📜 ⬆️ ⬇️

CubeDB: minimalistic storage of meters with multidimensional keys


Hi, Habr! My name is Dima Stanko, I work in the BI-team Badoo in the London office. It so happened in our company that we are trying to take as many measurements of user activity as possible. This is necessary for many specialists: developers test the efficiency of the code, colleagues from the product teams are convinced of the genius of their ideas, admins - that entropy will prevail not this night, and colleagues from the antispam department - that good just defeats evil in an eternal and epic battle.


We have written about all this many times and we will write again, because we believe that it is not good to step on a rake and not tell others about them.


Mobile analytics


A few years ago, Badoo had a need to analyze user actions in a mobile application. Every time someone pushed a button, loaded a screen, opened an application, wrote a message, we received a notification about it and were happy. A couple of hundred million times a day.


Soon there was a need to rejoice in more specialized cases, such as scrolling the screen to the end by the user, viewing another user’s profile, sending a gift. But this was not enough for our colleagues from the grocery team. This is how we got Hotpanel, a mobile application tracking system, and today we receive about 320 types of notifications for a total of 12 billion per day. We will definitely write about this, it’s a sin to hide such beauty.


One of the main components of the Hotpanel interface are hourly and daily charts with different breakdowns. To describe in a thousand words what can be described by a picture, it makes no sense, so - here:



You can filter by any combination of fields:



Prehistory


At the dawn of this project, when I was the only developer, and there were few messages, I decided to display the problem very quickly, cheaply and angrily, but, alas, not quite efficiently: at the front it was dc.js , and it was all “back” Redis, where each type of message and each unit of time had its own HASHMAP, the keys of which were the fields and their values, and the value the number of times the message with such a combination of fields came to us.


Example:



All this meant that on September 17, 2016, a male user on his Android device opened our application version 1.2.3 and rushed to look at the picture.


If on the same day another male user on the Android device opened our application version 1.2.3 and also rushed to look at the picture, we had HINCRBY , the value became 1500001, which gave much joy to our product manager responsible for the Android pictures in version 1.2.3.


Next to Redis, there was a service in Python and Flask, which was connected to Redis, HGETALL 'all dictionaries with hourly:view_screen:2016-07-17 for hourly:view_screen:2016-10-17 , sculpted one wonderful JSON structure and merged This is all a client on dc.js. There were, of course, many optimizations, but I will not talk about them, because all this - as they say, is the case of days gone by **.


Everything was just amazing, while combinations were few. The speed was amazing ( Crossfilter , which is the basis of dc.js , written by the author of the d3 package and has a response time of less than 30 ms). In general, it was a success. Short


This success has ruined us. With the increase in the number of message types, the emergence of new fields and values, the number of combinations grew by leaps and bounds. Using the interface turned into torture. We rested in such exotic ceilings, such as, for example, limiting the maximum size of a received JSON object . But we ignored this “sign from above” and came up with a tricky (as it seemed to us) solution with breaking JSON into pieces and then “gluing” it on the client. A separate torture was the questions of our colleagues from the web development department: “In two minutes, your page ate one and a half gigabytes of RAM and killed Chrome, congratulations! How did you do it? And why are you still working with us ?! ”


To this shame and disgrace were added more white and fluffy guests from the Arctic , who began to brawl with Redis at night. Memory consumption has grown, and it turned out that even 192 GB is not so much. Calls from the monitoring guys at three o'clock in the morning were completely inappropriate (even my one and a half year old son never did that!).


In general, the same situation has ripened when "the lower classes do not want the old way and the tops can not oldly." It's time to act.


System requirements


It was necessary to find or invent a miracle-stray that would hang on the back-end and know the following:


  1. Store meter data for 120 days (this is about 100 million different combinations; about 27 GB of data in uncompressed form).
  2. Filter by any combination of fields and by date range. The format is field1 in ('val11', 'val12' ... ) AND field2 in ('val21', 'val22', ...) .... AND dt between x and y , and it is clear from this that the indices we would not help, unfortunately.
  3. Present the results in the form of facets. If there are eight fields in the message, then eight dictionaries should be issued - for each field. Each dictionary must contain counters for each possible field value. If it is to be very pedantic, then in SQL it should look like this:


     select 'G1' as name, G1, SUM(M) from T WHERE D2 in (DQ2) and D3 in (DQ3) ... -- skip all filters related to G1 and p between PFrom and PTo group by name, G1 UNION ALL select 'G2' as name, G2, SUM(M1) from T WHERE D1 in (DQ1) and D3 in (DQ3) ... -- skip all filters related to G2 and p between PFrom and PTo group by name, G2 UNION ALL ... UNION ALL select 'GN' as name, GN, SUM(M1) from T WHERE D1 in (DQ1) ... and D(N-1) in (DQ(N-1)) ... -- skip all filters related to GN and p between PFrom and PTo group by name, GN UNION ALL select 'p' as name, p, SUM(M1) from T WHERE D1 in (DQ1) ... and Dn in (DQn) ... group by 'name', p 

  4. Do not strain if new fields and new field values ​​are added.
  5. Do not strain if new message types appear.
  6. To give out the result almost instantly, that is, on average over 100 ms, including the network, at worst - in 2 seconds (in this case we will screw a spinning wheel on our page).
  7. Be able to insert 3 million new combinations in a maximum of a minute.
  8. To be able to quickly delete data for the past days.
  9. All this should work on the existing infrastructure, that is, either on the same machine (192 GB of memory, 48 cows), or on the Hadoop cluster, or on the Exasol cluster, which we have there just at hand.
  10. All this should be simple in support and allow yourself to be monitored, not to call it up, and not to talk about your sores with riddles.

