📜 ⬆️ ⬇️

Event Registration with Kafka

Hi, Habr!

We uncorked the last reserves of the book " Apache Kafka. Stream processing and data analysis " and sent it to a reprint. Moreover, we received a contract for the book " Kafka Streams in Action " and proceed to translate it literally next week.


')
To show an interesting use case for the Kafka Streams library, we decided to translate an article about the Event Sourcing paradigm in Kafka from the very Adam Worsky, whose article on the Scala language was published with us two weeks ago. All the more interesting is that Adam Worsky’s opinion is not indisputable: here , for example, it is argued that this paradigm is decisive for Kafka. All the more memorable, we hope, we get the impression of the article.

The term “Event Sourcing” is translated as “Event Registration” in both our Martin Pure Architecture edition and in this article. If someone is impressed by the translation of "pumping events" - let me know please.

Creating a system in which event sourcing is provided for, we sooner or later encounter the problem of persistence - and here we have a couple of options. First, there is an EventStore , a mature implementation, battle-hardened. Alternatively, you can use akka-persistence to take full advantage of the scalability of Cassandra , as well as rely on the performance of the actor model. Another option is a good old relational database , where the CRUD approach is combined using events, and the maximum benefit is squeezed out of transactions.

In addition to these (and, perhaps, many other) opportunities that have emerged due to several recently implemented things, today it has become very easy to organize the registration of events on top of Kafka . Let's look at how.

What is event registration?

There are a number of excellent introductory articles on this subject, so I will limit myself to a brief introduction. When registering events, we save not the “current” state of the entities used in our system, but the stream of events related to these entities. Each event is a fact describing a change of state (already!) Occurred with the object. As you know, the facts are not discussed and unchanged .

When we have a stream of such events, the actual state of the entity can be found out by winding up all the events related to it; However, please note that the opposite is impossible - keeping only the “actual” state, we discard a lot of valuable chronological information.

Event logging can peacefully coexist with more traditional state storage methods. As a rule, the system processes a number of entity types (for example: users, orders, goods, ...) and it is quite possible that event registration will be expedient only for some of these categories. It is important to note that here we are not faced with the choice of “all or nothing”; it's just about the additional ability to manage the state in our application.

Storing events in Kafka

The first problem that needs to be solved: how to store events in Kafka? There are three possible strategies:


The third strategy (on-topic-on-essence) is practically impracticable. If, when each new user appeared in the system, he would have to start a separate topic, soon the number of topics would become unlimited. Any aggregation in this case would be very difficult, for example, it would be difficult to index all users in a search engine; not only that at the same time would have to consume a huge number of topics - so also not all of them would be known in advance.

Therefore, it remains to choose between 1 and 2. Both options have their advantages and disadvantages. Having a single topic, it's easier to get a global view of all events. On the other hand, highlighting the topic for each type of entity, you can scale and segment the flow of each entity separately. The choice of one of two strategies depends on the specific use case.

In addition, you can implement both strategies at once, if you have additional storage space: to produce topics by the type of entities from one comprehensive topic.



In the rest of the article, we will work with only one type of entity and a single topic, although the stated material is easy to extrapolate and apply to work with many topics or types of entities.

(READ: as Chris Hunt noted, there is an excellent article by Martin Kleppman , which discusses in detail how to distribute events across topics and segments).

Simplest storage operations in the event logging paradigm

The simplest operation that is logical to expect from a repository that supports event registration is to read the “current” (minimized) state of a particular entity. As a rule, each entity has one or another id . Accordingly, knowing this id , our storage system must return the current state of the object.

The event log will serve us as the ultimate truth: the current state can always be derived from the stream of events associated with a particular entity. For this, the database engine will require a pure function (without side effects), which accepts the event and the initial state and returns the changed state: Event = > State => State . With such a function and the values ​​of the initial state, the current state is a convolution of the event flow (the state change function must be clean so that it can be repeatedly applied to the same events.)

