📜 ⬆️ ⬇️

Flume - manage data streams. Part 3

Hi, Habr! After a long pause, we finally return to parsing Apache Flume . In previous articles, we met Flume ( Part 1 ) and figured out how to configure its main components ( Part 2 ). In this, the final part of the cycle, we will consider the following questions:



Monitoring component state


So, we set up and run all the nodes, checked their performance - the data is successfully delivered to the destination. But some time passes, we look at the result of the work of our transport network (for example, the folder with the files into which the data is packed) and we understand that a problem has arisen - since some time new files do not appear in our folder. The next step seems obvious - open the logs, look for the cause. The only trouble is that there may be a lot of nodes in our transport network, which means you need to manually view the logs of all the nodes, which, to put it mildly, is not very convenient. When such problems arise, I would like to respond to them as quickly as possible, and even better - to avoid such critical situations at all.

Flume components in the process of writing metrics that allow you to assess the status of the site. Using the values ​​of these metrics, it is quite easy to determine that everything is not all right with the node.
')
To store the counters and other attributes of its components, Flume uses java.lang.management.ManagementFactory , registering its own bean classes for maintaining metrics. All these classes are inherited from MonitoredCounterGroup (for the curious, the link to the source code ).

If you do not plan to develop your own components of Flume, then digging into the mechanism of maintaining metrics is completely optional, it is enough to figure out how to get them. This can be done quite simply with the utilitarian class JMXPollUtil :

package ru.test.flume.monitoring; import java.util.Map; import org.apache.flume.instrumentation.util.JMXPollUtil; public class FlumeMetrics { public static Map<String, Map<String, String>> getMetrics() { Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans(); return metricsMap; } } 

As a result, you will get metrics grouped by node components that look like this:

Flume Component Metrics (JSON)
 { "SOURCE.my-source": { "EventReceivedCount": "567393607", "AppendBatchAcceptedCount": "5689696", "Type": "SOURCE", "EventAcceptedCount": "567393607", "AppendReceivedCount": "0", "StartTime": "1467797931288", "AppendAcceptedCount": "0", "OpenConnectionCount": "1", "AppendBatchReceivedCount": "5689696", "StopTime": "0" }, "CHANNEL.my-channel": { "ChannelCapacity": "100000000", "ChannelFillPercentage": "5.0E-4", "Type": "CHANNEL", "ChannelSize": "500", "EventTakeSuccessCount": "567393374", "StartTime": "1467797930967", "EventTakeAttemptCount": "569291443", "EventPutSuccessCount": "567393607", "EventPutAttemptCount": "567393607", "StopTime": "0" }, "SINK.my-sink": { "ConnectionCreatedCount": "1", "ConnectionClosedCount": "0", "Type": "SINK", "BatchCompleteCount": "2", "EventDrainAttemptCount": "567393374", "BatchEmptyCount": "959650", "StartTime": "1467797930968", "EventDrainSuccessCount": "567393374", "BatchUnderflowCount": "938419", "StopTime": "0", "ConnectionFailedCount": "0" } } 

Metrics received, now you need to send them somewhere. Here you can go two ways.

  1. Use Flume features to provide metrics.
  2. Write your own implementation of processing metrics.

Flume provides an API that allows you to specify a monitoring method — for this, the implementations of the MonitorService interface are used . In order to enable monitoring, you must specify a class that implements MonitorService as a system property when the node is started (or in code).
 java -Dflume.monitoring.type=org.apache.flume.instrumentation.http.HTTPMetricsServer ... 

 System.setProperty("flume.monitoring.type", "org.apache.flume.instrumentation.http.HTTPMetricsServer"); 

The HTTPMetricsServer class provides a standard way to monitor the status of a node. It is a small web server that, upon request, provides a complete list of host metrics as JSON (as in the example above). To specify the port on which this server will listen to requests, it is enough to add a parameter to the Flume configuration (by default, it uses port 41414):

 flume.monitoring.port = 61509 

The request to this server looks like this: localhost:61509/metrics .

If this way of keeping track of metrics is not enough, then you have to go the second way and write your own implementation of MonitorService . This is exactly what we did to monitor the state of our nodes with Graphite. Below is a simple example of such an implementation.

