📜 ⬆️ ⬇️

Message Passing in F #. Application MailboxProcessor

This article continues a series of publications about the technologies that we use to develop the HostTracker web accessibility service .
Today we will talk about ...

Mailboxprocessor


image
')


MailboxProcessor is a class that allows you to create an agent in the F # language, which consists of two parts - a message queue and a function that processes messages from a queue. It provides the following interaction interface:

Post — send the message 'msg to the processor queue asynchronously, without waiting for it to be processed;

PostAndReply: (msgBuilder: AsyncReplyChannel <'reply> ->' msg) -> 'reply - send a message with an asynchronous channel waiting for the AsyncReplyChannel result. The thread that called this method waits for a call to AsyncReplyChannel.Reply: 'reply -> unit from the message handler from the queue to get the result. msgBuilder is a lambda function that composes a message from a channel;

PostAndAsyncReply: (msgBuilder: AsyncReplyChannel <'reply> ->' msg) -> Async <'reply> is similar to PostAndReply, but the result is not waiting. An asynchronous calculation is returned, the result of which is the value passed to AsyncReplyChannel.Reply. The client decides how to expect the result - by giving the current thread to the pool or blocking it;

TryPostAndReply: (msgBuilder: AsyncReplyChannel <'reply> ->' msg) *? Timeout: int -> 'reply option is similar to PostAndReply, but the wait occurs within timeout milliseconds. If the result is obtained during this time, then the method returns Some (result). Otherwise, it returns None;

PostAndTryAsyncReply: (msgBuilder: AsyncReplyChannel <'reply> ->' msg) *? Timeout: int -> Async <'reply option> - similar to TryPostAndReply, but the method returns asynchronous calculation;

Receive: unit -> Async <'msg>; TryReceive: timeout: int -> Async <'msg option> - asynchronous calculations that return a message to the queue in the FIFO order (TryReceive - asynchronous calculations that wait for timeout milliseconds to the message and return Some (msg) if it came to given time period, and None if not). It is used mainly on the side of the message handler and very rarely on the client side;

Scan: (scanner: 'msg -> Async <' T> option) -> Async <'T>; TryScan: (scanner: 'msg -> Async <' T> option) *? Timeout: int -> Async <'T option> - return asynchronous calculations that will check the message queue for the presence of such a message for which the scanner can build asynchronous calculations. For Scan, calculations created using the scanner will be executed after it has been built, and the result will be the result of an asynchronous calculation that Scan returns. TryScan is set to scan the queue. If during this time the scanner builds the calculations, they will be executed and their result will be the result of the asynchronous calculation created by TryScan. Otherwise, the asynchronous calculation will return None.

The lambda function, which generates asynchronous calculations that the agent will perform, takes part in the formation of the MailboxProcessor. This function takes as a parameter the processor itself, which allows the above-described interface for interacting with the queue to be used in the created calculation (the methods PostAndReply, often Post, Receive, Scan are used very rarely). Thus, the queue is processed by asynchronous computing. The client built by the processor uses the same interface to interact with the message queue, but usually (in most scenarios) the execution of the Receive and Scan methods on its side does not occur.

Using MailboxProcessor:

• Caches (caches) of data when working with their repositories (SQL databases, ...). Regardless of the type of storage, the application often has the task of accessing data according to certain criteria (usually by record identifier). This can lead to a noticeable decrease in system performance. The solution based on MaiboxProcessor is given in the following code:

//  : // GetFromCache: id -   - generic // AsyncReplyChannel<Choice<'res, exn>> -      // ClearCache -   type cacheCommands<'id,'res> = | GetFromCache of 'id * AsyncReplyChannel<Choice<'res, exn>> | ClearCache //  Cache  MailboxProcessor // getById -          // cleanupCacheInterval -    type Cache<'itemId,'item when 'itemId : comparison > (getById, cleanupCacheInterval) = // MailboxProcessor     let inbox = MailboxProcessor.Start(fun inbox -> //      // cache -   - Map -  id   let rec loop cache = async { let! res = inbox.Receive() //   // MailboxProcessor      match res with | GetFromCache (itemId:'itemId, rc) -> let (item, cache')= match Map.tryFind itemId cache with | None -> //   try match getById itemId with | None -> (Choice2Of2(KeyNotFoundException() :> exn), cache) | Some (item:'item) -> (Choice1Of2(item), Map.add itemId item cache) with exc -> (Choice2Of2(exc), cache) | Some item -> //   (Choice1Of2(item), cache) rc.Reply item //  return! loop cache' //     | ClearCache -> return! loop Map.empty //   } loop Map.empty) //      let rec runCleanupCicle () = async { do! Async.Sleep cleanupCacheInterval //   inbox.Post ClearCache return! runCleanupCicle () } do if cleanupCacheInterval > 0 then runCleanupCicle () |> Async.Start with //  member x.TryGet(itemId) = match inbox.PostAndReply( fun rc -> GetFromCache (itemId, rc) ) with | Choice1Of2 item -> Some item | _ -> None member x.Get(itemId) = match inbox.PostAndReply( fun rc -> GetFromCache (itemId, rc) ) with | Choice1Of2 item -> item | Choice2Of2 e -> raise e member x.Cleanup() = inbox.Post ClearCache 


