📜 ⬆️ ⬇️

Product Analytics VKontakte based on ClickHouse



Developing any product, be it video service or tape, stories or articles, I want to be able to measure the conditional "happiness" of the user. Understand whether we make our changes better or worse, correct the direction of product development, relying not on intuition and our own feelings, but on metrics and numbers that you can believe in.

In this article I will tell you how we managed to launch product statistics and analytics on a service with a 97-million monthly audience, while receiving an extremely high performance of analytical queries. It will be a question about ClickHouse, used engines and features of requests. I will describe an approach to data aggregation, which allows us to obtain complex metrics in a fraction of a second, and talk about data conversion and testing.
')
Now we have about 6 billion food events per day, in the near future we will reach 20-25 billion. And then - not so fast we will rise to 40-50 billion by the end of the year, when we describe all the food events of interest to us.

1 rows in set. Elapsed: 0.287 sec. Processed 59.85 billion rows, 59.85 GB (208.16 billion rows / s., 208.16 GB / s.)

Details under the cut.

Foreword


Analytical tools were VKontakte before. Unique users were considered, it was possible to build event schedules by cut and thereby fall into the depth of the service. However, it was about pre-fixed sections, about aggregated data, about HLL for unique users, about some constraint and the inability to quickly answer questions a little more complicated than “how much?”.

Of course, it was, is and will have hadoop, many, many logs of using services are also written, written and will be written to it. Unfortunately, hdfs was used only by some teams to implement their own tasks. Even more sadly, hdfs is not about quick analytical queries: there were questions to many fields, the answers to which had to be found in the code, and not in the documentation accessible to all.

We concluded that it was impossible to live this way anymore. Each team must have data, the queries above them must be fast, and the data itself must be accurate and rich in useful parameters.

Therefore, we have formulated clear requirements for the new system of statistics / analytics:


On the kitchen


Experience suggested that we need two bases: a slow one, where we would aggregate and enrich data, and a fast one, where we could work with this data and on top of which we would build graphs. This is one of the most common approaches, in which in a slow base, for example, in hdfs, different projections are built - on unique and on the number of events on cuts for a certain period of time.

On a warm September afternoon, while talking over a cup of tea in the kitchen overlooking the Kazan Cathedral, we had an idea to try ClickHouse as a quick base - at that time we already used it to store technical logs. There were many doubts related primarily to speed and reliability: the declared performance tests seemed unreal, and new releases of the database periodically broke down the existing functionality. Because the proposal was simple - try.

First samples


We deployed a cluster of two machines with this configuration:
2xE5-2620 v4 (32 cores in total), 256G ram, 28T places (raid10 with ext4).

Originally was the near layout, but then we switched to far. ClickHouse has many different table engines, but the main ones are from the MergeTree family. We chose ReplicatedReplacingMergeTree with similar settings:

PARTITION BY dt ORDER BY (toStartOfHour(time), cityHash64(user_id), event_microsec, event_id) SAMPLE BY cityHash64(user_id) SETTINGS index_granularity = 8192; 

Replicated - means that the table is replicable, and this solves one of our requirements for reliability.

Replacing - the table supports deduplication by the primary key: by default, the primary key is the same as the sort key, so the ORDER BY section says exactly what the primary key is.

SAMPLE BY - I also wanted to try sampling: sample returns a uniformly pseudo-random sample.

index_granularity = 8192 is the magic number of data lines between the index serifs (yes, it is sparse), which is used by default. We did not change it.

Partitioning done by day (although by default - by month). Many requests for data were supposed to be intraday — for example, plot a minute video view for a particular day.

Next, we took a piece of technical logs and filled the table with about a billion lines. Excellent compression, grouping by Int * type columns, counting unique values ​​- everything worked incredibly fast!

Speaking of speed, I mean that not a single query lasted longer than 500 ms, and most of them fit into 50–100 ms. And this is on two machines - and, in fact, only one was engaged in calculations.

We looked at all this and presented that instead of the UInt8 column there would be the country id, and the Int8 column would be replaced by data, for example, about the user's age. And we realized that ClickHouse is completely suitable for us, if we do everything right.

Strong data typing


The benefits of ClickHouse begin exactly when the correct data scheme is formed. Example: platform String is bad, platform Int8 + dictionary is good, LowCardinality (String) is convenient and good (I will talk about LowCardinality a little later).

