📜 ⬆️ ⬇️

Determining the weights of the importance of users relative to each other based on their actions (Tarantool + Lua)

There is a system with many users. Each user of the system can perform actions in relation to each other. Based on these actions, the weight is calculated. It is necessary to be able for each user to get a list of other users of the system, sorted in descending order of weight. The weights characteristics of an inactive user should not change.



In my last article, I described the basic concepts and means to start working with a tarantula. In this article I will try to pay more attention to the use of stored procedures in the Tarantula on the example of one game task.
')


For simplicity, let's imagine that a system is a game, and a user is a player.

For example, you have an online game in which players can view each other's achievements, attack each other, send messages, transfer virtual money, trade, etc. For each player you need to be able to sort the rest of the players. At the same time, the weight by which sorting takes place should depend not only on the activity, but also on what kind of activity was shown by the player. For example, when viewing a profile, the weight does not increase much, but during a trade operation it is much more.

A fairly standard statistical task in our difficult programmer everyday life. In this article I will show how it can be implemented using Tarantool + Lua. In my last article I told you how a tarantula is assembled and configured; In this article I will dwell in greater detail not only on the scheme but also in detail about Lua and how to work with the good received from Perl.

What we need to solve this problem:

A table consisting of fields in which the identifier of the player to be sampled is to be stored, the identifier of the player that is in the selection, and weight. In other words, this is a graph representation as an array of faces with a weight. But, given that the weight can not only increase, but it must also be reduced from day to day, you will have to keep the date of the last update of the player’s weight. Of course, the date can not be stored, but then you have to go over all the records every day and count all the weights. Let's simplify the task a bit, since we are not given a formula by which we need to calculate the weight reduction, let’s say that the weight is evenly reduced to zero within 30 days, provided that no action is taken on this player, but are made on other players. Therefore, we will also need the date of the last action in relation to the player, and better already calculated coefficient, which is calculated once at the moment of active action. The weight will be increased depending on the action produced in relation to the participant. However, the task states that the weights should not change if the player was not at all active. Actually, to implement this condition, we will need to keep the date of the last activity of the user.
The participant weights schedule should look something like this:



One of the significant places on the chart is 05.02. At this point, it can be seen that actions begin to take place only in relation to the player, whose weight chart is indicated in orange. And at around 17.02 actions are not performed in relation to any user, so the weights remain at the same level.

Total, we get 2 spaces. The first of 5 fields:
Player1
Player2
Weight
Last updated date
The ratio of weight loss per day
The second of 2 fields:
Player1
Last player activity date1

Now let's analyze what indexes we need:
Once we go to choose the attitude of one player to all others, we need a non-unique index on the first record. We also want to get a sorted list by weight, so we need a composite index for the 1 + 3 field. And of course, you need to build a unique index of 1 + 2 fields for a point sample of one player relative to another, it will be primary. With the second space everything is simpler, you need only one unique index by the first record.
Tarantula configuration:

slab_alloc_arena = 1 pid_file = "box.pid" logger="cat - >> Tarantool.log" primary_port = 33013 secondary_port = 33014 admin_port = 33015 rows_per_wal = 5000000 #    space[0].enabled = 1 space[0].index[0].type = "HASH" space[0].index[0].unique = 1 space[0].index[0].key_field[0].fieldno = 0 space[0].index[0].key_field[0].type = "NUM" space[0].index[0].key_field[1].fieldno = 1 space[0].index[0].key_field[1].type = "NUM" space[0].index[1].type = "TREE" space[0].index[1].unique = 0 space[0].index[1].key_field[0].fieldno = 0 space[0].index[1].key_field[0].type = "NUM" space[0].index[2].type = "TREE" space[0].index[2].unique = 0 space[0].index[2].key_field[0].fieldno = 0 space[0].index[2].key_field[0].type = "NUM" space[0].index[2].key_field[1].fieldno = 2 space[0].index[2].key_field[1].type = "NUM" space[1].enabled = 1 space[1].index[0].type = "HASH" space[1].index[0].unique = 1 space[1].index[0].key_field[0].fieldno = 0 space[1].index[0].key_field[0].type = "NUM" 


