📜 ⬆️ ⬇️

The book "Kafka Streams in action. Real-time applications and microservices

image Hi, Habrozhiteli! This book is suitable for any developer who wants to understand streaming processing. Understanding distributed programming will help you learn more about Kafka and Kafka Streams. It would be nice to know the Kafka framework itself, but this is not necessary: ​​I will tell you everything you need. Experienced Kafka developers, as well as beginners, thanks to this book will master the creation of interesting applications for stream processing using the Kafka Streams library. Java developers of medium and high level, already accustomed to such concepts as serialization, will learn to use their skills to create Kafka Streams applications. The source code of the book is written in Java 8 and essentially uses the syntax of lambda expressions Java 8, so the ability to work with lambda functions (even in another programming language) is useful to you.

Excerpt 5.3. Aggregation and window operations


In this section, we proceed to explore the most promising parts of Kafka Streams. So far we have considered the following aspects of Kafka Streams:


In the following examples, we will put all these elements together. In addition, you will learn about window operations - another great streaming application feature. Our first example will be simple aggregation.

5.3.1. Aggregation of stock sales by industry


Aggregation and grouping are vital tools when working with streaming data. Examining individual records as they arrive is often not enough. To extract additional information from the data, grouping and combining them are necessary.
')
In this example, you have to try on a suit for an intraday trader who needs to track the sales of company stocks in several industries. In particular, you are interested in five companies with the largest sales of shares in each of the industries.

For such an aggregation, several next steps will be required to convert the data into the desired form (if to speak in general terms).

  1. Create a source based on a topic that publishes raw stock trading information. We will have to map an object of type StockTransaction to an object of type ShareVolume. The fact is that the StockTransaction object contains sales metadata, and we only need data on the number of shares sold.
  2. Group ShareVolume data by share symbols. After grouping by symbols, you can collapse these data to intermediate amounts of sales of shares. It is worth noting that the KStream.groupBy method returns an instance of type KGroupedStream. And you can get an instance of KTable by calling the KGroupedStream.reduce method.

What is the KGroupedStream interface?

The KStream.groupBy and KStream.groupByKey methods return an instance of KGroupedStream. KGroupedStream is an intermediate representation of the stream of events after grouping by keys. He is not intended to work directly with him. Instead, KGroupedStream is used for aggregation operations, the result of which is always KTable. And since the result of aggregation operations is KTable and state storage is used in them, it is possible that not all updates as a result are sent further down the pipeline.

The KTable.groupBy method returns a similar KGroupedTable - an intermediate representation of the flow of updates, regrouped by key.

Let's take a short break and look at fig. 5.9, which shows what we have achieved. This topology should be familiar to you already.

image

Now let's take a look at the code for this topology (it can be found in the src / main / java / bbejeck / chapter_5 / AggregationsAndReducingExample.java file) (Listing 5.2).

image

The code above is short and has a large volume of actions produced in several lines. In the first parameter of the builder.stream method, you may notice something new for yourself: the value of the enumerated type AutoOffsetReset.EARLIEST (there is also LATEST) specified by the method Consumed.withOffsetResetPolicy. With this enumerated type, you can specify a strategy for resetting offsets for each of KStream or KTable, it has priority over the parameter for resetting offsets from the configuration.

GroupByKey and GroupBy

The KStream interface has two methods for grouping records: GroupByKey and GroupBy. Both return a KGroupedTable, so you may have a logical question: what is the difference between them and when to use which one?

The GroupByKey method is used when the keys in KStream are already non-empty. And most importantly, the flag “requires re-partitioning” has never been set.

The GroupBy method assumes that you have changed the keys for grouping, so the re-partition flag is set to true. Performing joins, aggregations, etc. after the GroupBy method will result in automatic re-partitioning.
Summary: should at the slightest opportunity to use GroupByKey, and not GroupBy.

What the mapValues ​​and groupBy methods do is understandable, so let's take a look at the sum () method (you can find it in the src / main / java / bbejeck / model / ShareVolume.java file) (Listing 5.3).

image

The ShareVolume.sum method returns the subtotal amount of stock sales, and the result of the entire chain of calculations is the KTable <String, ShareVolume> object. Now you understand what role KTable plays. When the ShareVolume objects arrive, the latest update is saved in the corresponding KTable object. It is important to remember that all updates are reflected in the previous shareVolumeKTable, but not all are sent further.

