📜 ⬆️ ⬇️

Parallelization of tasks with dependencies - example on .NET

Hello colleagues!

This week we have given to the translation the book Manning publishing house " Concurrency in .NET ", which is ambitious in its complexity:


')
The author kindly posted on the site Medium an excerpt from the 13th chapter, which we propose to evaluate long before the premiere.
Enjoy reading!

Suppose you need to write a tool that allows you to perform a series of asynchronous tasks, each of which has its own set of dependencies that affect the order of operations. Such problems can be solved with the help of consistent and imperative execution, but if you want to achieve maximum performance, then successive operations will not work for you. Instead, you need to organize parallel execution of tasks. Many competitive problems can be interpreted as static collections of atomic operations with dependencies between their input and output data. When the operation is completed, its output is used as input for other, dependent operations. To optimize performance, tasks need to be assigned based on their dependencies, and the algorithm must be configured so that dependent tasks are performed as consistently as necessary, and as parallel as possible.

You want to make a reusable component that executes a series of tasks in parallel, and ensure that all dependencies that could affect the order of operations are taken into account. How to build such a programming model that would provide the basic parallelism of a collection of operations performed effectively, either in parallel or sequentially, depending on what dependencies arise between this operation and others?

Solution: Implement the dependency graph using the MailboxProcessor class from F # and provide methods as standard tasks (Task) so that they can be consumed from C #

Such a solution is called Oriented Acyclic Graph (DAG) and is intended to form a graph, splitting operations into sequences of atomic tasks with well-defined dependencies. In this case, the acyclic essence of the graph is important, since it eliminates the possibility of interlocking between tasks, provided that the tasks are in fact completely atomic. When setting the graph, it is important to understand all the dependencies between the tasks, especially implicit dependencies that can lead to interlocks or race conditions. Below is a typical example of a graph-like data structure, with which you can imagine the limitations that arise when planning interactions between operations in a given column.

A graph is an extremely powerful data structure, and you can write strong algorithms based on it.



Fig. 1 A graph is a collection of vertices connected by edges. In this representation of a directed graph, node 1 depends on nodes 4 and 5, node 2 depends on node 5, node 3 depends on nodes 5 and 6, and so on.

The DAG structure is applicable as a strategy for parallel execution of tasks, taking into account the order of dependencies, which improves performance. The structure of such a graph can be defined using the MailboxProcessor class from the F # language; In this class, the internal state is preserved for tasks registered for execution in the form of edge dependencies.

Validation of an oriented acyclic graph

When working with any graph data structure, such as DAG, you have to take care of the correct registration of edges. For example, going back to Figure 1: what happens if we have node 2 registered with dependencies on node 5, and node 5 does not exist? It may also happen that some edges are dependent on each other, which is why an oriented cycle occurs. With an oriented cycle, it is critical to perform some tasks in parallel; otherwise, some tasks can always wait for others to complete, and a deadlock will occur.

The problem is solved using topological sorting: this means that we can order all the vertices of the graph in such a way that any edge leads from the vertex with a smaller number to the vertex with a larger number. So, if task A should complete before task B, and task B should complete task C, which, in turn, should complete before task A, then a circular reference occurs, and the system will notify you of this error by throwing an exception. If an oriented cycle occurs during sequence management, then there is no solution. This type of verification is called “detection of a cycle in a directed graph”. If a directed graph satisfies the described rules, then it is a directed acyclic graph, perfectly suitable for running several tasks in parallel, between which there are dependencies.

The full version of Listing 2, which contains the DAG validation code, is in the source code posted online.

In the following listing, the F # MailboxProccessor class is used as an ideal candidate for implementing a DAG that provides concurrent dependency-related operations. First, let's define the marked union, with which we will manage the tasks and perform their dependencies.

Listing 1 Message type and data structure for coordinating tasks according to their dependencies

 type TaskMessage = // #A | AddTask of int * TaskInfo | QueueTask of TaskInfo | ExecuteTasks and TaskInfo = // #B { Context : System.Threading.ExecutionContext Edges : int array; Id : int; Task : Func<Task> EdgesLeft : int option; Start : DateTimeOffset option End : DateTimeOffset option } 


#A sends the base dagAgent agent to ParallelTasksDAG , responsible for coordinating the execution of tasks

#B Wraps the details of each task to complete.

The TaskMessage type represents wrappers for messages sent to the base agent of type ParallelTasksDAG . These messages are used to coordinate tasks and synchronize dependencies. The TaskInfo type contains and tracks the details of registered tasks during the execution of a DAG, including dependency edges. The execution context (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs.110).aspx) is captured for the purpose of accessing information during deferred execution, for example, of such information: current user, any state associated with a logical flow of execution, information about safe access to code, etc. After the event is triggered, the start and end time is published.

