System.IObserver
and System.IObservable
. As the name implies, this is the implementation of the Observer pattern. In this implementation, IObservable
acts as subject
, and it is obvious that IObserver
is an observer who can subscribe to changes. In the .NET platform, there is already an observer implementation in the form of Events. As already mentioned, Rx allows you to create a sequence of events, and it goes without saying that this can be done with the help of events. Ways to work with Rx and Events are different, but more on that later.void OnNext(T value)
- provides the next element in the sequence.void OnError(Exception ex)
- allows you to pass an Exception and adequately process it. It is understood that after this message the sequence ends and the observers no longer need to follow the changes.void OnCompleated()
- it is reported that the sequence has ended and there will be no more new messages, no need to wait for them.IObservable<out >
IDisposable Subscribe(IObserver<> observer)
- accepts an observer (IObserver) with a parameter and signs it to messages. Notice that the method returns IDisposable
, with which you can later call the Dispose
method, thereby writing and destroying the observer.IObservable
, then in addition to the Subscribe
method, we will also need to implement logic that can send new messages, errors, or report on the end of a sequence. It turns out that you also need to implement the IObservable
interface, for such purposes you can use the Subject
type. But to use it, you will need to install an additional library ( Install-Package Rx-Mail
) with Nuget
, which also provides additional extensions and the ability to use LINQ
. using System; using System.Reactive.Subjects; namespace Demo { class Program { static Subject<int> sub = new Subject<int>();//Declare static void Main() { sub.Subscribe(Console.WriteLine); //Subscribe sub.OnNext(234); //Publish } } }
Subject<in>
(you can also call a sequence of ints), then an observer subscribes to it (in this case, each value of the sequence is simply output to the console), and the value that is output to the console with using an observer. Every time a new observer signs, he begins to receive the elements of a sequence. But there are a few other implementations with different behavior:ReplaySubject
using System; using System.Reactive.Subjects; namespace Demo { class Program { static ReplaySubject<int> sub = new ReplaySubject<int>(); static void Main() { sub.OnNext(222); sub.Subscribe(Console.WriteLine); sub.OnNext(354); } } }
ReplaySubject
- delivers all elements of the sequence, regardless of when the observer was signed.BehaviorSubject
using System; using System.Reactive.Subjects; namespace DemoData { class Program { static BehaviorSubject<int> sub = new BehaviorSubject<int>(666); static void Main() { sub.OnNext(222); sub.Subscribe(Console.WriteLine); // 222 } } }
BehaviorSubject
- can not be empty, always contains an element, but only the last one.AsyncSubject
using System; using System.Reactive.Subjects; namespace DemoData { class Program { static AsyncSubject<int> sub = new AsyncSubject<int>(); static void Main(string[] args) { sub.OnNext(222); sub.Subscribe(Console.WriteLine); sub.OnCompleted(); // Publish 222 } } }
AsyncSubject
- also returns only the last value, but, unlike other implementations, the data will be published when OnCompleated
called. using System; namespace Demo { class Program { static event Action<int> Ev; //Declare static void Main(string[] args) { Ev += Console.WriteLine; //Subscribe Ev(234); //Publish } } }
IObservable
implementations are classes in which you can do whatever you want. The methods that IObserver
declares allow you to more correctly manage the sequence.Subscribe
method returns IDisposable
and can simply call Dispose()
to unsubscribe. var toDispose = sub.Subscribe(Console.WriteLine); toDispose.Dispose();
LINQ
LINQ
allowed to do inquiries to static data sources. But as the amount of data grows and approaches change, you need to adapt to this. Rx allows you to query dynamic sequences. using System.Reactive.Linq; // LINQ namespace Demo { class Program { static void Main() { var sequence = Observable.Range(1, 10, Scheduler.Default); // 1 10 var query = from s in sequence where s % 2 == 0 select s; // , sequence.Subscribe(Console.WriteLine); // (1,2,3,4,5,6,7,8,9,10) query.Subscribe(Console.WriteLine); // (2,4,6,8,10) } } }
IObservable
, allowing integration into the system.IObservable
, it will be a waste of resources and will not bring any benefits. Also, do not implement queues, as these are completely different approaches. A huge advantage is the fact that Rx supports LINQ
and you do not need to learn anything new.Source: https://habr.com/ru/post/261031/
All Articles