The simplified implementation of the “read the current state” operation in Kafka collects a stream from all events from the topic, filters them, leaving only the events with the given id and collapses with the help of the specified function. If there are a lot of events (and over time the number of events only grows), this operation can become slow and consume a lot of resources. Even if its result will be cached in memory and stored on the service node, this information will still have to be periodically recreated, for example, due to node failures or due to crowding out the cache data.



Therefore, we need a more rational way. This is where kafka-streams and state stores will come in handy. Kafka-streams applications run on a whole cluster of nodes that consume certain topics together. Each node is assigned a number of segments of consumed topics, just as is the case with the conventional Kafka consumer account. However, kafka-streams provides higher-level operations on data, with which it is much easier to create derived streams.

One of these operations in kafka-streams is a convolution of the stream in the local storage. Each local storage contains data only from those segments that are consumed by a given node. Out of the box, two local storage implementations are available: in RAM and based on RocksDB .

Returning to the event registration topic, we note that it is possible to minimize the flow of events in the state store , keeping the “current state” of each entity in the local node from the segments assigned to the node. If we use the RocksDB-based state storage implementation, it depends only on the amount of disk space how many entities we can track on a single node.

Here is what the convolution of events in the local storage looks like when using the Java API (serde means "serializer / deserializer"):

 KStreamBuilder builder = new KStreamBuilder(); builder.stream(keySerde, valueSerde, "my_entity_events") .groupByKey(keySerde, valueSerde) //  :     .reduce((currentState, event) -> ..., "my_entity_store"); .toStream(); //     return builder; 

A complete example with microservice-based order processing is available on the Confluent website.

(READ: as noted by Sergey Egorov and Nikita Salnikov on Twitter, for the system with event registration, you will probably need to change the default data storage settings in Kafka so that no limits can be applied either in time or in size, as well as optional , enable data compression.)

View current status

We have created a state repository, where the actual states of all the entities coming from the segments assigned to the node are located, but how now to request this repository? If the request is local (that is, it originates from the same node on which the storage is located), then everything is quite simple:

 streams .store("my_entity_store", QueryableStoreTypes.keyValueStore()); .get(entityId); 

But what if we want to request data located on another node? And how to figure out what kind of node? Here we have another opportunity that has recently appeared in Kafka: interactive requests . With their help, you can access the Kafka metadata and find out which node processes the topic segment with the given id (in this case, the tool is used implicitly for the topic segmentation):

 metadataService .streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde) 

Next, you need to somehow redirect the request to the correct node. Please note: the specific way in which inter-node communication is implemented and processed — be it REST, akka-remote, or any other — is not the responsibility of kafka-streams. Kafka simply provides access to the state store and provides information on which node the state store for the given id .

Recovery after failure

State repositories look pretty, but what happens if a node fails? Recreating a local state repository for a given segment can also be a costly operation. It can for a long time provoke increased latency or loss of requests, since kafka-streams will need rebalancing (after adding or deleting a node).

That is why, by default, long-term state stores are logged: that is, all changes made to the store are additionally recorded in the changelog-topic. This topic is compressed (after all, for each id we are only interested in the last entry, without a history of changes, since the history is stored in the events themselves) - therefore, it is as small as possible. That is why recreating a repository on another node can occur much faster.

However, with rebalancing in this case, delays are still possible. To reduce them even more, kafka-streams provides the ability to keep multiple backup replicas ( num.standby.replicas ) for each repository. These replicas apply all updates retrieved from topics with change logs as they arrive, and are ready to switch to the main state storage for a given segment as soon as the current main storage fails.

Consistency

With the default settings, Kafka provides at least one-time delivery. That is, in case of node failure, some messages may be delivered several times. For example, it is possible that a specific event will be applied twice to the state store if the system crashes after the state store has recorded changes in the state store, but before the offset has been made for that particular event. Perhaps this will not cause any difficulties: our state update function ( Event = > State => State ) can handle such situations quite normally. However, it may not cope: in such a case, you can use the guarantees of strictly single delivery in Kafka. Such guarantees apply only when reading and writing Kafka topics, but this is exactly what we are doing here: in the background, all entries in Kafka topics come down to updating the change log for the state store and performing offsets. All this can be done in the form of transactions .