FlumeGraphiteMonitor
 package ru.dmp.flume.monitoring; import com.google.common.base.CaseFormat; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.flume.Context; import org.apache.flume.instrumentation.MonitorService; import org.apache.flume.instrumentation.util.JMXPollUtil; public class FlumeGraphiteMonitor implements MonitorService { //   ,      Graphite private static final Set<String> EXCLUDED_METRICS = new HashSet<String>() {{ add("start-time"); add("stop-time"); }}; private volatile long period = 60 * 1000; //  , 1  private volatile boolean switcher = true; private Thread scheduler = new Thread(this::schedule); @Override public void configure(Context context) { //    -     } private void schedule() { while (switcher) { send(); synchronized (this) { try { wait(period); } catch (InterruptedException ex) {} } } } @Override public void start() { scheduler.start(); } @Override public void stop() { switcher = false; synchronized (this) { notifyAll(); } try { scheduler.join(); } catch (InterruptedException ex) {} } private void send() { Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans(); for (Map.Entry<String, Map<String, String>> e: metricsMap.entrySet()) { if (e.getValue() != null) { //     Flume    "flume" String group = "flume." + normalize(e.getKey().toLowerCase()) + "."; for (Map.Entry<String, String> metric : e.getValue().entrySet()) { try { Double value = Double.valueOf(metric.getValue()); String metricName = normalize(metric.getKey()); if (!EXCLUDED_METRICS.contains(metricName)) { String fullName = group + normalize(metric.getKey()); //      -  // Graphite.send(metricName, value); } } catch (NumberFormatException ex) { //   ,    } } } } } //    EventReceivedCount -> event-received-count () private static String normalize(String str) { return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, str).replaceAll("_", "-"); } 

As a result, we get a neat Graphite branch with all node metrics.


Below are descriptions of graphs and metrics that we use for one of our services.

  1. The intensity of the sending service messages to the node Flume. The graph is not built according to the node metrics - these values ​​in Graphite send services that generate data and are the starting point of our transport system. If your data sources do not allow tracking the sending of data to Flume, then similar graphs can be removed from the source (s) of the site.

    If the value on this graph drops to zero, then the client for some reason cannot send messages to Flume. To diagnose who is to blame in such situations, we separately display a graph of errors that occur on the client side. Accordingly, if it is non-zero, the problem is on the Flume node, the source cannot accept the data. If the drop in intensity does not lead to an increase in the number of errors, then the problem is on the side of the service, he stopped sending messages.


  2. Node channel fullness. With this schedule, everything is simple - it should always be very close to zero. If the channel does not have time to empty, then somewhere in our transport network there is a bottleneck and you need to look for nodes that do not have time to cope with the load. Metrics on the chart:
    flume.channel.{CHANNEL-NAME}.channel-fill-percentage


  3. The intensity of the work of waste nodes. The expected indicators of stocks at this site are “how much they got, they sent so much”, since the events from the services are not duplicated in the channels. Thus, the intensity of emptying the effluent should be the same as the intensity of sending data by customers. Metrics on the chart:
    flume.sink.{SINK-NAME}.event-drain-success-count

    The drop in the intensity of any of the drains to zero indicates a potential problem at the next, receiving node. As a result, the channel, devastated by the “broken” drain, will begin to fill. It is also possible that the receiving nodes work normally, but simply do not have time to process the input data - in this case, the drain schedules will be non-zero, but the channels will be gradually filled.



Creating your own Flume components


Despite the fact that the set of standard Flume components is quite extensive, quite often there are situations that cannot be resolved with these standard components. In this case, you can write your Flume component and use it in the nodes. You can write your own implementation for any of the components of Flume - sink, source, channel, interceptor, etc.

The first thing that caught my eye when examining Flume stocks is the lack of flexible flow for the file system. Yes, there is a File-Roll Sink , the capabilities of which were described in the 2nd part of the cycle . But this stock is completely deprived of the possibility to somehow influence the file name, which is not very convenient.

We decided to develop our stock, which allows forming files in the local file system. In developing the following considerations.


Based on these theses, we came to the conclusion that the task of generating the file name is better left to the client (ie, data generating services), otherwise the flow configuration will be too cumbersome, and for each new “client” you will have to add a separate flow with individual settings .

Note A comparison with the HDFS drain, which we discussed in the previous article, is relevant here. This drain allows you to perform very fine tuning of the rotation and file naming. But this flexibility of adjustment has a reverse side - for example, for files that rotate once every 15 and once every 30 minutes, you have to do different drains, even if in all other respects the parameters are identical.

So, the decision on the functionality of the file flow was taken as follows:


Schematically, the data processing by this drain looks like this:

What it gave in the end:


The source code of the file sink and an example of its configuration are laid out on GitHub . I think it makes no sense to analyze the process of developing this flow in detail, I will limit myself to a few theses:


We design transport network


We considered the general methods of data management in the previous parts of the cycle - events can be divided between nodes, duplicated, choose the “direction of movement” using headers, etc. Let us now try to use all these techniques in order to build a reliable transport network. Suppose the task is as follows.

  1. The data provider is a service running on several machines (it has several identical instances).

  2. The data generated by the service is heterogeneous - some of them need to be delivered in HDFS, some - in the file system to a certain log server.

  3. It is necessary to conduct in real time some non-atomic calculations associated with the data.

On condition 3 I’ll dwell more Suppose that the task is to count the unique users of the site for the last hour. In this case, we cannot afford to parallelize the data stream from the machines or calculate this value separately on each web service - it is necessary to count the unique users by their cookies on a single data stream from all machines. Otherwise, each instance will have its own set of unique users that cannot be “picked up and folded” to get the final result.

Note Of course, the example is a bit far-fetched - this problem can be solved in other, more efficient ways. The essence of the example is reduced to the situation when you need to pass some data flow centrally through one handler and it is impossible to divide the load due to the nature of the task.

So, for the beginning let's prepare the client and end nodes:

For each of the web services, we put our own individual node on the same machine as the web service. This is done for the following reasons:


The question remains - how to deliver the data in such a way that nothing is lost if something breaks? We have already taken a number of measures - data for HDFS and FS are written to several machines. In this case, the data is not duplicated, but divided. Thus, if one of the end machines fails, the entire load will go to the remainder. The result of such a breakdown will be dysbalance on the recorded amount of data on different machines, but it is quite possible to live with this.

To ensure greater stability, add a few intermediate Flume nodes that will deal directly with data distribution:


It turned out pretty weird web. What's going on here:

  1. The web service sends events to the client node.

  2. Each event has a header that indicates “destination” (for example, dist = FS or dist = HDFS ), as well as a uniq header with possible values ​​of 1 or 0.

  3. Each client node has 3 streams that equally empty the channel and evenly distribute the events between the three intermediate nodes - Splitters (so far without looking at the dist header).

  4. Each Splitter has several channels - for HDFS, FS, and a unique user counter. The required channel is selected by the dist and uniq headers.

  5. Each of these channels on Splitter has several sinks that evenly distribute the events between the end machines (FS, HDFS or UNQ).

If everything is relatively simple with client nodes - they just divide the events between Splitters, then the structure of a single Splitter should be considered in more detail.


Here you can see that the end point for the data is determined using the dist header. At the same time, the events for which unique users are counted do not depend on the dist header - they are oriented on the uniq header. This means that some events can be duplicated in several channels, for example HDFS and UNQ.

Earlier, I did not specifically indicate the direction from the Splitters to the UNQ nodes. The point is that these nodes do not accept distributed data, like HDFS or FS. Given the specifics of the task of counting unique users, the entire data stream must pass through only one machine. A logical question - why do we need then 2 nodes to count unique users? The answer is because if one node fails, there will be no one to replace it. How can we be here - we cannot divide events between nodes, we cannot leave one alone too?

Here, another Flume tool can help us, allowing the drains to work in a group on the principle “If runoff 1 broke, use runoff 2”. This component is called Failover Sink Processor . Its configuration is as follows:

 #        agent.sinks.sink-unq-1.type = avro agent.sinks.sink-unq-1.batch-size = 5000 agent.sinks.sink-unq-1.channel = memchannel agent.sinks.sink-unq-1.hostname = unq-counter-1.my-company.com agent.sinks.sink-unq-1.port = 50001 agent.sinks.sink-unq-2.type = avro agent.sinks.sink-unq-2.batch-size = 5000 agent.sinks.sink-unq-2.channel = memchannel agent.sinks.sink-unq-2.hostname = unq-counter-2.my-company.com agent.sinks.sink-unq-2.port = 50001 #     agent.sinkgroups = failover-group agent.sinkgroups.failover-group.sinks = sink-unq-1 sink-unq-2 #     failover agent.sinkgroups.failover-group.processor.type = failover #    -        agent.sinkgroups.failover-group.processor.priority.sink-unq-1 = 10 agent.sinkgroups.failover-group.processor.priority.sink-unq-2 = 1 #    -      () agent.sinkgroups.failover-group.processor.maxpenalty = 10000 

The above setting group of threads allows you to use only one sink, but at the same time have a “spare” in case of an accident. Those. as long as the high priority drain is working properly, low priority drains will be idle.

Thus, the task is completed - the data is distributed between HDFS and FS, the counters of unique users work correctly. In this case, the failure of any machine will not lead to data loss:

  1. If a machine with a Web service breaks, this problem is solved by a balancer.

  2. If one of the Splitters fails, the load will be distributed among the rest.

  3. End nodes are also duplicated, the failure of one of them will not lead to stagnation or loss of data.

  4. The node for counting unique users has a “doubler” and in the event of a breakdown it will be replaced by it without disturbing the logic of data processing.

For such a scheme, scaling tasks are reduced to a simple change in the configuration of Flume nodes for the appropriate level of nodes (Client, Splitter, or Final):

  1. New Web service instance - no configuration changes are required, it is simply installed along with the Flume client node.

  2. New Splitter - you only need to change the configuration of the client nodes by adding a new stock.

  3. New end node - you only need to change the configuration of Splitter'a, adding a new stock.

Conclusion


This concludes the series of articles about Apache Flume. We covered all the most popular of its components, figured out how to manage the flow of data and looked at an example of a complete transport network. Nevertheless, the possibilities of Flume are not exhausted by all of this - in the standard package there are still quite a few components not considered by us that can make life much easier when solving certain tasks. We hope that this series of articles has helped you to get acquainted with Flume and to get a fairly complete picture of it.

Thanks for attention!

Source: https://habr.com/ru/post/305932/


All Articles