📜 ⬆️ ⬇️

Synchronization of processes during task parallelization using the Caché Event API

Today, the presence of multi-core, multiprocessor and multi-node systems is already the norm when processing large amounts of data.
How can you use all these computing powers? The answer is obvious - parallelizing the task.
But then another question arises: how to synchronize the subtasks themselves?

Immediately it should be noted that the JOB command in the Caché version for Windows of the DBMS does not generate a stream, but a process. Therefore, it would be more correct to speak not about a multi-thread, but about a multi-process application.
It also follows that for Caché, the number of cores in a processor is more important than the presence of Hyper-Threading technology , which should be considered when choosing iron.

Parallelization Stages: Map and Reduce


First, we briefly consider the stages of paralleling tasks on the example of biometric identification.

Suppose there is a database with biometric information, such as photographs.
And you, having a photo of a person, want to identify him by this database (search “one-to-many”).
')
First you need to decide what, where and how we will “parallelize”.
This may depend on many factors: the number of cores, the processors on a single node, the number of nodes in the grid system ( ECP ), the distribution of the data on the nodes themselves, etc.
In other words, at this stage ( Map ), we must determine the strategy by which our task will be parallel. Indeed, one task can be distributed into many smaller tasks, which in turn can also be parallelized, and so on.

In the next step ( Reduce ), we must collect data from our subtasks, aggregate them and give the final result.

Applied to our example, the Map strategy may vary significantly.
For example, on the number of people in the original photo.

One person

If there is only one person in the photo, then each process can be assigned to identify it within its part of the data, which can be spread on the nodes as well as be common to all nodes.

Few faces

If the photograph is captured by several people at once, then each process can be assigned to identify any one person by all the data at once.

At the Reduce stage, having received a list of similar persons and the coefficient of "similarity", we can only sort it out and produce the top of the most similar ones.

Caché Event API


At the Reduce stage, along with obtaining results from each of the subtasks, we should be able to determine which of them have already been completed and which are not, which is what the % SYSTEM.Event class will help us with .
The documentation describes the event queue processing mechanism in some detail, so it does not make sense to dwell in detail.

I will note only two main methods:

  1. Wait / WaitMsg - waiting for a resource to wake up with / without receiving a message
  2. Signal - sending a signal to wake up a resource with message transmission

Sample application



So, create the following program:

main () {

; delete the temporary data from the previous time
kill ^ tmp

; we start three subtasks, they are processes
job job (1, apple , 5)
job job (2, pear , 6)
job job (3, "plum" , 7)

; display the result on the screen
zwrite ^ tmp

}

job (a, b, c)

hang 1 ; we imitate vigorous activity with a delay of 1 sec.

set ^ tmp ( a ) = b _ "-" _ ( c * 2) // form the result

Run our program from the terminal:

TEST>do ^main

TEST>

As a result, we do not see any result, because the running processes live their own lives (they are executed asynchronously) and we did not wait until their completion from the main process.

Let's try to fix this by inserting a delay, as shown below:

main () {

; ...
job job (3, "plum" , 7)

hang 1

; display the result on the screen
; ...

}

Run again:

TEST>do ^main
^tmp(1)="-10"
^tmp(2)="-12"
^tmp(3)="-14"

TEST>

Now the result is obtained.
But this is implemented extremely inefficient and inflexible, because we do not know in advance how long subtasks will be performed.
You can use data availability checks or timeout locks. But this, too, is not optimal.

In this situation, we are saved by the built-in " Event Queueing " mechanism.

Let's rewrite our application, additionally assigning each process its own priority.

main () {

; create three processes with your priority
job job (3, -7, apple , 5)
job job (2, 0, pear , 6)
job job (1, 8, "plum" , 7)

; awaiting a wake-up call and
; display the result on the screen
write $ list ( $ system .Event . WaitMsg (), 2) ,!
write $ list ( $ system .Event . WaitMsg (), 2) ,!
write $ list ( $ system .Event . WaitMsg (), 2) ,!

}

job (x, delta, a, b)

; change the priority of the current process to delta
do $ system .Util . SetPrio ( delta )

hang x ; we imitate vigorous activity with a delay of x sec.

// send wake-up signal to parent process
// along with the result
do $ system .Event . Signal ( $ zparent , a _ "-" _ ( b * 2))

Output of the result:

TEST>do ^main
-14
-12
-10

TEST>

The same code, but in the form of a class
Class test.task
{

ClassMethod Test ()
{

; we start asynchronously three processes with the priority
job .. SubTask (3, -7, apple , 5)
job .. SubTask (2, 0, pear , 6)
job .. SubTask (1, 8, "plum" , 7)

; awaiting a wake-up call and
; display the result on the screen
write $ list ( $ system .Event . WaitMsg (), 2) ,!
write $ list ( $ system .Event . WaitMsg (), 2) ,!
write $ list ( $ system .Event . WaitMsg (), 2) ,!
}

ClassMethod SubTask (
x ,
delta ,
a ,
b )
{
; change the priority of the current process to delta
do $ system .Util . SetPrio ( delta )

hang x ; we imitate vigorous activity with a delay of x sec.

// send wake-up signal to parent process
// along with the result
do $ system .Event . Signal ( $ zparent , a _ "-" _ ( b * 2))
}

}

To run a class method in a terminal, call
do ## class ( test.task ). Test ()


Some useful links can be found in the class reference :
  1. class % SYSTEM.CPU - provides information about processors
  2. class % SYSTEM.Util - contains various useful methods.
    For example: NumberOfCPUs , SetBatch , SetPrio
  3. JobServers parameter - controls the size of the process pool

Source: https://habr.com/ru/post/167867/


All Articles