📜 ⬆️ ⬇️

Reactive extensions



This is the second publication of our Sync.NET internal conference . The first publication was devoted to multithreading in .NET .

Jet extensions - it sounds so cool that it suggests a connection with jet aircraft. Of course, there is no connection, but this is a really great tool. Reactive comes from the word “react”, meaning that the system responds to state changes. In the process of software development, the need has arisen for the system to be able to respond to multiple data sources, to be stable and for different modules not to be closely related.
')
As a rule, we write code in which there are methods and functions that we call, we get the result and process it. Rx in turn allows you to create events and event handlers that will respond to them. Thus, the system will consist of a sequence of events that will report on a change in state and respond appropriately to them.

Rx consists of two basic abstractions in the System namespace starting with .NET 4.0, namely System.IObserver and System.IObservable . As the name implies, this is the implementation of the Observer pattern. In this implementation, IObservable acts as subject , and it is obvious that IObserver is an observer who can subscribe to changes. In the .NET platform, there is already an observer implementation in the form of Events. As already mentioned, Rx allows you to create a sequence of events, and it goes without saying that this can be done with the help of events. Ways to work with Rx and Events are different, but more on that later.

IObserver <in>


Provides a mechanism for receiving notifications. The interface declares three methods:

void OnNext(T value) - provides the next element in the sequence.

void OnError(Exception ex) - allows you to pass an Exception and adequately process it. It is understood that after this message the sequence ends and the observers no longer need to follow the changes.

void OnCompleated() - it is reported that the sequence has ended and there will be no more new messages, no need to wait for them.

IObservable<out >


Produces notifications and allows observers to subscribe. Declares one method:

IDisposable Subscribe(IObserver<> observer) - accepts an observer (IObserver) with a parameter and signs it to messages. Notice that the method returns IDisposable , with which you can later call the Dispose method, thereby writing and destroying the observer.

If we want to implement IObservable , then in addition to the Subscribe method, we will also need to implement logic that can send new messages, errors, or report on the end of a sequence. It turns out that you also need to implement the IObservable interface, for such purposes you can use the Subject type. But to use it, you will need to install an additional library ( Install-Package Rx-Mail ) with Nuget , which also provides additional extensions and the ability to use LINQ .

 using System; using System.Reactive.Subjects; namespace Demo { class Program { static Subject<int> sub = new Subject<int>();//Declare static void Main() { sub.Subscribe(Console.WriteLine); //Subscribe sub.OnNext(234); //Publish } } } 


In this example, a new sequence is created, that is, Subject<in> (you can also call a sequence of ints), then an observer subscribes to it (in this case, each value of the sequence is simply output to the console), and the value that is output to the console with using an observer. Every time a new observer signs, he begins to receive the elements of a sequence. But there are a few other implementations with different behavior:

ReplaySubject


 using System; using System.Reactive.Subjects; namespace Demo { class Program { static ReplaySubject<int> sub = new ReplaySubject<int>(); static void Main() { sub.OnNext(222); sub.Subscribe(Console.WriteLine); sub.OnNext(354); } } } 


ReplaySubject - delivers all elements of the sequence, regardless of when the observer was signed.

BehaviorSubject


 using System; using System.Reactive.Subjects; namespace DemoData { class Program { static BehaviorSubject<int> sub = new BehaviorSubject<int>(666); static void Main() { sub.OnNext(222); sub.Subscribe(Console.WriteLine); // 222 } } } 


BehaviorSubject - can not be empty, always contains an element, but only the last one.

AsyncSubject


 using System; using System.Reactive.Subjects; namespace DemoData { class Program { static AsyncSubject<int> sub = new AsyncSubject<int>(); static void Main(string[] args) { sub.OnNext(222); sub.Subscribe(Console.WriteLine); sub.OnCompleted(); // Publish 222 } } } 


AsyncSubject - also returns only the last value, but, unlike other implementations, the data will be published when OnCompleated called.

Now let's compare it to the Event, this is what the code would look like:

 using System; namespace Demo { class Program { static event Action<int> Ev; //Declare static void Main(string[] args) { Ev += Console.WriteLine; //Subscribe Ev(234); //Publish } } } 


Everything is very simple, the execution will be the same, but in Rx there are a number of advantages over the events:


 var toDispose = sub.Subscribe(Console.WriteLine); toDispose.Dispose(); 



LINQ


Initially LINQ allowed to do inquiries to static data sources. But as the amount of data grows and approaches change, you need to adapt to this. Rx allows you to query dynamic sequences.

 using System.Reactive.Linq; //   LINQ namespace Demo { class Program { static void Main() { var sequence = Observable.Range(1, 10, Scheduler.Default); //    1  10 var query = from s in sequence where s % 2 == 0 select s; //  ,    sequence.Subscribe(Console.WriteLine); //   (1,2,3,4,5,6,7,8,9,10) query.Subscribe(Console.WriteLine); //   (2,4,6,8,10) } } } 


In the example, a sequence is first created that provides data of type int from 1 to 10, then a LINQ expression is applied to it, which selects only multiples of 2 from the sequence. Thus, two different sequences are obtained with which you can sign different observers. This is an extremely simple example, but Rx provides so many methods that offer tremendous flexibility.

findings


Reactive extensions allows you to create individual modules that will monitor the system and respond to it. Each part of the system will be completely independent, since it does not know anything about the other modules. Observers expect changes in the sequence, and she, in turn, doesn’t care who watches her changes. Thus, the connectedness of modules is achieved. It makes sense to use Rx to handle UI events, domain events, environmental changes, changes on third-party services (RSS, Twitter, etc.). Rx also provides the ability to convert events into IObservable , allowing integration into the system.

You should not use Rx to convert static sequences to IObservable , it will be a waste of resources and will not bring any benefits. Also, do not implement queues, as these are completely different approaches. A huge advantage is the fact that Rx supports LINQ and you do not need to learn anything new.

Rx is a great tool that allows you to create reactive systems, but this does not mean that you need to drop everything and start writing in this style. The main thing is to always use gray matter!

Source: https://habr.com/ru/post/261031/


All Articles