⬆️ ⬇️

Spotify: Google Cloud event subsystem migration (part 3)

In the first article of this series, we talked about how the old message delivery system works and some of the conclusions we made following its work. In the second, we looked at the design of the new system and why we chose Cloud Pub / Sub as the transport mechanism for all events. In this third and final article, we will explain how we intend to work with all published events using Dataflow , and what we learned about this approach.



image



Export events from Pub / Sub to (at) hourly intervals using Dataflow



Most of the tasks performed today in Spotify are batch jobs. They require that events be reliably exported to persistent storage. As such permanent storage, we traditionally use the Hadoop Distributed File System (HDFS) and Hive . To match the growth of Spotify - which can be measured both by the size of the stored data and the number of engineers - we are slowly switching from HDFS to Cloud Storage , and from Hive to BigQuery .

')

Extract, Transform and Load (ETL) tasks are the components that we use to export data from HDFS and Cloud Storage. Hive and BigQuery exports are processed by batch jobs that convert data from hourly assemblies to HDFS and Cloud Storage.



All exported data is divided, according to time stamps, into hourly packets. This is an open interface, which was introduced in our very first event delivery system. The system was based on the scp command and it copied the watch syslog files from all servers to HDFS.



ETL tasks must determine with high certainty that all data for hourly assemblies are recorded in permanent storage. When more data for the hourly assembly is not expected, it is marked as complete.



Late to the already complete assembly data can not be added to it, as running tasks usually read the data from the assembly once. To solve this problem, the ETL task must process the late data separately. All late data is recorded in the current open time assembly, shifting the time stamp of the event into the future.



To write an ETL assignment, we decided to experiment with Dataflow . This choice was due to the fact that we wanted for ourselves as little operational responsibility as possible and for others to solve big problems for us. Dataflow is both a framework for pipelining data recording and a fully managed service in Google Cloud for executing such pipelines. It can work out of the box with Cloud Pub / Sub, Cloud Storage and BigQuery.



Writing pipelines in Dataflow is a lot like writing in Apache Crunch . This is not surprising, since both projects were inspired by FlumeJava . The difference is that Dataflow offers a unified model for streaming and batch work, while Crunch has only a batch model.



image



To achieve a good end-to-end delay, we wrote our ETL as a streaming task. Due to the fact that it is constantly running, we can incrementally fill in individual hourly assemblies as data arrives. This gives us less latency compared to batch work, which exported data once at the end of each hour.



The ELT task uses the windowing (window) concept of Dataflow to separate data into hourly assemblies based on time. In Dataflow, windows can be assigned both by the time of events and by the processing time. The fact that windows can be created based on a time stamp gives Dataflow advantages over other streaming frameworks. Until now, only Apache Flink supports windowing in both time and processing.



Each window consists of one or more blocks (pane), and each block contains a set of elements. The trigger that is assigned to each window determines how blocks are created. These blocks are allocated only after the data passes through GroupByKey . Since GroupByKey groups by key and window, all aggregated elements in one block have the same key and belong to the same window.



Dataflow provides a mechanism called a “watermark” (watermark, which here has a value rather than limits or borders, rather than the same as for images or bills), which can be used to determine when to close a window. It is used by the time of the incoming data flow events to calculate the point in time when there is a high probability that all events for a particular window have already arrived.



Deep immersion in the implementation of ETL



In this section, we will look at some of the problems we encountered in the process of creating a Dataflow ETL task for event delivery. They can be a bit difficult to understand if you have not had experience with Dataflow or a similar system. A good helper in understanding (if the concept and terminology is new to you) is a publication about Google 's DataFlow .



image



In our event delivery system, we have a 1: 1 mapping between event types and tops of Cloud Pub / Sub. One ETL task works with one stream of event types. We use independent ETL tasks to process data from all types of events.



In order to evenly distribute the load among all the available workers, the data stream is divided before it goes through a transformation that assigns a window to each event. The number of shards that we use is a function of the number of workers assigned to the task.



image



The "window" is a composite transformation . At the first stage of this transformation, we assign hourly fixed windows to all events in the incoming stream. Windows are considered closed when the watermark goes beyond the hour.



@Override public PCollection<KV<String, Iterable<Gabo.EventMessage>>> apply( final PCollection<KV<String, Gabo.EventMessage>> shardedEvents) { return shardedEvents .apply("Assign Hourly Windows", Window.<KV<String, Gabo.EventMessage>>into( FixedWindows.of(ONE_HOUR)) .withAllowedLateness(ONE_DAY) .triggering( AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterPane.elementCountAtLeast(maxEventsInFile)) .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(maxEventsInFile), AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TEN_SECONDS)))) .discardingFiredPanes()) .apply("Aggregate Events", GroupByKey.create()); } 


