Whenever a user performs an action in a Spotify client — such as, for example, listening to a song or searching for an artist — a small amount of information, an event, is sent to our servers. Delivering events, the process of safely and securely transporting information from customers around the world to our central processing system is an interesting task. In a series of these articles, we will look at some of the solutions that have been implemented in this area. To be more precise, we will look at the architecture of our new event delivery system and tell you why we decided to deploy it in Google Cloud.
In this first article, we will explain how our current event delivery system works and describe some of the lessons we learned from working with it. In the
next one , we will consider the creation of a new system and why we chose
Cloud Pub / Sub as a transport mechanism for all events. In the third, and last, article, we will explain how we work with all events using DataFlow, and how quickly all this happens.

')
Events distributed through our delivery system have many uses. Most of our product design solutions are based on A / B test results, and they, in turn, should be based on large and accurate data.
Discover Weekly playlist, launched in 2015, quickly became one of the most used features in Spotify. It is created based on data about playing music.
Year in music , the
Spotify Party and many other features of Spotify are also based on data. In addition, Spotify data is one of the sources for compiling
Billboard tops.
Our message delivery system is a fundamental part of the Spotify data infrastructure. The key requirement for it is the delivery of all data with a predictable delay and accessibility to our developers through a well-described interface. Usage data can be described as a set of structured events, formed at some point in time as a response to some predefined actions.
Most of the events that are used in Spotify are directly generated by Spotify clients as a response to certain user actions. Every time an event occurs in the Spotify client, information about it is sent to one of the Spotify gateways, which writes it to the system log. There he is assigned a time stamp, which is used in the message delivery system. In order to guarantee a certain delay and completeness of message delivery, it was decided to use a log label (syslog timestamp) for the event, and not the client, since we do not have control over the event before it arrives on our servers.
In the case of Spotify, all data must be delivered to the central Hadoop cluster. Spotify servers, where we collect data, are located in several data centers on two continents. The bandwidth between our data centers is a scarce resource and you need to treat data transfer with special care.
The data interface is determined by the location of the data in Hadoop and the format in which it is stored. All data that is delivered by our service is recorded in
Avro format in HDFS. The delivered data is divided into sections (partitions) by 60 minutes (hour). This is a relic of the past, when the first message delivery system was based on the scp command and hourly copying syslog files from all servers to Hadoop. Since all data processing today in Spotify is tied to hourly data, this interface will remain with us in the foreseeable future.
Most data handling processes in Spotify read data from the hourly assembly only once. The output values of some processes can serve as input data for others, thus forming long chains of transformations. After the process has processed the data in an hour, it no longer performs any checks in the original hour for changes. If the data has changed, the only way to reproduce these changes further is to manually restart all the relevant tasks (and related tasks) for this particular interval (hour). This is a costly and time-consuming process, which is why we put forward such requirements for the message delivery service and after providing the watch set we can no longer supplement any data in it. This problem, known as the problem of data completeness, is opposed to the requirement of a minimum delay in data processing. An interesting point of view on the problem of data completeness is set out in Google’s
Dataflow report .
Initial Message Delivery System
System architecture
Our original message delivery system was built on top of Kafka 0.7.

In it, the event delivery system is built around the abstraction of hourly files. It is designed to stream log files that contain events, from service machines to HDFS. After all the log files are transferred to HDFS in a certain hour, they are converted from tabbed text to Avro format.
When the system was first created, one of the missing features of Kafka 0.7 was the ability of the Kafka Broker cluster to operate as a reliable permanent storage. This influenced the adoption of an important design decision - not to maintain constant states between the data producer, Kafka Syslog Producer and Hadoop. An event is considered to be reliably saved only when it is recorded in a file on HDFS.
The only problem with the reliable existence of an event inside Hadoop is that the Hadoop cluster becomes a single point of failure for the message delivery system. If Hadoop fails, the entire delivery system will stop. To cope with this, we need to make sure that we have enough disk space on all services from which we collect events. When Hadoop comes back in operation, we need to “catch up” with its state, transferring all the data as quickly as possible. Recovery time is mainly limited by the bandwidth that we can use between our data centers.
Producer is a daemon running on every host from which we want to send events to Hadoop. It monitors log files and sends log packages to Kafka Syslog Consumer. Producer knows nothing of the type of event or properties that he may have. From his point of view, an event is just a set of lines in a file and all lines are redirected to the same channel. This means that events of all types contained in one log file are also transmitted via one channel. In such a system, Kafka topics are used as channels for transmitting events. After the Producer sends logs to the Consumer, he must wait for the confirmation (ACK) that Consumer has successfully saved the log lines in HDFS. Only after the producer receives the ACK for the sent logs, he considers that they are securely stored and goes on to transfer other records.
For events to get from Producer to Consumer, you need to go through Kafka Brokers and then Kafka Groupers. Kafka Brokers is a standard Kafka component, and Kafka Groupers is a component written by us. Groupers processes all event streams from local data centers and then publishes them again compressed, effectively grouped in one topic, which is then pulled out by Consumer.
The Extract, Transform and Load (ETL) task is used to convert data from a simple tab-delimited format to the Avro format. This process is a regular Hadoop MapReduce work implemented using the
Crunch framework, which works with hourly sets. Before starting work with a certain hour, he needs to make sure that all files are completely transferred.
All Producers constantly send check marks that can contain end-of-file markers. These markers are sent only once, when Producer concluded that the entire file was securely stored on Hadoop. The status monitor (or "survivability") constantly polls our service detection systems in all data centers about which service machines were running at a specific hour. To check whether all files were finally transferred over this hour, ETL compares the information about the servers from which it should expect data, with end-of-file markers. If ETL detects discrepancy and incomplete data transfer, then it delays processing data for definitely an hour.
In order to be able to make the most of the available mappers and reducers, ETL, which is a common task of Hadoop MapReduce, needs to know how to shard the input data. Mappers and reducers are calculated based on the size of the input data. Optimal sharding is calculated based on the number of events continuously coming from Consumer.
Lessons
One of the main problems associated with this design is that local Producers have to make sure that the data in HDFS is stored in a central location before they can be considered reliably delivered. This means that the Producer server on the west coast of the United States needs to know when the data is written to disk in London. Most of the time it works just fine, but if the data transfer slows down, it will cause delays in delivery, which will then be difficult to get rid of.
Compare this with the options when the service point is in the local data center. This simplifies the construction of the Producer, since, typically, the network between the hosts in the data center is very reliable.
Abstracting from the problems, we were quite pleased with the system, which can reliably deliver more than 700,000 events per second from all over the world. The system redesign also gave us the opportunity to improve the software development process.
By sending all events together through one channel, we lost the flexibility of managing event flows with different quality of service (QoS). It also limited the work in real time, as any process operating in real time had to transmit its data through a single channel, in which the entire stream goes, and filter only the necessary one from it.
Transferring unstructured data adds unnecessary latency, as it requires additional ETL conversion. Currently, ETL work adds about 30 minutes delay to the delivery of the event. If the data were sent in Avro format, they would immediately be available when writing to HDFS.
The need for the sender to track the completion of the hour also caused problems. For example, if the machine dies, it cannot send a message about the end of the file. If the end-of-file token is lost, then we will wait forever until this process is manually interrupted. As the number of cars grows, this problem becomes more urgent.
Next steps
The number of delivered messages in Spotify is constantly increasing. As a result of increased loads, we began to experience more and more problems. Over time, the number of outages began to alarm us. We realized that neither we, nor the system, was able to handle the increased load anymore. Just in the
next article we will tell about how we decided to change our system.
The number of messages processed by our system at a specific point in time.