// : // GetFromCache: id - - generic // AsyncReplyChannel<Choice<'res, exn>> - // ClearCache - type cacheCommands<'id,'res> = | GetFromCache of 'id * AsyncReplyChannel<Choice<'res, exn>> | ClearCache // Cache MailboxProcessor // getById - // cleanupCacheInterval - type Cache<'itemId,'item when 'itemId : comparison > (getById, cleanupCacheInterval) = // MailboxProcessor let inbox = MailboxProcessor.Start(fun inbox -> // // cache - - Map - id let rec loop cache = async { let! res = inbox.Receive() // // MailboxProcessor match res with | GetFromCache (itemId:'itemId, rc) -> let (item, cache')= match Map.tryFind itemId cache with | None -> // try match getById itemId with | None -> (Choice2Of2(KeyNotFoundException() :> exn), cache) | Some (item:'item) -> (Choice1Of2(item), Map.add itemId item cache) with exc -> (Choice2Of2(exc), cache) | Some item -> // (Choice1Of2(item), cache) rc.Reply item // return! loop cache' // | ClearCache -> return! loop Map.empty // } loop Map.empty) // let rec runCleanupCicle () = async { do! Async.Sleep cleanupCacheInterval // inbox.Post ClearCache return! runCleanupCicle () } do if cleanupCacheInterval > 0 then runCleanupCicle () |> Async.Start with // member x.TryGet(itemId) = match inbox.PostAndReply( fun rc -> GetFromCache (itemId, rc) ) with | Choice1Of2 item -> Some item | _ -> None member x.Get(itemId) = match inbox.PostAndReply( fun rc -> GetFromCache (itemId, rc) ) with | Choice1Of2 item -> item | Choice2Of2 e -> raise e member x.Cleanup() = inbox.Post ClearCache
// : // Save: // SaveToDB: // UpdateSaveState: type commandsSave<'itemId, 'item when 'itemId: comparison> = | Save of 'itemId * 'item | SaveToDB | UpdateSaveState of (Map<'itemId, 'item> -> Map<'itemId, 'item>) // saveToDB - Map<'itemId, 'item> // savingInterval - type SavingCache<'itemId, 'item when 'itemId : comparison>(saveToDB, savingInterval) = let inbox = MailboxProcessor.Start( fun inbox -> // let rec loop cache = async{ let! msg = inbox.Receive() let cache' = match msg with | Save (key: 'itemId, value: 'item) -> Map.add key value cache // | SaveToDB -> saveToDB cache Map.empty | UpdateSaveState updater -> updater cache // updater ( Save - ) return! loop cache' } loop Map.empty ) // let rec runCleanSaveCicle () = async{ do! Async.Sleep savingInterval inbox.Post(SaveToDB) return! runCleanSaveCicle() } do if savingInterval > 0 then runCleanSaveCicle() |> Async.Start with member x.Save(itemId, item) = inbox.Post( Save (itemId, item) ) member x.UpdateBy f = inbox.Post <| UpdateSaveState f
// ProcessData - // Reconfigure - // type private RemoteAgentProxyCommand<'data, 'result> = | ProcessData of 'data * ('result option -> unit) | Reconfigure of string | Suspend | Resume | CleanupExpiredData type RemoteAgentProxy<'data, 'result>(transport, ?cleanupInterval) = let cleanupInterval = defaultArg cleanupInterval 60000 let processor = MailboxProcessor.Start(fun inbox -> let cleanup state now = …// let send state = async { … } // // let rec working (state: Map<_, _>) = async { let! msgOpt = inbox.TryReceive(1000) // 1 let now = DateTime.UtcNow match msgOpt with | None when state.Count = 0 -> //c return! working state // | None -> // let nextStateHandler = // async { try let! state' = send state // return! working state' // - with e -> return! recovery(state, 10000) // - } return! nextStateHandler | Some CleanupExpiredData -> return! working (cleanup state now) // | Some (ProcessData (data, channelReply)) -> // let expiration = DateTime.UtcNow.AddMilliseconds(float cleanupInterval) return! working (Map.add (Guid.NewGuid()) (expiration, data, channelReply) state) | Some Suspend -> // return! stoped (working state) // stoped | Some _ -> // return! working state } and stoped nextStateHandler = inbox.Scan(function | Resume -> Some(nextStateHandler) | _ -> None) // Resume and recovery(state, timeToRecieve) = async { // let! nextTimeToRecieve = if timeToRecieve <= 100 then async { try let! state' = send state return Choice1Of2 state' with e -> return Choice2Of2 10000 } else async.Return <| Choice2Of2 timeToRecieve match nextTimeToRecieve with | Choice1Of2 state -> return! working state | Choice2Of2 time -> let nextDateTime = DateTime.UtcNow.AddMilliseconds(float time) let! msg = inbox.TryReceive(timeToRecieve) let now = DateTime.UtcNow let nextTime = int (nextDateTime - now).TotalMilliseconds match msg with | Some (ProcessData (data, channelReply)) -> channelReply None return! recovery(state, nextTime) | Some CleanupExpiredData -> return! recovery (cleanup state now, nextTime) | Some Suspend -> return! stoped (recovery(state, nextTime)) | None -> return! recovery(state, 0) | _ -> return! recovery(state, nextTime) } working Map.empty)
let CreateAsyncChannel<'a> () = //'a - //MailboxProcessor let inbox = new MailboxProcessor<_>( fun inbox -> // AsyncReplyChannel let rec waitResponse (repl:AsyncReplyChannel<'a*bool>) = inbox.Scan <| function | GetResult repl -> // AsyncReplyChannel - Some <| waitResponse repl | IntermediateResult res -> // repl.Reply (res, false) Some <| waitRepl () | Result res -> // repl.Reply (res, true) Some <| async.Return () and waitRepl () = // - GetResult // //repl - inbox.Scan <| function | GetResult repl -> Some <| waitResponse repl | _ -> None waitRepl () ) inbox.Start() // resultWaiter - let resultWaiter timeout = inbox.PostAndTryAsyncReply ((fun replChannel -> GetResult replChannel), timeout) // postResult - // MailboxProcessor let postResult closeChannel = if closeChannel then Result else IntermediateResult >> inbox.Post (resultWaiter, postResult)
Source: https://habr.com/ru/post/236325/
All Articles