Hi, Habr! My name is Mikhail Golovanov, in Sbertech I am engaged in technical architecture and advanced developments. We, like any modern bank, have a lot of systems that support different aspects of the bank’s operations: deposits, accounts, crediting money, lending, financial markets, stocks, etc. Whenever a new system appears, we start the next level of an exciting game called Integration. And each next level is more complicated than the previous one - after all, the systems need to be covered more and more. This post is what is called walkthrough in gaming circles: first we will go over local networks and then go through the message queues to the large-scale stage of streaming computing through Apache Kafka in widely distributed networks.

First, a bit of theory - we will list what we value in integration, taking into account banking specifics:
- Performance . It's simple: the higher the better.
- Latency - information transfer delays. Acceptable latency depends on the systems we are dealing with. If you come to an ATM to withdraw money from the card, then an extra second of weather will not do. And if they make you wait 30 seconds, you will hardly like it. There are operations in which latency is not so important. If you are applying for a loan, then you can wait for the decision for ten minutes - and then 30 seconds are not crucial. But in principle, the lower the better.
- Scaling . Scaling is of two kinds. With vertical scaling, you add capacity on a single machine, and your performance increases. With horizontal - put next to the machine even the nth number of the same.
- Fault tolerance . This is very important for us. If something fails in the bank and customers are not served - this is very bad for everyone. This can be attributed to another important indicator - the recovery time.
- Consistency Suppose the transfer of money has passed, but there is no write-off. And you need to tamper balance. The second example: you send a transfer, and your money has been deducted from the card, and the person to whom you transfer them is not credited. This means that the system is in inconsistent state. And causes a lot of inconvenience. It is highly desirable that all data be in a consistent state.
Beginning of the story
The first was the stage of local networks - the formation of the classical two-tier architecture and the dawn of database servers (MS SQL, Oracle and others). Sberbank had a large, powerful database server that served the entire organization. Client machines on the local network connected to it, received and recorded information.
')
Then began the active spread of Internet technology. The number of users of business applications began to grow very quickly. We rested on the capabilities of the database server and moved to the three-tier scheme. The database served as the repository. The application server contained business logic — rules for the manipulation of information. The thin client — the browser — connected to the application server and interacted with the end user.

This architecture has many advantages:
- There is no need to install client software on the machines and update the programs - it is enough to update the application server and the database, and immediately a new version becomes available to all customers.
- The database relationally stores all the data of the organization - due to the verification of keys and support of transactions, we automatically obtain consistency.
- JEE application servers are well clustered, scaled, and take over most of the business logic work.
- Interfaces of JavaScript-based web applications are getting closer to the native ones in terms of saturation and capabilities.
10-15 years ago it was very cool and made life much easier.
The next stumbling block was the synchronous interaction of the system components. The client requested information from the application server and blocked - waited until he received a response. And server components also expected responses from each other. Here the performance seriously subsided.
To solve problems with synchronous exchange, middleware is used to create message queues. After writing to such a queue, the calling component does not wait for a response and can perform other useful work. The processing component reads incoming messages to the queue and generates responses. Which caller listens in a separate thread.

