📜 ⬆️ ⬇️

Riak usage patterns

Riak is a NoSQL solution, an honest DHT (key / value storage) with additional conflict resolution features.

A distributed hash table has both advantages and disadvantages. DHT scales well, but data may be lost due to concurrent access conflicts, consider the following example:

client a: def o-value = DHT.get("some-key");
client a: def a-value = changeValue(o-value);
client b: def o-value = DHT.get("some-key");
client a: DHT.put("some-key", a-value);
client b: def b-value = changeValue(o-value);
client b: DHT.put("some-key", b-value);


It turned out that client b rewrote client data a and no one knows about it (neither a, nor b, nor the one who reads the data on this key later).
')
Since many NoSQL databases are basically DHT, it is interesting to see how they are trying to solve the problem of competitive access.

For example, MongoDB uses the compare-and-swap strategy : its version is stored with each document (value), the update indicates the version of the “ancestor” of the changed document, if the base contains an ancestor at the time of the update, then the update passes, if not, then not: the update party receives the message, and tries to update again - an analogue STM. This approach works well with shards, but poorly with replication.

Riak solves the problem of competitive access, like version control systems, he, as it were, keeps conflicting versions in different branches, giving the program a merge for the next sample. This approach allows us to resolve conflicts related not only with competitive access, but also with the temporary isolation of a part of the cluster (partition tolerance: a cluster of machines can split into two parts, both parts will work and will be able to unite in the future without problems).

Riak imposes more conditions on the development, but provides scalability and reliability of data when working with a large amount of information. The article will describe how to get around the limitations of Riak when developing typical web applications.

Blog


Consider the first primitive, implemented on the basis of Riak - append-only list of small size.

