📜 ⬆️ ⬇️

I am groot. We do our analytics on events



In the spring of this year, I learned about the ability of the HP Vertica database to create queries with matching event patterns. The so-called Events Pattern Matching went well under the task of analyzing user behavior in ivi.ru products. We decided to try to deal with the payment funnels, search for problem areas on the devices, dive deeper into the traffic analysis. Our team really likes the way analytics is implemented in Mixpanel and Localytics (it is based on events and their properties), so many ideas were borrowed from them.

What is going on?

Historically, for analytics, we, like most other projects, have used Google Analytics. At some point, data sampling on our volumes reached unimaginable scales - the samples were based on less than 0.5% of the audience. This made it impossible to work with small samples - they were either not visible at all, or the error was catastrophic. Plus, in GA it was impossible to throw a bunch of internal data on the content, which made it impossible to analyze in depth.

This fact gave rise to the development of its own system. This is how Groot was born - ivi.ru internal analytics.
')
We started with a list of requirements that Groot had to meet:


Architecture


In addition to HP Vertica column base, we decided to use Apache Kafka and Apache Storm, thus discovering the great and terrible world of Java.

Apache Kafka - pub / sub system. The main difference from the usual pub / sub implementations is that the subscriber can start reading messages not from the end, but from the beginning or middle. This solution allows you not to worry about data loss when the subscriber is not working.

Apache Storm is a distributed system for computing large amounts of data. In general, on the Storm can talk for a long time. We liked the integration with kafka out of the box, the ability to scale the system horizontally, and a fairly fast operation speed.

Top view

In general, the system works as follows:


First awkward steps



The first version worked very badly. More precisely, there was no problem sending data to kafka at all (everything works out of the box). And we had to tinker with apache storm, since we had to write our topology in java, which nobody in our company knows.

The topology in storm consists of the following parts:


I implemented the simplest bolt that received the event, parsed json and sent a pack to the base. The first tests revealed the following problems:


The first version was very simple: there are columns with id, name, subsite_id, user_id, ivi_id, ts. At the same time there were difficulties with the tables in Vertica also proved difficult.

As you can see, we did not record any more data. Then, however, they decided to record another browser, operating system, browser window size, version of the flash player. “Ha!”, We thought and made the following table:

| id | event_id | name | int_value | string_value | double_value | datetime_value | added |


They made a second bolt, which takes additional parameters from JSON, checks the type and writes it all into a new table.

Everything was fine, I was glad that it was so cool to realize, analysts were happy that you can add any parameters to the events and then build reports on them. At that time, the main source of events was the ivi.ru site itself, mobile applications have not yet sent anything. When they began to send, we realized that everything is very bad.

First let's look at our request for a simple “clicked” funnel -> “bought” for the Chrome browser:

 WITH groupped_events AS ( SELECT MIN(e.ts) as added, MIN(e.user_id) as user_id, e.name, MIN(CASE WHEN ep.name = 'browser' THEN string_value ELSE NULL END) as browser from events.events as e LEFT JOIN events.event_properties as ep ON ep.event_id = e.id WHERE e.added >= '2014-07-28' and e.added < '2014-07-29' and e.subsite_id = '10' GROUP BY e.id, e.name ) SELECT COUNT(q.match_id) as count, name FROM ( SELECT event_name() as name, user_id, match_id() as match_id FROM groupped_events as e WHERE e.name IN ('click', 'buy') MATCH ( PARTITION BY user_id ORDER BY e.added ASC DEFINE click as e.name = 'click' and e.browser = 'Chrome', buy as e.name = 'buy' PATTERN P as (click buy | click) ) ) as q GROUP BY q.match_id, q.name; 

See the catch? We join the label (now there are more than a billion records), group it and pull the required value through CASE. Of course, when we had a lot of events, it all began to slow down. Requests worked for several minutes, which did not suit us. Analysts complained about requests in half an hour, product specialists wanted to make me dark.

Why?

Separately, I want to clarify the fact that, after all, HP Vertica is a column database. It stores a lot of data in columns in a very compact way and, for example, allows you to add a new column on the fly without spilling over all the data. With our own all-in-one sign, the vertical was coping very poorly - she did not understand how to optimize this pile.

Then it was decided to drag the main parameters into the events table in separate columns, and form a list of parameters that are often used in queries. We have done this procedure 2 times. The first time we had a table with 30 columns, the second time, already with 50. After all these manipulations, the average execution time for all queries decreased by 6-8 times.

After all the manipulations, the previous request turned into a simple one:

 SELECT COUNT(q.match_id) as count, name FROM ( SELECT event_name() as name, user_id, match_id() as match_id FROM events.events as e WHERE e.name IN ('click', 'buy') MATCH ( PARTITION BY user_id ORDER BY e.added ASC DEFINE click as e.name = 'click' and e.browser = 'Chrome', buy as e.name = 'buy' PATTERN P as (click buy | click) ) ) as q GROUP BY q.match_id, q.name; 

At this, the torment with the base we stopped, in this form, everything lives for about 3 months and we have no pritenzy to it.

We still left the event_properties table so that we could develop applications faster, rather than wait for the update of the structure of the main table.

Apache storm

Having dealt with HP Vertica, we began to deal with Apache Storm: it was necessary to stabilize the work, remove a separate Thread and be ready for heavy loads.

There are at least two ways of batch processing in a storm:

  1. Separate thread with a populated list;
  2. Using the standard ability to take a tickTuple;

At first we tried the first option and discarded it - the behavior was unstable, the requests went to almost idle. The second option showed us the beauty of Storm:

Using simple settings when creating a topology, we can specify when we want to get a tickTuple (we have 10 seconds). TickTuple is an empty entry that is sent to the main thread every 10 seconds. We can safely track such a record, add everything to the database in a queue or record.

 private static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); } @Override public void execute(Tuple tuple) { if( isTickTuple(tuple) ) { executeTickTuple(tuple); } else { executeTuple(tuple); } } 

In executeTuple we save the event to the LinkedBlockingQueue , and, accordingly, we LinkedBlockingQueue in executeTickTuple in turn and insert it into the database.

We divided our topology into several Bolt :


Now we can see which of the “bolts” is slowing down and how much data is being chased through it: Statistics of the topology from the Storm UI

Afterword


In the next article I will try to tell how to administer and monitor all this.

Source: https://habr.com/ru/post/236253/


All Articles