By the nature of my work, I often have to participate in projects in which high-availability, high-performance systems are created for various markets - advertising, fintech, services of the SaaS, PaaS classes. Such systems use a well-established set of architectures and components that allow you to effectively meet product requirements, for example, lambda-based architecture for streamline data processing, scalable micro-service software design, oriented to horizontal scaling, noSQL DBMS (Redis, Aerospike, Cassandra, MongoDB ), message brokers (Kafka, RabbitMQ), distributed coordination and detection servers (Apache Zookeeper, Consul). Such basic infrastructure blocks most often allow to successfully solve most of the tasks and the development team does not encounter the tasks of developing components of the middle level (middleware), which, in turn, will be used by the business-oriented part of the developed system.
In some cases, there are tasks that the project team cannot implement with existing solutions that are standard, then there is a need to search for or develop highly specialized products designed to solve specific problems.
In one of the projects, the team had a rather specific task, within the framework of which there was a need to organize a strictly-ordered data queue that would allow the data to be “played”, supported replication, horizontal scaling and was fault tolerant. This is a classic data structure, which is called a transaction log (Commit Log) and is used in almost all more or less complex DBMS.
The basic requirements for the transaction log are that the changes are recorded in a strict order, thus the structure ensures that the client who initiated the recording is first guaranteed to receive a lower sequence number of the log entry, even if the record took longer than some other client. Operations logs are widely used in systems that have competitive access to objects, which should be streamlined. A typical concurrent access queue is an example of such a journal (for example, LinkedBlockingQueue in Java).
The value of such a structure is that it guarantees the immutability of the final state during repeated passes and is often used precisely to correctly reflect the final state of the object during multiple changes.
Difficulties begin when the set of agents must propagate an ordered change of states on the set of nodes for a set of objects. The obvious and lowest-performing approach is based on the use of a distributed lock service, for example, Apache Zookeeper, with the help of which an ordered access to some storage is provided. The variant is working, if changes of objects rarely occur, however, for the implementation of a high-performance system in which the states of objects change frequently, this option is not suitable. A special case with two agents and two nodes is shown in the figure.
If you look closely at the introductory image with Rick, Morty and Summer, then you can just notice it ... Everything seems to be similar, but there are already some minor mismatches.
We needed to develop a replicable Leader / Followers system, each node of which would maintain the current state of the objects in RocksDB, and if you lost the node, you could easily restore it from another node and the existing activity log, an abstract view is shown in the diagram:
The main engineering problem that exists within this task is the development of a distributed replicated transaction log .
At first, as usual, the thought went on the way - " we need our bar ... we are cool developers - let's do everything ourselves! ". We had an implementation for a local magazine and we thought that we could probably expand it to a networked and replicable implementation. Honestly, the amount of work caused a dull toothache, and this was not the main product, but a component of the underlying layer.
Wait, but there is Apache Kafka, the surprised reader will say! And it will be almost right. Apache Kafka is a great thing, but as part of this task it lacks the following functions:
- Confirmation of completion of the operation
- Guarantees of the order and uniqueness of data
In most cases, Apache Kafka will work as it should, but if you lose TCP packets or drop the wizard, you have no guarantees that your message will not be duplicated. This is due to the fact that messages in Kafka are sent on the principle of "fire-and-forget", and the client does not control the order of entries on the server, which is logical, since Apache Kafka is optimized for bandwidth.
However, having started the analysis and reflection on the details of the decision, I found that there was already a solution, we just did not know about it. And this is the Apache BookKeeper . Moreover, it is implemented ideologically and technologically almost in the way we would do it ourselves - Apache Zookeeper, Java, Netty (our project on Scala, but the Java stack was very pleased). As a result, a new phase was initiated, during which we tested Apache BookKeeper to meet our needs.
Next, I will try to talk about the principles and approach that is used in the Apache BookKeeper system for solving the problem of replicable, scalable transaction logs.
So, the product is not so well known, but it has users. For example:
Perhaps it was necessary to simply translate the first article, but in the soul of the Chukchi a writer.
First, look at the Apache BookKeeper conceptual architecture, which is shown in the following figure:
Key elements of the scheme:
Now let's look at the key elements in more detail, in order to understand how the model of interaction with Apache BookKeeper and its data management architecture are arranged.
Storage Server As part of BookKeeper, we deploy and run multiple Bookie servers, in the field of which a replicable storage medium appears. The data is distributed within the framework of ledgers, the distribution method is flat, the system does not allow you to set any topological labels (which is a pity). Adding Bookie servers allows system horizontal scaling.
Organizes a set of strictly ordered records replicated to the same servers. Ledgers ordered by increasing identifiers. The replica server data is stored in the ledger record in Zookeeper.
An important limitation is 1 . Only one writer can write to the ledger. Thus, the ledger is a scaling unit. If a series of operations does not allow spreading them across different ledgers, then the performance limit is the write speed to the ledger, which depends on the replication factor (although not linearly, since BK Client writes in parallel).
When writing to the ledger each record is given a unique increasing ordinal integer number issued by the client, which allows you to organize an asynchronous approach to recording. At the same time, BookKeeper guarantees that the write operation with the number (n + 1) is completed only when all the write operations with the lower numbers are completed. This is a very useful feature that allows you to significantly improve the performance of write operations in some cases, even if you need an orderly notification of clients about the completion of operations.
Important limitation 2 . An unlimited number of readers can read from a ledger in parallel, while reading is allowed when the ledger is closed, that is, at the moment no one is writing.
In general, the above restrictions for ledgers are significant limitations that in some cases can significantly narrow the scope of application.
For example, you open a new ledger every second, write to it and close it. This means that reading processing will be 1 second behind. Since the ledger has the Long identifier type, you can open every 1ms without any problems, but then you will encounter a situation that will increase the load on the Zookeeper, for which this will be a significant load. In the problem we are solving, this restriction was valid.
Another example is that the write intensity is such that the performance of servers within a single ledger is not enough, and the logic of the application is such that it is not possible to partition a record by several ledgers. Everything, as they say, arrived. This restriction within our task also managed, we were able to partition all the objects in this way, and we only have one data stream left for which we could not get around this restriction.
Ledger IDs are stored in Zookeeper, which allows for iteration over them, however, not everything is so simple. If we want to have several parallel chains of ledgers, then it is necessary to keep the ledger identifiers that relate to a specific chain somewhere separate, BookKeeper will not help us with this. Another option is to deploy Bookie's isolated servers for each task separately, so that they have a separate path to the Zookeeper (which did not suit us).
It should be noted that the Apache Zookeeper itself imposes additional restrictions that also need to be considered:
The creators of Apache BookKeeper provide a solution for problem 2 by introducing the Hierarchical Ledger Manager , which organizes the flat identifier space into a hierarchical tree by splitting Long into levels. The default is Flat Ledger Manager , which is clearly inapplicable with the frequent generation of new ledgers.
The record is an element of the ledger. Each entry within the framework of the ledger has a sequential, increasing identifier - from 0 ... The entry contains arbitrary data in the form of a byte array. Adding an entry can be done in synchronous and asynchronous mode. In asynchronous mode, as usual, you can achieve higher performance for multi-user applications. As previously mentioned, the call to the asynchronous part occurs when the record is added to the ledger, that is, in an orderly manner.
Perhaps these three concepts - Bookie, Ledger (ledger) and Ledger Entry (writing) are essential elements for understanding the work of Apache BookKeeper.
I must say that the Apache BookKeeper does not look like a silver bullet or a magic tablet, it is a very specific solution that does not contradict the CAP theorem and imposes a significant amount of restrictions on the problem being solved, which are often impossible to get around. Fortunately for us, we were able to provide with this component horizontal scaling of the system, but to support our requirements, we had to solve several engineering tasks along the way, for example, how to read data from two ledgers correctly and how to store “sifted” lists for non-intersecting ledgers in Zookeeper.
This article is introductory, introductory. Making Apache BookKeeper Hello World is easy enough; the authors provide a detailed start-up guide , while we were sorting out, rewriting its implementation on Scala .
RocksDB was chosen due to the fact that it has high performance and supports atomic packet writing (all or nothing), which ensures the correct behavior in case of emergency situations. As part of our task, we loaded the previous state from RocksDB, read all the actions from the ledger (s), formed the final state in RAM, and recorded it atomically in RocksDB, while the numbers of the ledger we also processed went to the record.
Source: https://habr.com/ru/post/332686/
All Articles