Imagine that we are writing a blog, for each post of which there will be not very many comments and a comment cannot be changed after adding. In this case, it is reasonable to store the entire post with comments as a value, so the read operation will be past O (1). We describe the scheme for the data:

 public class Post { public static class Comment implements Comparable<Comment> { public String text; public int ts; //timestamp /* equals, hashCode, compareTo*/ } public String text; public List<Comment> comments; } 

Now imagine that two users "simultaneously" added a comment:

client a: def o-post = DHT.get("post/13");
client a: def a-post = addComment(o-value, " , ");
client b: def o-post = DHT.get("some-key");
client a: DHT.put("post/13", a-post);
client b: def b-post = addComment(o-post, " ");
client b: DHT.put("post/13", b-post);


Now the database with the id “post / 13” stores two entries; and the first person to apply for this key will receive both of them and will have to sgdjit on their own. For simplicity, suppose that a post cannot be edited, so a post from any “branch” is suitable, and since comments can only be added, the comment lists of both posts have a common prefix, therefore, you need to select it and create a new list from the prefix, its additions of the first list and its additions of the second list. The operation merge will be as follows:

 public static Post merge(Post a, Post b) { Post c = new Post(); c.text = a.text; c.comments = Mergers.mergeLists(a.comments, b.comments); return c; } 

Where mergeLists is defined as follows:

 public static <T extends Comparable<T>> List<T> mergeLists(List<T> a, List<T> b) { List<T> result = new ArrayList<T>(); List<T> rest = new ArrayList<T>(); int max = Math.min(a.size(), b.size()); int i=0; //    for(;i<max && a.get(i).equals(b.get(i));i++) { result.add(a.get(i)); } //   for(int j=i;j<a.size();j++) { rest.add(a.get(j)); } for(int j=i;j<b.size();j++) { rest.add(b.get(j)); } //   Collections.sort(rest); //   for(T item : rest) { result.add(item); } return result; } 

It is obvious that mergeLists is very similar to the union of sets, and therefore, if some element was in a or b, then it will be in the resulting list, therefore, there is no data loss during a merge. It turns out that now we have learned to write to the list in Riak, avoiding the problems of competitive access.

If you need several posts, then use the merge inside the fold of the combinator.

Alerts (messages, updates)


The following primitive, which we consider will be a fixed-size list with the possibility of deletion. In the case when it reaches the maximum size, some element is thrown out of it (for example, the oldest, although in distributed systems this concept is rather conditional). Since the size of the list is fixed, we will store the entire list again as one value.

All sorts of notifications fit this primitive well. First, event notification is much less important than the event itself; secondly, it is unlikely that the user wants to see a notification about the old event if he has not logged in for a long time; thirdly, if the user is already studying the information about the event, then the notification should be deleted.

Unlike the previous scheme, where one list of objects was stored as a value, two lists will be stored here: the list of “alerts” and the list of deleted “alerts”. In the case of merge, the corresponding lists will be merged and, thus, the deleted object will remain in the list of deleted objects, and the added one in the list of added ones (of course, after the merge, the deleted ones will have to be subtracted). We write more formally:

image

The problem is that with such a definition of operations, our lists grow indefinitely, although they implement the necessary primitive. Let's try to limit these lists: if the list of added objects has grown to the maximum - transfer any object to the list of deleted objects, if the list of deleted objects has grown - remove some object from it, again a little more formally:

image

After each add, delete, and merge operation, you must perform a ram operation. It is clear that having bordered the length of the lists we have lost something and, most likely, undesirable behavior. Let's try to measure it. In our case (when objects are lost), the only undesirable behavior is the appearance of a remote alert. To measure this indicator, I modeled the process and made a series of observations. It is quite obvious that the number of errors should depend on the length of the list and on the product of the duration of processing the request for the request frequency (this is true, I checked), let's call this parameter key. Below are a few graphs by which you can understand the dynamics:

The percentage of write errors from the list length

The graph reflects the proportion of changes in the list, after which the remote "alert" reappeared as new, depending on the maximum number of items in the list. The key parameter was fixed (0.8 and 2), which corresponds to approximately 8 requests per second and 20 requests per second. Below it will be written that it is not as small as it seems.
image

The percentage of write errors from the key parameter

The graph shows the dynamics of the percentage of errors depending on the key parameter at a fixed list length (30 and 130 elements, respectively).
image

1% error zone

The key parameter is plotted on the abscissa, the red line is responsible for the value 1; the ordinate is the length of the list, the red lines are responsible for 100, 200 and 300 elements. Black marks the zone of parameters for which the error is less than 1%.

image

Why 10 requests per second is not as small as it seems. Firstly, only write requests are taken into account, and secondly, it is not the total number of requests, but the number of requests to a single object. In case we design, for example, google +, then 10 requests per second is not the number of calls to all Google, but the predicted frequency of comments to one record.

Flow (wall in VKontakte or tape to twitter)


The last pattern in this article is a large list, with the ability to add records to the end, read records both from the end and from the beginning, as well as receive several records for one request. It is assumed that there will be very few deletions from the list.

This primitive well describes the tape on Twitter, as well as the walls on social networks, but you can think of other applications for it.

Unlike the previous schemes, which were described by one key-value pair, this pattern is described by several - a key-value service pair with information on the beginning and end of the list, as well as data chunks in which a fixed-size segment is stored from the list. We define the date model, as well as the merge operation for each data type:

 class Info { public String key; public String prefix; public int lastChunk = 0; public String getChunkKey(int chunk) { return prefix + chunk; } public Info mergeWith(Info brother) { Info info = new Info(); info.key = key; info.prefix = prefix; info.lastChunk = Math.max(lastChunk, brother.lastChunk); return info; } } class Chunk<T extends Comparable<T>> { public String key; List<T> added = new ArrayList<T>(); List<T> deleted = new ArrayList<T>(); public void add(T obj) { added.add(obj); } public void delete(T obj) { deleted.add(obj); } public Iterable<T> getData() { List<T> data = new ArrayList<T>(added); data.removeAll(deleted); return data; } public Chunk<T> mergeWith(Chunk<T> brother) { Chunk<T> chunk = new Chunk(); chunk.key = key; chunk.added = mergeLists(added, brother.added); chunk.deleted = mergeLists(deleted, brother.deleted); return chunk; } } 


I think the operations are obvious. If you want to add an item to the list:Since the flow traversal occurs sequentially (twitter, wall ...) chunk after chunk, with frequent removal, it is possible to find empty chunks; then you will need to take the next chunk (or the following) to return the data. It turns out that with very frequent deletion the duration of the data acquisition operation cannot be estimated, but with a rare deletion, it is O (1).

Conclusion


As can be seen from the article, on Riak, you can put data management schemes that are common to the web and subsequently get a painless distribution of this data across several nodes. This is achieved through the set of primitives that Riak provides the programmer.

I liked the Riak approach because of the transparent approach to conflict resolution and flexibility in choosing constraints (CAP) with each request.

Source: https://habr.com/ru/post/127228/


All Articles