Tarantula is ready, you can initialize the storage:

 $ tarantool_box --init-storage tarantool/src/box/tarantool_box: space 0 successfully configured tarantool/src/box/tarantool_box: space 1 successfully configured tarantool/src/box/tarantool_box: creating `./00000000000000000001.snap.inprogress' tarantool/src/box/tarantool_box: saving snapshot `./00000000000000000001.snap' tarantool/src/box/tarantool_box: done 


Storage is prepared, you can write Lua procedures.
We need two procedures:
  1. user action registration (increase_score)
  2. get sorted user list (get_top)


We implement them in the box_popular_user.lua file, we place the configuration variables in box_config_popular_user.lua
Then to connect this plugin in init.lua you need to write 1 line: dofile ("box_popular_user.lua")

So, now more about increase_score ().

 function increase_score(user_id, friend_id, action_type) --    local id = tonumber(user_id) local fid = tonumber(friend_id) if not id or not fid or not map_type_score[action_type] then return false end --     local dt = os.date('*t') local cd = box.time{year = dt.year; month=dt.month; day=dt.day} --     ,          local last_update = box.select('1', '0', id) if not last_update then last_update = box.insert('1',id,cd) end --        local difft = math.floor(( cd-box.unpack('i',last_update[1]) )/24/60/60) --    ,       if difft ~= 0 then if difft > 1 then --      ,        ,        -- ,           ,     table.insert(updates_in_fibers,id,box.fiber.wrap(function() _move_last_update(id, cd, difft) end)) difft = difft-1 else difft = 0 end box.update('1',id,'=p',1,cd) end --       local tup = box.select('0','0', id, fid) --          _get_score_koef if not tup then --      ,   local s_k = _get_score_koef(nil, nil, map_type_score[action_type], nil, cd) tup = box.insert('0', id, fid, s_k[1], cd, s_k[2]) else --      local s_k = _get_score_koef(box.unpack('i', tup[3])+difft, box.unpack('i', tup[2]), map_type_score[action_type], box.unpack('i',tup[4]), cd) --       ,       ,       tup = box.update('0',{id;fid},'=p=p=p',2, s_k[1],3,cd,4,s_k[2]) end return tup end 


We can get caught by a very “rich” player on relationships with other players in his list, who have not logged in for a long time, so we will update the date of the last weight conversion in relation to other players in a separate stream in order to respond faster. And of course, let's not forget to protect ourselves from parallel tasks for the same user and we will not recalculate weights until all the dates are updated. Actually, the function of updating dates:

 function _move_last_update(id, cd, diff) weights = box.space[0] for tup in weights.index[1]:iterator(box.index.EQ, id) do --        if box.unpack('i',tup[3]) < cd then weights:update({box.unpack('i',tup[1]); box.unpack('i',tup[2])},'=p', 3, box.unpack('i',tup[3])+diff) end end --     id    table.remove(updates_in_fibers, id) end 


Here we use update which gives control to another thread, than we will protect ourselves from the fact that until the dates are updated, no longer one client will receive a response.

Now consider the weight recalculation function and its reduction factor.

 function _get_score_koef(last_update, last_score, add_score, koef, current_date) local score = 0; if not last_score then --        ,     ,            score = add_score else if current_date == last_update then --       ,          score = last_score + add_score else --      ,           local diff = (current_date-last_update)/24/60/60*koef if diff > last_score then -- ,    ,  ,    -       diff = last_score end --      ,   score = last_score + add_score - diff end end if add_score then --   ,   ,        ,     koef = score/box_pu_default['score_day'] end return {score,koef}; end 


To improve performance, we change only those records that require it. We act in the same way when selecting records, if the record needs to be updated, we will update it, if not, we will not. The logic that determines the need to update the record is the same as in the weight change function. But let's complicate our life and make it possible to select only specified users sorted in descending order.