Then, using this KTable, we perform aggregation (by the number of shares sold) in order to get the five companies with the largest sales of shares in each of the industries. Our actions in this case will be similar to the actions for the first aggregation.

  1. Perform another groupBy operation to group individual ShareVolume objects by industry.
  2. Start summing up ShareVolume objects. This time, the aggregation object is a priority queue of a fixed size. In such a fixed-size queue, only five companies with the largest number of shares sold are retained.
  3. Display queues from the previous item to a string value and return the five most sold by the number of shares by industry.
  4. Write the results in string form to the topic.

In fig. 5.10 shows the graph of the topology of the movement of data. As you can see, the second round of processing is quite simple.

image

Now, having clearly understood the structure of this second round of processing, you can refer to its source code (you will find it in the file src / main / java / bbejeck / chapter_5 / AggregationsAndReducingExample.java) (Listing 5.4).

There is a fixedQueue variable in this initializer. This is a custom object adapter for java.util.TreeSet, which is used to track the N greatest results in descending order of the number of shares sold.

image

You have already encountered the groupBy and mapValues ​​calls, so we will not dwell on them (we call the KTable.toStream method, since the KTable.print method is considered obsolete). But you have not yet seen the KTable version of the aggregate () method, so we will spend some time discussing it.

As you remember, KTable is distinguished by the fact that entries with the same key are considered updates. KTable replaces the old entry with a new one. Aggregation occurs in the same way: the last records with one key are aggregated. When a record arrives, it is added to an instance of the FixedSizePriorityQueue class using an adder (second parameter in the aggregate method call), but if another record already exists with the same key, then the old record is deleted using a subtractor (the third parameter in the aggregate method call).

This all means that our aggregator, FixedSizePriorityQueue, does not aggregate all the values ​​with one key at all, but stores a sliding amount of the quantities N of the most traded types of shares. Each incoming record contains the total number of shares sold so far. KTable will give you information on which companies are selling the most shares at the moment, sliding aggregation of each update is not required.

We learned to do two important things:


The ability to perform these operations is important for understanding the meaning of the data moving through the Kafka Streams application and finding out what information they carry.

We also put together some of the key concepts discussed earlier in this book. In Chapter 4, we talked about how resilient, local state is important for a streaming application. The first example in this chapter showed why the local state is so important - it gives you the opportunity to keep track of what information you have already seen. Local access avoids network latency, which makes the application more efficient and error tolerant.

When performing any operation of convolution or aggregation, you must specify the name of the state storage. Convolution and aggregation operations return an instance of KTable, and KTable uses state storage to replace old results with new ones. As you have seen, far from all updates are sent further down the pipeline, and this is important because the aggregation operations are intended to receive the final information. If you do not apply a local state, KTable will send further all the results of aggregation and convolution.

Next, we look at performing operations such as aggregation, within a specific period of time - the so-called windowing operations.

5.3.2. Window operations


In the previous section, we met with "rolling" convolution and aggregation. The application continuously contracted the volume of sales of shares, followed by aggregation of the five best-selling shares on the exchange.

Sometimes such continuous aggregation and convolution of results is necessary. And sometimes you need to perform operations only on a specified period of time. For example, calculate how many stock exchange transactions were made with shares of a particular company in the last 10 minutes. Or how many users clicked on a new ad banner in the last 15 minutes. An application can perform such operations multiple times, but with results related only to specified periods of time (time windows).

Calculation of exchange transactions for the buyer


In the following example, we will be tracking traded transactions for several traders - either large organizations or clever single financiers.

There are two possible reasons for such tracking. One of them is the need to know what the market leaders are buying / selling. If these large players and sophisticated investors see opportunities opening up for themselves, it makes sense to follow their strategy. The second reason is the desire to notice any possible signs of illegal transactions using inside information. To do this, you will need to analyze the correlation of large sales surges with important press releases.

This tracking consists of steps such as:


The topology of this application is simple, but its graphic image does not hurt. Take a look at fig. 5.11.

Next we look at the functionality of window operations and the corresponding code.

image

Types of windows


There are three types of windows in Kafka Streams:


Which one to choose depends on the business requirements. “Tumbling” and “jumping” windows are limited in time, while session restrictions are associated with user actions — the duration of the session (s) is determined solely by how actively the user behaves. The main thing is not to forget that all types of windows are based on the date / time stamps of the records, and not on the system time.

Next, we implement our topology with each of the types of windows. The full code will be shown only in the first example, for other types of windows, nothing will change, except for the type of window operation.