When assigning windows, we have an early trigger that is set to select every N elements in blocks until the window closes. Thanks to the trigger, hourly packets are constantly filled as data arrives. So the configured trigger helps us not only to achieve a smaller delay in export, but also to bypass the limitations of GroupByKey. The amount of data collected in the panels must fit in memory on the workers' machines, since GroupByKey is a transformation in memory.



When the window is closed, block selection is controlled later by a trigger. This trigger creates a block from the data either after N elements or after 10 seconds of operation time. Events are discarded if they are more than one day late.



Materialization (creation of a temporary storage or table, for example) of blocks is performed in the “Event Aggregation” transformation, which is nothing more than a GroupByKey transformation.



image

Number of incoming events per second



In order to track the number of inbound events per second that go through the ETL task, we use the “Track Average RPS Of Timed or Late Events” (Monitor Average RPS Of Timely And Late Events) at the Assign Hourly Windows output. All conversion metrics are sent as custom metrics to Cloud Monitoring . Indicators are calculated on sliding five-minute windows that are transmitted every minute.



Information about the timeliness of an event can only be obtained after the event has been assigned to the window. Comparing the maximum timestamp of window elements with the current watermark gives us this information. Since watermark data is not synchronized between transformations, detecting timeliness in this way may not be accurate. The number of falsely detected late events that we are seeing now is rather low: less than one per day.



We can accurately accurately detect the timeliness of events if the monitoring transformation (or Monitor Average RPS Of Time And Late Events) is applied to the Aggregate Events output. The disadvantage of this approach would be the unpredictability of obtaining metrics, since the window is obtained on the basis of the number of elements and the time of events.



image



In the Write to HDFS / GCS conversion, we write data to either HDFS or Cloud Storage. The mechanics of recording in HDFS and Cloud Storage are the same. The only difference is which file system API is used. In our implementation, both APIs are hidden behind the IOChannelFactory interface.



To ensure that only one file is written to the block, even neglecting the possibility of failure, each block gets a unique ID. The block identifier is then used as a unique ID for all the files to be written. Files are written in Avro format with a scheme that follows the event ID scheme.



Timely blocks are written into packages (bucket) based on the time of the event. Late ones are written to the current hour packets, since the addition of closed assemblies is undesirable in working with data in Spotify. To understand whether a block is timely, we use the PaneInfo object. It is created when creating a block.



The completeness marker for the hourly assembly is written only once. For this, the main output action of the Write Pane action is re-processed (re-windowed) into the hour window and aggregated into Aggregated Write Successes.



image

The number of recorded files per second



image

Watermark Delay in milliseconds



Getting metrics is a by-product of Write Pane actions. We get data that shows how many files are recorded per second, the average delay of events and the lag of the watermark compared to the current time. All these metrics are calculated for a 5 minute window and transmitted every minute.



Since we measure the backlog of a watermark after recording to HDFS / Cloud Storage, it is directly related to the entire latency of the system. On the graph with a delay, you can see that the lag of the current character is generally less than 200 s (approximately 3.5 minutes). You can see occasional bursts of up to 1500 s (approximately 25 minutes) in the same figure. Such peaks are caused by fragmentation when writing to our Hadoop cluster via VPN. For comparison, the latency in our old system is two hours on the “best day” and three hours on average.



Next steps in the ETL task



The implementation of the ETL task is still in the prototyping stage. So far, we have four tasks running ETL (see the graph with the number of events per second). The smallest task consumes about 30 events per second, and the largest reaches peak values ​​of 100K events per second.



We have not yet found a good way to calculate the optimal number of workers for ETL tasks. Their number is still determined manually after trial and error. We use two workers for the smallest task and 42 for the greatest. It is interesting to note that the execution of tasks also depends on the memory. For one pipeline that processes about 20K events per second, we use 24 workers, while for the second one, processing events with the same speed, but with an average message size four times smaller, we only use 4. The management of conveyors can be much simpler when we implement the auto scaling function.



We must ensure that when the job is restarted (job), we will not lose any data. Now this is not the case if the job update does not work. We are actively cooperating with Dataflow engineers in finding a solution to this problem.



The behavior of the watermark is still a mystery to us. We need to check that its calculation is predictable both in the case of a failure and in the case of normal operation.



Finally, we need to define a good CI / CD model for fast and reliable ETL job updates. This is a nontrivial task - we need to manage one ETL task for each type of event, and we have more than 1000 of them.



Cloud Event Delivery System



We are actively working to launch a new production system. The preliminary numbers we received from the launch of the new system in the experimental phase are very encouraging. The worst total delay time in the new system is four times less than the total delay time of the old platform.



But increasing productivity is not the only thing we want to get from the new system. We want to significantly reduce transaction costs with cloud products. This, in turn, means that we will have much more time to improve Spotify products.

Source: https://habr.com/ru/post/323908/



All Articles