Consider the feature get users
 function get_top(user_id, count_users, ids) --   ,          (   "box_config_popular_user.lua") local id = tonumber(user_id) if not id then return false end local cu if not count_users then cu = box_pu_default['count_users'] else cu = tonumber(count_users) end local id_users = {} local count = 0 --    id   ,   ,    -   -  ,    if ids then for id in string.gmatch(ids, "%d+") do id_users[tonumber(id)] = 1 count=count+1 end if(count < cu) then cu = count end end --      local ret local last_update = box.select('1', '0', id) --  2                     for iter = 1, 2 do local need_update = {} ret = {} for v in box.space[0].index[2]:iterator(box.index.LE, id) do --         -  if not v or #ret == cu or box.unpack('i',v[0]) ~= id then break end --    ,    ,   if not ids or id_users[box.unpack('i',v[1])] == 1 then --                   --  ,         if not updates_in_fibers[id] and box.unpack('i', v[3]) ~= box.unpack('i', last_update[1]) then table.insert(need_update, v) else --        table.insert(ret, v) end end end local need_another_req = 0 for i = 1, #need_update do local v = need_update[i] local s_k = _get_score_koef(box.unpack('i', v[3]), box.unpack('i', v[2]), 0, box.unpack('i',v[4]), box.unpack('i', last_update[1])) --   ,      if s_k[1] == 0 then box.delete('0',{v[0],v[1]}) else --       ,    , ..     box.update('0',{v[0];v[1]},'=p=p=p',2, s_k[1],3,box.unpack('i', last_update[1]),4,s_k[2]) need_another_req = 1 end end if need_another_req == 0 then break end end return unpack(ret) end 


In the configuration file box_config_popular_user.lua put the following values:

 map_type_score={user_page=100} --      increase_score,  ,  ,         increase_score box_pu_default={count_users=10;score_day=30} -- -    get_top  -        , . 


The first line in the box_popular_user.lua file is the config file:
 dofile("box_config_popular_user.lua") 


Actually, to install this plugin, you need to put these two files in the working directory. Register the desired configuration values. Write in init.lua line to connect the plugin. Restart the tarantula and use. And, of course, do not forget about the configuration of the spaces.

Now look at this system in terms of use.

Let's write the simplest test on perl, which synchronously feeds Tarantula data in a single stream based on some log.

In the log there will be lines containing the name of the function to be pulled in the tarantula and a set of parameters to be transferred there. This is a synthetic log file, but this one is needed not to load our script with additional work, we check the tarantula for consistency.