Session windows


Session windows are very different from all other types of windows. They are limited not so much by time as by the user's activity (or the activity of the entity that you would like to monitor). Session windows are delimited by periods of inactivity.

Figure 5.12 illustrates the concept of session windows. The smaller session will merge with the session to the left of it. And the session on the right will be separate because it follows a long period of inactivity. Session windows are based on user actions, but apply date / time stamps from the records to determine which session the record belongs to.

image


Using session windows to track exchange transactions


We use session windows to capture information about exchange transactions. The implementation of the session windows is shown in Listing 5.5 (which can be found in the src / main / java / bbejeck / chapter_5 / CountingWindowingAndKTableJoinExample.java file).

image

Most of the operations of this topology you have already met, so there is no need to consider them here again. But there are some new elements here that we will discuss now.

For any groupBy operation, an aggregation operation is usually performed (aggregation, convolution, or counting). You can perform either cumulative aggregation with a cumulative total, or window aggregation, which takes into account the records within the specified time window.

The code in Listing 5.5 counts the number of transactions within session windows. In fig. 5.13 these actions are analyzed step by step.

Using the call windowedBy (SessionWindows.with (twentySeconds) .until (fifteenMinutes)), we create a session window with a sleep interval of 20 seconds and a save interval of 15 minutes. An idle interval of 20 seconds means that the application will include any record that arrives within 20 seconds from the end or start of the current session to the current (active) session.

image

Next, we indicate which aggregation operation to perform in the session window - in this case, count. If the incoming entry goes beyond the inactivity interval (from either side of the date / time stamp), then the application creates a new session. The save interval means maintaining the session for a specific time and allows late data that goes beyond the session inactivity but can still be attached. In addition, the beginning and end of a new session resulting from the merge correspond to the earliest and latest date / time stamp.

Consider several records from the count method to see how sessions work (Table 5.1).

image

Upon receipt of records, we look for already existing sessions with the same key, the end time is less than the current date / time stamp - the inactivity interval and the start time is longer than the current date / time stamp + the inactivity interval. Given this, four entries from Table. 5.1 merge into a single session as follows.

1. The first entry is record 1, so that the start time is equal to the end time and equal to 00:00:00.

2. Next comes record 2, and we are looking for sessions that end no later than 23:59:55 and begin no later than 00:00:35. Find record 1 and merge sessions 1 and 2. Take the start time of session 1 (earlier) and the end time of session 2 (later), so our new session starts at 00:00:00 and ends at 00:00:15.

3. Record 3 arrives, we look for sessions between 00:00:30 and 00:01:10 and find none. Add a second session for key 123-345-654, FFBE, starting and ending at 00:00:50.

4. Record 4 arrives, and we look for sessions between 11:59:45 PM and 12:00:25 AM. This time there are both sessions - 1 and 2. All three sessions are combined into one, with a start time of 00:00:00 and a stop time of 00:00:15.

The following important points should be remembered from this section:


Next, we discuss the following type of windows - “tumbling” windows.

“Tumbling” windows


"Tumbling" windows capture events that fall within a certain period of time. Imagine that you need to capture all the stock transactions of a company every 20 seconds, so that you collect all the events during this period of time. At the end of the 20-second interval, the window “tumbles” and moves to a new 20-second observation interval. Figure 5.14 illustrates this situation.

image

As you can see, all events received in the last 20 seconds are included in the window. At the end of this period, a new window is created.

Listing 5.6 shows a code that demonstrates the use of “tumbling” windows to capture exchange transactions every 20 seconds (you can find it in the src / main / java / bbejeck / chapter_5 / CountingWindowingAndKtableJoinExample.java file).

image

Thanks to this small change to the call to the TimeWindows.of method, you can use a “tumbling” window. In this example, there is no call to the until () method, as a result of which the default save interval of 24 hours will be used.

Finally, it is time to move on to the last of the window options, hopping windows.

Sliding ("jumping") windows


Sliding / hopping windows are similar to “tumbling” windows, but with a slight difference. Sliding windows do not wait for the end of the time interval before creating a new window to handle recent events. They start new calculations after a waiting interval shorter than the window duration.

To illustrate the differences between "tumbling" and "jumping" windows, let us return to the example of counting exchange transactions. Our goal is still to count the number of transactions, but we would not want to wait the entire time interval before updating the counter. Instead, we will update the counter at shorter intervals. For example, we will continue to count the number of transactions every 20 seconds, but update the counter every 5 seconds, as shown in Fig. 5.15. In this case, we have three result windows with overlapping data.

