📜 ⬆️ ⬇️

Monitoring of actors in Akka.Net, but on F #

At once I will tell, there is no hub for F # on the Habré, therefore I write in C #.

For those who are not familiar with F #, but familiar with C #, I recommend the latest article from Microsoft.
It will help you experience less WTF moments while reading, because My article is not a syntax tutorial.

Task context


There is a service written in Akka.NET, it dumps a bunch of information into different text logs. The operations department stirs these logs, fries them with regexps to find out about the number of errors (business and not so much), the number of incoming messages in the service, and the number of outgoing ones. Further, this information is poured into ElasticDB, InfluxDB and is shown in Grafana and Kibana in different sections and aggregations.
')
It sounds difficult, and parsing the text logs of the service that generates several dozen GB of text garbage per day is a thankless task. Therefore, there was a problem - the service should be able to raise endpoints, which can be pulled and get all the information about it at once.

We will solve the problem as follows:

  1. Write a domain model for metrics
  2. Zapaim domain model metrics on the implementation of App.Metrics and raise apishchku
  3. Let's make a structured domain logger, which we put on Akka internal logger
  4. Make a wrapper for functional actors that will hide work with metrics and logger
  5. Putting it all together and running

Domain Model for Metrics


There are 6 types of views in App.Metrics:


In the first iteration, we have enough counters, timers and ... meters :)
In the beginning we will describe the types and interfaces (I will not give you everything, you can look in the repository, the link is at the end).

We also agree that all our messages for metrics will come to a special actor (we will define it later) via the EventStream (the message bus in Akka.Net itself).

For example, a timer that should be able to measure a certain amount of time for an object:

type IMetricsTimer = abstract member Measure : Amount -> unit abstract member Measure : Amount * Item -> unit 

Or a counter that should be able to increase / decrease both with and without quantity indication:

  type IMetricsCounter = abstract member Decrement : unit -> unit abstract member Decrement : Amount -> unit abstract member Decrement : Amount * Item -> unit abstract member Increment : unit -> unit abstract member Increment : Amount -> unit abstract member Increment : Amount * Item -> unit 

And a couple of examples of commands for the bus:

  type DecrementCounterCommand = { CounterId : CounterId DecrementAmount : Amount Item : Item } type CreateCounterCommand = { CounterId : CounterId Context : ContextName Name : MetricName MeasurementUnit : MeasurementUnit ReportItemPercentages : bool ReportSetItems : bool ResetOnReporting : bool } 

The most important thing is to determine the possible messages that can go along the bus, and to which our metrics-actor will react. To do this, we use Discriminated Union:

  type MetricsMessage = | DecrementCounter of DecrementCounterCommand | IncrementCounter of IncrementCounterCommand | MarkMeter of MarkMeterCommand | MeasureTime of MeasureTimeCommand | CreateCounter of CreateCounterCommand | CreateMeter of CreateMeterCommand | CreateTimer of CreateTimerCommand 

Now we need to implement the interfaces and finish the first paragraph. We will implement them in a functional style, i.e. through functions.

