⬆️ ⬇️

Hadoop Part 2: Collecting Data with Flume

image



In a previous post, we looked at the process of deploying a Cloudera-based Hadoop cluster in detail. In this article, we would like to elaborate on the methods and tools for collecting data in Hadoop. You can upload data to the system using simple copying to HDFS, and using special tools.



The easiest way to transfer data to a cluster is to copy files through the web interface of the file manager in the Hue control panel. The web interface is located at http: // [Hue_node]: 8888 / filebrowser / (instead of [Hue_node], the address of the node on which Hue is deployed is indicated). It is intuitive and does not require additional explanation.



The web interface is good for novice users; With it, it is convenient to investigate the structure of HDFS directories. At the same time, it is inconvenient for downloading large files (from several gigabytes).

')

image



To download a large number of files or large files, it is preferable to copy files to HDFS using the hadoop utility. This operation is performed using the following command, which is executed from any server that is part of the HDFS cluster:



  hadoop fs -put file_for_hadoop / path / to / put / file / in / HDFS / 


In this case, you can always use traditional pipes or pre-copy files to the server.



The described methods are well suited for situations where you need to transfer already existing data to HDFS. But it seems more natural to collect data immediately in Hadoop. For this purpose, specialized tools are used. One of these tools is being developed as part of the Apache Hadoop project. This is Flume, the universal tool for collecting logs and other data. In this article, we would like to talk about the architecture of Flume and share practical experience of using it.



About project Flume



The word flume means "channel". This tool is designed to manage data flows: collect them from various sources and send them to a centralized repository.



The Flume project is being developed by the Apache Software Foundation. The first versions of Flume came out in 2009. All versions released before 2011 (up to version 0.9.4) are called Flume OG (OG - old generation, i.e. the old generation). In 2011, work began on a new project branch, within the framework of which it is planned to significantly expand the functionality of Flume OG. This branch is known as Flume 728 (by task number in JIRA, which listed all the main notes and suggestions for improvement) or Flume NG (NG means new generation, ie, new generation). The latest to date version 1.4.0 was released in July 2013.



Architecture



Basic concepts


The description of the Flume architecture will begin with a definition of basic concepts:







Flow structure


The stream starts with the client, which sends the event to the agent (more precisely, to the source in the agent). The source that received the event transmits it to one or more channels. From the channels the event is transmitted to the drains that are part of the same agent. He can transfer it to another agent, or (if it is the final agent) to the destination node.



Since the source can transmit events to several channels, streams can be sent to several destination nodes. This is clearly shown in the figure below: the agent reads the event into two channels (Channel 1 and Channel 2), and then transfers them to independent drains.



image



Multiple threads can be combined into one. For this, it is necessary that several sources with the same agent transmit data to the same channel. The scheme of interaction between the components when combining flows is shown in the figure below (here each of the three agents, including several sources, transmits data to the same channel and then to the drain):



image



Reliability and error handling


Data transfer between sources and channels, as well as between agents, is carried out using transactions, which ensures data integrity.



Error handling is also carried out on the basis of the transaction mechanism. When a stream passes through several different agents, and when passing through, communication problems occur, events are buffered on the last agent available in the stream. More clearly the error handling scheme is presented in the figure below:



image



Error handling in the stream: (a) events move from the client to the central repository without communication problems; (b) there was a communication problem between the agent2 and the central repository, and events are buffered on the agent2; (c) after resolving communication problems, events buffered on Agent2 were sent to the central repository.



Installing Flume through Cloudera Manager



Since we have already deployed the Cloudera Hadoop cluster (see previous article), we will install Flume using the Cloudera Manager. On the page with the list of cluster services, select “Actions” → “Add a Service”.



image



Select "Flume" and click on the "Continue" button:



image



Now select the Zookeeper service with which our Flume service will be associated. You can create multiple clusters or similar services controlled by different Zookeeper instances.



image



Next, we specify the hosts in the cluster where Flume agents will be installed. You can configure several independent services with agents located on different hosts and having different settings. In our case, we will select all available hosts:



image



Click on the button "Continue". A message about the successful addition of a new service will soon appear on the screen:



image



Now let's move on to the Flume dashboard by selecting “Services” → “flume1” in Cloudera Manager:



image



The service page will open with the following tabs: general status, service instances (in this case, agents are listed in this tab), service control commands (on, off, reboot), service settings, access rights settings, statistics graphics and load. Open the configuration tab “Configuration →“ View and Edit ”:



