
In order for us to count the statistics, our site in its work generates a huge number of
events . For example, when sending a message to another user, when a user visits the site, when the location changes, etc. Events are a string in the format JSON or GPB (Google Protocol Buffers) and contain the sending time, user ID, type of event, as well as fields related directly to the event itself (for example, user coordinates).
Every second hundreds of thousands of events are generated, and we need tools to collect and analyze them efficiently and with minimal delay. We considered several existing solutions for this task and until recently used a demon called Scribe from Facebook. He generally arranged for us and allowed us to do everything that we needed. However, at some point, Facebook abandoned its development, and under some conditions Scribe began to fall with us (for example, when upstream servers were overloaded). We didn’t succeed in eliminating the cause of the demon’s fall, so we began to look for an alternative.
Our requirements for the event delivery system were as follows:
')
- the presence of a local (proxy) daemon;
- saving events to disk in case of unavailability of the receiving server;
- the ability to route events by category;
- sharding data streams by hash (from user_id or other) and round-robin;
- recording events in the files on the receiving side (scribe-like);
- normal work in conditions of high latency of the network (event delivery between the DC);
- scalability of receiving and sending up to a million events per second;
- ease of operation, adequate resource consumption.
We considered the following options:
- Apache Flume: unstable, losing events when crashing, if you do not use Spooling Directory Source, which has a very awkward API;
- FluentD: performance too low, otherwise very good;
- Apache Kafka: no local agent (see issues.apache.org/jira/browse/KAFKA-1955 ).
Unfortunately, none of these options solve all our problems, so we decided to write our system and called it Live Streaming Daemon (LSD).
What could Scribe?
To understand what LSD does and why it is needed, let's first take a closer look at the features we used in the scribe.
The presence of a local demon
Scribe works on a client-server architecture, where clients are the machines that generate events, and the servers are the machines that receive them. To save resources and be able to buffer events to the disk in case of problems with delivery, Scribe suggest running the client instances on each machine on which events are generated. The application that generates events connects to the local client via unix or tcp socket and sends events to it via the Apache Thrift protocol. It is assumed that the local proxy will always be available and will be responsible for a short time.
In general, in most cases, this is how it works, but sometimes the local Scribe instance can start responding longer than usual or crash altogether. Therefore, we had a mechanism that saves the event to local files instead of Scribe if it is not available. We then sent these events to Scribe as a separate cron-script when it went up.
The ability to route events by category
The event category is called, in fact, the name of the directory in which a particular event will be recorded on the receiving server. It makes sense to put different types of events in different categories, since the handler for them may differ. Scribe provides the ability to send different categories to different servers and is set by the mask of the category name, for example
debug_*
.
In our configuration, all events are delivered by default to the European DC. If you need to deliver an event within the DC, we send an event that has the
local_
prefix, or if we want to deliver the event to a certain DC, we add a prefix with the name of this DC. Different routes for these categories are registered in the configuration of the daemon, and they are delivered to the right place. Upon delivery to remote data centers, intermediate nodes can be used to buffer events.
Sharding data streams
Sometimes it is convenient to deliver data that pertains to a specific user to the same server. In some cases, this can significantly improve processing performance by caching user data for a short time.
As a rule, data is distributed simply by the round-robin algorithm, that is, each subsequent piece of data is sent to the next server from the list, and so on in a circle. Scribe has a drawback when working in both modes: the daemon “remembers” the server to which a specific event should be delivered, and if one of the receiving servers is unavailable, events will accumulate on disk and not be delivered anywhere, even if other servers are available and able to receive and process the whole flow of events.
Record events in the files on the receiving side
On the receiving side (i.e., on the server side) all events are written to files of the type
<_>/<_>-<>_<>
, and also a symlink is created of the
<_>/<_>_current
to the last file in the category. Files are rotated based on elapsed time (for example, 60 seconds) or volume (for example, 10 MB), whichever happens first.
If the category is called, for example, error_log, then the hierarchy of files and directories will be as follows:
/var/scribe/error_log/ |-- error_log-2016-09-13_004742 |-- error_log-2016-09-13_004743 |-- error_log-2016-09-13_004744 `-- error_log_current -> error_log-2016-09-13_004744
Writing is always in the last file. The server does not write to the previous files, they can be read and deleted freely after the file is fully processed.
Normal operation in high latency network conditions
The Scribe client sends the data in small bursts and waits for confirmation from the remote side before sending a new batch. This works very poorly, for example, in the case of sending events across the Atlantic Ocean, where the data transmission delay is approximately 125 ms. If the maximum packet size, for example, is 0.1 MB, then in one second only 0.1 MB / 0.125 s = 0.8 MB / s can be transferred in this way. This limitation can be circumvented if you do not wait for confirmation for each packet, and send events in streaming mode.
What does LSD offer?
In general, the main claims to Scribe, we had only two:
- Instability and data loss when a demon crashes.
- If the receiving server falls, the traffic is not redistributed between the remaining servers automatically; manual intervention is required.
LSD solves these two problems and satisfies our requirements for delivering events that we talked about at the beginning.
Protection against data loss when a demon crashes.
There is no software without errors, so instead of trying to make LSD “unkillable” and always responsible for adequate time, it was decided to go another way: customers will always write events to the files, and the LSD client will read these files and deliver them to the necessary cars. This method is also convenient due to the fact that Thrift, Protocol Buffers, etc. drivers are not required, events can be sent even from a shell script.
To send an event, you need to write a line with this event at the end of a file of the type <category> / <filename> .log, where <category> is the name of the category of the event. The <filename> can be any monotonically increasing string based on the current date and time. This format was not chosen randomly and allows you to send events to other servers, delivered using LSD or Scribe. As <filename>, we recommend using the date and time in the format YYYYMMDDHHII (for example, 201609131714). When choosing such a format, files are created at most once a minute and their names monotonously increase.
If you need to send events larger than 4 Kb in size (
stackoverflow.com/questions/1154446/is-file-append-atomic-in-unix ) from several processes, then you need to take a file lock before writing the event to the file so that the lines are not mixed up. You can add the _big suffix to the file name and write large events in a separate file so as not to take a lock for small events.
A plain format of the <category> .log type is also supported, and, in this case, the creation of a subdirectory is not required. This format is useful when sending events from shell scripts and for collecting logs.
Automatically redistribute the flow of events
If one of the servers to which events are delivered falls, then they are automatically redistributed to the remaining servers. If one of the servers is slower than the others, then in this case it will simply get such a flow of events that it is able to receive.
This also means that a one-time delivery is not guaranteed, since server unavailability is determined based on timeout. It is possible that events can be successfully delivered to the server, but confirmations about this will not come, or they will come with a big delay. In this case, the LSD client will re-send a packet of events for which confirmation did not come in timeout (30 seconds by default).
Real-time event delivery
Since we chose the name Live Streaming Daemon, you need to match :). When there is enough network bandwidth and server performance on the receiving side, event delivery is carried out in real time — no artificial delays in delivery are made. This is useful if you deliver logs or create many intermediate nodes for sending events. On the other hand, real-time delivery requires more resources than if events had accumulated and been sent every few seconds (we used Scribe with these settings). Therefore, the CPU consumption of the LSD is on average somewhat higher than that of the Scribe, but the difference is not very significant.
Performance
Unfortunately, we could not measure the performance of Scribe on our event stream for the internal analytics system of UDS, as scribe clients fell under load (Alexander Krashennikov
told about UDS not so long ago).
One LSD server can easily handle 2 Gigabit / s (400k events / s) stream of events from thousands of servers. Accordingly, to receive a stream of 1 million events per second, you need only 3 servers, and each of the servers must be equipped with two gigabit network cards.
Open source
The LSD sources are on
GitHub (type go get github.com/badoo/lsd to install). The daemon works under Linux and macOS, but Linux is recommended for industrial use.
In addition to LSD, we have a large number of other projects laid out in open source, which you can see and explore
in our technical blog .
Yuri Nasretdinov, Senior Developer, Badoo