The advantages of this architecture:
- Asynchronous exchange significantly improves system performance.
- If your server is stopped for a while, the client will not know about it. It will simply throw requests for processing while the queue is available. At this time, you can quickly and quietly raise the server part, deduct from the queue and process what has been received in the last few minutes. As a result, the client will notice almost nothing.
- If you centralize the queue in the message broker, you can get a single point of information flow management in the enterprise.
So we built our internal information processing architecture about seven years ago. Everything was great.
We meet Kafka
Two years ago, we decided to switch from paid products of large vendors to a more functional open source. We looked at what can be done with message queues, decided to leave the integration architecture unchanged, and transfer message queues to open source. We scanned the market and stumbled upon Apache Kafka, a distributed open source messaging software broker written in Scala and Java (in the bank this is our main technological stack). Then Kafka versions 0.8–0.9 were relevant.
The pilot was quickly deployed:
Kafka's performance was at least several times higher than our solution, tens of thousands of messages per second, or even more, about a hundred. Existing lines on the same equipment pulled out at best 5-7 thousand.
In our previous Message Queue (MQ), building a cluster required a lot of non-trivial actions. The topology turned out to be complicated: there were gateways that distributed the load, ensured the operation of the Message Queue cluster, etc. With Kafka, everything turned out to be simpler: we put a new machine, raise Kafka on it, register the Kafka node number in the cluster, and the node connects to the cluster itself. If you need to turn off the machine, it is enough just to stop Kafka on the node - the broker will exit the cluster himself. This way you can easily scale in both directions.
At the same time, the scaling is close to linear : put a second broker - it will process twice as much, if the third is three. Ten knots were put on the pilot, and this dependence persisted.
Kafka supports two unified interaction styles at once.
- Point-to-point - someone lays out the handler information, the handler takes it, and only these two sides interact with each other.
- Publish-subscribe - when someone puts out information, and many consumers read it at once.
In the old JMS paradigms, these were two different interfaces, but in Kafka everything is unified, and there is no difference between Point-to-point and Publish-subscribe, including in the API for the programmer. This is a big simplification.
In Kafka,
all messages are persistent , that is, written to disk. In queues, it works differently. When all messages in the queue are stored in RAM, everything works quite quickly - several thousand messages per second. As soon as we turn on the persistence mode - writing messages to the disk - the performance drops several times. However, this mode is indispensable, because the information in the RAM is erased as soon as the machine is turned off. And we have a lot of data that you don’t want to lose - for example, data on money transfer. In Kafka, messages are persistent out of the box, and everything works quickly.
Moving from JMS to Kafka
Faster, more convenient, and even free. Hurray, throw JMS, moving to Kafka! Previously, we had the usual queues, and now Kafka-topics. The essence is the same: they wrote it down in a topic and forgot it, but someone on the other side reads asynchronously.
How is all this inside? Kafka, in fact, is an append-only distributed log, that is, a log whose entry always goes to the end. To ensure scaling, the topic is divided into partitions. In each partition there is always a start offset (the number of the first recorded message) and end offset (the number of the last recorded message). The recording always occurs at the end of the log, and the numbers of the messages continuously monotonically increase. Sequential recording to disk is carried out with good speed and provides persistence - in contrast to writing to an arbitrary part of the file, especially slow on the HDD.
What happens on the side of the reader? When creating a reader, a group is assigned at creation, according to which the reader subscribes to topics - sets of partitions (logs).
The reader endlessly carries out a call (poll), that is, requests data from Kafka. If something was recorded, Kafka gives this data. The reader processes them, commit reports in response, and the pointer moves forward one message. So in the topic you can make a lot of partitions and put a lot of readers. One partition in one group is read by one reader, and this is a fairly simple and clear scaling scheme. If we want everything to work faster, we increase the number of partitions in the topic and readers in the group, and due to the parallelization, everything works faster. In Kafka, writers are called producers, and readers are consumer salesmen.
Problems with message bundles
An experienced booth at Kafka pleased everyone, except for some unpleasant moment: the same messages began to be duplicated. Immediately thought that the point is who writes. But no, the recording went once, and the reading - two, three, sometimes even four times. As a result, performance fell in Kafka and a large number of duplicates appeared.
It turns out that groups of readers work a little differently. Within a single polling cycle, the handler immediately receives a stack of messages. Then he begins to process it and must commit message processing. In our case, in processing there were not only single messages - some collected a bundle of information in themselves, many business events. The handler receives such a bundle, for example, from 200 events and sequentially starts processing each of them.
Meanwhile, the broker starts counting a timeout - a certain time interval after which, without receiving a message processing commit, he starts to consider the handler dead, throws him out of the group and puts one of the living instead. If during one poll two or three large bursts of messages were received, the time-out time often expired and the consumer threw out. A rebalance started in Kafka - rebuilding a group of consyumers when they added a new one or, as in our case, threw away the old one. Instead of the supposedly dead, Kafka substituted the neighboring, supposedly lively concierge. Bundles of messages began to kill the whole group in a circle. After some time, in the server’s opinion, there were no live users at all, and the reading stopped.
What to do? The first option: let's not send packs. But the system that transmitted them, otherwise could not work, because it was not very online. Maybe create a separate message pool? But then we break the reading and processing of messages, and the scheme will become fragile.
At this moment, the tenth version of Kafka came out with some very useful features for us:
- KIP-62 - heartbeat in a separate thread. It used to be a confirmation that the handler was live and the processing itself was going in one thread. In the tenth, Kafka introduced a separate message “I am alive”, which can be thrown not mainly in a separate background thread. And these messages go much faster than processing the main bundle. Rebalance does not occur, and we can scroll through large messages for quite a long time.
- KIP-41 - the maximum number of messages in one poll. Previously, it was limited only by available memory. If someone wrote a lot, the handler could immediately take 10, 30, 50 messages. With the tenth of Kafka, you can set the exact number: messages to be read for one poll.
- Setting timeout settings.
With the new settings, the system began to work stably, and the massive doubles stopped. But still not to the end. At this moment we understood: Kafka is not exactly a queue. This is another data structure,
partitioned log .
What is the difference? In the queue, all readers are competitive in reading messages, and therefore it is not always orderly. In Kafka, within the framework of partitions, the reading proceeds sequentially, and the partition is always streamlined. The messages in the queue are deleted after reading, but not in Kafka - the pointer of the read messages simply moves. After some time (timeout) in Kafka, the entire file is deleted, and writing to the new file (segment) begins. Messages are deleted in batches, like files from the file system - it turns out to be much less expensive than with queues that delete each message. And, as we described above, turning on / off one of the consumer meters in reading mode affects the rest. There is a rebalance, and for some time the broker does not give readers any messages until the rebalance occurs.
We achieve doubles
The lesson was learned, the system was stabilized, the doubles were reduced to fractions of a percent, but we didn’t get rid of them completely. The duplicates arose because of the rebalances that could not be avoided - at least they occurred when a new customer clerk entered the topology, or if Kafka decided that some optimization was needed.
We began to think what to do with the remaining takes. There were three options:
- Nothing to do. There are such tasks where doubles are not at all terrible. For example, you monitor a business process for a certain stage. If you twice received information about this, do not worry. Or, for example, if a client requests a balance on his card and receives it twice.
- Fight with the consequences. There are two main approaches: deduplication and compensation logic.
- Eliminate the cause - to make such a system where doubles will not arise. This is the most correct, but the most difficult way.
If you do nothing, then the key concept is
idempotency . Idempotency is if the operation is repeated several times, and this does not affect the system from the point of view of an outside observer, from the point of view of the data state.
Idempoddentnymi can be read or monitor operations. There are ways to make business transactions idedentary, even if they are not. For example, when transferring money, we generate a unique ID — enter a unique identifier for the operation. After the first processing of the ID, we changed the balance on the account and saved information about it. In the future, by ID, we determine that the transfer has already been made, and we do not move money. However, for this, it is necessary to rework logic every time, to do an analysis of the tasks - in general, the method is expensive.
The second approach is the creation of a deduplicator. You can put the shared storage, and when the message comes a second time, ignore it. But here it is necessary to build additional storage for call traffic — on large volumes it can become a point of failure and a cause of a drop in performance. And since usually the storage is remote, we receive an additional one network call and an increase in latency. At low loads, the deduplicator is quite a working scheme, but this is not our case.
The third approach is to make compensation at the level of business logic. Our application programmers will have to remember all the time that the operation can be repeated. How to determine whether it was repeated due to integration, or does the user really try to transfer five rubles to someone every second? This is time consuming and can cause many errors, so compensation logic is an extreme option.
The idea was to add transactions to transactions. Then repeated operations will be rejected, because there will be a repeat of the transaction. Java even has a distributed transaction technology (XA transaction). However, in Kafka, it is not supported and is unlikely to be supported.
As expected, it remains to deal with the cause.
We transfer commit before processing
When the group is rebalanced, then sending the message to the old consumer is no longer possible. There is an error stating that this consumer no longer works with this partition. We have always done commit after processing, but you can transfer commit before processing. Then the consumer will read the message from Kafka and immediately confirm the reading.
But what if at the moment when we are already commited, but not yet processed, there will be a failure in the handler? In this case, we will lose this message, because Kafka believes that we have already given it to us, but the handler has not yet worked to the end. This message processing guarantee is called
at most once , that is, “
at most once ”. It can be used for some not very important operations. But not for operations related to money, because no one wants to lose a transfer.
We assign topics to handlers
It is possible not to use the mechanism of auto-balancing groups of readers, but to explicitly assign a handler to each partition by calling the assign method. This is when we explicitly say: you are a handler, here is your topic, here is your partition, work with it. In this case, you can make an early commit - at most once, or you can, for a guarantee, a late commit at least once. Due to the fact that only one handler makes commits and processing, if you try hard, you can do it exactly once - that is, exactly once.
But what is bad assign? You nailed the handler to the partition. Now, if he died, something needs to be done with him: restart, watch what he last handled, and so on. For the system administrator, this is quite laborious: you need to make sure that the handlers are alive, manually restart them, and so on. That is, we begin to do the work of a consumer group. And if a person appears in the process, you can forget about the quick recovery time of the system, since he immediately has a desire to figure out what was processed and what was not. People react at best in minutes, computers in a split second. We get exactly once, but we lose a lot in fault tolerance. And we will have to spend a lot on operation.
Revaluation of distributed networks
As a result, we then postponed the final conquest of Kafka. Returned to the question after a year and a half. We were satisfied with performance, scalability, fault tolerance. That's just the consistency of trouble - damned doubles. It is unlikely that experienced Kafka developers could ignore such a problem. Perhaps we misused it? Not. The decision was hidden at an even deeper level than could have been supposed.
As it turned out, in a large distributed environment, some principles on which our design of IT systems used to be held simply do not work. These principles are dedicated to the work of L. Peter Deutsch “Fallacies of distributed computing”, written in 1994-1997.
- The network is no longer as reliable as it used to be. Due to the large number of elements, it cannot always work quickly and reliably.
- Information transfer delays are no longer zero, as in a local network. Yes, the speed of access between the memory is the highest. If we contact disks several dozen times, the performance drops in the same way. And if we also connect with the network, the slowdown happens a hundred times. We can not neglect the delay in the interaction between the distributed components.
- The throughput is finite. With large volumes of network, we quickly rest on the ceiling, especially when interacting with remote servers.
- The network is no longer secure. When working via the Internet, it is impossible to control everything, I can hack something somewhere.
- Topology is changing all the time. Some cars are always on or off. Among the thousands of Google servers, about a dozen are always in inoperable condition.
- The administrator is no longer alone. There may be hundreds of them; each manages his own part of the system in his own way.
Having accepted these truths, we formulated three main characteristics of a distributed system:
1. Failures are the norm.If the failure of one machine was an extraordinary event, then when you have a lot of machines, something is constantly not working all the time. The system's inoperability is a deviation from some characteristics or a complete failure. A malfunction is a failure. Failure is a self-eliminating failure. And we need to make such systems so that we experience failures. The larger the system, the more failures in it are more diverse and more frequent. It is necessary to ensure that failures turn into failures, so that failures can be eliminated. Because you can run around the whole big system and fix everything with your hands - you can go crazy.
2. Coordination is difficultThe more machines, the harder it is to ensure coordination, especially through the network. Coordination is difficult. Network latency between nodes, unreliability of communication, changeable topology — you cannot fight this; you just need to try to avoid it. The smaller the different parts of the system are coordinated with each other, the better. If they manage independently, this is ideal.
3. Time is not uniform.In different parts of the system due to delays different times. And on different computers, too, at different times. Very often, when designing, three things are confused: time, duration and order of messages. For example, if you are doing a chat, it is not the specific time, but the order that matters. If you sent a question, and you answered it, it is important that everyone sees the question first, and the second - the answer. It happens that duration is important, not order. For example, if you measure timeout.
But the worst thing is that time floats. Even the most accurate atomic clocks also float. Ordinary quartz watches on a local machine can go astray for a couple of milliseconds due to the heating of the machine and other physical causes. Synchronizing the time between machines gives tens of milliseconds. With this you need to accept and understand that in different parts of your system time is different.
Given the new conditions, we had to overestimate the issue of information processing. Up to this point, we had two main options:
- Batch. You accumulate a certain array of information and run the “thresher”, which performs calculations offline and gives the result. The calculation may take minutes, hours, days, but we can safely process large volumes. If something broke or we realized that we had an error in the algorithm, the input array of information does not change - that is good. We can eliminate errors, run again and get an answer that suits us. This is not online and the result is always deterministic. If the input array and the algorithm have not changed, you will certainly get the same results.
- Request-Reply. This online option is used when you need to get results quickly, as in a web browser, for example. You give some database queries and quickly get an answer. But since these calls are in no way ordered, you can no longer reproduce this. In the next second, the state of the database may change, and, throwing the same query, you will get a completely different result. That is, the result is non-deterministic, but it can be obtained quickly.
In each case, one has to make sacrifices. Is it possible and accurate and fast? For a number of cases we found a way.
Stream architecture
So, we live in a distributed system with its own characteristics. Kafka has limitations related to doubles. A centralized relational database cannot be inflated forever - there are limitations in scaling. What to do? Let's try to implement some of the tasks in the stream architecture. Our acquaintance with her began with the article “Introducing Kafka Streams: Stream Processing Made Simple” by Jay Kreps, the current CEO of Confluent, the developer of Kafka.
Streaming architecture is based on the concept of stream. A stream is a time-ordered set of immutable messages. Everything that happens in our system is consistently written to the journal as events occur over time. If the event is recorded in the log, it can no longer be changed. If you have created a user, you can not go back and cancel the action. You can only post a new user adjustment event or something like that. In general, it is very similar to our lives. When something happens, we can no longer go back and change what happened. We can only respond to what happened and create a new event.
Business event modules exchange messages asynchronously through event streams. Accordingly, if a module wants to interact with new modules, it sends an event to their event logs. Thus, the entire system is the modules and the relationship between them in the event logs.
The flow of events is infinite, they are strictly ordered in time, and this order after recording never changes. Module state is the result of processing a specific stream. If we bring the module to the initial state X and lose a certain flow, we will get the state Y. Doing this each time we will get the same final state, since the initial state is fixed, the flow of events is the same, the processing algorithm is the same.
How to scale such a system? With the help of partitions.

