MailboxProcessor
class from the F # language; In this class, the internal state is preserved for tasks registered for execution in the form of edge dependencies.MailboxProccessor
class is used as an ideal candidate for implementing a DAG that provides concurrent dependency-related operations. First, let's define the marked union, with which we will manage the tasks and perform their dependencies. type TaskMessage = // #A | AddTask of int * TaskInfo | QueueTask of TaskInfo | ExecuteTasks and TaskInfo = // #B { Context : System.Threading.ExecutionContext Edges : int array; Id : int; Task : Func<Task> EdgesLeft : int option; Start : DateTimeOffset option End : DateTimeOffset option }
dagAgent
agent to ParallelTasksDAG
, responsible for coordinating the execution of tasksTaskMessage
type represents wrappers for messages sent to the base agent of type ParallelTasksDAG
. These messages are used to coordinate tasks and synchronize dependencies. The TaskInfo
type contains and tracks the details of registered tasks during the execution of a DAG, including dependency edges. The execution context (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs.110).aspx) is captured for the purpose of accessing information during deferred execution, for example, of such information: current user, any state associated with a logical flow of execution, information about safe access to code, etc. After the event is triggered, the start and end time is published. type ParallelTasksDAG() = let onTaskCompleted = new Event<TaskInfo>() // #A let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox -> let rec loop (tasks : Dictionary<int, TaskInfo>) // #B (edges : Dictionary<int, int list>) = async { // #B let! msg = inbox.Receive() // #C match msg with | ExecuteTasks -> // #D let fromTo = new Dictionary<int, int list>() let ops = new Dictionary<int, TaskInfo>() // #E for KeyValue(key, value) in tasks do // #F let operation = { value with EdgesLeft = Some(value.Edges.Length) } for from in operation.Edges do let exists, lstDependencies = fromTo.TryGetValue(from) if not <| exists then fromTo.Add(from, [ operation.Id ]) else fromTo.[from] <- (operation.Id :: lstDependencies) ops.Add(key, operation) ops |> Seq.iter (fun kv -> // #F match kv.Value.EdgesLeft with | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value)) | _ -> ()) return! loop ops fromTo | QueueTask(op) -> // #G Async.Start <| async { // #G let start = DateTimeOffset.Now match op.Context with // #H | null -> op.Task.Invoke() |> Async.AwaitATsk | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I (fun op -> let opCtx = (op :?> TaskInfo) opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo) let end' = DateTimeOffset.Now onTaskCompleted.Trigger { op with Start = Some(start) End = Some(end') } // #L let exists, deps = edges.TryGetValue(op.Id) if exists && deps.Length > 0 then let depOps = getDependentOperation deps tasks [] edges.Remove(op.Id) |> ignore depOps |> Seq.iter (fun nestedOp -> inbox.Post(QueueTask(nestedOp))) } return! loop tasks edges | AddTask(id, op) -> tasks.Add(id, op) // #M return! loop tasks edges } loop (new Dictionary<int, TaskInfo>(HashIdentity.Structural)) (new Dictionary<int, int list>(HashIdentity.Structural))) [<CLIEventAttribute>] member this.OnTaskCompleted = onTaskCompleted.Publish // #L member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N member this.AddTask(id, task, [<ParamArray>] edges : int array) = let data = { Context = ExecutionContext.Capture() Edges = edges; Id = id; Task = task NumRemainingEdges = None; Start = None; End = None } dagAgent.Post(AddTask(id, data)) // #O
onTaskCompletedEvent
class, used to notify when a task is completed.ParallelTasksDAG
state changes, and because they inherit thread safety because they are in AgentParallelTasksDAG
ExecutionContext
is null
, then we start the task in the current context, otherwise we go to #IExecutionContext
onTaskCompleted
event to give notification of the completion of the task. The event contains information about the task.ExecutionContext
to execute the DAG.AddTask
function is to register a task with arbitrary dependency edges. This function accepts a unique ID, a task that must be completed, and a set of edges representing the IDs of all other registered tasks that must be completed before the task can be performed. If the array is empty, it means that there are no dependencies. An instance of MailboxProcessor
called dagAgent
keeps registered tasks up to date with the “tasks”, which is a dictionary ( tasks : Dictionary<int, TaskInfo>
), correlating the ID of each task and its details. Moreover, the Agent also stores the state of edge dependencies by the ID of each task ( edges : Dictionary<int, int list>
). When an agent receives a notification about the need to start execution, this process checks that all edge dependencies are registered and that there are no cycles in the graph. This verification phase is available in the full implementation of ParallelTasksDAG, provided in the online code. Next, I offer an example in C #, where I refer to the library that F # to launch ParallelTasksDAG
(and consume it). The registered tasks reflect the dependencies shown above in fig. one. Func<int, int, Func<Task>> action = (id, delay) => async () => { Console.WriteLine($”Starting operation{id} in Thread Id {Thread.CurrentThread.ManagedThreadId}…”); await Task.Delay(delay); }; var dagAsync = new DAG.ParallelTasksDAG(); dagAsync.OnTaskCompleted.Subscribe(op => Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”)); dagAsync.AddTask(1, action(1, 600), 4, 5); dagAsync.AddTask(2, action(2, 200), 5); dagAsync.AddTask(3, action(3, 800), 6, 5); dagAsync.AddTask(4, action(4, 500), 6); dagAsync.AddTask(5, action(5, 450), 7, 8); dagAsync.AddTask(6, action(6, 100), 7); dagAsync.AddTask(7, action(7, 900)); dagAsync.AddTask(8, action(8, 700)); dagAsync.ExecuteTasks();
Id
current thread to confirm the multithreading. On the other hand, the OnTaskCompleted
event OnTaskCompleted
registered to issue a notification of the completion of each task with the output of the task ID
and the Id
current flow to the console. Here is the output that we get when calling the ExecuteTasks
method. Starting operation 8 in Thread Id 23… Starting operation 7 in Thread Id 24… Operation 8 Completed in Thread Id 23 Operation 7 Completed in Thread Id 24 Starting operation 5 in Thread Id 23… Starting operation 6 in Thread Id 25… Operation 6 Completed in Thread Id 25 Starting operation 4 in Thread Id 24… Operation 5 Completed in Thread Id 23 Starting operation 2 in Thread Id 27… Starting operation 3 in Thread Id 30… Operation 4 Completed in Thread Id 24 Starting operation 1 in Thread Id 28… Operation 2 Completed in Thread Id 27 Operation 1 Completed in Thread Id 28 Operation 3 Completed in Thread Id 30
ID
are different), and the order of dependencies is preserved.Source: https://habr.com/ru/post/422571/
All Articles