How to count hits of the google.com page? And how to store the likes of very popular users? This article proposes to consider solving these tasks using CRDT (Conflict-free Replicated Data Types, which translates roughly as Conflict-Free Replicated Data Types), and more generally, replica synchronization tasks in a distributed system with several leading nodes.
1. Introduction
We have long been accustomed to using applications such as a calendar or note-type service like Evernote. They are united by the fact that they allow working offline, from several devices and several people at the same time (on the same data). The challenge facing the developers of each such application is how to ensure the most “smooth” synchronization of data changed simultaneously on several devices. Ideally, user participation should not be required at all to resolve merge conflicts.
In the previous article , an approach for solving such problems was considered - Operational Transformation, here a very similar method will be described, which has both advantages and disadvantages (for example, CRDT for JSON has not yet been invented. Upd: Thanks to the user msvn for the link, here is a project from the authors of a research article on JSON implementation in CRDT)
2. Strong eventual consistency
Recently, a lot of work has been written and a lot of research has been done in the field of eventual consistency. In my opinion, now there is a strong trend to shift from strong consistency to various consistency options, to research what consistency in which situations / systems it is more advantageous to apply, to rethink existing definitions. This leads to some confusion, for example, when the authors of some works, arguing about consistency, mean eventual consistency with some additional property, while other authors use certain terminology for this. ')
The question raised by the authors of one of the articles criticizes the current definition of eventual consistency: according to him, if your system always responds to all requests with the answer "42", then everything is OK, it will eventually be consistent.
Without disturbing the correctness of this article, I, following the authors of the original articles, will use the following terminology (please note, these are not strict definitions, these are differences):
Strong consistency (SC): all write operations are strictly ordered, the read request on any replica returns the same, last recorded result. Real time consensus is needed to resolve conflicts (with ensuing consequences), withstand a drop to n / 2 - 1 node.
Eventual consistency (EC): update the data locally, send the update further. Reading on different replicas can return stale data. In case of conflicts, either roll back or somehow decide what to do. So consensus is still needed, but not in real time .
Strong eventual consistency (SEC): EC + for conflict resolution replicas have a predefined algorithm. So no consensus is needed , withstand drops to n - 1 nodes.
Note that SEC (as it were) solves the problem of the CAP theorem: all three properties are satisfied.
So, we are ready to sacrifice SC and want to have a certain set of basic data types for our potentially unstable distributed system that will automatically resolve recording conflicts for us (no user interaction or a request to an arbiter is required)
3. Tasks about likes and hits
Undoubtedly, there are several algorithms for solving such problems. CRDT offers a rather elegant and easy way.
Google.com hits count:
google.com processes approximately 150,000 requests per second from all points of the planet. Obviously, the counter needs to be updated asynchronously. Queues solve a problem partially - for example, if we provide an external API to get this value, then we will have to do replication in order not to put the repository with read requests. And if you already have replication, can it be possible without global queues?
User likes count:
The task is very similar to the previous one, but now you need to count unique hits.
4. Terminology
For a more complete understanding of the article, you need to know about the following terms:
Idempotency Says that the use of the operation several times does not change the result. Examples - GET operation or addition with zero: f ( x ) = x + 0
Partial order Reflexivity + Transitivity + Antisymmetry
Semilattice Partially ordered set with exact upper (bottom) face
Version vector The vector of dimension is equal to the number of nodes, and each node upon the occurrence of a particular event increases its value in the vector. During synchronization, data is transmitted with this vector and this introduces an order relationship, which allows determining which replicas have old / new data.
5. Synchronization models
State-based (state synchronization):
Also called passive synchronization, it forms the Convergent Replicated Data Type - CvRDT. Used in file systems such as NFS, AFS, Coda, and in KV storages Riak, Dynamo In this case, the replicas are exchanged directly with the states, the receiving replica merges the received state with its current state.
To perform replica convergence using this synchronization, you need to:
The data formed a semilattice
The merge function produced an exact upper bound.
Replicas formed a connected graph
Example:
Data set: natural numbers m a t h b b N
Minimum item: - i n f t y
m e r g e ( x , y ) = m a x ( x , y )
Such requirements give us a commutative and idempotent merge function, which monotonously grows on a given data set:
This ensures that the replicas will converge sooner or later and allows you not to worry about the data transfer protocol - we can lose messages with the new state, send them several times, and even send them in any order .
Operation-based (synchronization operations):
Also called active sync, it forms the Commutative Replicated Data Type - CmRDT. Used in cooperative systems such as Bayou, Rover, IceCube, Telex.
In this case, the replicas exchange status update operations. When updating data, the original replica:
Calls the generate () method which returns the effector () method for execution on the remaining replicas. In other words, effector () is a closure for changing the state of the other replicas.
Apply effector to local state
Sends effector to all other replicas.
To perform replica convergence, the following conditions must be met:
Reliable delivery protocol
If effector is delivered to all replicas in accordance with the entered order (for this type), then simultaneous effector is commutative, or
If effector is delivered to all replicas without regard to order, then all effector is commutative.
In case effector can be delivered several times, then it must be idempotent
Some implementations use queues (Kafka) as part of the delivery protocol.
Delta-based:
Considering the state / op based it is easy to notice that if an update changes only part of the state, then there is no sense in sending the state as a whole, and also if a large number of changes affect one state (for example, a counter), then you can send one aggregated change, but not all operations changes.
Delta sync combines both approaches and sends delta-mutators, which update the state according to the latest sync date. During initial synchronization, it is necessary to send the state completely, and some implementations in such cases already take into account the state of the other replicas when building delta-mutators.
The next optimization method is op-based log compression, if delays are allowed.
Pure operation-based (pure synchronization operations):
In op-based synchronization, there is a delay in creating effector. In some systems, this may be unacceptable, then you have to send the original change at the cost of complicating the protocol and the additional amount of metadata.
Standard approaches of use:
If the system updates should be sent immediately , then state-based would be a bad choice, since sending a state is entirely more expensive than just an update operation. Delta-based is better, but in this particular case, the difference with state-based will be small.
If you need to synchronize a replica after a failure , then state-based and delta-based is the ideal choice. If you have to use op-based, then the options are:
1) Roll all missed operations from the moment of failure 2) A complete copy of one of the replicas and roll forward missed operations.
As noted above, op-based requires that updates be delivered exactly once to each replica. The requirement of delivery only once can be omitted if effector is idempotent. In practice, it is much easier to implement the first than the second.
Connection between Op-based and State-based:
The two approaches can be emulated through each other, so that in the future we will consider CRDT without reference to any particular synchronization model.
6. CRDT
6.1 Counter
Integer with support for two operations: inc and dec. As an example, consider the possible implementations for op-based and state-based synchronization:
Op-based counter:
Obviously enough, just send updates. Example for inc:
function generator(){ return function (counter) { counter += 1 } }
State-based counter:
The implementation is no longer so obvious, since it is not clear how the merge function should look.
Consider the following options:
Monotonically increasing counter (Increment only counter, G-Counter):
The data will be stored as a vector of dimension equal to the number of nodes (version vector) and each replica will increase the value at position with its id.
The merge function will take the maximum in the corresponding positions, and the final value will be the sum of all elements of the vector
\ begin {align} inc () &: V [id ()] = V [id ()] + 1 \\ value () &: \ sum_ {i = 0} ^ {n} V [i] \\ merge (C_1, C_2) &: i \ in [1..n] \ Result [i] = max (C_1.V [i], C_2.V [i]) \ end {align}
\ begin {align} inc () &: V [id ()] = V [id ()] + 1 \\ value () &: \ sum_ {i = 0} ^ {n} V [i] \\ merge (C_1, C_2) &: i \ in [1..n] \ Result [i] = max (C_1.V [i], C_2.V [i]) \ end {align}
You can also use the G-Set (see below)
Application:
Counting the number of clicks / hits (sic!)
Decrementing Counter (PN-counter)
We get two G-counter - one for increment operations, the second - for decrement
Application:
The number of logged in users on the p2p network, such as Skype
Non-negative counter (Non-negative counter)
A simple implementation does not yet exist. Offer in the comments your ideas, we will discuss.
6.2 Register
A memory cell with two operations - assign (write) and value (read). Problem - assign is not commutative. There are two approaches to solve this problem:
Last-Write-Wins Register (LWW-Register):
We enter the full order through the generation of unique id for each operation (timestamp, for example).
An example of synchronization is the exchange of pairs (value, id):
Application:
Columns in cassandra
NFS - whole or part file
Multi-Value Register, MV-Register:
The approach is similar to the G-counter - we store the set (value, version version). The register value is all values; when merged, the LWW is separately for each value in the vector.
Application:
Basket in the Amazon. A known bug is connected with this, when after removing things from the basket it appears there again. The reason is that despite the fact that the register stores a set of values ​​— it is not a set (see the picture below). Amazon, by the way, does not even consider it a bug - in fact it boosts sales.
Riak. In the more general case, we shift the problem of choosing the actual (note - there is no conflict!) Values ​​to the application.
The explanation of the bug in the Amazon:
6.3 Many
The set is the base type for building containers, maps, and graphs and supports the operations add and rmv, which are not commutative.
Consider a naive implementation of an op-based set, in which add and rmv are executed as they arrive (add and then rmv at 1 at the same time)
As you can see, the replicas eventually diverged. Consider the various options for constructing conflict-free sets:
Growing Set (G-Set):
The simplest solution is to prohibit deleting items. The only thing left is the add operation, which is commutative. The merge function is the union of sets.
Two Phase Set (2P-Set):
We allow to delete, but after removal it is impossible to add again. To implement, we create a separate set of deleted G-set elements (such a set is called tombstone set) Example for state-based:
\ begin {align} lookup (e) &: e \ in A \ land e \ notin R \\ add (e) &: A = A \ cup \ {e \} \\ rmv (e) &: R = R \ cup \ {e \} \\ merge (S_1, S_2) &: \\ Res & ult.A = S_1.A \ cup S_2.A \\ Res & ult.R = S_1.R \ cup S_2.R \ end {align}
\ begin {align} lookup (e) &: e \ in A \ land e \ notin R \\ add (e) &: A = A \ cup \ {e \} \\ rmv (e) &: R = R \ cup \ {e \} \\ merge (S_1, S_2) &: \\ Res & ult.A = S_1.A \ cup S_2.A \\ Res & ult.R = S_1.R \ cup S_2.R \ end {align}
LWW-element Set:
The next way to implement a conflict-free set is to introduce a complete order, one of the options is to generate unique timestamp for each element.
We create two sets — add-set and remove-set; when we call add (), we add (element, unique_id ()); if we check if there is an element in the set, we see where the timestamp is greater — in remove-set or in add-set
PN-Set:
Variation with the ordering of the set — we start a counter for each element, increase it when adding it, decrease it as it deletes. An element is in the set if its counter is positive.
Note the interesting effect - in the third replica the addition of the element does not lead to its appearance.
Observe-Remove Set, OR-Set, Add-Win Set:
In this type, add takes precedence over remove. Example of implementation: we assign a unique tag to each newly added element (relative to the element, and not the entire set). Rmv removes the element from the set and sends all pairs seen (element, tag) to remove the replicas.
Remove-win Set:
Similar to the previous one, but at the same time add / rmv wins rmv.
6.4 Graph
This type is based on the set. The problem is this: if there are simultaneous operations addEdge (u, v) and removeVertex (u) - what to do? Such options are possible:
RemoveVertex priority, all edges incident to this vertex are removed
AddEdge priority, remote vertices are restored
We postpone the execution of removeVertex until all simultaneous addEdge are executed.
The easiest option is the first, to implement it (2P2P-Graph), it’s enough to have two 2P-Set, one for the vertices, the second for the edges
6.5 Mapping (Map)
Display of literals (Map of literals):
Two problems to solve:
What to do with simultaneous put operations? By analogy with counters, you can choose either LWW or MV semantics
What to do with simultaneous put / rmv? By analogy with sets, one can either put-wins, or rmv-wins, or last-put-wins semantics.
CRDT mapping (Map of CRDTs):
A more interesting case, because allows you to build nested maps. We do not consider cases of changing nested types - this should be solved by the nested CRDT itself.
Remove-as-recursive-reset map
The remove operation “resets” the value of the type to a certain starting state. For example, for a counter it is a zero value.
Consider an example - a general shopping list. One of the users adds flour, and the second makes a checkout (this results in a call to a delete operation on all elements). As a result, only one flour remains in the list, which looks logical.
Remove-wins map
The rmv operation takes precedence.
Example: in an online game, player Alice has 10 coins and a hammer. Then two events occur simultaneously: on a cue, A she produced a nail, and on a cue B her character was removed with the removal of all the items:
Note that using remove-as-recursive would ultimately leave a nail, which is not the correct state when the character is removed.
Update-wins map
Updates have priority, or rather, cancel previous operations to delete simultaneous rmv.
Example: in an online game, the Alice character on the replica B is deleted due to inactivity, but at the same time, activity occurs on the replica A. Obviously, the delete operation must be undone.
There is one interesting effect when working with this implementation - suppose that we have two replicas, A and B, and they store a set of some key k. Then, if A removes the value of the key k, and B removes all the elements of a set, then, as a result, an empty set of the key k will remain in the replicas.
Note that the naive implementation will not work correctly - you cannot just undo all previous deletions. In the following example, with this approach, the final state would be as original, which is wrong:
List (List)
The problem with this type is that the indexes of the elements on different replicas will be different after a local insert / delete operation. To solve this problem, use the Operational Transformation approach - when applying the resulting change, the index of the element in the original replica should be taken into account.