Another problem occurs when the operation of modifying data in the repository (insert, delete, update). Such operations are optimally grouped (batch mode). To do this, you can implement cache grouping data from different streams:

 //  : // Save:     // SaveToDB:     // UpdateSaveState:         type commandsSave<'itemId, 'item when 'itemId: comparison> = | Save of 'itemId * 'item | SaveToDB | UpdateSaveState of (Map<'itemId, 'item> -> Map<'itemId, 'item>) // saveToDB -       Map<'itemId, 'item> // savingInterval -     type SavingCache<'itemId, 'item when 'itemId : comparison>(saveToDB, savingInterval) = let inbox = MailboxProcessor.Start( fun inbox -> //    let rec loop cache = async{ let! msg = inbox.Receive() let cache' = match msg with | Save (key: 'itemId, value: 'item) -> Map.add key value cache //    | SaveToDB -> saveToDB cache Map.empty | UpdateSaveState updater -> updater cache //     updater ( Save -  ) return! loop cache' } loop Map.empty ) //    let rec runCleanSaveCicle () = async{ do! Async.Sleep savingInterval inbox.Post(SaveToDB) return! runCleanSaveCicle() } do if savingInterval > 0 then runCleanSaveCicle() |> Async.Start with member x.Save(itemId, item) = inbox.Post( Save (itemId, item) ) member x.UpdateBy f = inbox.Post <| UpdateSaveState f 


• MailboxProcessor is a state machine that has at least one state. For a set of states, the number of which is more than one, for each of them is determined by the set of messages that can be processed. During the processing of a particular message, it is possible to switch to another state. The lambda function, which is passed to the MailboxProcessor constructor, defines a set of asynchronous recursive calculations, one for each state of the machine. In each calculation, one message is waited from among all possible ones from the queue (Receive), or a specific subset of messages (Scan). After waiting, processing takes place, in which a transition to a calculation for another state of the machine, continuation of work with the current state or completion of work is possible. The following is an example - a proxy for working with a remote agent. The states working, stoped, recovery are defined. All messages are presented in the form of the RemoteAgentProxyCommand type:

 // ProcessData -   // Reconfigure - // type private RemoteAgentProxyCommand<'data, 'result> = | ProcessData of 'data * ('result option -> unit) | Reconfigure of string | Suspend | Resume | CleanupExpiredData type RemoteAgentProxy<'data, 'result>(transport, ?cleanupInterval) = let cleanupInterval = defaultArg cleanupInterval 60000 let processor = MailboxProcessor.Start(fun inbox -> let cleanup state now = …//  let send state = async { … } //    //   let rec working (state: Map<_, _>) = async { let! msgOpt = inbox.TryReceive(1000) //    1  let now = DateTime.UtcNow match msgOpt with | None when state.Count = 0 -> //c       return! working state //     | None -> //        let nextStateHandler = //    async { try let! state' = send state //     return! working state' //    -     with e -> return! recovery(state, 10000) //   -     } return! nextStateHandler | Some CleanupExpiredData -> return! working (cleanup state now) //  | Some (ProcessData (data, channelReply)) -> //     let expiration = DateTime.UtcNow.AddMilliseconds(float cleanupInterval) return! working (Map.add (Guid.NewGuid()) (expiration, data, channelReply) state) | Some Suspend -> //   return! stoped (working state) //   stoped | Some _ -> //   return! working state } and stoped nextStateHandler = inbox.Scan(function | Resume -> Some(nextStateHandler) | _ -> None) //  Resume and recovery(state, timeToRecieve) = async { //  let! nextTimeToRecieve = if timeToRecieve <= 100 then async { try let! state' = send state return Choice1Of2 state' with e -> return Choice2Of2 10000 } else async.Return <| Choice2Of2 timeToRecieve match nextTimeToRecieve with | Choice1Of2 state -> return! working state | Choice2Of2 time -> let nextDateTime = DateTime.UtcNow.AddMilliseconds(float time) let! msg = inbox.TryReceive(timeToRecieve) let now = DateTime.UtcNow let nextTime = int (nextDateTime - now).TotalMilliseconds match msg with | Some (ProcessData (data, channelReply)) -> channelReply None return! recovery(state, nextTime) | Some CleanupExpiredData -> return! recovery (cleanup state now, nextTime) | Some Suspend -> return! stoped (recovery(state, nextTime)) | None -> return! recovery(state, 0) | _ -> return! recovery(state, nextTime) } working Map.empty) 


• MailboxProcessor can be used to organize an asynchronous reusable data channel. Its purpose is to transfer data from one part of the application to another (transmission is possible between different streams), while not rigidly linking these parts. A channel is a tuple of two functions: functions for sending data and functions for waiting to receive them without blocking the stream:

 let CreateAsyncChannel<'a> () = //'a -       //MailboxProcessor     let inbox = new MailboxProcessor<_>( fun inbox -> //     AsyncReplyChannel   let rec waitResponse (repl:AsyncReplyChannel<'a*bool>) = inbox.Scan <| function | GetResult repl -> //   AsyncReplyChannel    -   Some <| waitResponse repl | IntermediateResult res -> //    repl.Reply (res, false) Some <| waitRepl () | Result res -> //    repl.Reply (res, true) Some <| async.Return () and waitRepl () = //   -     GetResult //         //repl -     inbox.Scan <| function | GetResult repl -> Some <| waitResponse repl | _ -> None waitRepl () ) inbox.Start() //    resultWaiter -   let resultWaiter timeout = inbox.PostAndTryAsyncReply ((fun replChannel -> GetResult replChannel), timeout) //    postResult -    //     MailboxProcessor let postResult closeChannel = if closeChannel then Result else IntermediateResult >> inbox.Post (resultWaiter, postResult) 


Thus, based on MailboxProcessor, you can build a system of independent components, each of which owns its own resources and provides a thread-safe interface to others. The next article will discuss the features of building a state machine based on the TPL Dataflow library.

Source: https://habr.com/ru/post/236325/


All Articles