Therefore, if our state update function requires this, we can enable the semantics of processing “one-time delivery” streams using a single configuration option: processing.guarantee . Because of this, productivity drops, but nothing comes for nothing.

Hearing of events

Now that we’ve covered the basics — querying the “current state” and updating it for each entity — what about triggering side effects ? At some point it will become necessary, for example, for:


All these tasks are blocking to one degree or another and are associated with I / O operations (this is natural for side effects), so perhaps not the best idea to perform them within the state updating logic: as a result, the frequency of failures in the main loop may increase events, and in terms of performance there will be a bottleneck.

Moreover, the function with the state update logic (E Event = > State => State ) can be run multiple times (in case of failures or restarts), and more often we want to minimize the number of cases in which side effects for a particular event are run repeatedly.

Fortunately, since we are working with Kafka tops, we have a fair amount of flexibility. At the flow stage where the state storage is updated, events can be emitted in an unmodified (or, if necessary, in a modified) form, and the resulting stream / topic (in Kafka, these concepts are equivalent) can be consumed as you like. Moreover, it can be consumed either before or after the state update stage. Finally, we can also control how we will trigger side effects: at least once or at most once. The first option is provided if you perform the offset of the consumed topic-event only after all the side effects have been successfully completed. Conversely, with a maximum of one-time launch, we perform offsets before triggering side effects.

There are several options for starting side effects, they depend on the specific practical situation. First of all, it is possible to determine the Kafka-streams stage, where side effects for each event are triggered as part of the stream processing function.
It is quite simple to set up such a mechanism, but this decision is inflexible when it comes to retrying, managing displacements and competing displacements at once for many events. In such more complex cases, it is more appropriate to determine processing using, say, a reactive-kafka or other mechanism that consumes Kafka topics "directly."

It is also possible that one event will trigger other events — for example, an order event may trigger the “preparation for dispatch” and “client notification” events. This can also be implemented at the kafka-streams stage.

Finally, if we wanted to save events or certain data extracted from events in a database or a search engine, say, in ElasticSearch or PostgreSQL, we could use the Kafka Connect connector, which will process all the details associated with the consumption of topics for us.

Creating views and projections

Normally, system requirements are not limited to requesting and processing only single entity streams. Also should be supported aggregation, a combination of multiple streams of events. Such combined streams are often referred to as projections , and when minimized they can be used to create representations of data . Is it possible to implement them with Kafka?



Again - yes! Remember that basically we are dealing simply with the Kafka topic, where our events are stored; Consequently, we have all the power of Kafka raw consumers / producers, kafka-streams combinator and even KSQL - all this will be useful to us for defining projections. For example, using kafka-streams, you can filter the stream, display, group by key, aggregate in temporal or session windows, etc. either at the code level, or using an SQL-like KSQL.

Such streams can be stored and provided for a long time for requests using state stores and interactive requests, just as we did with separate entity streams.

What's next

To prevent the infinite growth of the flow of events as the system develops, such a compression option, such as saving snapshots of the “current state”, can be useful. Thus, we can limit ourselves to storing only a few recent snapshots and the events that occurred after they were created.

Although, Kafka does not have direct support for snapshots (and in some other systems operating on the principle of event registration, it does exist), you can definitely add this kind of functionality by using some of the mechanisms mentioned above, such as threads, consumers, state stores, etc. d.

Summary

Although initially Kafka was not designed with an eye on the event registration paradigm, in fact it is a stream processing engine with support for topic replication , segmentation, state storage and streaming APIs , and is very flexible at the same time. Therefore, on top of Kafka, you can easily implement an event recording system. Moreover, since, against the background of everything that is happening, we will always have a Kafka topic, we will gain additional flexibility, as we can work with either high-level streaming APIs or low-level consumers.

Source: https://habr.com/ru/post/424739/


All Articles