image

Listing 5.7 shows the code for specifying sliding windows (you can find it in the src / main / java / bbejeck / chapter_5 / CountingWindowingAndKtableJoinExample.java file).

image

A tumbling window can be converted to a jumping one by adding a call to the advanceBy () method. In the example, the save interval is 15 minutes.

You saw in this section how to limit the aggregation results to time windows. In particular, I would like you to remember the following three things from this section:


Next, we will learn how to convert KTable back to KStream for connection.

5.3.3. Connecting KStream and KTable Objects


In Chapter 4, we discussed connecting two KStream objects. Now we have to learn how to connect KTable and KStream. It may be necessary for the following simple reason. KStream is a stream of records, and KTable is a stream of updates to records, but sometimes it may be necessary to add additional context to the stream of records using updates from KTable.

Take data on the number of stock exchange transactions and combine them with stock news on relevant industries. Here is what you need to do, to achieve this, taking into account the existing code.

  1. Convert a KTable object with data on the number of exchange transactions in KStream, followed by replacing the key with a key indicating the industry corresponding to this stock symbol.
  2. Create a KTable object that reads data from a stock news topic. This new KTable will be categorized by industry.
  3. Connect news updates with information on the number of exchange transactions by industry.

Now let's see how to implement this action plan.

Convert KTable to KStream


To convert KTable to KStream, do the following.

  1. Call the KTable.toStream () method.
  2. Using a call to the KStream.map method, replace the key with the name of the industry, then extract the TransedSummary object from the Windowed instance.

