📜 ⬆️ ⬇️

Introduction to reactive programming

Hello. In this article I will run a gallop across Europe, namely, I will tell you what they mean by reactive programming, introduce you to actors, reactive streams, and finally, using reactive streams, we will recognize mouse gestures, like in the old Opera and its spiritual heir - Vivaldi .

The goal is to introduce the basic concepts of reactive programming and show that everything is not as difficult and scary as it may seem at first glance.

image
A source

What is reactive programming?


To answer this question, we turn to the site . It has a beautiful picture, which shows 4 main criteria that must be met by reactive applications.
')
image

The application must be fast, fault tolerant and well scaled.
Looks like "we are for all the good against all the bad," right?

What is meant by these words:

  1. Responsiveness

    The application should give the user the result in half a second. This also includes the principle of fail fast - that is, when something goes wrong, it is better to return to the user an error message like “Sorry, there was a problem. Try later "than to make the sea wait for the weather. If the operation is long, we show the user a progress bar. If it’s very long, “your request will be completed on or about March 18, 2042. We will send you a notification by mail. ”
  2. Scalability is a way to ensure responsiveness under load. Imagine the life cycle of a relatively successful service:
    1. Starting - the request flow is small, the service is spinning on a virtual machine with one core.
    2. The request flow increases - virtual kernels have added kernels and requests are processed in several threads.
    3. Even more loading - we connect batching - requests to base and to the hard disk are grouped.
    4. Even more load - you need to raise more servers and ensure the work in the cluster.
      Ideally, the system should scale up or down depending on the load.
  3. fault tolerance

    We accept that we live in an imperfect world and anything happens. In case something goes wrong with our system, we need to provide error handling and ways to restore functionality.
  4. Finally, we are encouraged to do all this with a system based on messaging ( message-driven ).

Before continuing, I want to dwell on the difference between event-driven systems and message-driven systems.

Event-driven:


In contrast to the event-driven, in a message-driven system:


All this offers us

Actor model


Milestones of development:


What can actors do?


Actors are the same objects, but:


Consider an example.

image

Actor A wants to send a message to actor B. All he has is ActorRef (some address). Actor B can be anywhere.
Actor A sends letter B through the system (ActorSystem). The system puts the letter in the mailbox of actor B and the actor B. awakens it. Actor B takes the letter from the mailbox and does something.

Compared to calling other methods, it looks unnecessarily difficult, but the model of actors fits perfectly with the real world, if you imagine that actors are people who are trained to do something in response to certain stimuli.

Imagine a father and son:



The father sends SMS to the son “Clean the room” and continues to go about his business. Son reads SMSku and begins cleaning. Father plays poker in the meantime. The son finishes cleaning and sends SMS "Finish". It looks easy, right?

Now imagine that father and son are not actors, but ordinary objects that can pull methods from each other. The father pulls the son for the “clean the room” method and follows him on the heels, waiting until the son finishes cleaning and transfers control back to his father. Father cannot play poker at this time. In this context, the model of actors becomes more attractive.

Now move on to

Akka.NET


Everything that is written below is true for the original Akka for the JVM, but for me C # is closer than Java, so I’ll use the example of Akka.NET.

So what advantages does Akka have?



But the topic of scaling is very extensive and worthy of a separate publication. Therefore, I will tell you more about the feature that will be useful in all projects:

Error processing


Actors have a hierarchy - it can be represented as a tree. Each actor has a parent and can be "children."

image
Akka.NET documentation Copyright 2013-2018 Akka.NET project

For each actor, you can install the Supervision strategy - what to do if something went wrong with the “children”. For example, “nail down” an actor who has problems, and then create a new actor of the same type and charge him with the same work.

For example, I made an application on Akka.net CRUD, in which the layer of "business logic" is implemented on actors. The task of this project was to find out whether it is worth using actors in unscalable systems - whether they will make life better or add more pain.

How built-in error handling in Akka can help:

Gif


  1. everything is good, the application is working,
  2. something happened to the repository, and now it gives the result only 1 time out of 5,
  3. I set up Supervision strategy to "try 10 times in a second",
  4. the application is working again (albeit more slowly), and I have time to figure out what's wrong.

Then there is a temptation to say: “Well, I will write such error handling myself, why should any actors be fenced?”. Fair remark, but only if the points of failure are few.

