📜 ⬆️ ⬇️

Work with real-time logging with Heka. Experience Yandex. Money

image alt text


In this article I will talk about how a system for collecting and delivering server logs of payment services is organized in Yandex.Money. I have been shipping logs for the entire last year, and during this time I have accumulated enough nuances to share my experience with the public.


The system is based on the EHK stack (Elasticsearch / Heka / Kibana), with an eye on working in near real time. I will place particular emphasis on the thin spots and nuances of processing billions of lines of text per day.


I believe in three things: monitoring, logs and backups.

For quick access to the operational logs in Yandex.Money, ElasticSearch is used, receiving data from almost hundreds of microservices that make up the payment service.


By “operational”, I mean logs that are available in real time: the delay in delivery of new records from the log file to the Elasticsearch cluster is now less than a second. The monitoring service always knows the exact status of the service at any time, and developers can quickly evaluate and correct the behavior of the new version of each microservice immediately after release.


But it was not always so. When I came to the company five months ago, I was assigned the task of adjusting the work of delivering operational service logs to the ElasticSearch cluster (hereinafter referred to as ES). At that time, four schemes for their analysis and delivery to ES were used:



Almost all of them worked imperfectly: the Kafka cluster was unreliable, Flume periodically hung, causing ES to fall into a stupor.


Logs are actually a very simple thing: a lot of files that are quickly growing on the combat servers in volumes. Therefore, simple solutions in this case are the most reliable.

Since “the more joints in the umbrella are, the higher the probability of its failure”, I threw away all the non-working and unnecessarily complex schemes and stopped on one in which Heka would perform the role of a processor and transport.


image alt text


Heka picks up logs and sends them to another Heka via TCP for further forwarding to ElasticSearch Cluster .


The idea is the following: Heka is installed on each server, the logs of all services of this server are packed into the Protobuf binary protocol and sent to the Heka receiving server for parsing, filtering and sending to ES.


Why not the classic ELK stack and why deprecated Heka, not Logstash


In terms of configuration and software, our ES cluster looks like this:



All ES nodes are in the same local network and are connected to the same router, which allows communication within the cluster using the transport protocol at maximum speed. Such an organization significantly speeds up the placement of indices and the rebalance of the cluster.


In the modern world, the Elasticsearch / Logstash / Kibana stack has become practically the de facto standard for working with logs. And if there are no objections against Elasticsearch and Kibana, then there is one nuance with Logstash - it is created on jRuby (written in Java by the Ruby language interpreter) and requires JVM. Given that Yandex.Money is a multitude of microservices hosted on hundreds of physical servers and virtual containers, it would be wrong to put heavy Logstash and JAVA on each of them. Heka chose the choice because of its simplicity, reliability, ease, ability to filter passing messages and excellent data buffering.


As for the status of the product (deprecated) - for us it is not an argument. Bow and arrows for military purposes are also deprecated, but this does not prevent you from shooting someone in the head with a guaranteed result. If you need, for example, a non-standard plug-in or a processor, then a whole staff of experienced developers will help to refine the product.


But all this was a theory, and in the transition to practice problems began.


The devil was hiding in the amount of data


Given the financial specifics of our work, almost all services write a lot of different information in the logs. For example and understanding of scale: the volume of logs of some of the components of the system reaches 250 thousand lines per minute .


Not a single Heka, on whatever powerful hardware it may be, alone such a volume will not process - performance sagging and data loss are inevitable. Neither, of course, is totally unacceptable, so HAProxy comes to the rescue. The final scheme was as follows:


image alt text


The diagram shows the general direction of traffic logs from Heka-sources to a bunch of HAProxy + Heka.


On each server there is one Heka, collecting logs of microservices of this server. Data is collected, packaged in Protobuf and sent via TCP to a load balancer serving the data center. The backends are HAProxy, located directly on the ES data nodes, behind which are Heka pools. In turn, they receive data, repack it in ESJson and send it to a local data node via HTTP.


... and in different log file formats


Despite the fact that the main language in the company is Java and the logs are output via the standard log4j library, there was no single accepted format at the time of building the “dream cluster”. Each microservice wrote logs of its own type, including variations in the set of output fields and date-time formats. I did not want to wait for the development to bring the logs to the common formats, so the movement towards the goal was parallel. Simultaneously with the analysis of the existing log formats, tasks were set up for revision, and as new versions were released with the correct formats, the settings of the collectors changed.


Let's go to the very juice - the nuances of setting


The behavior of Heka is described by .toml files that are assembled into a single configuration at startup, and depending on the blocks described, Heka builds a data processing model.


Each unit goes through several stages:



All these stages are just plugins on Go and Lua . If necessary, you can write something of your own. For example, a plugin filter on Lua, which will cut off sending monitoring service requests to the ES; or cut confidential data from logs.


Heck in ancient Egyptian mythology - the god of magic. And what Heka allows you to do with logs is simply magical.

Log Source Server Settings


Let us analyze the Heka configuration using the example of the source log server and the service.toml file.


[money-service-log] type = "LogstreamerInput" log_directory = "/var/log/tomcat" file_match = "money-service.log" splitter = "RegexSplitter" decoder = "service_decoder" 

The simplest case is a single file, rotation takes place by system means. If there are many files and they differ in formats, then each pair of input / decoder should be described. For a more detailed description it is better to refer to the official documentation . If something remains unclear - be sure to ask in the comments.


 [RegexSplitter] delimiter = '\n(\[\d\d\d\d-\d\d-\d\d)' delimiter_eol = false 