We will chain these operations as follows (the code can be found in the file src / main / java / bbejeck / chapter_5 / CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

image

Since we are performing a KStream.map operation, re-partitioning for the returned KStream instance is performed automatically when it is used in a connection.

We have completed the conversion process, then we need to create a KTable object for reading stock exchange news.

Create KTable for stock news


Fortunately, a single line of code is enough to create a KTable object (this code can be found in the src / main / java / bbejeck / chapter_5 / CountingWindowingAndKtableJoinExample.java file) (see listing 5.9).

image

It should be noted that no Serde objects are required to be specified, since the string Serde is used in the settings. Also, due to the use of the EARLIEST enumeration, the table is filled with records at the very beginning.

Now we can proceed to the final step - the connection.

Link news updates with transaction count data


Creating a connection is easy. We will use the left connection in case there are no stock news in the relevant industry (the required code can be found in the file src / main / java / bbejeck / chapter_5 / CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

image

This leftJoin operator is quite simple. Unlike the connections from Chapter 4, the JoinWindow method is not used, since when performing a KStream-KTable connection, only one entry is present for each key in the KTable. Such a connection is not limited in time: the record is either in the KTable, or absent. The main conclusion: using KTable objects, KStream can be enriched with less frequently updated reference data.

And now we will consider a more efficient way of enriching events from KStream.

5.3.4. GlobalKTable objects


As you understood, there is a need to enrich the flow of events or add context to them. In Chapter 4, you saw the connections of two KStream objects, and in the previous section, the connection between KStream and KTable. In all these cases, it is necessary to re-partition the data stream when mapping keys to a new type or value. Sometimes re-partitioning is done explicitly, and sometimes Kafka Streams does it automatically. Re-partitioning is necessary because the keys have changed and the entries must be in new sections, otherwise the connection will not be possible (this was discussed in Chapter 4, in the “Re-partitioning of data” section 4.2.4).

Re-partitioning has its price


Re-partitioning requires costs - additional resource costs for creating intermediate topics, storing duplicate data in another topic; it also means an increase in latency due to writing and reading from this topic. In addition, if you need to make a connection in more than one aspect or dimension, you need to make connections in a chain, display entries with new keys, and again carry out the re-partitioning process.

Connecting to smaller data sets


In some cases, the amount of reference data with which the connection is planned is relatively small, so that complete copies of them can easily fit locally on each of the nodes. For such situations, Kafka Streams provides the GlobalKTable class.

GlobalKTable instances are unique because the application replicates all the data to each of the nodes. And since each node contains all the data, there is no need to partition the flow of events according to the reference data key so that it is accessible to all sections. You can also make keyless connections using the GlobalKTable objects. Let's return to one of the previous examples to demonstrate this feature.

Connecting KStream objects with GlobalKTable objects


In subsection 5.3.2, we performed window aggregation of exchange transactions by customers. The results of this aggregation looked like this:

{customerId='074-09-3705', stockTicker='GUTM'}, 17 {customerId='037-34-5184', stockTicker='CORK'}, 16 

Although these results corresponded to the goal, it would be more convenient if the client’s name and the full name of the company were also displayed. To add the buyer’s name and company name, you can make regular connections, but you will need to make two key mappings and re-partitioning. With GlobalKTable you can avoid the cost of such operations.

To do this, we will use the countStream object from Listing 5.11 (the corresponding code can be found in the src / main / java / bbejeck / chapter_5 / GlobalKTableExample.java file), connecting it with two GlobalKTable objects.

image

We have already discussed this before, so I will not repeat. But I note that the code in the toStream (). Map function is abstracted into the object-function instead of the embedded lambda expression for readability.

The next step is to declare two instances of the GlobalKTable (the code given can be found in the file src / main / java / bbejeck / chapter_5 / GlobalKTableExample.java) (Listing 5.12).

image


Note that topic names are described using enumerated types.

Now that we have prepared all the components, it remains to write the code for the connection (which can be found in the src / main / java / bbejeck / chapter_5 / GlobalKTableExample.java file) (Listing 5.13).

image

Although there are two connections in this code, they are organized as a chain, since none of their results is used separately. Results are displayed at the end of the entire operation.

When you run the above connection operation, you will get the following results:

 {customer='Barney, Smith' company="Exxon", transactions= 17} 

The essence has not changed, but these results look more clear.

If you read Chapter 4, you've already seen several types of connections in action. They are listed in Table. 5.2. This table reflects the connectivity options that are relevant for version 1.0.0 of Kafka Streams; In future releases, maybe something will change.

image

In conclusion, I will remind you of the main point: you can connect event streams (KStream) and update streams (KTable) using a local state. In addition, if the size of the reference data is not too large, you can use the GlobalKTable object. The GlobalKTable replicates all sections to each of the Kafka Streams application nodes, thereby ensuring the availability of all data regardless of which section the key corresponds to.

Next, we will see the possibility of Kafka Streams, thanks to which we can observe state changes without consuming data from the Kafka topic.

5.3.5. Available for requests status


We have already performed several operations involving the state and always output the results to the console (for development purposes) or write them to the topic (for the purposes of industrial operation). When recording results in a topic, you have to use Kafka consumer to view them.

Reading data from these topics can be considered a type of materialized views. For our tasks, you can use the definition of a materialized view from Wikipedia: “... is a physical database object containing the results of the query. For example, it may be a local copy of deleted data, or a subset of rows and / or columns of a table or join results, or a pivot table obtained by aggregation ”(https://en.wikipedia.org/wiki/Materialized_view).

Kafka Streams also allows you to perform interactive queries to state stores, which allows you to directly read these materialized views. It is important to note that the request to the state store is in the nature of a “read only” operation. Because of this, you can not be afraid to accidentally make the state inconsistent during data processing by the application.

The ability to directly query the state repositories is important. It means that you can create applications - information panels without having to first receive data from the consumer Kafka. It increases the efficiency of the application, due to the fact that it is not necessary to record data again:


The main thing that I would like you to remember: you can directly perform requests to the state of the application. You can not overestimate the possibilities that this gives you. Instead of consuming data from Kafka and storing records in the database for the application, you can query the state repositories with the same result. Direct requests to the status repository mean less code (no customer) and less software (no need for a database table to store the results).

We have covered a considerable amount of information in this chapter; therefore, for the time being, we will stop our discussion of interactive requests to the state repositories. But do not worry: in Chapter 9 we will create a simple application - an information panel with interactive queries. To demonstrate the interactive queries and the possibilities of adding them to the Kafka Streams applications, it will use some of the examples from this and previous chapters.

Summary



So far, we have focused on building Kafka Streams applications using high-level DSL KStream. Although the high-level approach allows you to create neat and concise programs, its use represents a certain compromise. Working with DSL KStream means improving the brevity of the code by reducing the degree of control. In the next chapter, we will look at the low-level handler node API and try other trade-offs. Programs will be longer than they have been until now, but we will be able to create almost any handler node that we may need.

→ For more information on the book can be found on the publisher

→ For Habrozhiteley 25% discount coupon - Kafka Streams

→ Upon payment of the paper version of the book, an e-book is sent to the e-mail.

Source: https://habr.com/ru/post/457756/


All Articles