And some code. This is how the initialization of the system of actors in the IoC container looks like:

public Container() { system = ActorSystem.Create("MySystem"); var echo = system.ActorOf<EchoActor>("Echo"); //stop initialization if something is wrong with actor system var alive = echo.Ask<bool>(true, TimeSpan.FromMilliseconds(100)).Result; container = new WindsorContainer(); //search for dependencies //register controllers //register ActorSystem propsResolver = new WindsorDependencyResolver(container, (ActorSystem)system); system.AddDependencyResolver(propsResolver); actorSystemWrapper = new ActorSystemWrapper(system, propsResolver); container.Register(Component.For<IActorRefFactory>().Instance(actorSystemWrapper)); container.Register(Component.For<IDependencyResolver>().Instance(propsResolver)); } 

EchoActor is the simplest actor that returns a value to the sender:

  public class EchoActor : ReceiveActor { public EchoActor() { Receive<bool>(flag => { Sender.Tell(flag); }); } } 

To associate actors with a “normal” code, use the Ask command:

  public async Task<ActionResult> Index() { ViewBag.Type = typeof(Model); var res = await CrudActorRef.Ask<IEnumerable<Model>>(DataMessage.GetAll<Model>(), maxDelay); return View(res); } 

Total


Pokhimich with actors, I can say:


Part 2: Reactive Streams


And now let's move on to a more popular and useful topic - jet streams. If you can never meet actors in the course of work, then Rx streams will be useful both in the frontend and in the backend. Their implementation is in almost all modern programming languages. I will give examples on RxJs, because nowadays even backend programmers sometimes have to do something in JavaScript.


Rx-streams are for all popular programming languages

“ Introduction to Reactive Programming you've been missing ” by Andre Staltz , under license CC BY-NC 4.0

To explain what a jet stream is, I'll start with Pull and Push collections.
Single return valueMultiple return values
Pull
Synchronized
Interactive
TIEnumerable <T>
Push
Asynchronous
Reactive
Task <T>IObservable <T>

Pull collections are what we are all used to in programming. The most striking example is the array.

 const arr = [1,2,3,4,5]; 

It already has data, it will not change this data, but it can give it on request.

 arr.forEach(console.log); 

Also, before you do something with the data, you can somehow handle them.

 arr.map(i => i+1).map(I => “my number is ”+i).forEach(console.log); 

And now let's imagine that initially there is no data in the collection, but it will definitely inform you that they have appeared (Push). And at the same time, we still can apply the necessary transformations to this collection.

For example:

 source.map(i => i+1).map(I => “my number is ”+i).forEach(console.log); 

When a value appears in the source, for example, 1, console.log will display “my number is 1”.

How it works:

A new entity appears - Subject (or Observable):

 const observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); }, 1000); }); 

This is a push-collection that will send notifications about changes in its state.

In this case, numbers 1, 2 and 3 will appear in it immediately, 4 seconds later, and then the collection will “end”. This is such a special type of event.

The second entity is the Observer. He can subscribe to Subject events and do something with the received data. For example:

 observable.subscribe(x => console.log(x)); observable.subscribe({ next: x => console.log('got value ' + x), error: err => console.error('something wrong occurred: ' + err), complete: () => console.log('done'), }); observable .map(x => 'This is ' + x) .subscribe(x => console.log(x)); 

Here you can see that one Subject can have many subscribers.

It looks easy, but it is not entirely clear why this is needed. I will give 2 more definitions that need to be known when working with jet streams, and then I will show in practice how they work and in what situations their full potential is revealed.

Cold observables



What this means: let's say the company (Subject) decided to arrange the distribution of gifts. Each employee (Observer) comes to work and receives his copy of the gift. No one is left out.

Hot observables



Example: in the morning, hot patties are brought to the company for employees. When they are brought, all the larks fly to the smell and dismantle the pies for breakfast. And the owls, who came later, no longer get the pies.

What situations to use jet streams?


When there is a data stream distributed in time. For example, user input. Or logs from any service. In one of the projects, I saw a samopinny logger who collected events in N seconds and then recorded the whole pack at a time. The battery code occupied the page. If Rx streams were used, this would be much simpler:

image
“ RxJs Reference / Observable , documentation licensed under CC BY 4.0 .
(there are many examples and pictures explaining what various operations with jet streams do)

 source.bufferTime(2000).subsribe(doThings); 

And finally, an example of use.

Recognize mouse gestures using Rx streams


In the old Opera or its spiritual heir - Vivaldi - it was browser control using mouse gestures.

Gif - mouse gestures in Vivaldi


That is, you need to recognize the movement of the mouse up / down, right / left and their combinations. This can be written without Rx streams, but the code will be complex and difficult to maintain.

And this is what it looks like with Rx streams:


I will start from the end - I will ask what data and in what format I will look for in the original sequence:

 //gestures to look for const gestures = Rx.Observable.from([ { name: "Left", sequence: Rx.Observable.from([{ x: -1, y: 0 }]) }, { name: "Right", sequence: Rx.Observable.from([{ x: 1, y: 0 }]) }, { name: "Up", sequence: Rx.Observable.from([{ x: 0, y: -1 }]) }, { name: "Down", sequence: Rx.Observable.from([{ x: 0, y: 1 }]) }, { name: "Down+Up", sequence: Rx.Observable.from([{ x: 0, y: 1 }, { x: 0, y: -1 }]) }, { name: "Up+Right", sequence: Rx.Observable.from([{ x: 0, y: -1 }, { x: 1, y: 0 }]) } ]); 

These are single vectors and their combinations.

Next, you need to convert mouse events to Rx streams. All Rx libraries have built-in tools for turning standard events into Observables.

 const mouseMoves = Rx.Observable.fromEvent(canvas, 'mousemove'), mouseDowns = Rx.Observable.fromEvent(canvas, 'mousedown'), mouseUps = Rx.Observable.fromEvent(canvas, 'mouseup'); 

Next, I group the mouse coordinates by 2 and find their difference, getting the mouse offset.

 const mouseDiffs = mouseMoves .map(getOffset) .pairwise() .map(pair => { return { x: pair[1].x-pair[0].x, y: pair[1].y-pair[0].y } }); 

And I group these movements using the 'mousedown' and 'mouseup' events.

 const mouseGestures = mouseDiffs .bufferToggle(mouseDowns, x => mouseUps) .map(concat); 

The concat function cuts out too short movements and groups movements that roughly match in direction.

 function concat(values) {//summarize move in same direction return values.reduce((a, v) => { if (!a.length) { a.push(v); } else { const last = a[a.length - 1]; const lastAngle = Math.atan2(last.x, last.y); const angle = Math.atan2(vx, vy); const angleDiff = normalizeAngle(angle - lastAngle); const dist = Math.hypot(vx, vy); if (dist < 1) return a;//move is too short – ignore //moving in same direction => adding vectors if (Math.abs(angleDiff) <= maxAngleDiff) { last.x += vx; last.y += vy; } else { a.push(v); } } return a; }, []); } 

If the movement along the X or Y axis is too short, it is reset. And then only the sign remains from the received offset coordinates. Thus, we obtain the unit vectors that we were looking for.

 const normalizedMouseGestures = mouseGestures.map(arr => arr.map(v => { const dist = Math.hypot(vx, vy);//length of vector vx = Math.abs(vx) > minMove && Math.abs(vx) * treshold > dist ? vx : 0; vy = Math.abs(vy) > minMove && Math.abs(vy) * treshold > dist ? vy : 0; return v; }) ).map(arr => arr .map(v => { return { x: Math.sign(vx), y: Math.sign(vy) }; }) .filter(v => Math.hypot(vx, vy) > 0) ); 

Result:

 gestures.map(gesture => normalizedMouseGestures.mergeMap( moves => Rx.Observable.from(moves) .sequenceEqual(gesture.sequence, comparer) ).filter(x => x).mapTo(gesture.name) ).mergeAll().subscribe(gestureName => actions[gestureName]()); 

With the help of sequenceEqual, you can compare the received movements with the original ones and, if there is a match, perform a specific action.

Gif


→ Play with gestures here

Please note that, besides gesture recognition, there is also a drawing of both the initial and normalized mouse movements on the HTML canvas. The readability of the code does not suffer from this.

From which follows another advantage - the functionality written with the help of Rx streams can be easily supplemented and expanded.

Total



Source: https://habr.com/ru/post/432004/


All Articles