We have created a special generator class in php, which, upon request, creates wrapper classes over events based on tables in ClickHouse, and a single entry point to logging. Let me explain by the example of the scheme, which turned out:

  1. Analyst / data engineer / developer describes the documentation: what fields, possible values, events need to be logged.
  2. The table is created in ClickHouse in accordance with the data structure from the previous paragraph.
  3. Generate wrapper classes for events based on a table.
  4. The product team implements filling in the fields of an object of this class, sending.

Changing the scheme at the php level and the type of logged data will not work without first changing the table in ClickHouse. And this, in turn, cannot be done without coordination with the team, changes in documentation and description of events.

For each event, you can set two settings that regulate the percentage of events sent to ClickHouse and hadoop, respectively. Settings are needed first of all for gradual rolling with the ability to cut down logging if something goes wrong. Before hadoop, data is delivered in a standard way using Kafka. And in ClickHouse, they fly through the KittenHouse scheme in a persistent mode, which guarantees at least one-time event delivery.

The event is delivered to the buffer table on the desired shard, based on the remainder of dividing some hash from user_id by the number of shards in the cluster. Next, the buffer table resets the data to the local ReplicatedReplacingMergeTree. And on top of the local tables, the distributed table with the Distributed engine is stretched, which allows you to access data from all shards.

Denormalization


ClickHouse is a column DBMS. It is not about normal forms, which means it’s better to have all the information right in the event than join-it. Join-s too, but if the right table does not fit in the memory, the pain begins. Therefore, we made a volitional decision: all the information we are interested in should be stored in the event itself. For example, gender, age of the user, country, city, birthday — all that is public information that can be useful for analytics of the audience, as well as all useful information about the interaction object. If, for example, we are talking about a video, it is video_id, video_owner_id, the date the video was uploaded, the length, the quality at the time of the event, the maximum quality, and so on.

In total, in each table we have from 50 to 200 columns, while in all the tables there are service fields. For example, the error log error_log - in fact, we call an error out of range types. In case strange values ​​will flow into the field with age, beyond the size of the type.

Type LowCardinality (T)


ClickHouse has the ability to use external dictionaries. They are stored in memory, periodically updated, and can be effectively used in various scenarios, including as classic reference books. For example, you want to log the operating system and you have two alternatives: a string or a number + directory. Of course, on large amounts of data, and for high-performance analytical queries, it is logical to write a number, and get a string representation from the dictionary when you need:

 dictGetString('os', 'os_name', toUInt64(os_id)) 

But there is a much more convenient way - to use the LowCardinality (String) type, which automatically builds a dictionary. The performance of LowCardinality under the condition of low cardinality of the set of values ​​is radically higher than with String.

For example, we use LowCardinality (String) for the event types 'play', 'pause', 'rewind'. Or for the platform: 'web', 'android', 'iphone':

 SELECT vk_platform, count() FROM t WHERE dt = yesterday() GROUP BY vk_platform Elapsed: 0.145 sec. Processed 1.98 billion rows, 5.96 GB (13.65 billion rows/s., 41.04 GB/s.) 

The feature is still experimental, so to use it you must perform:

 SET allow_experimental_low_cardinality_type = 1; 

But there is a feeling that after some time it will be no longer under the setting.

VKontakte data aggregation


Since there are a lot of columns, and there are a lot of events, then the natural desire is to cut the “old” partitions, but before that - to assemble aggregates. Occasionally it is necessary to analyze raw events (a month or a year ago), so we do not cut the data in hdfs - any analyst can refer to the desired floor for any date.

As a rule, during aggregation, in the time interval we always rest on the fact that the number of rows per unit of time is equal to the product of the shear powers. This imposes restrictions: countries begin to gather into groups such as Russia, Asia, Europe, Rest of the World, and ages into intervals in order to lower the dimension to a conditional million lines per date.

Aggregation by dt, user_id


But we have a click ClickHouse! Can we accelerate to 50−100 million lines on the date?
Quick tests have shown that we can, and at that moment a simple idea arose - to leave the user in the unit. Namely, to aggregate not by “date, slices” by spark, but by “date, user” by means of ClickHouse, while doing some “transposing” the data.