Since the logs can be multi-line (the same stacktraces), do not forget about RegexSplitter, which makes Heka understand where one block of text ends and another begins.


 [service_decoder] type = "PayloadRegexDecoder" match_regex = '^\[(?P<timestamp>\d{4}-\d{2}-\d{2}T[\d:.\+]+\])\s+(?P<level>[AZ]+)\s+\[(?P<thread>.*)\]\s+\[(?P<context>\S*)\]\s+\[(?P<traceid>\S*)\]\s+\[(?P<unilabel>\S*)\]\s\[(?P<class>\S+)\]\s-\s(?P<msg>.*)' log_errors = true [service_decoder.message_fields] @timestamp = "%timestamp%" level = "%level%" thread = "%thread%" context = "%context%" traceid = "%traceid%" unilabel = "%unilabel%" class = "%class%" msg = "%msg%" 

In match_regex, we describe log lines with a regular expression in the Go standard. Regular expressions in Go are almost identical to the standard PCRE, but there are a number of nuances because of which Heka may refuse to start. For example, some PCRE implementations will forgive this syntax:


 (?<groupname>.*) 

But GOLANG - not forgive.


Using the log_errors parameter, we collect all errors in a separate log - they will be needed later.


 [ServiceOutput] type = "TcpOutput" address = "loadbalancer.server.address:port" keep_alive = true message_matcher = "Logger == 'money-appname'" use_buffering = true [ServiceOutput.buffering] max_file_size = 100857600 max_buffer_size = 1073741824 full_action = "block" 

Downstream buffering is one of the great Heka features. By default, it stores the output buffer in the following path:


 /var/cache/hekad/output_queue/OutputName 

In the settings, we limit the size of each buffer file with a capacity of 100 MB, and also set the total cache size for each Output-module to 1 GB. The full_action parameter can take three values:



With block, you are guaranteed not to lose a single line of log. The only disadvantage is that if the connection is broken or the channel is degraded, you will get a sharp jump in traffic when the connection is resumed. This is due to the fact that Heka sends the accumulated buffer, trying to return to processing in real time. The receiving pool needs to be designed with a margin, necessarily considering the possibility of such situations, otherwise you can easily turn the DDoS yourself.


By the way, about the use of double and single quotes in the Heka configuration - the following is implied:



This nuance at one time spoiled a lot of blood for me.


Backend configuration


The backend for the balancer is a bundle of HAProxy and three copies of Heka on each ES data node.


In HAProxy, everything is quite simple and it seems to me that it does not require explanations:


 listen pool_502 bind 0.0.0.0:502 balance roundrobin default-server fall 5 inter 5000 weight 10 server heka_1502 127.0.0.1:1502 check server heka_2502 127.0.0.1:2502 check server heka_3502 127.0.0.1:3502 check 

There are three Heka instances running on each server, differing only in ports:


 [Input_502_1] type = "TcpInput" address = "0.0.0.0:1502" keep_alive = true keep_alive_period = 180 decoder = "MultiDecoder_502" [MultiDecoder_502] type = "MultiDecoder" subs = ['Service1Decoder', 'Service2Decoder'] cascade_strategy = "first-wins" log_sub_errors = true 

In the configuration, MultiDecoder is used, since many services logs pass through one port. The first-wins policy means that after the first match, the further search of decoders stops.


 [Service1Decoder] type = "ProtobufDecoder" [Service2Decoder] type = "ProtobufDecoder" [Service1Encoder] type = "ESJsonEncoder" index = "service1-logs-%{%Y.%m.%d}" es_index_from_timestamp = false type_name = "%{Logger}" fields = [ "DynamicFields", "Payload", "Hostname" ] dynamic_fields = ["@timestamp", "level", "thread", "context", "traceid", "unilabel", "class", "msg"] [Service1Encoder.field_mappings] Payload = "message" Hostname = "MessageSourceAddress" 

The es_index_from_timestamp parameter is needed to indicate that the date and time for generating the index name is taken not from the incoming data, but from the server local time. It allows you to avoid mess when servers are working in different time zones and someone writes logs in UTC, and someone in MSK.


The index parameter implements the principle of “one service - one index”; a new index is created every day.


 [Service1Output] type = "ElasticSearchOutput" message_matcher = "Logger == 'money-service1'" server = "http://localhost:9200" encoder = "Service1Encoder" use_buffering = true 

Output plugins parse the data stream based on the message_matcher parameter corresponding to the log file name. In order not to overload the network, Heka sends data to the local data node on which it is installed. And already further ES itself sends the indexed data on the transport protocol between the cluster data-nodes.


Conclusion


The scheme described above works successfully and indexes 25-30 thousand records per second. The safety factor of Heka receiving pools makes it possible to withstand load peaks of up to 100 thousand records / s:


image alt text


Statistics from Zabbix .


In ElasticSearch we keep logs for the last 21 days. Experience shows that online access is rarely needed for older data. But if you need them, you can always request them from the log servers that store data almost forever.


image alt text


The current state of the cluster according to Kopf.


I described only the part of the system that relates to the collection and delivery of logs, so in the next article I am going to tell you about the ElasticSearch cluster itself and its configuration. I think to tell you how we virtualized it, how we moved from version 2.2 to 5.3 and transported 24 billion records with us, without losing faith in humanity.


Maybe add to the story something else, some overlooked details? Share your opinion in the comments.


')

Source: https://habr.com/ru/post/328018/


All Articles