To work with the tarantula, we need the DR :: Tarantool module, it can be installed from pan , you can install it through your favorite package manager yum, apt, ...

 use strict; use DR::Tarantool ':constant', 'tarantool'; my $client = tarantool host => 'localhost', port => 33013, spaces => { 0 => { name => 'user_score', default_type => 'NUM', fields => [ qw(id fid score last_update koef), ], indexes => { 0 => { name => 'idx_id_fid', fields => [ 'id', 'fid' ] }, 1 => { name => 'idx_id', fields => [ 'id' ] }, 2 => { name => 'idx_id_score', fields => [ 'id', 'score' ] }, } }, 1 => { name => 'user_last_update', default_type => 'NUM', fields => [ qw(id last_update), ], indexes => { 0 => { name => 'idx_id', fields => [ 'id' ] }, } }, }; while(<>){ my $line = $_; my @params = split( " ", $line); $client->call_lua($params[0], @params[1..$#params],($params[0] eq 'increase_score' ? ('user_page') : ()); }; 




Here is such a small script, in which most of it is a description of the spaces, can work with a tarantula. In fact, the part that sent the data to graphite was thrown out of this script, since we are more interested in working with a tarantula. Actually in one thread I received a total of about 3500 requests per second.
However, the tarantula ate only about 17% of the CPU. Let's try to run the script in 4 threads.



Each of the scripts managed to feed the tarantula approximately 1050 records from the log per second, and all four simultaneously about 4200 records per second. Tarantula while eating about 38% of the processor. Not much gain. Although our scripts that send data to the tarantula are also not rested against the processor.

During the proceedings, what is the problem and where our system is blundering, I found that DR :: Tarantool also uses AnyEvent in synchronous mode and moreover, for each request that arrives, it generates a new AE :: io, which in turn causes a system select which, in my opinion, causes some delays. More deeply, I decided not to dig, but just wrote another test program, but using Client :: Tarantool . All I had to do was change the use and initialization of the client.

 use Client::Tarantool; my $client = Client::Tarantool->new... 


And so we start and look at the graph:



Initially, the new script worked in 1 thread, rested on 100% of the CPU, then it was launched in 3 threads, each of which also rested on 100% of the CPU and, what was very pleased, was that the number of requests to the tarantula grew! Then they launched into 4 threads and then Tarantula ran into 100% CPU. Total it turned out about 18-20K requests per second. Further, for verification, the fifth process was also launched, and, as expected, the number of requests for each stream fell and evenly distributed, remaining at the level of 18-20K.

This synchronous protocol does not use AnyEvent, it uses pure syswrite and sysread, which to some extent confirms the theory with timeouts when using “extra” AE :: io.

And now, finally, in order not to criticize AnyEvent at all, let's write an asynchronous test based on DR :: Tarantool :: AsyncClient.

 #!/usr/bin/perl use strict; use DR::Tarantool::AsyncClient 'tarantool'; use AnyEvent; use AnyEvent::Handle; my $cv = condvar AnyEvent; my $counts = {}; my $hdl; my $w; my $client; #        my $done_sub = sub { $counts->{ae}--; $cv->send unless $counts->{ae}; }; #      my $read_line_sub = sub { my $line = shift; my @params = split( " ", $line); $counts->{ae}++; $counts->{$params[0]}++; $client->call_lua($params[0], [@params[1..$#params],($params[0] eq 'increase_score' ? ('user_page') : ())], $done_sub); }; my $limit_concur_req = 12000; #  -   DR::Tarantool::AsyncClient->connect( host => '127.0.0.1', port => 33013, spaces => { 0 => { name => 'user_score', default_type => 'NUM', fields => [ qw(id fid score last_update koef), ], indexes => { 0 => { name => 'idx_id_fid', fields => [ 'id', 'fid' ] }, 1 => { name => 'idx_id', fields => [ 'id' ] }, 2 => { name => 'idx_id_score', fields => [ 'id', 'score' ] }, } }, 1 => { name => 'user_last_update', default_type => 'NUM', fields => [ qw(id last_update), ], indexes => { 0 => { name => 'idx_id', fields => [ 'id' ] }, } }, }, sub { ($client) = @_; $counts->{ae}++; $hdl = new AnyEvent::Handle fh => \*STDIN, on_error => sub { #     my ($hdl, $fatal, $msg) = @_; AE::log error => $msg; $hdl->destroy; $cv->send; }, on_eof => $done_sub; my @start_request; #        @start_request = (line => sub { my ($hdl, $line) = @_; $line =~ s/[\r\n]//g; $read_line_sub->($line) if $line; #       -      $hdl->push_read (@start_request) if $counts->{ae} < $limit_concur_req; }); $w = AnyEvent->timer (after => 0, interval => 1, cb => sub{ #             $limit_concur_req += 1000 if $counts->{ae} < 1000; $hdl->push_read (@start_request) if $counts->{ae} < $limit_concur_req; }); } ); $cv->recv; 


The script turned out to be more spreading and, perhaps, not very well read, but this largely depends on the habit of reading the event code.
We run 4 such scripts in parallel on the same machine and on the same data in order to equalize the odds between tests.



And we get an increase in relation to the first synchronous test 7 times! And almost 2 times in relation to the second test. This is approximately 30,000 requests per second. At the same time, the pearl barley scripts sending data to the tarantula work almost at the limit of 100% CPU

Now we’ll do the worst conditions for our algorithm, this is when all the players didn’t go in a few days, and then they all came back together, this shouldn’t be normal in normal life, so this test is synthetic, but it will show the lower bar for performance. In order to understand the order of numbers, we have 14169698 records in the first space at the time of the test launch. The largest number of relations of players 5000.



Total, on average, we get about 20,000 requests per second, although the spread among the workers has become more, some of them get on players with a large number of connections, someone on players with a smaller number of connections.

So, we have a box that can solve the problem with a capacity of 30,000 requests per second (20,000 in the pessimistic version), which, in general, will satisfy the needs of many projects.

Source: https://habr.com/ru/post/194324/


All Articles