In a previous post, we looked at the process of deploying a Cloudera-based Hadoop cluster in detail. In this article, we would like to elaborate on the methods and tools for collecting data in Hadoop. You can upload data to the system using simple copying to HDFS, and using special tools.
The easiest way to transfer data to a cluster is to copy files through the web interface of the file manager in the Hue control panel. The web interface is located at http: // [Hue_node]: 8888 / filebrowser / (instead of [Hue_node], the address of the node on which Hue is deployed is indicated). It is intuitive and does not require additional explanation.
The web interface is good for novice users; With it, it is convenient to investigate the structure of HDFS directories. At the same time, it is inconvenient for downloading large files (from several gigabytes).
')
To download a large number of files or large files, it is preferable to copy files to HDFS using the hadoop utility. This operation is performed using the following command, which is executed from any server that is part of the HDFS cluster:
hadoop fs -put file_for_hadoop / path / to / put / file / in / HDFS /
In this case, you can always use traditional pipes or pre-copy files to the server.
The described methods are well suited for situations where you need to transfer already existing data to HDFS. But it seems more natural to collect data immediately in Hadoop. For this purpose, specialized tools are used. One of these tools is being developed as part of the Apache Hadoop project. This is Flume, the universal tool for collecting logs and other data. In this article, we would like to talk about the architecture of Flume and share practical experience of using it.
About project Flume
The word flume means "channel". This tool is designed to manage data flows: collect them from various sources and send them to a centralized repository.
The Flume project is being developed by the Apache Software Foundation. The first versions of Flume came out in 2009. All versions released before 2011 (up to version 0.9.4) are called Flume OG (OG - old generation, i.e. the old generation). In 2011, work began on a new project branch, within the framework of which it is planned to significantly expand the functionality of Flume OG. This branch is known as Flume 728 (by task number in JIRA, which listed all the main notes and suggestions for improvement) or Flume NG (NG means new generation, ie, new generation). The latest to date version 1.4.0 was released in July 2013.
Architecture
Basic concepts
The description of the Flume architecture will begin with a definition of basic concepts:
- event (event) - a data set transmitted by Flume from the point of origin to the point of destination;
- flow (flow) - the movement of events from the point of origin to the point of destination;
- client (client) - any application that sends data to the Flume agent;
- agent (agent) - an independent process in which components such as sources, channels and drains; stores events and transfers to the next node;
- source (source) - an interface that receives messages through various data transfer protocols. Received events are transmitted by the source to one or several channels. Flume supports the following standards for transferring data from external sources: Avro, log4j, syslog, HTTP Post with JSON body. Using the ExecSource library, you can implement support for other standards;
- channel (channel) - temporary storage for events. The event is in the channel until it is withdrawn from it by a drain. Channels store event queues, allowing you to separate drains and sources with different performance and architecture. Depending on the type of channel, events may be stored in memory, a regular file on disk or in a database (for example, a JDBC channel);
- sink (sink) - the implementation of the interface that takes an event from the channel and sends it to the next agent in the stream or saves it to the final storage (for example, HDFS). The drains that transmit the event to the final storage are called final drains. Examples of final sinks include the HDFS file system, the Hive database, and the Solr search server. An example of a normal runoff is Avro, which simply sends messages to other agents.
Flow structure
The stream starts with the client, which sends the event to the agent (more precisely, to the source in the agent). The source that received the event transmits it to one or more channels. From the channels the event is transmitted to the drains that are part of the same agent. He can transfer it to another agent, or (if it is the final agent) to the destination node.
Since the source can transmit events to several channels, streams can be sent to several destination nodes. This is clearly shown in the figure below: the agent reads the event into two channels (Channel 1 and Channel 2), and then transfers them to independent drains.
Multiple threads can be combined into one. For this, it is necessary that several sources with the same agent transmit data to the same channel. The scheme of interaction between the components when combining flows is shown in the figure below (here each of the three agents, including several sources, transmits data to the same channel and then to the drain):
Reliability and error handling
Data transfer between sources and channels, as well as between agents, is carried out using transactions, which ensures data integrity.
Error handling is also carried out on the basis of the transaction mechanism. When a stream passes through several different agents, and when passing through, communication problems occur, events are buffered on the last agent available in the stream. More clearly the error handling scheme is presented in the figure below:
Error handling in the stream: (a) events move from the client to the central repository without communication problems; (b) there was a communication problem between the agent2 and the central repository, and events are buffered on the agent2; (c) after resolving communication problems, events buffered on Agent2 were sent to the central repository.
Installing Flume through Cloudera Manager
Since we have already deployed the Cloudera Hadoop cluster (see previous article), we will install Flume using the Cloudera Manager. On the page with the list of cluster services, select “Actions” → “Add a Service”.
Select "Flume" and click on the "Continue" button:
Now select the Zookeeper service with which our Flume service will be associated. You can create multiple clusters or similar services controlled by different Zookeeper instances.
Next, we specify the hosts in the cluster where Flume agents will be installed. You can configure several independent services with agents located on different hosts and having different settings. In our case, we will select all available hosts:
Click on the button "Continue". A message about the successful addition of a new service will soon appear on the screen:
Now let's move on to the Flume dashboard by selecting “Services” → “flume1” in Cloudera Manager:
The service page will open with the following tabs: general status, service instances (in this case, agents are listed in this tab), service control commands (on, off, reboot), service settings, access rights settings, statistics graphics and load. Open the configuration tab “Configuration →“ View and Edit ”:
By default, the settings of all Flume agents are stored in one configuration file (its contents are displayed in the Configuration File field). This file is common to all agents and is inherited by each of them:
Flume Agent Setup
Consider an example of setting up a flume agent that collects logs from syslog over UDP and saves them to HDFS in a cluster.
### syslog cfg
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = cdh2.example.com
a1.sources.r1.channels = c1
# insert timestamp
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = / log / flume / events /% y-% m-% d /% H-% M
a1.sinks.k1.hdfs.filePrefix = flume-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 5
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollCount = 100000
a1.sinks.k1.hdfs.rollSize = 0
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
As you can see from the above example, all the entries in the file have a hierarchical structure; the order of the lines is not important. Before each parameter the name of the agent to which it belongs is indicated. Next, indicate the type of the object (source, channel or drain) and its name, and then the types and subtypes of the parameters and the value itself.
For all agents, a single configuration file is created by default. Due to the common configuration file, several agents can have the same name and, accordingly, the same set of settings. This is useful for ensuring fault tolerance of agents or for load balancing between them. To change the role of the agent, it is enough to simply change its name without rewriting the configuration file again.
Details on setting up Flume agents can be found in the
documentation .
Consider the structure of the configuration file in more detail. First, we set the names for the main objects and “bind” them to a specific agent. In our case, we indicate for the agent “a1” the source “r1”, the channel “c1” and the drain “k1”:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
When specifying several objects, their names are listed separated by spaces (for example, “a1.sources = r1 r2 r3”)
Let's start with the channel setup. As a channel, we use the so-called memory channel, which simply stores an event queue in memory. In case of unexpected bursts of activity, the maximum queue size is set to 1000 messages, although the number of messages in the queue usually does not exceed 10.
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
Configuring a Syslog UDP Source
As a source, we will use UDP Syslog, included in the standard delivery of Flume:
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = cdh2.example.com
a1.sources.r1.channels = c1
The parameters type, port and host speak for themselves. The channels parameter indicates the channels to which the source will be connected. When specifying multiple channels, their names are separated by spaces. The name of the channel is specified without the prefix of the agent name: it is understood that only the objects belonging to it can be used for each agent.
The following is the object that should be described in more detail - the interceptor (interceptor). Interceptors are not separate entities, but are part of the sources.
# insert timestamp
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
Components of events (events) in Flume are the body (the data itself) and additional headers, the list of which may vary depending on the type of source. The interceptor performs preliminary data processing on the fly before being transmitted to the channel.
Interceptors can be chained; they are executed according to the order specified in the .interceptors directive (for example, "a1.sources.r1.interceptors = i1 i2 i3"). In our case, only one interceptor “i1” is used for the source “r1”.
The syslog source writes only the message itself to the event body. All other headers (corresponding to Syslog RFC) are recorded in the corresponding Flume event headers.
The syslog source writes a timestamp not in the unixtime format, but in human-readable form (for example, "2013: 09: 17-09: 03: 00") in the timestamp header. To fix this, we use the timestamp interceptor, which rewrites the timestamp header in unixtime format. We will need this title in the settings of the flow, which will be discussed further.
HDFS drain setup
Finally, let's proceed to setting up the drain, which will save our data in HDFS:
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = / log / flume / events /% y-% m-% d /% H-% M
a1.sinks.k1.hdfs.filePrefix = flume-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 5
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollCount = 100000
a1.sinks.k1.hdfs.rollSize = 0
With the parameters type and channel everything is clear, but the rest should be discussed in more detail.
The path parameter specifies the path to the HDFS files in which data from events will be saved. When saving logs, it is highly desirable to distribute files into subfolders according to timestamps - this simplifies monitoring and subsequent data processing. It is enough to specify a directory with a date and a mask to process logs for a certain period. To form the path to the file of the drain, you need a timestamp of the event, which we redefined earlier using an interceptor.
The filePrefix parameter sets the prefix for files, which is convenient to use when collecting logs from different agents in the same folders.
The fileType parameter specifies the format of the files to which the events will be saved. DataStream is a standard format in which each event is stored as a string in a plain text file.
The parameters round, roundValue and roundUnit indicate that the timestamp value will be rounded to a multiple of 5 minutes. This will allow you to save files in subfolders in 5-minute increments. Without these parameters, subfolders would be created in 1 minute increments, which is not very convenient.
When working with large streams of messages, it seems appropriate to additionally split the files inside the subfolders, rather than writing one huge file. This can be done using the roll * parameters:
rollCount = 100000 indicates the number of messages in one file, above which the current file is closed and a new one is created.
rollSize = 0 indicates that we do not limit the size of each file.
Client setup
So, our agent is configured and ready to receive and convert data, and then save it to HDFS. It remains only to send the data itself via the UDP protocol in the Syslog format.
Consider the procedure for collecting data on the example of our service "Cloud Storage". As a load balancer, we use haproxy, which transmits HTTP request logs to the Flume agent. These logs contain the client’s address, the request URL, the amount of data transferred and other service data.
Here is an example of the part of the haproxy config file that is responsible for logging:
global
log [FQDN_Flume_agent]: 5140 local0 info
maxconn 60000
nbproc 16
user haproxy
group haproxy
daemon
tune.maxaccept -1
tune.bufsize 65536
defaults
log global
mode http
# for hadoop
option httplog
#option logasap
log-format \ t% T \ t% ci \ t% cp \ t% ft \ t% b \ t% s \ t% ST \ t% B \ t% sq \ t% bq \ t% r
option dontlognull
#option dontlog-normal
The log option indicates the address of the server and the port on which the Flume agent is running, as well as the standard syslog facility facility local0 and the logging level notify.
The mode http and option httplog directives indicate that we will save the HTTP access logs. More information about the haproxy log formats can be found in the
documentation .
To keep as much information as possible, disable the logasap and dontlog-normal options. When the logasap option is disabled, haproxy will log an HTTP request after it is completed, indicating the amount of received and transmitted data. To haproxy log all requests, including successful ones, you need to disable the dontlog-normal option.
In order to present the data in a close-to-machine-readable format and simplify further data processing, we changed the formatting of the logs (log-format directive). By default, the space character is used as a separator in the logs, but it can be contained in the data itself, which makes it difficult to process them further. Therefore, we replaced it with a tab character. In addition, we have disabled the quotes for the URL and the request method.
For reference: access logs from the Storage per day are 20-30GB. At the moment, we have already collected more than 200TB of data for further research. The Flume agent practically does not load the server on which it works (Load Average ~ 0.01), and the HDFS service easily decomposes and reserves the obtained data into three independent cluster nodes. And all this is on far from the most productive servers with ordinary spindle disks.
Conclusion
In this article, we looked at the procedure for collecting and storing data in a Hadoop cluster using the Flume service using the example of our “Cloud Storage” logs. Naturally, this example of the possibility of Flume is far from exhausted.
In the next article we will move on to the most interesting part - data processing in Hadoop using the Pig tool. Stay tuned!
For those who can not comment on posts on Habré, we invite to our
blog .