
System.IObserver and System.IObservable . As the name implies, this is the implementation of the Observer pattern. In this implementation, IObservable acts as subject , and it is obvious that IObserver is an observer who can subscribe to changes. In the .NET platform, there is already an observer implementation in the form of Events. As already mentioned, Rx allows you to create a sequence of events, and it goes without saying that this can be done with the help of events. Ways to work with Rx and Events are different, but more on that later.void OnNext(T value) - provides the next element in the sequence.void OnError(Exception ex) - allows you to pass an Exception and adequately process it. It is understood that after this message the sequence ends and the observers no longer need to follow the changes.void OnCompleated() - it is reported that the sequence has ended and there will be no more new messages, no need to wait for them.IObservable<out >IDisposable Subscribe(IObserver<> observer) - accepts an observer (IObserver) with a parameter and signs it to messages. Notice that the method returns IDisposable , with which you can later call the Dispose method, thereby writing and destroying the observer.IObservable , then in addition to the Subscribe method, we will also need to implement logic that can send new messages, errors, or report on the end of a sequence. It turns out that you also need to implement the IObservable interface, for such purposes you can use the Subject type. But to use it, you will need to install an additional library ( Install-Package Rx-Mail ) with Nuget , which also provides additional extensions and the ability to use LINQ . using System; using System.Reactive.Subjects; namespace Demo { class Program { static Subject<int> sub = new Subject<int>();//Declare static void Main() { sub.Subscribe(Console.WriteLine); //Subscribe sub.OnNext(234); //Publish } } } Subject<in> (you can also call a sequence of ints), then an observer subscribes to it (in this case, each value of the sequence is simply output to the console), and the value that is output to the console with using an observer. Every time a new observer signs, he begins to receive the elements of a sequence. But there are a few other implementations with different behavior:ReplaySubject using System; using System.Reactive.Subjects; namespace Demo { class Program { static ReplaySubject<int> sub = new ReplaySubject<int>(); static void Main() { sub.OnNext(222); sub.Subscribe(Console.WriteLine); sub.OnNext(354); } } } ReplaySubject - delivers all elements of the sequence, regardless of when the observer was signed.BehaviorSubject using System; using System.Reactive.Subjects; namespace DemoData { class Program { static BehaviorSubject<int> sub = new BehaviorSubject<int>(666); static void Main() { sub.OnNext(222); sub.Subscribe(Console.WriteLine); // 222 } } } BehaviorSubject - can not be empty, always contains an element, but only the last one.AsyncSubject using System; using System.Reactive.Subjects; namespace DemoData { class Program { static AsyncSubject<int> sub = new AsyncSubject<int>(); static void Main(string[] args) { sub.OnNext(222); sub.Subscribe(Console.WriteLine); sub.OnCompleted(); // Publish 222 } } } AsyncSubject - also returns only the last value, but, unlike other implementations, the data will be published when OnCompleated called. using System; namespace Demo { class Program { static event Action<int> Ev; //Declare static void Main(string[] args) { Ev += Console.WriteLine; //Subscribe Ev(234); //Publish } } } IObservable implementations are classes in which you can do whatever you want. The methods that IObserver declares allow you to more correctly manage the sequence.Subscribe method returns IDisposable and can simply call Dispose() to unsubscribe. var toDispose = sub.Subscribe(Console.WriteLine); toDispose.Dispose(); LINQLINQ allowed to do inquiries to static data sources. But as the amount of data grows and approaches change, you need to adapt to this. Rx allows you to query dynamic sequences. using System.Reactive.Linq; // LINQ namespace Demo { class Program { static void Main() { var sequence = Observable.Range(1, 10, Scheduler.Default); // 1 10 var query = from s in sequence where s % 2 == 0 select s; // , sequence.Subscribe(Console.WriteLine); // (1,2,3,4,5,6,7,8,9,10) query.Subscribe(Console.WriteLine); // (2,4,6,8,10) } } } IObservable , allowing integration into the system.IObservable , it will be a waste of resources and will not bring any benefits. Also, do not implement queues, as these are completely different approaches. A huge advantage is the fact that Rx supports LINQ and you do not need to learn anything new.Source: https://habr.com/ru/post/261031/
All Articles