image



By default, the settings of all Flume agents are stored in one configuration file (its contents are displayed in the Configuration File field). This file is common to all agents and is inherited by each of them:



image



Flume Agent Setup



Consider an example of setting up a flume agent that collects logs from syslog over UDP and saves them to HDFS in a cluster.



  ### syslog cfg
 a1.sources = r1
 a1.channels = c1
 a1.sinks = k1

 # source
 a1.sources.r1.type = syslogudp
 a1.sources.r1.port = 5140
 a1.sources.r1.host = cdh2.example.com
 a1.sources.r1.channels = c1

 # insert timestamp
 a1.sources.r1.interceptors = i1
 a1.sources.r1.interceptors.i1.type = timestamp

 # sink
 a1.sinks.k1.type = hdfs
 a1.sinks.k1.channel = c1
 a1.sinks.k1.hdfs.path = / log / flume / events /% y-% m-% d /% H-% M
 a1.sinks.k1.hdfs.filePrefix = flume-
 a1.sinks.k1.hdfs.round = true
 a1.sinks.k1.hdfs.roundValue = 5
 a1.sinks.k1.hdfs.roundUnit = minute
 a1.sinks.k1.hdfs.fileType = DataStream
 a1.sinks.k1.hdfs.rollCount = 100000
 a1.sinks.k1.hdfs.rollSize = 0

 # channel
 a1.channels.c1.type = memory
 a1.channels.c1.capacity = 1000 


As you can see from the above example, all the entries in the file have a hierarchical structure; the order of the lines is not important. Before each parameter the name of the agent to which it belongs is indicated. Next, indicate the type of the object (source, channel or drain) and its name, and then the types and subtypes of the parameters and the value itself.



For all agents, a single configuration file is created by default. Due to the common configuration file, several agents can have the same name and, accordingly, the same set of settings. This is useful for ensuring fault tolerance of agents or for load balancing between them. To change the role of the agent, it is enough to simply change its name without rewriting the configuration file again.



Details on setting up Flume agents can be found in the documentation .



Consider the structure of the configuration file in more detail. First, we set the names for the main objects and “bind” them to a specific agent. In our case, we indicate for the agent “a1” the source “r1”, the channel “c1” and the drain “k1”:



  a1.sources = r1
 a1.channels = c1
 a1.sinks = k1 


When specifying several objects, their names are listed separated by spaces (for example, “a1.sources = r1 r2 r3”)



Let's start with the channel setup. As a channel, we use the so-called memory channel, which simply stores an event queue in memory. In case of unexpected bursts of activity, the maximum queue size is set to 1000 messages, although the number of messages in the queue usually does not exceed 10.



  # channel
 a1.channels.c1.type = memory
 a1.channels.c1.capacity = 1000 




Configuring a Syslog UDP Source




As a source, we will use UDP Syslog, included in the standard delivery of Flume:



  a1.sources.r1.type = syslogudp
 a1.sources.r1.port = 5140
 a1.sources.r1.host = cdh2.example.com
 a1.sources.r1.channels = c1 


The parameters type, port and host speak for themselves. The channels parameter indicates the channels to which the source will be connected. When specifying multiple channels, their names are separated by spaces. The name of the channel is specified without the prefix of the agent name: it is understood that only the objects belonging to it can be used for each agent.



The following is the object that should be described in more detail - the interceptor (interceptor). Interceptors are not separate entities, but are part of the sources.



  # insert timestamp
 a1.sources.r1.interceptors = i1
 a1.sources.r1.interceptors.i1.type = timestamp 


Components of events (events) in Flume are the body (the data itself) and additional headers, the list of which may vary depending on the type of source. The interceptor performs preliminary data processing on the fly before being transmitted to the channel.



Interceptors can be chained; they are executed according to the order specified in the .interceptors directive (for example, "a1.sources.r1.interceptors = i1 i2 i3"). In our case, only one interceptor “i1” is used for the source “r1”.



The syslog source writes only the message itself to the event body. All other headers (corresponding to Syslog RFC) are recorded in the corresponding Flume event headers.



The syslog source writes a timestamp not in the unixtime format, but in human-readable form (for example, "2013: 09: 17-09: 03: 00") in the timestamp header. To fix this, we use the timestamp interceptor, which rewrites the timestamp header in unixtime format. We will need this title in the settings of the flow, which will be discussed further.



HDFS drain setup


