
//  : // GetFromCache: id -   - generic // AsyncReplyChannel<Choice<'res, exn>> -      // ClearCache -   type cacheCommands<'id,'res> = | GetFromCache of 'id * AsyncReplyChannel<Choice<'res, exn>> | ClearCache //  Cache  MailboxProcessor // getById -          // cleanupCacheInterval -    type Cache<'itemId,'item when 'itemId : comparison > (getById, cleanupCacheInterval) = // MailboxProcessor     let inbox = MailboxProcessor.Start(fun inbox -> //      // cache -   - Map -  id   let rec loop cache = async { let! res = inbox.Receive() //   // MailboxProcessor      match res with | GetFromCache (itemId:'itemId, rc) -> let (item, cache')= match Map.tryFind itemId cache with | None -> //   try match getById itemId with | None -> (Choice2Of2(KeyNotFoundException() :> exn), cache) | Some (item:'item) -> (Choice1Of2(item), Map.add itemId item cache) with exc -> (Choice2Of2(exc), cache) | Some item -> //   (Choice1Of2(item), cache) rc.Reply item //  return! loop cache' //     | ClearCache -> return! loop Map.empty //   } loop Map.empty) //      let rec runCleanupCicle () = async { do! Async.Sleep cleanupCacheInterval //   inbox.Post ClearCache return! runCleanupCicle () } do if cleanupCacheInterval > 0 then runCleanupCicle () |> Async.Start with //  member x.TryGet(itemId) = match inbox.PostAndReply( fun rc -> GetFromCache (itemId, rc) ) with | Choice1Of2 item -> Some item | _ -> None member x.Get(itemId) = match inbox.PostAndReply( fun rc -> GetFromCache (itemId, rc) ) with | Choice1Of2 item -> item | Choice2Of2 e -> raise e member x.Cleanup() = inbox.Post ClearCache  //  : // Save:     // SaveToDB:     // UpdateSaveState:         type commandsSave<'itemId, 'item when 'itemId: comparison> = | Save of 'itemId * 'item | SaveToDB | UpdateSaveState of (Map<'itemId, 'item> -> Map<'itemId, 'item>) // saveToDB -       Map<'itemId, 'item> // savingInterval -     type SavingCache<'itemId, 'item when 'itemId : comparison>(saveToDB, savingInterval) = let inbox = MailboxProcessor.Start( fun inbox -> //    let rec loop cache = async{ let! msg = inbox.Receive() let cache' = match msg with | Save (key: 'itemId, value: 'item) -> Map.add key value cache //    | SaveToDB -> saveToDB cache Map.empty | UpdateSaveState updater -> updater cache //     updater ( Save -  ) return! loop cache' } loop Map.empty ) //    let rec runCleanSaveCicle () = async{ do! Async.Sleep savingInterval inbox.Post(SaveToDB) return! runCleanSaveCicle() } do if savingInterval > 0 then runCleanSaveCicle() |> Async.Start with member x.Save(itemId, item) = inbox.Post( Save (itemId, item) ) member x.UpdateBy f = inbox.Post <| UpdateSaveState f  // ProcessData -   // Reconfigure - // type private RemoteAgentProxyCommand<'data, 'result> = | ProcessData of 'data * ('result option -> unit) | Reconfigure of string | Suspend | Resume | CleanupExpiredData type RemoteAgentProxy<'data, 'result>(transport, ?cleanupInterval) = let cleanupInterval = defaultArg cleanupInterval 60000 let processor = MailboxProcessor.Start(fun inbox -> let cleanup state now = …//  let send state = async { … } //    //   let rec working (state: Map<_, _>) = async { let! msgOpt = inbox.TryReceive(1000) //    1  let now = DateTime.UtcNow match msgOpt with | None when state.Count = 0 -> //c       return! working state //     | None -> //        let nextStateHandler = //    async { try let! state' = send state //     return! working state' //    -     with e -> return! recovery(state, 10000) //   -     } return! nextStateHandler | Some CleanupExpiredData -> return! working (cleanup state now) //  | Some (ProcessData (data, channelReply)) -> //     let expiration = DateTime.UtcNow.AddMilliseconds(float cleanupInterval) return! working (Map.add (Guid.NewGuid()) (expiration, data, channelReply) state) | Some Suspend -> //   return! stoped (working state) //   stoped | Some _ -> //   return! working state } and stoped nextStateHandler = inbox.Scan(function | Resume -> Some(nextStateHandler) | _ -> None) //  Resume and recovery(state, timeToRecieve) = async { //  let! nextTimeToRecieve = if timeToRecieve <= 100 then async { try let! state' = send state return Choice1Of2 state' with e -> return Choice2Of2 10000 } else async.Return <| Choice2Of2 timeToRecieve match nextTimeToRecieve with | Choice1Of2 state -> return! working state | Choice2Of2 time -> let nextDateTime = DateTime.UtcNow.AddMilliseconds(float time) let! msg = inbox.TryReceive(timeToRecieve) let now = DateTime.UtcNow let nextTime = int (nextDateTime - now).TotalMilliseconds match msg with | Some (ProcessData (data, channelReply)) -> channelReply None return! recovery(state, nextTime) | Some CleanupExpiredData -> return! recovery (cleanup state now, nextTime) | Some Suspend -> return! stoped (recovery(state, nextTime)) | None -> return! recovery(state, 0) | _ -> return! recovery(state, nextTime) } working Map.empty)  let CreateAsyncChannel<'a> () = //'a -       //MailboxProcessor     let inbox = new MailboxProcessor<_>( fun inbox -> //     AsyncReplyChannel   let rec waitResponse (repl:AsyncReplyChannel<'a*bool>) = inbox.Scan <| function | GetResult repl -> //   AsyncReplyChannel    -   Some <| waitResponse repl | IntermediateResult res -> //    repl.Reply (res, false) Some <| waitRepl () | Result res -> //    repl.Reply (res, true) Some <| async.Return () and waitRepl () = //   -     GetResult //         //repl -     inbox.Scan <| function | GetResult repl -> Some <| waitResponse repl | _ -> None waitRepl () ) inbox.Start() //    resultWaiter -   let resultWaiter timeout = inbox.PostAndTryAsyncReply ((fun replChannel -> GetResult replChannel), timeout) //    postResult -    //     MailboxProcessor let postResult closeChannel = if closeChannel then Result else IntermediateResult >> inbox.Post (resultWaiter, postResult) Source: https://habr.com/ru/post/236325/
All Articles