Example of creating a meter:

  let private createMeter (evtStream: EventStream) meterId = { new IMetricsMeter with member this.Mark amount = this.Mark (amount, Item None) member this.Mark item = this.Mark (Amount 1L, item) member this.Mark (amount, item) = evtStream.Publish <| MarkMeter { MeterId = meterId; Amount = amount; Item = item } 

For people from the world of C # I give an analogue:

  private IMetricsMeter createMeter(EventStream evtStream, MeterId meterId) { private class TempClass : IMetricsMeter { public void Mark(long amount) { Mark(amount, ""); } public void Mark(string item) { Mark(1, item); } public void Mark(long amount, string item) { evtStream.Publish(new MarkMeter {...});//omitted } } return new TempClass(); } 

Do not be confused that the analog is not compiled, this is normal, because the private class in the body of the method is confused by the compiler. But in F # you can return an anonymous class through the interface.

The main thing you need to pay attention to is that we throw a message on the bus that we need to move the meter, which is determined via MeterId.

We do the same with the IMetricsAdapter, but since I will give many methods for one:

  member this.CreateMeter (name, measureUnit, rateUnit) = let cmd = { MeterId = MeterId (toId name) Context = context Name = name MeasurementUnit = measureUnit RateUnit = rateUnit } evtStream.Publish <| CreateMeter cmd createMeter evtStream cmd.MeterId 

When we request a timer, we send a creation message to the bus, and the caller is returned the result of the createMeter method with the arguments evtStream and cmd.MeterId.
The result of it, as seen above - IMetricsMeter.

After that, create an extension for ActorSystem so that you can call our IMetricsAdapter from anywhere:

  type IActorContext with member x.GetMetricsProducer context = createAdapter x.System.EventStream context 

Actors for metrics and apishechka


We need two actors:


Immediately think ApiController, it is trivial:

  type public MetricController(metrics: IMetrics) = inherit ApiController() [<HttpGet>] [<Route("metrics")>] member __.GetMetrics() = __.Ok(metrics.Snapshot.Get()) 

Next, we declare the function of an actor that will read all MetricsMessage from the EventStream and do something with them. Introduce the IMetrics dependency to the function through arguments, inside we create caches for all metrics through ordinary Dictionary.

Why not ConcurrentDictionary, you ask? And because the actor processes messages in turn. In order to catch the race condition inside the actor, you need to purposefully shoot yourself in the foot.

  let createRecorder (metrics: IMetrics) (mailbox: Actor<_>) = let self = mailbox.Self let counters = new Dictionary<CounterId, ICounter>() let meters = new Dictionary<MeterId, IMeter>() let timers = new Dictionary<TimerId, ITimer * TimeUnit>() //    ... let handle = function | DecrementCounter evt -> match counters.TryGetValue evt.CounterId with | (false, _) -> () | (true, c) -> let (Amount am) = evt.DecrementAmount match evt.Item with | Item (Some i) -> c.Decrement (i, am) | Item None -> c.Decrement (am) | CreateMeter cmd -> match meters.TryGetValue cmd.MeterId with | (false, _) -> let (ContextName ctxName) = cmd.Context let (MetricName name) = cmd.Name let options = new MeterOptions( Context = ctxName, MeasurementUnit = toUnit cmd.MeasurementUnit, Name = name, RateUnit = toTimeUnit cmd.RateUnit) let m = metrics.Provider.Meter.Instance options meters.Add(cmd.MeterId, m) | _ -> () //    match  subscribe typedefof<MetricsMessage> self mailbox.Context.System.EventStream |> ignore let rec loop() = actor { let! msg = mailbox.Receive() handle msg return! loop() } loop() 

The short meaning is that they declared the internal state in the form of dictionaries of different metrics, declared the message processing function of MetricsMessage, subscribed to MetricsMessage and returned the recursive message processing function from the mailbox.

Messages for working with metrics are processed as follows:

  1. We look exactly what message (through the pattern matching)
  2. We are looking for a metric with this Id in the corresponding dictionary (for this there is a wonderful pattern after a couple (bool, obj), which returns TryGetValue in F #
  3. If this is a request to create a metric and there is none
  4. If this is a request for the use of a metric and it is - use

We also need an actor that raises the Owin host with the controller above.
To do this, we write a function that accepts a dependency in the form of a config and an IDependencyResolver. In order not to fail at the start, the actor sends a message to itself, which initiates a possible Dispose () of the old API and the creation of a new one. And again, because the actor is synchronous inside itself, we can use mutable state.

  type IMetricApiConfig = abstract member Host: string abstract member Port: int type ApiMessage = ReStartApiMessage let createReader (config: IMetricApiConfig) resolver (mailbox: Actor<_>) = let startUp (app: IAppBuilder) = let httpConfig = new HttpConfiguration(DependencyResolver = resolver) httpConfig.Formatters.JsonFormatter.SerializerSettings.Converters.Add(new MetricDataConverter()) httpConfig.Formatters.JsonFormatter.Indent <- true httpConfig.MapHttpAttributeRoutes() httpConfig.EnsureInitialized() app.UseWebApi(httpConfig) |> ignore let uri = sprintf "http://%s:%d" config.Host config.Port let mutable api = {new IDisposable with member this.Dispose() = ()} let handleMsg (ReStartApiMessage) = api.Dispose() api <- WebApp.Start(uri, startUp) mailbox.Defer api.Dispose mailbox.Self <! ReStartApiMessage let rec loop() = actor { let! msg = mailbox.Receive() handleMsg msg return! loop() } loop() 

We also throw the api.Dispose method into deferred tasks during the final stop of the actor using mailbox.Defer. And for the initial state of the variable api, we use a stub through an object expression that constructs an empty IDisposable object.

We make a structured logger


The meaning of the task is to make a wrapper for the logger from Akka.Net (it is presented through the ILoggingAdapter interface), which can be used to measure the operation time and typed information transfer (not just strings, but distinct business cases).

All logger typing is enclosed in one union.

 type Fragment = | OperationName of string | OperationDuration of TimeSpan | TotalDuration of TimeSpan | ReceivedOn of DateTimeOffset | MessageType of Type | Exception of exn 

And the logger itself will work on this interface:

 type ILogBuilder = abstract OnOperationBegin: unit -> unit abstract OnOperationCompleted: unit -> unit abstract Set: LogLevel -> unit abstract Set: Fragment -> unit abstract Fail: exn -> unit abstract Supress: unit -> unit abstract TryGet: Fragment -> Fragment option 

We will create it through the usual class:

 type LogBuilder(logger: ILoggingAdapter) = let logFragments = new Dictionary<System.Type, Fragment>() let stopwatch = new Stopwatch() let mutable logLevel = LogLevel.DebugLevel interface ILogBuilder with //  

Maybe you ask why the usual Dictionary? As mentioned above, this LogBuilder is intended for use inside the actor when processing a single operation. It makes no sense to use a competitive data structure.

I will give an example of methods for implementing the interface:

  let set fragment = logFragments.[fragment.GetType()] <- fragment member x.OnOperationBegin() = stopwatch.Start() member this.Fail e = logLevel <- LogLevel.ErrorLevel set <| Exception e member this.OnOperationCompleted() = stopwatch.Stop() set <| OperationDuration stopwatch.Elapsed match tryGet <| ReceivedOn DateTimeOffset.MinValue with | Some (ReceivedOn date) -> set <| TotalDuration (DateTimeOffset.UtcNow - date) | _ -> () match status with | Active -> match (logLevel) with | LogLevel.DebugLevel -> logger.Debug(message()) | LogLevel.InfoLevel -> logger.Info(message()) | LogLevel.WarningLevel -> logger.Warning(message()) | LogLevel.ErrorLevel -> logger.Error(message()) | x -> failwith(sprintf "Log level %s is not supported" <| string x) | Supressed -> () 

The most interesting is the logic of the work OnOperationCompleted ():


Create a wrapper for functional actors.


The most magical part that allows us to write simple message processing functions without a boilerplate, which carries total logging.

What do we want to achieve? First we want to secure:


Linq.Expressions will help us to do all of the above. How to do this through QuotationExpressions from F # I do not know, because I did not find a simple way to compile them. I would be glad if someone offers options.

And so, for starters, let's declare a couple of auxiliary types and one method:

 type Expr<'T,'TLog when 'TLog :> ILogBuilder> = Expression<System.Action<Actor<'T>, 'T, 'TLog>> type Wrap = static member Handler(e: Expression<System.Action<Actor<'T>, 'T, #ILogBuilder>>) = e let toExprName (expr: Expr<_,_>) = match expr.Body with | :? MethodCallExpression as methodCall -> methodCall.Method.Name | x -> x.ToString() 

Expr is an expression that contains the Action from the mailbox (in case you need to have children, stop yourself or children and in general), the message being processed and the logger (if you need to do some special actions with it).

Wrap.Handler (Expr) - let us write to it ordinary F # expressions of the form “fun mb msg log -> ()”, and get Linq.Expressions at the output.

toExprName is a method that receives a method name if the expression is a method call (MethodCallExpression) or just tries to cast our expression to a string.
For an expression like “fun mb msg log -> handleMsg msg” - toExprName returns “handleMsg”.

Now we will write a wrapper for creating functional actors. The beginning of the announcement looks like this:

 let loggerActor<'TMsg> (handler: Expr<'TMsg,_>) (mailbox: Actor<'TMsg>) = let exprName = handler |> toExprName let metrics = mailbox.Context.GetMetricsProducer (ContextName exprName) let logger = mailbox.Log.Value 

At the entrance, we will submit only the handler, because mailbox then Akka itself (partial application).

Using the extension we wrote to the ActorSystem, we get an instance of the IMetricsAdapter to the value metrics. We also get the Akka logger in the logger value.

Then we will create all the necessary metrics for this actor and use them right away:

  let errorMeter = metrics.CreateMeter (MetricName "Error Rate", Errors) let instanceCounter = metrics.CreateCounter (MetricName "Instances Counter", Items) let messagesMeter = metrics.CreateMeter (MetricName "Message Processing Rate", Items) let operationsTimer = metrics.CreateTimer (MetricName "Operation Durations", Requests, MilliSeconds, MilliSeconds) instanceCounter.Increment() mailbox.Defer instanceCounter.Decrement 

As you can see, we increase the value of instanceCounter and lay down the decrease of this counter at the stop of the actor.

We will need a couple more methods that will fill in the known parameters to the logger and pull the required metrics.

In this piece of code, we throw the name of the operation into the logger, call the completion of its logging, throw the operation time into the timer metric, and type the message into the message messaging metric:

  let completeOperation (msgType: Type) (logger: #ILogBuilder) = logger.Set (OperationName exprName) logger.OnOperationCompleted() match logger.TryGet(OperationDuration TimeSpan.Zero) with | Some(OperationDuration dur) -> operationsTimer.Measure(Amount (int64 dur.TotalMilliseconds), Item (Some exprName)) | _ -> () messagesMeter.Mark(Item (Some msgType.Name)) 

The following method will help us in handling exceptions inside the actor:

  let registerExn (msgType: Type) e (logger: #ILogBuilder) = errorMeter.Mark(Item (Some msgType.Name)) logger.Fail e 

It remains a bit to make it work. Let's tie everything together through a wrapper over the handler:

  let wrapHandler handler mb (logBuilder: unit -> #ILogBuilder) = let innherHandler mb msg = let logger = logBuilder() let msgType = msg.GetType() logger.Set (MessageType msgType) try try logger.OnOperationBegin() handler mb msg logger with | e -> registerExn msgType e logger; reraise() finally completeOperation msgType logger innherHandler mb 

wrapHandler has a complex signature. In C #, it would look like this:

 Func<TMsg, TResult> wrapHandler<Tmsg, TResult, TLogBuilder, TMailbox>( Func<TMailbox, TMsg, TLogBuilder, TResult> handler, TMailbox mb, Func<TLogBuilder> logBuilder) where TLogBuilder: ILogBuilder 

At the same time on all other types there are no restrictions .

According to the meaning, wrapHandler should give a function at the output that receives TMsg and returns TResults. The procedure for this function will be as follows:


To convert the Expression to Action and submit to each action of the actor a new instance of the logger, we will make one more auxiliary function:

  let wrapExpr (expr: Expr<_,_>) mailbox logger = let action = expr.Compile() wrapHandler (fun mb msg log -> action.Invoke(mailbox, msg, log)) mailbox (fun () -> new LogBuilder(logger)) 

In it, we just get our Expression, compile it and submit it to wrapHandler above, along with the mailbox and the function to get the new LogBuilder ().

The signature of this method is also not easy. On C #, it would look like this:

 Action<TMsg> wrapExpr<TMsg>( Expr<TMsg, LogBuilder> expr, Actor<TMsg> mb, ILoggingAdapterlogger) 

There are still no restrictions on TMsg.

It remains only to create a recursive function :)
  let rec loop() = actor { let! msg = mailbox.Receive() wrapExpr handler mailbox akkaLogger msg return! loop() } loop() 

This expression “wrapExpr handler mailbox akkaLogger”, as can be seen from the explanation above, returns an Action, i.e. a method in which you can submit any type of input and get a unit (void in c #).

Having added “msg” at the end of the expression, we throw the msg argument into this function and perform our action on the received message.

At this point, we ended up coding our problem and move on to examples!

How to start all this?


For this all to work it is not necessary to write a lot of code
You can generally write only message handlers without knowing that we need mailboxes, loggers, or error handling.

A simple case might look like this:

 type ActorMessages = | Wait of int | Stop let waitProcess = function | Wait d -> Async.Sleep d |> Async.RunSynchronously | Stop -> () 


And to wrap this function in loggerActor and get all the buns for which we tried so hard, you can write this:

 let spawnWaitWorker() = loggerActor <| Wrap.Handler(fun mb msg log -> waitProcess msg) let waitWorker = spawn system "worker-wait" <| spawnWaitWorker() waitWorker <! Wait 1000 //    ~1000 waitWorker <! Wait 500 

If you have complex logic and need access to the mailbox and logger:

 let failOrStopProcess (mailbox: Actor<_>) msg (log: ILogBuilder) = try match msg with | Wait d -> failwith "can't wait!" | Stop -> mailbox.Context.Stop mailbox.Self with | e -> log.Fail e let spawnFailOrStopWorker() = loggerActor <| Wrap.Handler(fun mb msg log -> failOrStopProcess mb msg log) let failOrStopWorker = spawn system "worker-vocal" <| spawnFailOrStopWorker() failOrStopWorker <! Wait 1000 //   "can't wait!" failOrStopWorker <! Wait 500 //   "can't wait!" failOrStopWorker <! Stop failOrStopWorker <! Wait 500 //     DeadLetters 

EntryPoint program itself, the creation of ActorSystem, raising metrics and actors can be viewed under the spoiler, there is nothing remarkable there.

Program.fs
 open Akka.FSharp open SimpleInjector open App.Metrics; open Microsoft.Extensions.DependencyInjection open SimpleInjector.Integration.WebApi open System.Reflection open System open Metrics.MetricActors open ExampleActors let createSystem = let configStr = System.IO.File.ReadAllText("system.json") System.create "system-for-metrics" (Configuration.parse(configStr)) let createMetricActors system container = let dependencyResolver = new SimpleInjectorWebApiDependencyResolver(container) let apiConfig = { new IMetricApiConfig with member x.Host = "localhost" member x.Port = 10001 } let metricsReaderSpawner = createReader apiConfig dependencyResolver let metricsReader = spawn system "metrics-reader" metricsReaderSpawner let metricsRecorderSpawner = createRecorder (container.GetInstance<IMetrics>()) let metricsRecorder = spawn system "metrics-recorder" metricsRecorderSpawner () type Container with member x.AddMetrics() = let serviceCollection = new ServiceCollection() let entryAssemblyName = Assembly.GetEntryAssembly().GetName() let metricsHostBuilder = serviceCollection.AddMetrics(entryAssemblyName) serviceCollection.AddLogging() |> ignore let provider = serviceCollection.BuildServiceProvider() x.Register(fun () -> provider.GetRequiredService<IMetrics>()) [<EntryPoint>] let main argv = let container = new Container() let system = createSystem container.RegisterSingleton system container.AddMetrics() container.Verify() createMetricActors system container let waitWorker1 = spawn system "worker-wait1" <| spawnWaitWorker() let waitWorker2 = spawn system "worker-wait2" <| spawnWaitWorker() let waitWorker3 = spawn system "worker-wait3" <| spawnWaitWorker() let waitWorker4 = spawn system "worker-wait4" <| spawnWaitWorker() let failWorker = spawn system "worker-fail" <| spawnFailWorker() let waitOrStopWorker = spawn system "worker-silent" <| spawnWaitOrStopWorker() let failOrStopWorker = spawn system "worker-vocal" <| spawnFailOrStopWorker() waitWorker1 <! Wait 1000 waitWorker2 <! Wait 500 waitWorker3 <! Wait 5000 waitWorker4 <! Wait 8000 failWorker <! Wait 5000 waitOrStopWorker <! Wait 1000 waitOrStopWorker <! Wait 500 waitOrStopWorker <! Stop waitOrStopWorker <! Wait 500 failOrStopWorker <! Wait 1000 failOrStopWorker <! Wait 500 failOrStopWorker <! Stop failOrStopWorker <! Wait 500 Console.ReadKey() |> ignore 0 


The most important thing is metrics!

If during the work go to the link localhost: 10001 / metrics, we will see a fairly large json, which will have a lot of information. I will give a piece for the waitProcess function:

Hidden text
 { "Context": "waitProcess", "Counters": [ { "Name": "Instances Counter", "Unit": "items", "Count": 4 } ], "Meters": [ { "Name": "Message Processing Rate", "Unit": "items", "Count": 4, "FifteenMinuteRate": 35.668327519112893, "FiveMinuteRate": 35.01484385742755, "Items": [ { "Count": 4, "FifteenMinuteRate": 0.0, "FiveMinuteRate": 0.0, "Item": "Wait", "MeanRate": 13.082620551464204, "OneMinuteRate": 0.0, "Percent": 100.0 } ], "MeanRate": 13.082613248856632, "OneMinuteRate": 31.356094372926623, "RateUnit": "min" } ], "Timers": [ { "Name": "Operation Durations", "Unit": "req", "ActiveSessions": 0, "Count": 4, "DurationUnit": "ms", "Histogram": { "LastUserValue": "waitProcess", "LastValue": 8001.0, "Max": 8001.0, "MaxUserValue": "waitProcess", "Mean": 3927.1639786164278, "Median": 5021.0, "Min": 1078.0, "MinUserValue": "waitProcess", "Percentile75": 8001.0, "Percentile95": 8001.0, "Percentile98": 8001.0, "Percentile99": 8001.0, "Percentile999": 8001.0, "SampleSize": 4, "StdDev": 2932.0567172627871, "Sum": 15190.0 }, "Rate": { "FifteenMinuteRate": 0.00059447212531854826, "FiveMinuteRate": 0.00058358073095712587, "MeanRate": 0.00021824579927905906, "OneMinuteRate": 0.00052260157288211038 } } ] } 


From it you can learn that:


In the console will be about the following .

Conclusion


There are a lot of code in this article and, most likely, there is little explanation (I’ll answer in comments if something is not clear), but this is because the article is intended to show the solution of several routine tasks from a real project.

Maybe someone will come in handy, especially since this code was originally written for actors in C #, so if you want, you can transfer all this (I will give you a hint, you can make your version of Receive () with the same inside).

I recommend to study F # to those who are engaged in modeling of complex domain models, since its type system is much richer, the absence of null and design in types allows you to make the model resistant to programmer errors.

An example repository is here .

Thanks for attention!

Source: https://habr.com/ru/post/334296/


All Articles