Facilitating circumstances:


  1. We did not need real persistence to save data immediately after each change. New units were added once an hour, so it was necessary to save everything to disk once immediately after loading. However, preservation should not have been blocking.
  2. The number of values ​​of each field is no more than 1000.
  3. The number of fields is no more than 100 (they are all of type String).
  4. No ACL (yet).
  5. No transactions, etc. Naturally, the counters should be updated atomically.

Intermediate conclusions:


  1. You can process 27 GB for 100 ms only in the case of clever compression or cunning indexing and the use of all CPUs.
  2. Key-value stores will not help us. The various scripting capabilities that are available on Lua in Redis and Tarantool would help, but they are still single-threaded and are unlikely to process so much data in time.
  3. Relational databases also do not roll due to paragraphs 4 and 5 of the requirements.
  4. Presto, Impala and others like them - of course, well done, but for 100 ms they will not be able to do anything. Yes, and 100m records for them - it's like shooting a gun on the sparrows. About Hadoop and MapReduce do not even stutter.
  5. Sly and interesting things like Druid and InfluxDB would probably solve this problem, but they are too complicated. There was simply no opportunity to put a separate cluster for this. My more capable and less lazy colleagues have already written about this.
  6. You probably noticed that it all looks like a time-series. Yes, practically it is, but not in face, but in profile. In fact, each graph that we have is not one time-series, but the sum of millions of combinations. So we also dropped the time-series store.

Due to the inherent hazards and lack of time to test all the possibilities, I liked to put a tool to solve this problem in a naive tone at interviews. But even good candidates I could not say anything encouraging; so last hope disappeared, Elasticsearch, which seems to have been intended for a faceted search, too slowly for our atomic icebreaker.


By that time, the constant humiliation from colleagues, the lack of normal sleep due to the night riot of Redis, the ever-diminishing hope of finding something ready and free, and, admittedly, purely geek interest in solving the exciting problem made itself felt: my eyes were filled with blood , the mind has clouded, and I decided to create a solution myself.


I wrote at home, and the already mentioned one and a half year old son covered me. We have agreed with him that while he is sleeping peacefully after seven in the evening, I sit in his nursery and write a code under the guise of a paternal vigil. That couple of weeks is still remembered as the happiest since fatherhood.


A couple of years ago, when we were choosing a new analytical database , I was amazed at the speed of those solutions that operated only in memory. Maybe the idea to create a data structure that gives the result in O (N) time is very unsuccessful at the interview, but in practice it is not so sad at all (provided that all these N elements live together in memory).


I have inside neon


In fact, there were two problems: how to insert data and how to retrieve it.


Removing counters