In the example above, three partitions are created. Events are distributed according to them according to the keys that we assign to events. K1 – K3 in the first partition, K4 – K6 in the second and K7 – K9 in the third. Events within one partition are ordered by time. A handler is attached to each partition, which sequentially processes the events. Processed one - went to the next. That is, he manages his local database. The state of the handler is determined by the initial state and the flow. The overall speed of the system depends on the number and speed of handlers.
In this scheme there is no central base and coordination, because each processor handles its own partition and knows nothing about the others. We just throw in the event stream. Different events may relate to the change of one entity and be logically related. If such events fall into different partitions, then due to the independence of the handlers in time, we will have a run-up and the results will fall into the wrong handlers and everything will be bad.
All logically related events must appear in the same partition in order to continue to go to the same developer instance. How to do it? There are two approaches.
The first approach is partitioning the stream while writing. This method is used in Kafka Streams. We must generate some key from the recorded information by which the partition is determined. Writing this key, we can safely work on and be sure that logically related events will fall into one handler. The disadvantage of the method is that if the topology of the topic changes, it is necessary to carry out repartitioning - the redistribution of data by partitions, which is very resource-intensive.
The second approach is partitioning the stream while reading. It can be implemented using the operator available in Apache Flink and similar engines. In it, logically related events can fall into different partitions. But then the entire topic is read in one cluster, which, when reading each event, calculates its key. Having calculated the key, it understands where the required handler is in the cluster topology and sends the event there. : . , , .
, .
, , . . stateless processing.
— , — , . . key-value. , key-value .
? - — . offset , . , . , offset, , . :
- . , - , . , , , , — , .
- . . .
. . :
- . Kafka Streams. , key-value . , . , — , . , , , , .
- — , Hadoop . , . , .
. Kafka . , .
?
, , . - , - .
, : K2, K4 K7.- , . , , , . eventual consistency. , , . , .
— , . , . , , : . . , , .
? , , . , . , . , .
The idea is good. , . , , . .
. — — . , . , , , , - . - , , … .
? — . , . , , .
— tumbling window — . , , . ? . , , — .

K1. , K1. ( ), , . , . , .
, , , .
, . , , , . , , - , . , . , - , , , . : , , .
, , — . , , . , .
, . — — (watermarks).

, , . . , — , .. , , .
time-based . , . , — . , , .
: . :
, . . -, . , . .
, , . , ? . , , . , .

? - . . , , . . .
, . Processing Time — . Event Time — , . Ingestion Time – ( ).
, — . eventual consistency. , . .
.
— IT-. , . , :
- ad-hoc — .. — — . . . , , .
- - . push. — push, , . - — , .
, — . There are several approaches.
1.
Queryable state — . query.

, — Query Controller. , SQL - . . , , query controller. , . — CQRS, Command-Query Responsibility Segregation, . , — , Query Controller.
, . , . , . , . . , , , , . queryable state.
2.
Copy by request
queryable state , . , , — , . . . queryable controller — , . — . .
— , , . , . — copy by request queryable state.
3.
Continuous distribution
— -. , . , , , . , - — key-value — . , . , - .
— , . — - , . - - , , , .
, . , , continuous distribution .
?
. :
- Kafka JMS .
- Kafka .
- Eventual consistency — .
- ,
, , -. . , . , , .