
In the spring of this year, I learned about the ability of the HP Vertica database to create queries with matching event patterns. The so-called Events Pattern Matching went well under the task of analyzing user behavior in ivi.ru products. We decided to try to deal with the payment funnels, search for problem areas on the devices, dive deeper into the traffic analysis. Our team really likes the way analytics is implemented in Mixpanel and Localytics (it is based on events and their properties), so many ideas were borrowed from them.
What is going on?
Historically, for analytics, we, like most other projects, have used Google Analytics. At some point, data sampling on our volumes reached unimaginable scales - the samples were based on less than 0.5% of the audience. This made it impossible to work with small samples - they were either not visible at all, or the error was catastrophic. Plus, in GA it was impossible to throw a bunch of internal data on the content, which made it impossible to analyze in depth.
This fact gave rise to the development of its own system. This is how Groot was born - ivi.ru internal analytics.
')
We started with a list of requirements that Groot had to meet:
- No sampling, all data must be stored raw;
- Cross Platform Time. Since in addition to the site we have very popular applications for mobile platforms and Smart TV, the system should be able to collect data even from the iron if it is connected to the Internet and our application is on it;
- The ability to quickly scale;
- Lack of SPOF;
- Easy setup and deployment.
Architecture
In addition to HP Vertica column base, we decided to use Apache Kafka and Apache Storm, thus discovering the great and terrible world of Java.
Apache Kafka - pub / sub system. The main difference from the usual pub / sub implementations is that the subscriber can start reading messages not from the end, but from the beginning or middle. This solution allows you not to worry about data loss when the subscriber is not working.
Apache Storm is a distributed system for computing large amounts of data. In general, on the Storm can talk for a long time. We liked the integration with kafka out of the box, the ability to scale the system horizontally, and a fairly fast operation speed.
Top view
In general, the system works as follows:
- The client sends a request with JSON information about the event;
- the web server to flask asynchronously sends a stack of events to kafka;
- storm constantly picks up new messages from kafka;
- in storm, the topology of parsit, parses the event and builds a batch request in vertica and saves the database to the database.
First awkward steps

The first version worked very badly. More precisely, there was no problem sending data to kafka at all (everything works out of the box). And we had to tinker with apache storm, since we had to write our topology in java, which nobody in our company knows.
The topology in storm consists of the following parts:
- spout - tap from which constantly (or not) data arrives . In our case, this is the standard KafkaSpout;
- bolt is the actual data handler. In the "bolts" all the magic of working with data occurs;
- tuple is a standard data structure. A tuple can store anything from a prime number to an object.
I implemented the simplest bolt that received the event, parsed json and sent a pack to the base. The first tests revealed the following problems:
- Vertica locks the table while recording;
- It is very difficult to track problem areas in the topology;
- Thread with the insert into the database could send 1 record, then 100 at once. There was no understanding why this is happening;
The first version was very simple: there are columns with id, name, subsite_id, user_id, ivi_id, ts. At the same time there were difficulties with the tables in Vertica also proved difficult.
As you can see, we did not record any more data. Then, however, they decided to record another browser, operating system, browser window size, version of the flash player. “Ha!”, We thought and made the following table:
| id | event_id | name | int_value | string_value | double_value | datetime_value | added |
They made a second bolt, which takes additional parameters from JSON, checks the type and writes it all into a new table.
Everything was fine, I was glad that it was so cool to realize, analysts were happy that you can add any parameters to the events and then build reports on them. At that time, the main source of events was the ivi.ru site itself, mobile applications have not yet sent anything. When they began to send, we realized that everything is very bad.
First let's look at our request for a simple “clicked” funnel -> “bought” for the Chrome browser:
WITH groupped_events AS ( SELECT MIN(e.ts) as added, MIN(e.user_id) as user_id, e.name, MIN(CASE WHEN ep.name = 'browser' THEN string_value ELSE NULL END) as browser from events.events as e LEFT JOIN events.event_properties as ep ON ep.event_id = e.id WHERE e.added >= '2014-07-28' and e.added < '2014-07-29' and e.subsite_id = '10' GROUP BY e.id, e.name ) SELECT COUNT(q.match_id) as count, name FROM ( SELECT event_name() as name, user_id, match_id() as match_id FROM groupped_events as e WHERE e.name IN ('click', 'buy') MATCH ( PARTITION BY user_id ORDER BY e.added ASC DEFINE click as e.name = 'click' and e.browser = 'Chrome', buy as e.name = 'buy' PATTERN P as (click buy | click) ) ) as q GROUP BY q.match_id, q.name;
See the catch? We join the label (now there are more than a billion records), group it and pull the required value through CASE. Of course, when we had a lot of events, it all began to slow down. Requests worked for several minutes, which did not suit us. Analysts complained about requests in half an hour, product specialists wanted to make me dark.
Why?
Separately, I want to clarify the fact that, after all, HP Vertica is a column database. It stores a lot of data in columns in a very compact way and, for example, allows you to add a new column on the fly without spilling over all the data. With our own all-in-one sign, the vertical was coping very poorly - she did not understand how to optimize this pile.
Then it was decided to drag the main parameters into the events table in separate columns, and form a list of parameters that are often used in queries. We have done this procedure 2 times. The first time we had a table with 30 columns, the second time, already with 50. After all these manipulations, the average execution time for all queries decreased by 6-8 times.
After all the manipulations, the previous request turned into a simple one:
SELECT COUNT(q.match_id) as count, name FROM ( SELECT event_name() as name, user_id, match_id() as match_id FROM events.events as e WHERE e.name IN ('click', 'buy') MATCH ( PARTITION BY user_id ORDER BY e.added ASC DEFINE click as e.name = 'click' and e.browser = 'Chrome', buy as e.name = 'buy' PATTERN P as (click buy | click) ) ) as q GROUP BY q.match_id, q.name;
At this, the torment with the base we stopped, in this form, everything lives for about 3 months and we have no pritenzy to it.
We still left the event_properties table so that we could develop applications faster, rather than wait for the update of the structure of the main table.
Apache storm
Having dealt with HP Vertica, we began to deal with Apache Storm: it was necessary to stabilize the work, remove a separate Thread and be ready for heavy loads.
There are at least two ways of batch processing in a storm:
- Separate thread with a populated list;
- Using the standard ability to take a tickTuple;
At first we tried the first option and discarded it - the behavior was unstable, the requests went to almost idle. The second option showed us the beauty of Storm:
Using simple settings when creating a topology, we can specify when we want to get a tickTuple (we have 10 seconds). TickTuple is an empty entry that is sent to the main thread every 10 seconds. We can safely track such a record, add everything to the database in a queue or record.
private static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); } @Override public void execute(Tuple tuple) { if( isTickTuple(tuple) ) { executeTickTuple(tuple); } else { executeTuple(tuple); } }
In
executeTuple
we save the event to the
LinkedBlockingQueue
, and, accordingly, we
LinkedBlockingQueue
in
executeTickTuple
in turn and insert it into the database.
We divided our topology into several
Bolt
:
- KafkaRecieverBolt - gets data from KafkaSpout, parses JSON and sends it to PropertiesParserBolt;
- PropertiesParserBolt - Parsing non-standard parameters, sends them to EventPropertiesBatchBolt, sends the entire event further to EventsBatchBolt
- EventsBatchBolt - saves data to the main table;
- EventPropertiesBatchBolt - saves data to the dop parameters table
Now we can see which of the “bolts” is slowing down and how much data is being chased through it:
Statistics of the topology from the Storm UIAfterword
In the next article I will try to tell how to administer and monitor all this.