Hi, Habr! After a long pause, we finally return to parsing
Apache Flume . In previous articles, we met Flume (
Part 1 ) and figured out how to configure its main components (
Part 2 ). In this, the final part of the cycle, we will consider the following questions:
- How to set up monitoring of node components.
- How to write your own implementation of the Flume component.
- Designing a complete transport network.
Monitoring component state
So, we set up and run all the nodes, checked their performance - the data is successfully delivered to the destination. But some time passes, we look at the result of the work of our transport network (for example, the folder with the files into which the data is packed) and we understand that a problem has arisen - since some time new files do not appear in our folder. The next step seems obvious - open the logs, look for the cause. The only trouble is that there may be a lot of nodes in our transport network, which means you need to manually view the logs of all the nodes, which, to put it mildly, is not very convenient. When such problems arise, I would like to respond to them as quickly as possible, and even better - to avoid such critical situations at all.
Flume components in the process of writing metrics that allow you to assess the status of the site. Using the values ​​of these metrics, it is quite easy to determine that everything is not all right with the node.
')
To store the counters and other attributes of its components, Flume uses
java.lang.management.ManagementFactory , registering its own bean classes for maintaining metrics. All these classes are inherited from
MonitoredCounterGroup (for the curious, the
link to the source code ).
If you do not plan to develop your own components of Flume, then digging into the mechanism of maintaining metrics is completely optional, it is enough to figure out how to get them. This can be done quite simply with the utilitarian class
JMXPollUtil :
package ru.test.flume.monitoring; import java.util.Map; import org.apache.flume.instrumentation.util.JMXPollUtil; public class FlumeMetrics { public static Map<String, Map<String, String>> getMetrics() { Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans(); return metricsMap; } }
As a result, you will get metrics grouped by node components that look like this:
Flume Component Metrics (JSON) { "SOURCE.my-source": { "EventReceivedCount": "567393607", "AppendBatchAcceptedCount": "5689696", "Type": "SOURCE", "EventAcceptedCount": "567393607", "AppendReceivedCount": "0", "StartTime": "1467797931288", "AppendAcceptedCount": "0", "OpenConnectionCount": "1", "AppendBatchReceivedCount": "5689696", "StopTime": "0" }, "CHANNEL.my-channel": { "ChannelCapacity": "100000000", "ChannelFillPercentage": "5.0E-4", "Type": "CHANNEL", "ChannelSize": "500", "EventTakeSuccessCount": "567393374", "StartTime": "1467797930967", "EventTakeAttemptCount": "569291443", "EventPutSuccessCount": "567393607", "EventPutAttemptCount": "567393607", "StopTime": "0" }, "SINK.my-sink": { "ConnectionCreatedCount": "1", "ConnectionClosedCount": "0", "Type": "SINK", "BatchCompleteCount": "2", "EventDrainAttemptCount": "567393374", "BatchEmptyCount": "959650", "StartTime": "1467797930968", "EventDrainSuccessCount": "567393374", "BatchUnderflowCount": "938419", "StopTime": "0", "ConnectionFailedCount": "0" } }
Metrics received, now you need to send them somewhere. Here you can go two ways.
- Use Flume features to provide metrics.
- Write your own implementation of processing metrics.
Flume provides an API that allows you to specify a monitoring method — for this, the implementations of the
MonitorService interface are
used . In order to enable monitoring, you must specify a class that implements
MonitorService
as a system property when the node is started (or in code).
java -Dflume.monitoring.type=org.apache.flume.instrumentation.http.HTTPMetricsServer ...
System.setProperty("flume.monitoring.type", "org.apache.flume.instrumentation.http.HTTPMetricsServer");
The
HTTPMetricsServer
class provides a standard way to monitor the status of a node. It is a small web server that, upon request, provides a complete list of host metrics as JSON (as in the example above). To specify the port on which this server will listen to requests, it is enough to add a parameter to the Flume configuration (by default, it uses port 41414):
flume.monitoring.port = 61509
The request to this server looks like this:
localhost:61509/metrics
.
If this way of keeping track of metrics is not enough, then you have to go the second way and write your own implementation of
MonitorService
. This is exactly what we did to monitor the state of our nodes with Graphite. Below is a simple example of such an implementation.
FlumeGraphiteMonitor package ru.dmp.flume.monitoring; import com.google.common.base.CaseFormat; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.flume.Context; import org.apache.flume.instrumentation.MonitorService; import org.apache.flume.instrumentation.util.JMXPollUtil; public class FlumeGraphiteMonitor implements MonitorService {
As a result, we get a neat Graphite branch with all node metrics.
Below are descriptions of graphs and metrics that we use for one of our services.
- The intensity of the sending service messages to the node Flume. The graph is not built according to the node metrics - these values ​​in Graphite send services that generate data and are the starting point of our transport system. If your data sources do not allow tracking the sending of data to Flume, then similar graphs can be removed from the source (s) of the site.
If the value on this graph drops to zero, then the client for some reason cannot send messages to Flume. To diagnose who is to blame in such situations, we separately display a graph of errors that occur on the client side. Accordingly, if it is non-zero, the problem is on the Flume node, the source cannot accept the data. If the drop in intensity does not lead to an increase in the number of errors, then the problem is on the side of the service, he stopped sending messages.
- Node channel fullness. With this schedule, everything is simple - it should always be very close to zero. If the channel does not have time to empty, then somewhere in our transport network there is a bottleneck and you need to look for nodes that do not have time to cope with the load. Metrics on the chart:
flume.channel.{CHANNEL-NAME}.channel-fill-percentage
- The intensity of the work of waste nodes. The expected indicators of stocks at this site are “how much they got, they sent so much”, since the events from the services are not duplicated in the channels. Thus, the intensity of emptying the effluent should be the same as the intensity of sending data by customers. Metrics on the chart:
flume.sink.{SINK-NAME}.event-drain-success-count
The drop in the intensity of any of the drains to zero indicates a potential problem at the next, receiving node. As a result, the channel, devastated by the “broken” drain, will begin to fill. It is also possible that the receiving nodes work normally, but simply do not have time to process the input data - in this case, the drain schedules will be non-zero, but the channels will be gradually filled.
Creating your own Flume components
Despite the fact that the set of standard Flume components is quite extensive, quite often there are situations that cannot be resolved with these standard components. In this case, you can write your Flume component and use it in the nodes. You can write your own implementation for any of the components of Flume - sink, source, channel, interceptor, etc.
The first thing that caught my eye when examining Flume stocks is the lack of flexible flow for the file system. Yes, there is a
File-Roll Sink , the capabilities of which were described in the
2nd part of the cycle . But this stock is completely deprived of the possibility to somehow influence the file name, which is not very convenient.
We decided to develop our stock, which allows forming files in the local file system. In developing the following considerations.
- We have quite a lot of services with a relatively small load. This means that in the end we will have quite a lot of disparate files - I would not want to configure a separate drain for each of them.
- Files must be rotated in time. Moreover, for different data, the rotation period may differ (by rotation, it means "cutting data" into files by time - 15 minutes, an hour, etc.).
- Data from each service should be stored in a separate folder. Moreover, one service can generate data for several subfolders.
Based on these theses, we came to the conclusion that the task of generating the file name is better left to the client (ie, data generating services), otherwise the flow configuration will be too cumbersome, and for each new “client” you will have to add a separate flow with individual settings .
Note A comparison with the HDFS drain, which we discussed in the previous article, is relevant here. This drain allows you to perform very fine tuning of the rotation and file naming. But this flexibility of adjustment has a reverse side - for example, for files that rotate once every 15 and once every 30 minutes, you have to do different drains, even if in all other respects the parameters are identical.So, the decision on the functionality of the file flow was taken as follows:
- The name of the file to which the data should be recorded is determined by the client and transmits it in the header along with the event.
- The file name may contain subfolders.
- Files that are recorded by the drain are closed by a certain timeout when events stop coming for them.
Schematically, the data processing by this drain looks like this:
What it gave in the end:
- There is no need to add stock for each new service or data type.
- Stoke is deprived of the costs of forming the file name (in the previous part we considered these costs on the example of HDFS drain)
- Since the file name is uniquely identified by one header, we can use the client side event grouping (this technique is also described in the second part of the cycle).
The source code of the file sink and an example of its configuration
are laid out on GitHub . I think it makes no sense to analyze the process of developing this flow in detail, I will limit myself to a few theses:
- The component is based on either an abstract class or a component interface (depending on whether you are developing - a sink, an interceptor, or something else).
- We make our own implementation - the easiest way is to take something from the ready-made Flume components as an example, since all the sources are available.
- When configuring, we specify not the reserved alias of the component (of the type 'avro' or 'logger'), but the entire class name.
We design transport network
We considered the general methods of data management in the previous parts of the cycle - events can be divided between nodes, duplicated, choose the “direction of movement” using headers, etc. Let us now try to use all these techniques in order to build a reliable transport network. Suppose the task is as follows.
- The data provider is a service running on several machines (it has several identical instances).
- The data generated by the service is heterogeneous - some of them need to be delivered in HDFS, some - in the file system to a certain log server.
- It is necessary to conduct in real time some non-atomic calculations associated with the data.
On condition 3 I’ll dwell more Suppose that the task is to count the unique users of the site for the last hour. In this case, we cannot afford to parallelize the data stream from the machines or calculate this value separately on each web service - it is necessary to count the unique users by their cookies on a single data stream from all machines. Otherwise, each instance will have its own set of unique users that cannot be “picked up and folded” to get the final result.
Note Of course, the example is a bit far-fetched - this problem can be solved in other, more efficient ways. The essence of the example is reduced to the situation when you need to pass some data flow centrally through one handler and it is impossible to divide the load due to the nature of the task.So, for the beginning let's prepare the client and end nodes:
For each of the web services, we put our own individual node on the same machine as the web service. This is done for the following reasons:
- These nodes play the role of a buffer - if for some reason the delivery of events to other machines becomes impossible, these nodes will allow you to “hold out” for some time without data loss due to the thick file channel.
- Reduced response time. Of course, sending data to Flume should be done asynchronously - but during peak loads or a clogged network, a situation may arise when the background thread does not have time to send new events. In this case, the queue for sending can grow greatly, mercilessly absorbing the memory of the service, which is not very good. In the case where the node is located on the same machine as the service, these costs are significantly reduced.
- If the further processing logic changes and you decide to rebuild the transport network, then the changes will need to be made only in the configuration of the client node, and not the web service. For him, everything will remain the same - “I send data to my site, then let him decide how to be”.
The question remains - how to deliver the data in such a way that nothing is lost if something breaks? We have already taken a number of measures - data for HDFS and FS are written to several machines. In this case, the data is not duplicated, but divided. Thus, if one of the end machines fails, the entire load will go to the remainder. The result of such a breakdown will be dysbalance on the recorded amount of data on different machines, but it is quite possible to live with this.
To ensure greater stability, add a few intermediate Flume nodes that will deal directly with data distribution:
It turned out pretty weird web. What's going on here:
- The web service sends events to the client node.
- Each event has a header that indicates “destination” (for example, dist = FS or dist = HDFS ), as well as a uniq header with possible values ​​of 1 or 0.
- Each client node has 3 streams that equally empty the channel and evenly distribute the events between the three intermediate nodes - Splitters (so far without looking at the dist header).
- Each Splitter has several channels - for HDFS, FS, and a unique user counter. The required channel is selected by the dist and uniq headers.
- Each of these channels on Splitter has several sinks that evenly distribute the events between the end machines (FS, HDFS or UNQ).
If everything is relatively simple with client nodes - they just divide the events between Splitters, then the structure of a single Splitter should be considered in more detail.
Here you can see that the end point for the data is determined using the
dist header. At the same time, the events for which unique users are counted do not depend on the
dist header - they are oriented on the
uniq header. This means that some events can be duplicated in several channels, for example HDFS and UNQ.
Earlier, I did not specifically indicate the direction from the Splitters to the UNQ nodes. The point is that these nodes do not accept distributed data, like HDFS or FS. Given the specifics of the task of counting unique users, the entire data stream must pass through only
one machine. A logical question - why do we need then 2 nodes to count unique users? The answer is because if one node fails, there will be no one to replace it. How can we be here - we cannot divide events between nodes, we cannot leave one alone too?
Here, another Flume tool can help us, allowing the drains to work in a group on the principle “If runoff 1 broke, use runoff 2”. This component is called
Failover Sink Processor . Its configuration is as follows:
The above setting group of threads allows you to use only one sink, but at the same time have a “spare” in case of an accident. Those. as long as the high priority drain is working properly, low priority drains will be idle.
Thus, the task is completed - the data is distributed between HDFS and FS, the counters of unique users work correctly. In this case, the failure of any machine will not lead to data loss:
- If a machine with a Web service breaks, this problem is solved by a balancer.
- If one of the Splitters fails, the load will be distributed among the rest.
- End nodes are also duplicated, the failure of one of them will not lead to stagnation or loss of data.
- The node for counting unique users has a “doubler” and in the event of a breakdown it will be replaced by it without disturbing the logic of data processing.
For such a scheme, scaling tasks are reduced to a simple change in the configuration of Flume nodes for the appropriate level of nodes (Client, Splitter, or Final):
- New Web service instance - no configuration changes are required, it is simply installed along with the Flume client node.
- New Splitter - you only need to change the configuration of the client nodes by adding a new stock.
- New end node - you only need to change the configuration of Splitter'a, adding a new stock.
Conclusion
This concludes the series of articles about Apache Flume. We covered all the most popular of its components, figured out how to manage the flow of data and looked at an example of a complete transport network. Nevertheless, the possibilities of Flume are not exhausted by all of this - in the standard package there are still quite a few components not considered by us that can make life much easier when solving certain tasks. We hope that this series of articles has helped you to get acquainted with Flume and to get a fairly complete picture of it.
Thanks for attention!