Finally, let's proceed to setting up the drain, which will save our data in HDFS:



  a1.sinks.k1.type = hdfs
 a1.sinks.k1.channel = c1
 a1.sinks.k1.hdfs.path = / log / flume / events /% y-% m-% d /% H-% M
 a1.sinks.k1.hdfs.filePrefix = flume-
 a1.sinks.k1.hdfs.fileType = DataStream
 a1.sinks.k1.hdfs.round = true
 a1.sinks.k1.hdfs.roundValue = 5
 a1.sinks.k1.hdfs.roundUnit = minute
 a1.sinks.k1.hdfs.rollCount = 100000
 a1.sinks.k1.hdfs.rollSize = 0 


With the parameters type and channel everything is clear, but the rest should be discussed in more detail.



The path parameter specifies the path to the HDFS files in which data from events will be saved. When saving logs, it is highly desirable to distribute files into subfolders according to timestamps - this simplifies monitoring and subsequent data processing. It is enough to specify a directory with a date and a mask to process logs for a certain period. To form the path to the file of the drain, you need a timestamp of the event, which we redefined earlier using an interceptor.



The filePrefix parameter sets the prefix for files, which is convenient to use when collecting logs from different agents in the same folders.



The fileType parameter specifies the format of the files to which the events will be saved. DataStream is a standard format in which each event is stored as a string in a plain text file.



The parameters round, roundValue and roundUnit indicate that the timestamp value will be rounded to a multiple of 5 minutes. This will allow you to save files in subfolders in 5-minute increments. Without these parameters, subfolders would be created in 1 minute increments, which is not very convenient.



When working with large streams of messages, it seems appropriate to additionally split the files inside the subfolders, rather than writing one huge file. This can be done using the roll * parameters:

rollCount = 100000 indicates the number of messages in one file, above which the current file is closed and a new one is created.

rollSize = 0 indicates that we do not limit the size of each file.



Client setup


So, our agent is configured and ready to receive and convert data, and then save it to HDFS. It remains only to send the data itself via the UDP protocol in the Syslog format.



Consider the procedure for collecting data on the example of our service "Cloud Storage". As a load balancer, we use haproxy, which transmits HTTP request logs to the Flume agent. These logs contain the client’s address, the request URL, the amount of data transferred and other service data.

Here is an example of the part of the haproxy config file that is responsible for logging:



  global
  log [FQDN_Flume_agent]: 5140 local0 info
  maxconn 60000
  nbproc 16
  user haproxy
  group haproxy
  daemon
  tune.maxaccept -1
  tune.bufsize 65536

 defaults
  log global
  mode http
  # for hadoop
  option httplog
  #option logasap
  log-format \ t% T \ t% ci \ t% cp \ t% ft \ t% b \ t% s \ t% ST \ t% B \ t% sq \ t% bq \ t% r
  option dontlognull
  #option dontlog-normal 


The log option indicates the address of the server and the port on which the Flume agent is running, as well as the standard syslog facility facility local0 and the logging level notify.



The mode http and option httplog directives indicate that we will save the HTTP access logs. More information about the haproxy log formats can be found in the documentation .



To keep as much information as possible, disable the logasap and dontlog-normal options. When the logasap option is disabled, haproxy will log an HTTP request after it is completed, indicating the amount of received and transmitted data. To haproxy log all requests, including successful ones, you need to disable the dontlog-normal option.



In order to present the data in a close-to-machine-readable format and simplify further data processing, we changed the formatting of the logs (log-format directive). By default, the space character is used as a separator in the logs, but it can be contained in the data itself, which makes it difficult to process them further. Therefore, we replaced it with a tab character. In addition, we have disabled the quotes for the URL and the request method.



For reference: access logs from the Storage per day are 20-30GB. At the moment, we have already collected more than 200TB of data for further research. The Flume agent practically does not load the server on which it works (Load Average ~ 0.01), and the HDFS service easily decomposes and reserves the obtained data into three independent cluster nodes. And all this is on far from the most productive servers with ordinary spindle disks.



Conclusion



In this article, we looked at the procedure for collecting and storing data in a Hadoop cluster using the Flume service using the example of our “Cloud Storage” logs. Naturally, this example of the possibility of Flume is far from exhausted.

In the next article we will move on to the most interesting part - data processing in Hadoop using the Pig tool. Stay tuned!



For those who can not comment on posts on Habré, we invite to our blog .

Source: https://habr.com/ru/post/201204/



All Articles