type IMetricsTimer = abstract member Measure : Amount -> unit abstract member Measure : Amount * Item -> unit
type IMetricsCounter = abstract member Decrement : unit -> unit abstract member Decrement : Amount -> unit abstract member Decrement : Amount * Item -> unit abstract member Increment : unit -> unit abstract member Increment : Amount -> unit abstract member Increment : Amount * Item -> unit
type DecrementCounterCommand = { CounterId : CounterId DecrementAmount : Amount Item : Item } type CreateCounterCommand = { CounterId : CounterId Context : ContextName Name : MetricName MeasurementUnit : MeasurementUnit ReportItemPercentages : bool ReportSetItems : bool ResetOnReporting : bool }
type MetricsMessage = | DecrementCounter of DecrementCounterCommand | IncrementCounter of IncrementCounterCommand | MarkMeter of MarkMeterCommand | MeasureTime of MeasureTimeCommand | CreateCounter of CreateCounterCommand | CreateMeter of CreateMeterCommand | CreateTimer of CreateTimerCommand
let private createMeter (evtStream: EventStream) meterId = { new IMetricsMeter with member this.Mark amount = this.Mark (amount, Item None) member this.Mark item = this.Mark (Amount 1L, item) member this.Mark (amount, item) = evtStream.Publish <| MarkMeter { MeterId = meterId; Amount = amount; Item = item }
private IMetricsMeter createMeter(EventStream evtStream, MeterId meterId) { private class TempClass : IMetricsMeter { public void Mark(long amount) { Mark(amount, ""); } public void Mark(string item) { Mark(1, item); } public void Mark(long amount, string item) { evtStream.Publish(new MarkMeter {...});//omitted } } return new TempClass(); }
member this.CreateMeter (name, measureUnit, rateUnit) = let cmd = { MeterId = MeterId (toId name) Context = context Name = name MeasurementUnit = measureUnit RateUnit = rateUnit } evtStream.Publish <| CreateMeter cmd createMeter evtStream cmd.MeterId
type IActorContext with member x.GetMetricsProducer context = createAdapter x.System.EventStream context
type public MetricController(metrics: IMetrics) = inherit ApiController() [<HttpGet>] [<Route("metrics")>] member __.GetMetrics() = __.Ok(metrics.Snapshot.Get())
let createRecorder (metrics: IMetrics) (mailbox: Actor<_>) = let self = mailbox.Self let counters = new Dictionary<CounterId, ICounter>() let meters = new Dictionary<MeterId, IMeter>() let timers = new Dictionary<TimerId, ITimer * TimeUnit>() // ... let handle = function | DecrementCounter evt -> match counters.TryGetValue evt.CounterId with | (false, _) -> () | (true, c) -> let (Amount am) = evt.DecrementAmount match evt.Item with | Item (Some i) -> c.Decrement (i, am) | Item None -> c.Decrement (am) | CreateMeter cmd -> match meters.TryGetValue cmd.MeterId with | (false, _) -> let (ContextName ctxName) = cmd.Context let (MetricName name) = cmd.Name let options = new MeterOptions( Context = ctxName, MeasurementUnit = toUnit cmd.MeasurementUnit, Name = name, RateUnit = toTimeUnit cmd.RateUnit) let m = metrics.Provider.Meter.Instance options meters.Add(cmd.MeterId, m) | _ -> () // match subscribe typedefof<MetricsMessage> self mailbox.Context.System.EventStream |> ignore let rec loop() = actor { let! msg = mailbox.Receive() handle msg return! loop() } loop()
type IMetricApiConfig = abstract member Host: string abstract member Port: int type ApiMessage = ReStartApiMessage let createReader (config: IMetricApiConfig) resolver (mailbox: Actor<_>) = let startUp (app: IAppBuilder) = let httpConfig = new HttpConfiguration(DependencyResolver = resolver) httpConfig.Formatters.JsonFormatter.SerializerSettings.Converters.Add(new MetricDataConverter()) httpConfig.Formatters.JsonFormatter.Indent <- true httpConfig.MapHttpAttributeRoutes() httpConfig.EnsureInitialized() app.UseWebApi(httpConfig) |> ignore let uri = sprintf "http://%s:%d" config.Host config.Port let mutable api = {new IDisposable with member this.Dispose() = ()} let handleMsg (ReStartApiMessage) = api.Dispose() api <- WebApp.Start(uri, startUp) mailbox.Defer api.Dispose mailbox.Self <! ReStartApiMessage let rec loop() = actor { let! msg = mailbox.Receive() handleMsg msg return! loop() } loop()
type Fragment = | OperationName of string | OperationDuration of TimeSpan | TotalDuration of TimeSpan | ReceivedOn of DateTimeOffset | MessageType of Type | Exception of exn
type ILogBuilder = abstract OnOperationBegin: unit -> unit abstract OnOperationCompleted: unit -> unit abstract Set: LogLevel -> unit abstract Set: Fragment -> unit abstract Fail: exn -> unit abstract Supress: unit -> unit abstract TryGet: Fragment -> Fragment option
type LogBuilder(logger: ILoggingAdapter) = let logFragments = new Dictionary<System.Type, Fragment>() let stopwatch = new Stopwatch() let mutable logLevel = LogLevel.DebugLevel interface ILogBuilder with //
let set fragment = logFragments.[fragment.GetType()] <- fragment member x.OnOperationBegin() = stopwatch.Start() member this.Fail e = logLevel <- LogLevel.ErrorLevel set <| Exception e member this.OnOperationCompleted() = stopwatch.Stop() set <| OperationDuration stopwatch.Elapsed match tryGet <| ReceivedOn DateTimeOffset.MinValue with | Some (ReceivedOn date) -> set <| TotalDuration (DateTimeOffset.UtcNow - date) | _ -> () match status with | Active -> match (logLevel) with | LogLevel.DebugLevel -> logger.Debug(message()) | LogLevel.InfoLevel -> logger.Info(message()) | LogLevel.WarningLevel -> logger.Warning(message()) | LogLevel.ErrorLevel -> logger.Error(message()) | x -> failwith(sprintf "Log level %s is not supported" <| string x) | Supressed -> ()
type Expr<'T,'TLog when 'TLog :> ILogBuilder> = Expression<System.Action<Actor<'T>, 'T, 'TLog>> type Wrap = static member Handler(e: Expression<System.Action<Actor<'T>, 'T, #ILogBuilder>>) = e let toExprName (expr: Expr<_,_>) = match expr.Body with | :? MethodCallExpression as methodCall -> methodCall.Method.Name | x -> x.ToString()
let loggerActor<'TMsg> (handler: Expr<'TMsg,_>) (mailbox: Actor<'TMsg>) = let exprName = handler |> toExprName let metrics = mailbox.Context.GetMetricsProducer (ContextName exprName) let logger = mailbox.Log.Value
let errorMeter = metrics.CreateMeter (MetricName "Error Rate", Errors) let instanceCounter = metrics.CreateCounter (MetricName "Instances Counter", Items) let messagesMeter = metrics.CreateMeter (MetricName "Message Processing Rate", Items) let operationsTimer = metrics.CreateTimer (MetricName "Operation Durations", Requests, MilliSeconds, MilliSeconds) instanceCounter.Increment() mailbox.Defer instanceCounter.Decrement
let completeOperation (msgType: Type) (logger: #ILogBuilder) = logger.Set (OperationName exprName) logger.OnOperationCompleted() match logger.TryGet(OperationDuration TimeSpan.Zero) with | Some(OperationDuration dur) -> operationsTimer.Measure(Amount (int64 dur.TotalMilliseconds), Item (Some exprName)) | _ -> () messagesMeter.Mark(Item (Some msgType.Name))
let registerExn (msgType: Type) e (logger: #ILogBuilder) = errorMeter.Mark(Item (Some msgType.Name)) logger.Fail e
let wrapHandler handler mb (logBuilder: unit -> #ILogBuilder) = let innherHandler mb msg = let logger = logBuilder() let msgType = msg.GetType() logger.Set (MessageType msgType) try try logger.OnOperationBegin() handler mb msg logger with | e -> registerExn msgType e logger; reraise() finally completeOperation msgType logger innherHandler mb
Func<TMsg, TResult> wrapHandler<Tmsg, TResult, TLogBuilder, TMailbox>( Func<TMailbox, TMsg, TLogBuilder, TResult> handler, TMailbox mb, Func<TLogBuilder> logBuilder) where TLogBuilder: ILogBuilder
let wrapExpr (expr: Expr<_,_>) mailbox logger = let action = expr.Compile() wrapHandler (fun mb msg log -> action.Invoke(mailbox, msg, log)) mailbox (fun () -> new LogBuilder(logger))
Action<TMsg> wrapExpr<TMsg>( Expr<TMsg, LogBuilder> expr, Actor<TMsg> mb, ILoggingAdapterlogger)
let rec loop() = actor { let! msg = mailbox.Receive() wrapExpr handler mailbox akkaLogger msg return! loop() } loop()
type ActorMessages = | Wait of int | Stop let waitProcess = function | Wait d -> Async.Sleep d |> Async.RunSynchronously | Stop -> ()
let spawnWaitWorker() = loggerActor <| Wrap.Handler(fun mb msg log -> waitProcess msg) let waitWorker = spawn system "worker-wait" <| spawnWaitWorker() waitWorker <! Wait 1000 // ~1000 waitWorker <! Wait 500
let failOrStopProcess (mailbox: Actor<_>) msg (log: ILogBuilder) = try match msg with | Wait d -> failwith "can't wait!" | Stop -> mailbox.Context.Stop mailbox.Self with | e -> log.Fail e let spawnFailOrStopWorker() = loggerActor <| Wrap.Handler(fun mb msg log -> failOrStopProcess mb msg log) let failOrStopWorker = spawn system "worker-vocal" <| spawnFailOrStopWorker() failOrStopWorker <! Wait 1000 // "can't wait!" failOrStopWorker <! Wait 500 // "can't wait!" failOrStopWorker <! Stop failOrStopWorker <! Wait 500 // DeadLetters
open Akka.FSharp open SimpleInjector open App.Metrics; open Microsoft.Extensions.DependencyInjection open SimpleInjector.Integration.WebApi open System.Reflection open System open Metrics.MetricActors open ExampleActors let createSystem = let configStr = System.IO.File.ReadAllText("system.json") System.create "system-for-metrics" (Configuration.parse(configStr)) let createMetricActors system container = let dependencyResolver = new SimpleInjectorWebApiDependencyResolver(container) let apiConfig = { new IMetricApiConfig with member x.Host = "localhost" member x.Port = 10001 } let metricsReaderSpawner = createReader apiConfig dependencyResolver let metricsReader = spawn system "metrics-reader" metricsReaderSpawner let metricsRecorderSpawner = createRecorder (container.GetInstance<IMetrics>()) let metricsRecorder = spawn system "metrics-recorder" metricsRecorderSpawner () type Container with member x.AddMetrics() = let serviceCollection = new ServiceCollection() let entryAssemblyName = Assembly.GetEntryAssembly().GetName() let metricsHostBuilder = serviceCollection.AddMetrics(entryAssemblyName) serviceCollection.AddLogging() |> ignore let provider = serviceCollection.BuildServiceProvider() x.Register(fun () -> provider.GetRequiredService<IMetrics>()) [<EntryPoint>] let main argv = let container = new Container() let system = createSystem container.RegisterSingleton system container.AddMetrics() container.Verify() createMetricActors system container let waitWorker1 = spawn system "worker-wait1" <| spawnWaitWorker() let waitWorker2 = spawn system "worker-wait2" <| spawnWaitWorker() let waitWorker3 = spawn system "worker-wait3" <| spawnWaitWorker() let waitWorker4 = spawn system "worker-wait4" <| spawnWaitWorker() let failWorker = spawn system "worker-fail" <| spawnFailWorker() let waitOrStopWorker = spawn system "worker-silent" <| spawnWaitOrStopWorker() let failOrStopWorker = spawn system "worker-vocal" <| spawnFailOrStopWorker() waitWorker1 <! Wait 1000 waitWorker2 <! Wait 500 waitWorker3 <! Wait 5000 waitWorker4 <! Wait 8000 failWorker <! Wait 5000 waitOrStopWorker <! Wait 1000 waitOrStopWorker <! Wait 500 waitOrStopWorker <! Stop waitOrStopWorker <! Wait 500 failOrStopWorker <! Wait 1000 failOrStopWorker <! Wait 500 failOrStopWorker <! Stop failOrStopWorker <! Wait 500 Console.ReadKey() |> ignore 0
{ "Context": "waitProcess", "Counters": [ { "Name": "Instances Counter", "Unit": "items", "Count": 4 } ], "Meters": [ { "Name": "Message Processing Rate", "Unit": "items", "Count": 4, "FifteenMinuteRate": 35.668327519112893, "FiveMinuteRate": 35.01484385742755, "Items": [ { "Count": 4, "FifteenMinuteRate": 0.0, "FiveMinuteRate": 0.0, "Item": "Wait", "MeanRate": 13.082620551464204, "OneMinuteRate": 0.0, "Percent": 100.0 } ], "MeanRate": 13.082613248856632, "OneMinuteRate": 31.356094372926623, "RateUnit": "min" } ], "Timers": [ { "Name": "Operation Durations", "Unit": "req", "ActiveSessions": 0, "Count": 4, "DurationUnit": "ms", "Histogram": { "LastUserValue": "waitProcess", "LastValue": 8001.0, "Max": 8001.0, "MaxUserValue": "waitProcess", "Mean": 3927.1639786164278, "Median": 5021.0, "Min": 1078.0, "MinUserValue": "waitProcess", "Percentile75": 8001.0, "Percentile95": 8001.0, "Percentile98": 8001.0, "Percentile99": 8001.0, "Percentile999": 8001.0, "SampleSize": 4, "StdDev": 2932.0567172627871, "Sum": 15190.0 }, "Rate": { "FifteenMinuteRate": 0.00059447212531854826, "FiveMinuteRate": 0.00058358073095712587, "MeanRate": 0.00021824579927905906, "OneMinuteRate": 0.00052260157288211038 } } ] }
Source: https://habr.com/ru/post/334296/
All Articles