Listing 2 DAG agent on F # for parallelizing dependencies related operations

 type ParallelTasksDAG() = let onTaskCompleted = new Event<TaskInfo>() // #A let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox -> let rec loop (tasks : Dictionary<int, TaskInfo>) // #B (edges : Dictionary<int, int list>) = async { // #B let! msg = inbox.Receive() // #C match msg with | ExecuteTasks -> // #D let fromTo = new Dictionary<int, int list>() let ops = new Dictionary<int, TaskInfo>() // #E for KeyValue(key, value) in tasks do // #F let operation = { value with EdgesLeft = Some(value.Edges.Length) } for from in operation.Edges do let exists, lstDependencies = fromTo.TryGetValue(from) if not <| exists then fromTo.Add(from, [ operation.Id ]) else fromTo.[from] <- (operation.Id :: lstDependencies) ops.Add(key, operation) ops |> Seq.iter (fun kv -> // #F match kv.Value.EdgesLeft with | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value)) | _ -> ()) return! loop ops fromTo | QueueTask(op) -> // #G Async.Start <| async { // #G let start = DateTimeOffset.Now match op.Context with // #H | null -> op.Task.Invoke() |> Async.AwaitATsk | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I (fun op -> let opCtx = (op :?> TaskInfo) opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo) let end' = DateTimeOffset.Now onTaskCompleted.Trigger { op with Start = Some(start) End = Some(end') } // #L let exists, deps = edges.TryGetValue(op.Id) if exists && deps.Length > 0 then let depOps = getDependentOperation deps tasks [] edges.Remove(op.Id) |> ignore depOps |> Seq.iter (fun nestedOp -> inbox.Post(QueueTask(nestedOp))) } return! loop tasks edges | AddTask(id, op) -> tasks.Add(id, op) // #M return! loop tasks edges } loop (new Dictionary<int, TaskInfo>(HashIdentity.Structural)) (new Dictionary<int, int list>(HashIdentity.Structural))) [<CLIEventAttribute>] member this.OnTaskCompleted = onTaskCompleted.Publish // #L member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N member this.AddTask(id, task, [<ParamArray>] edges : int array) = let data = { Context = ExecutionContext.Capture() Edges = edges; Id = id; Task = task NumRemainingEdges = None; Start = None; End = None } dagAgent.Post(AddTask(id, data)) // #O 


#A An instance of the onTaskCompletedEvent class, used to notify when a task is completed.

#B Internal agent state for tracking task registers and their dependencies. Collections are mutable, because during the execution of ParallelTasksDAG state changes, and because they inherit thread safety because they are in Agent

#C Asynchronously awaiting execution

#D The shell of the message that launches ParallelTasksDAG

#E Collection displayed on a monotonously increasing index with a task to launch

#F Process enumerates the task list, analyzing dependencies with other tasks to create a topological structure that presents the order in which tasks are performed

#G The message envelope for queuing the task, its execution and, ultimately, for deleting this task from the agent state as an active dependency after the task is completed

#H If the picked-up ExecutionContext is null , then we start the task in the current context, otherwise we go to #I

#I Run the task in the intercepted ExecutionContext

#L Initiate and publish the onTaskCompleted event to give notification of the completion of the task. The event contains information about the task.

#M Message shell to add a task for execution in accordance with its dependencies, if any

#N Starts the execution of registered tasks.

#O Add a task, its dependencies and the current ExecutionContext to execute the DAG.

The purpose of the AddTask function is to register a task with arbitrary dependency edges. This function accepts a unique ID, a task that must be completed, and a set of edges representing the IDs of all other registered tasks that must be completed before the task can be performed. If the array is empty, it means that there are no dependencies. An instance of MailboxProcessor called dagAgent keeps registered tasks up to date with the “tasks”, which is a dictionary ( tasks : Dictionary<int, TaskInfo> ), correlating the ID of each task and its details. Moreover, the Agent also stores the state of edge dependencies by the ID of each task ( edges : Dictionary<int, int list> ). When an agent receives a notification about the need to start execution, this process checks that all edge dependencies are registered and that there are no cycles in the graph. This verification phase is available in the full implementation of ParallelTasksDAG, provided in the online code. Next, I offer an example in C #, where I refer to the library that F # to launch ParallelTasksDAG (and consume it). The registered tasks reflect the dependencies shown above in fig. one.

 Func<int, int, Func<Task>> action = (id, delay) => async () => { Console.WriteLine($”Starting operation{id} in Thread Id {Thread.CurrentThread.ManagedThreadId}…”); await Task.Delay(delay); }; var dagAsync = new DAG.ParallelTasksDAG(); dagAsync.OnTaskCompleted.Subscribe(op => Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”)); dagAsync.AddTask(1, action(1, 600), 4, 5); dagAsync.AddTask(2, action(2, 200), 5); dagAsync.AddTask(3, action(3, 800), 6, 5); dagAsync.AddTask(4, action(4, 500), 6); dagAsync.AddTask(5, action(5, 450), 7, 8); dagAsync.AddTask(6, action(6, 100), 7); dagAsync.AddTask(7, action(7, 900)); dagAsync.AddTask(8, action(8, 700)); dagAsync.ExecuteTasks(); 

The purpose of the auxiliary function is to display a message that the task has started, referring to the Id current thread to confirm the multithreading. On the other hand, the OnTaskCompleted event OnTaskCompleted registered to issue a notification of the completion of each task with the output of the task ID and the Id current flow to the console. Here is the output that we get when calling the ExecuteTasks method.

 Starting operation 8 in Thread Id 23… Starting operation 7 in Thread Id 24… Operation 8 Completed in Thread Id 23 Operation 7 Completed in Thread Id 24 Starting operation 5 in Thread Id 23… Starting operation 6 in Thread Id 25… Operation 6 Completed in Thread Id 25 Starting operation 4 in Thread Id 24… Operation 5 Completed in Thread Id 23 Starting operation 2 in Thread Id 27… Starting operation 3 in Thread Id 30… Operation 4 Completed in Thread Id 24 Starting operation 1 in Thread Id 28… Operation 2 Completed in Thread Id 27 Operation 1 Completed in Thread Id 28 Operation 3 Completed in Thread Id 30 

As you can see, tasks are performed in parallel in different threads (their thread ID are different), and the order of dependencies is preserved.

In essence, this is how tasks that have dependencies are parallelized. Read more in the book Concurrency in .NET.

Source: https://habr.com/ru/post/422571/


All Articles