An elegant structure was created for data storage. All aggregates for a certain type of message were proposed to be stored in a separate object (let's call it a cube). At the very top level, we have just a map of these cubes with the name of it as a map key.


Each cube is partitioned by time (in our case, by day and hour). Each partition stores aggregates in a row. Adding a new field is decided by the trivial addition of a new element to the map of these fields.


Since we know that the maximum number of all possible values ​​for each field is small, we can encode it with a digit, and in our case 10 bits will suffice (but to simplify the task and the reserve, I made it 16-bit). This not only saves memory, but also makes searching faster, since now you only need to compare 16 bits, and not the entire string. In our case, there are two types of columns: by which values ​​are searched, they are 16-bit, and the counters themselves are 64-bit.


Thus, the key already known to us is screen_name=view_photo,prev_screen=welcome,platform=android,app_version=1.2.3,gender=male with a value of 1500000 turns into the following data: 1: 1: 1: 1: 1 1500000. And all this now takes 5x2 + 8 = 18 bytes. Savings - there.


The search is solved by a simple full scan, with each line the value of the counter of the corresponding field value is increased by the value from the counter field.


The speed is also not bad. The current implementation in Java can calmly scan through 20 million values ​​per second in a single processor on a Mac. Since we need a faceted search , we need to read the values ​​of all the fields, and not just those with the filter values, so in the case of five fields we still get 4 million counters per second.


Insert


Unfortunately, such an elegant data structure cannot cope with insertion at all. If for the insertion of each element we need to comb through the entire memory (which takes, say, 100 ms), the insertion of 3 million records (namely, so many combinations we accumulate every hour) will take approximately 83 hours, which will not go into any gate. Somehow even it would be insulting for the CPU cycles wasted.


On the other hand, we know that data is only inserted for the last few hours or days. We also know that if we insert something, we have all the fields. Therefore, nothing prevents us from creating a reverse index, where the key will be the numerical values ​​of the fields (1: 1: 1: 1: 1), and the value will be the line number where this combination is located. Such a structure is created in a fraction of a second, and the key search time is generally a fraction of a millisecond.


In our case, we managed to achieve a speed of about 150,000 records per second. This is, of course, subject to an internal call, not taking into account the time of de-serialization of the REST request and the network. Such a reverse index lives in the cache and is created for each partition. If it has not been accessed for several days, it is deleted and the memory is freed.
Most of our data is inserted on the same (or previous) day when they were sent, so there are really few partitions with existing reverse indexes.


As I said, our search is trivial, and therefore it can be trivially parallelized. Therefore, partitions are combined in parallel, in separate threads. With an average of eight fields and 48 processors on the server, we are able to achieve a scan speed of 120 million lines per second. That is, if the number of different combinations does not exceed 12 million, we still fit into the 100 ms period we set. This, of course, in an ideal world, but we are almost there already.


Saving to disk


Write data to disk makes sense immediately in such a compact form. Dictionaries are written to the beginning of the file, and then data columns with numbers instead of rows. And all this economy is also compressed. Oddly enough, in the case of Java, the processor (and gzip compression) turned out to be the slowest component. The transition to Snappy allowed to reduce the save time from 60 s to 8 s.


“Just Fireman” also provides for saving in JSON format, so that you can manually reload data in case of loss of compatibility of future systems.


Communication with the world


The whole interface is made through REST. This is probably the most boring part of all software, so I guess I will not even write anything about it. Insert via PUT, requests via GET, delete via DELETE, etc.


Java


As I mentioned, I wrote it all in Java. Of all the so-called fast languages, I can only program in it. But I have a very strong suspicion that C would work even faster. Snacks like SIMD, I think, would really speed up the system. The dream of life - in general, to rewrite all this to Rust. But since at this stage, the performance suits us, and the son has grown and no longer agrees to go to bed at seven in the evening, we will have to wait a bit with this ***.


In general, Java pleased me and upset at the same time. In terms of full scan there is very good performance, I did not even expect it. I was upset by a garbage collector who constantly panics when he has a lot of hanging and nothing is released. Therefore it was necessary to write all these data structures for off-heap, using allocateDirect and unsafe'y. And all this, of course, is very cool, but the impression is that the code is in C, not in Java. I'm not sure that when the universe created the Java language, it assumed exactly this kind of event.


The garbage collector upset me even more when it was necessary to create these same counters for facets. When 48 processors simultaneously create a HashMap for a couple of thousand items, the assembler behaves as if it is his first time in his life. As a result, full scan pairs of millions of lines takes less time than the transformation of results from numbers to lines and the subsequent consolidation of data from all partitions.


The present


At the moment, our only copy contains about 600 cubes, which consist of about 500 million records. All this takes about 80 GB of resident memory, and backup in compressed form (Snappy) - about 5 GB on disk. The save to disk operation takes about 30 seconds.
The system is very stable and after fixing , MIN_VALUE did not fall even once.


Future


Reasoning about the future is my favorite, since you can say anything, but you don’t need to show numbers.


So, there are several options to improve the world:


  1. First, as I mentioned, rewrite the solution in a language more suited to such things as fast and multi-threaded data processing, where the garbage collector will not embarrass everyone and himself with his presence.
  2. Secondly, it would be cool to teach CubeDB to communicate with their own kind and grow into a whole cluster.
  3. Thirdly, since everything is so fast in our memory, it makes sense to drag some tricky algorithms closer to the data. For example, some semblance of Anomaly Detection, as it was done in the Facebook Gorilla .

Since I have clearly more ideas than time, we have placed our solution in the public domain . And since you have read this far, you are probably interested. Come, see, play, use. I believe that the idea is very simple, but interesting, and there is still a lot that can be improved. Dare!


PS Data without a visualization is like a disco without music, so we simultaneously released a set of front-end components for working with the CubeDB API, and also made a simple page to demonstrate the possibilities. I would like to warn, however, that the demo runs in a cloud on a machine with one core and it’s difficult to estimate the actual processing speed: in our internal system, on a real iron and forty-eight cores, the speed differs dramatically.


* Actually, no - we stored everything in JSON, which is terribly inefficient.
** Actually no, I just looked. Forgot to demolish - just stopped using. I've turned it off - the guys from the monitoring department immediately called me. Well done!
*** Why don't you try?


')

Source: https://habr.com/ru/post/342564/


All Articles