With this approach, we retain users in aggregated data, which means we can still count the audience indicators, retention metrics and frequencies. We can connect the units, considering the general audience of several services, up to the entire VKontakte audience. All this can be done on any slice that is present in the table for the same time.

I will illustrate with an example:



After aggregation (many more columns on the right):



In this case, the aggregation occurs exactly by (dt, user_id). For fields with information on the user with such an aggregation, you can use the functions any, anyHeavy (selects a common value). You can, for example, build into an anyHeavy (platform) aggregate in order to know from which platform the user basically triggers video events. If desired, you can use groupUniqArray (platform) and store an array of all the platforms from which the user triggered the event. If this is not enough, you can create separate columns for the platforms and store, for example, the number of unique videos scanned to half of a specific platform:

 uniqCombinedIf(cityHash64(video_owner_id, video_id), (platform = 'android') AND (event = '50p')) as uniq_videos_50p_android 

With this approach, a fairly wide aggregate is obtained, in which each row is a unique user, and each column contains information either on the user or on his interaction with the service.

It turns out, to calculate the DAU service, it is enough to execute such a query on top of its aggregate:

 SELECT dt, count() as DAU FROM agg GROUP BY dt Elapsed: 0.078 sec. 

Or calculate how many days users have been in the service for the week:

 SELECT days_in_service, count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 2.922 sec. 

We can speed up the sampling, while almost without losing accuracy:

 SELECT days_in_service, 10 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 10 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 0.454 sec. 

It should be immediately noted that sampling is not based on the percentage of events, but on the percentage of users - and as a result it becomes an incredibly powerful tool.

Or the same for 4 weeks with 1/100 sampling - about 1% less accurate results are obtained.

 SELECT days_in_service, 100 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 100 WHERE dt > (yesterday() - 28) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 28 rows in set. Elapsed: 0.287 sec. 

Aggregation on the other hand


When aggregating by (dt, user_id), we do not lose the user, do not greatly miss the information about its interaction with the service, however, we undoubtedly lose metrics about a specific interaction object. But you can not lose it - let's build a unit by
(dt, video_owner_id, video_id), sticking to the same ideas. We preserve information about the video as much as possible, do not greatly miss the data on the interaction of the video with the user and completely skip information about a specific user.

 SELECT starts FROM agg3 WHERE (dt = yesterday()) AND (video_id = ...) AND (video_owner_id = ...) 1 rows in set. Elapsed: 0.030 sec 

Or the top 10 video views yesterday:

 SELECT video_id, video_owner_id, watches FROM video_agg_video_d1 WHERE dt = yesterday() ORDER BY watches DESC LIMIT 10 10 rows in set. Elapsed: 0.035 sec. 

As a result, we have a scheme of aggregates of the form:


Azkaban and TeamCity


It is worthwhile to say a few words about the infrastructure. The collection of aggregates is started at night, starting with OPTIMIZE over each of the tables with raw data in order to cause an extraordinary merging of data in the Replicated ReplacingMergeTree. The operation may take a long time, but it is necessary to remove duplicates, if any. It is worth noting that while I have never come across duplicates, there are no guarantees that they will not appear in the future.

The next step is the creation of aggregates. These are bash scripts in which the following occurs:


This is not entirely optimal and can generate a lot of networking between hosts. However, when adding new shards, everything continues to work out of the box, the locality of the data for the aggregates is preserved, so we decided not to worry much about this.

We have Azkaban as our task scheduler. I would not say that this is a super-convenient tool, but it copes well with its task, including when it comes to building slightly more complex pipelines and when one script needs to wait for the execution of several others.

The total time it takes to convert existing events into aggregates is 15 minutes.

Testing


Every morning, we run automated tests that answer questions about raw data, as well as the availability and quality of the aggregates: “Check that yesterday or there were no data on any data in the raw data or in aggregates less than half a percent compared to the same day a week ago. ”

Technologically, this is the usual unit tests using JUnit and implementing the ClickHouse jdbc driver. All tests are run in TeamCity and run for about 30 seconds in 1 thread, and in case of fail, we get VK notifications from our wonderful TeamCity bot.

Conclusion


Use only stable versions of ClickHouse, and your hair will be soft and silky. It is worth adding that ClickHouse does not slow down .

Source: https://habr.com/ru/post/445284/


All Articles