Hello! My name is Yuri Lilekov, I work in Server Team Badoo. The other day I came across a rather interesting article about exactly-once semantics at Apache Kafka, which I gladly translated for you.
Finally, it happened that the Kafka community had been waiting for so long: in Apache Kafka version 0.11 , exactly-once semantics appeared (“strictly one-time delivery”). In this post I will tell you about the following points:
- what is exactly-once semantics in Apache Kafka;
- why this problem is complex;
- how new idempotency and transaction properties allow streaming exactly-once-streaming processing using the Kafka Streams API.
I guess what each of you now thought: delivery exactly-once is impossible. For practical use it is too expensive (or correct me if I am mistaken). You are not alone in such thoughts. Some of my colleagues admit that shipping exactly-once is one of the most difficult problems in distributed applications.
So, someone made it clear that he considers delivery exactly-once with a high probability impossible!
I do not deny that the delivery-exactly-once semantics (and support for stream processing in the same mode) is a really difficult task. But I also witnessed for more than a year how talented Confluent engineers together with the open-source community worked hard on solving this problem at Apache Kafka. So let's move on to the overview of message passing semantics.
Computers that form a distributed system can always fail, regardless of other system participants. In the case of Kafka, a separate broker may fail, or a network error may occur, while the producer sends a message to the topic. And depending on the actions of the producer to handle the failure, you can get different semantics.
If the producer receives confirmation from the broker Kafka, and at the same time acks=all
, this means that the message was recorded in the Kafka topic strictly once. But if the producer does not receive confirmation after the timeout expires or receives an error, he may try to send the message again, considering that it was not recorded in the Kafka topic. If the broker fails just before sending a confirmation, but after the message has been successfully written to the Kafka topic, this repeated attempt to send will result in the message being written and sent to the final consumer twice. Everyone will be satisfied with the tirelessness of the dealer, but this approach leads to duplication of work and incorrect results.
If the producer does not resend the message after the timeout or receipt of an error, the message may not be recorded in the Kafka topic, and, therefore, it will not be delivered to the consumer. In most cases, messages will be delivered, but to avoid the possibility of duplication, we assume that sometimes messages do not reach.
Even when the producer again tries to send a message, the message is delivered strictly once. The semantics of exactly once is the most desirable guarantee, but at the same time the least understood. The reason is that it requires interaction between the messaging system itself and the application that generates and receives messages. For example, if after a successful receipt of a message you rewind the Kafka-consumer to the previous position, then again you will receive everything from there to the last message. This clearly shows why the messaging system and the client application must interact with each other for the exactly-once semantics to work.
The description of the difficulties of supporting the semantics of exactly-once will begin with a simple example.
Suppose a single-threaded application-producer sends the message “Hello, Kafka” to a Kafka topic consisting of one section with the name “EoS” Next, suppose that a single instance of the consumer application at the other end takes data from the topic and displays a message. If you are lucky and there are no failures, then everything will work fine, and the message “Hello, Kafka” will be recorded once in the section of the “EoS” topic. The consumer receives the message, processes it and fixes its position, thereby reporting the completion of processing - and the consumer application will not receive this message again, even in case of its failure and reboot.
But we all know well that one cannot always count on luck. As scaling progresses, even the most unlikely failure scenarios occur periodically.
Broker failure. Kafka is a highly available, stable and reliable system in which every message recorded in a section is saved and replicated n-th times. Therefore, Kafka can withstand n-1 broker failures, which means that the section is available as long as at least one broker is available. The Kafka replication protocol ensures that if the message was once successfully written to the master replica, it will be replicated to all available replicas.
Crash RPC "producer broker" . The sustainability of Kafka depends on the producer, who receives confirmation from the broker. Failure to receive confirmation does not necessarily mean failure of the request itself. The broker may “fall” after the message is recorded, but before the confirmation is sent to the producer. He can also "fall" before writing a message to the topic. Since the producer has nowhere to know the cause of the failure, he is forced to assume that the message was not successfully recorded, and will make another attempt. In some cases, this will lead to duplication of the message in the Kafka section log, and the end user will receive this message more than once.
Prior to version 0.11.x, Apache Kafka supported for each section the semantics of delivery at least once and delivery with preservation of order. As can be seen from the above example, this means that repeated attempts to send a message by the producer could lead to its duplication. In the new semantics of exactly-once, we strengthened the semantics of Kafka data processing in three different but interrelated ways.
An idempotent operation is an operation that, when executed multiple times, produces the same result as a single operation. The send operation by the producer is now idempotent. In case of an error, forcing the producer to try again, the message that was repeatedly sent by the producer will be recorded once in the Kafka broker log. With respect to a single section, idempotent producer sending operations save us from the possibility of duplicate messages due to producer or broker errors.
To enable this function and get the semantics exactly-once for each section (that is, no duplication, no data loss and preservation of the delivery order), simply set enable.idempotence=true
in the settings of the producer.
So how does this feature work? “Under the hood,” it works similarly to the TCP: each message packet sent to Kafka will contain a sequence number with which the broker will be able to eliminate data duplication. But unlike TCP, which is guaranteed to perform deduplication only in the conditions of a temporary connection in memory, the sequence number is stored in the replicated log. Therefore, even in the event of a master replica failure, any broker that assumes this role also recognizes whether the newly submitted data is a duplicate.
The overhead of this approach is relatively small: only a few additional numeric fields for each batch of messages. Further you will see that this feature very slightly reduces performance compared to the non-idempotent producer.
Kafka now supports atomic notation in several sections using new transactional APIs. This allows the producer to send message packets in several sections so that either all messages from the package will be visible to any consumer, or none of them will be visible to anyone. This function also allows you to shift the consumer in one transaction with the data that you have processed, and, therefore, makes possible the through-semantics of exactly-once. Here is a snippet with sample code that demonstrates the use of the transactional API:
producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch(ProducerFencedException e) { producer.close(); } catch(KafkaException e) { producer.abortTransaction(); }
This example shows how you can use the new Producer API to send atomic messages to a set of topic sections. It is worth noting that the Kafka topic section can simultaneously contain messages, both being part of a transaction and not being.
Therefore, from the consumer’s point of view, there are two ways to read transactional messages expressed through the isolation level
— the consumer’s setting:
read_committed
: in addition to reading messages that are not part of a transaction, it is possible to read those that are part of a transaction after the transaction commit.
read_uncommitted
: all messages are read in offset order without waiting for a transaction commit; This option is similar to the existing semantics of Kafka-consumer.To use a transaction, you need to configure the consumer to set the required isolation level
, use the new Producer API and set the producer configuration parameter Transactional ID as some unique ID (it is needed to ensure the continuity of the transaction state when the application restarts).
Thanks to the Streams API in Apache Kafka, streaming exactly-once processing is now available based on idempotency and atomicity. In order for your streaming application to use this semantics, it is enough to specify processing.guarantee=exactly_once
in the configuration. As a result, all processing will be performed strictly once. This applies both to the processing and to the recreated state created by the data processing task and recorded back to Kafka.
“That's why the exactly-once-guarantee guarantees provided by the Streams API at Kafka are the strongest among all the guarantees offered today by streaming processing systems. It provides end-to-end exactly-once-guarantees for a streaming processing application, starting from reading data from Kafka, from any state recreated in Kafka by a streaming application, before writing to Kafka the final result. Stream processing systems, in which support for recreated states relies only on external data systems, weaken the guarantees of streaming exactly-once-processing. Even when they use Kafka as a source for processing and need to recover from a failure, they can only switch their Kafka positions for overconsumption and message processing. But they cannot roll back the associated state in the external system, which leads to incorrect results in the case of non-idempotency of the state update. ”
Let me add some more details. We usually care if the streaming application receives the correct response, even if one of the instances drops during processing. The right decision when restoring a failed instance is to continue processing in the same state as before the failure.
So, stream processing is a simple “read-process-write” operation in the Kafka topic. The consumer reads the message from the topic, the processing logic converts them, or changes the state maintained by the processor, and the producer writes the result to another Kafka topic. Streaming exactly-once-processing is simply the ability to perform a single-read-process-write operation once. In this situation, getting the correct answer means just not to miss any incoming message or not to duplicate the output data. And this is exactly the mode of operation that consumers expect from a streaming exactly-once-processor.
In addition to the considered simplest failure scenario, there are many others.
A stream handler can take input from several source topics, the order of which differs with multiple runs. Therefore, if you restart your streaming handler, which took data from several source topics, you can get other results.
A streaming handler can send output to several target topics. If a producer cannot perform atomic recording in several topics, then its output may be incorrect in case of failure of recording in some (not all) sections.
The stream handler can aggregate or combine multiple input data using the state management tools provided by the Streams API. If one of the stream handler instances fails, then you should be able to roll back to the state recreated by this stream handler instance. When you restart an instance, you also need to be able to continue processing and recreate its state.
The stream processor can search for more complete information in an external database or by accessing the service updated from the outside. The dependency on the external service makes the stream handler completely non-deterministic. If an external service changes its internal state between two launches, it will lead to incorrect transfer of results. But with proper processing, this should not be the reason for getting completely wrong results - just the output of the stream processor will be related to the valid output.
Failure and restart, especially in combination with non-deterministic operations and changes in the persistent state, calculated by the application, can lead not only to duplication, but also to incorrect results. For example, if the number of events seen is calculated at one processing stage, then duplication can lead to an incorrect calculation at the next stage. Therefore, we need to determine the meaning of the phrase "stream exactly-once-processing."
This also applies to the consumption from the topic, and to recreating the intermediate state in the Kafka topic and sending it to it. Not all possible calculations in the message are performed using the Streams API, some of them (for example, depending on external service or consumption from several source topics) are basically non-deterministic.
“With regard to streaming exactly-once-processing guarantees for deterministic operations, you need to make sure that the output of the read-process-write operation will be the same as if the stream processor saw each message strictly once — as if uninterruptedly working.”
All this makes sense for deterministic operations, but what does streaming exactly-once processing mean when the processing logic itself is non-deterministic? Suppose that the same streaming handler, counting the number of incoming events, will be modified to count only those events that satisfy the conditions dictated by the external service. This operation is non-deterministic in nature, since external conditions can vary from launch to launch of a streaming handler, potentially leading to different results. So how do we understand streaming-exactly-once-processing guarantees for such operations?
"With regard to streaming exactly-once-processing guarantees for non-deterministic operations, you need to make sure that the output of the read-process-write operation belongs to a subset of the valid output data generated by the combination of valid non-deterministic input data."
So, in our example of a streaming handler, with the current value of the counter 31 and the value of the incoming event 2, only 31 or 33: 31 can be valid output data if the input event is rejected as indicated by external conditions, and 33 if it is not rejected.
This article only superficially deals with streaming exactly-once-processing in the Streams API. In the next post on this topic will be described in more detail about the guarantees, as well as a comparison of the guarantees exactly-once in other systems of stream processing.
Any major work like this always raises the question “Does this feature work as promised?” To answer it, let's consider its correctness (as we designed, built and tested this feature) and performance.
Correctness and performance start with a reliable architecture. We started working on it and prototypes about three years ago on LinkedIn. We also did this for more than a year at Confluent, trying to find an elegant way to bring idempotency and transactional requirements into a complete package. We have compiled a 60-page description covering all aspects of the architecture, ranging from high-level messaging to the routine details of the implementation of each data structure and RPC. This process has been going on for nine months under the scrutiny of the public, during which time the architecture has greatly improved thanks to community feedback.
For example, thanks to the open-source discussion, we have replaced the buffering of transactional readings on the part of the consumer with smarter filtering in the server part, thus avoiding possible large performance problems. We also improved the interaction of transactions with compressed topics and expanded security capabilities.
As a result, we have come to a simple architecture, which is largely based on the reliable Kafka primitives.
, .
, . . , .
15 000 , , . : NTP- . : Kafka , , «» , , .
, , .
; , exactly-once - , . , . , (~1 ). . , 100 , :
3% , (in-order) at least once ( acks=all, max.in.flight.requests.per.connection=1
),
20% , at most once ( acks=1, max.in.flight.requests.per.connection=5
), .
exactly-once . , KAFKA-5494 , , , at most once . , . , , .
, , exactly-once. Kafka exactly-once, Kafka, . , . . , 10 35% . Kafka , / : 20% 50% – . Kafka 0.11, exactly-once.
exactly-once Streams API. 100 ( ) 15% ( ) 30% ( ). 30 1 . , . .
, , , Kafka . – exactly-once Apache Kafka. , exactly-once, . Apache Kafka.
, . exactly-once – , , . API, , , .
. – , , , . . Kafka. exactly-once .
, exactly-once- Apache Kafka 0.11, , .
Source: https://habr.com/ru/post/333046/
All Articles