📜 ⬆️ ⬇️

Basics of reactive programming using RxJS. Part 3. Higher Order Observables



In this article, we will look at how it is possible to process another in one thread, what it is for, and how the Higher Order Observables operators (further HOO) will help us with this.

When working with threads, a situation often arises when it is necessary to transfer the results of another to one stream as a value. For example, we want to execute an ajax request and process its response in the current thread, or run several parallel requests, implement pooling. I think many people are used to solving such problems using such a mechanism as promise. But is it possible to solve them using RxJS? Of course, everything is much easier than you think!

A series of articles “Basics of reactive programming using RxJS”:
')

Note : in order to understand the theoretical part of the article, you do not need to read the previous articles, you only need to know what observable, operators and pipes are. In the practical part, we will refine the example from the second article , which you can find by reference .

Problem


Let's imagine the following task: we need to know every second whether the server is available. How can we solve it?

First, create a stream using the timer method:

timer(0, 1000).subscribe({ next: console.log }); 

The timer method is very similar in operation to interval . But in contrast, it allows you to specify a timeout for starting a stream, which is transmitted by the first parameter. The second parameter specifies the interval at which the new value will be generated. If the second parameter is not specified, the timer will generate only one value and terminate the stream.

Since we don’t have a server with you, I suggest simply writing a function that emulates the request to the server:

 const makeRequest = () => { return timer(1000).pipe( mapTo('success') ) } 

What does this method do? It returns a stream created using the timer method, which emits a value after one second and ends. Since the timer method only generates a number, we use the mapTo operator to replace it with the string “success”.

Here is the stream that is created by the makeRequest method:



Now we have a choice: call the makeRequest method inside the stream or assign this duty to the observer?

The first approach is more preferable, since in this case we will be able to use the full potential of RxJS with its operators and we will relieve our observer from unnecessary duties. Let's use the timer method to execute queries by interval:

 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: console.log }); 

When we run this code, we will see that in console.log we receive not a message with the text “success”, but an object of the Observable type:



The answer is quite expected, because in the map we return the stream. To make a stream, you need to subscribe to it. Well, let's see how not to do it :

 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: observable => observable.subscribe({ next: console.log }); }); 

The problem of the example above is that we receive a subscription in a subscription. What if we want to do more than one query in a chain? Or what if at some point we need to unsubscribe from the stream inside? In this case, our code will more and more resemble “noodles”. To solve this problem in RxJS there are special operators, which are called HOO.

HOO


HOO is a special type of operator that accepts streams as values. One of these operators is the mergeAll method.

When a stream arrives at the mergeAll input, it subscribes to it. The stream to which the operator subscribed is called internal. The stream from which the operator receives other streams as values ​​is called external.

When the internal stream generates a value, mergeAll pushes that value into the external stream. Thus, we get rid of the need to manually subscribe. If we unsubscribe from the external stream, then mergeAll will automatically unsubscribe from the internal one.

Let's see how we can rewrite our example with mergeAll:

 timer(0, 1000).pipe( map(() => makeRequest()) mergeAll() ).subscribe({ next: console.log }); 

In the example above, the external stream is created by the timer operator. And the streams that are created in the map operator are internal. Each created thread falls into the mergeAll operator.



The map + mergeAll combination is used very often, so the mergeMap method exists in RxJS:

 timer(0, 1000).pipe( mergeMap(() => makeRequest()) ).subscribe({ next: console.log }); 

When an external thread generates a value, the mergeMap operator calls the callback function passed to it, which generates a new thread. Then mergeMap subscribes to the generated stream.



The special feature of the operator's mergeAll / mergeMap is that if another stream goes down to it, it also subscribes to it. Thus, we can get values ​​from several internal ones into the external flow at once. Let's see the following example:

  timer(0, 1000) 

This is how the external stream will look like without the mergeMap operator:



And this is how it is with mergeMap:

 timer(0, 1000).pipe( mergeMap(() => interval(1000)) ) 



Every second we create a new internal stream and mergeMap subscribes to it. Thus, we simultaneously have a lot of internal threads, the values ​​of which fall into the external:





Note : Be careful with mergeMap, each new internal thread will work until you unsubscribe from the external. In the example above, the number of internal threads grows every second, in the end, there can be so many flows that the computer cannot cope with the load.

concatAll / concatMap


The mergeMap method is great when you don’t care about the order in which internal threads are executed, but what if you have a need for it? Suppose we want the next request to the server to be executed only after receiving a response from the previous one?

For such purposes, the HOO statement concatAll / concatMap is suitable. This operator, subscribing to an internal stream, waits until it completes, and only then subscribes to the next one.

If during the execution of one thread a new one goes to it, then it is placed in the queue until the previous one is completed.

 // ,  1     const firstInnerObservable = timer(1000).pipe( mapTo(1) ); // ,  2     const secondInnerObservable = timer(500).pipe( mapTo(2) ); of( firstInnerObservable, secondInnerObservable ).pipe( concatAll() ).subscribe({ next: console.log }); 

In the example above, we create two threads using the timer method. For clarity, I used the mapTo operator to display different values. The first stream will generate 1, the second - 2. The external stream is created using the of method, which accepts two above-declared observables as input.

The concatAll operator first receives firstInnerObservable, subscribes to it and waits for it to complete, and only after the completion of the first will it subscribe to secondInnerObservable. Here is what the external stream will look like:



If we replace concatAll with mergeAll, then the stream will look like this:

 of( firstInnerObservable, secondInnerObservable ).pipe( mergeAll() ).subscribe({ next: console.log }); 



switchAll / switchMap


This operator differs from the previous ones in that when it receives a new stream, it immediately unsubscribes from the previous one and subscribes to a new one.

Take the example above and replace concatAll with switchAll, and see how the external thread behaves:

 of( firstInnerObservable, secondInnerObservable ).pipe( switchAll() ).subscribe({ next: console.log }); 



Only the value from the second internal stream has fallen into the external stream. All because switchMap unsubscribed from the first when I received the second thread.

When is it necessary? For example, when implementing a data search. If the response from the server has not yet arrived, and we have already sent a new request, then it makes no sense for us to wait for the previous one.

exhaust / exhaustMap


exhaust - the exact opposite of the switchAll operator, while its behavior is similar to concatAll. This method, subscribing to the stream, waits for it to end. If a new stream comes down to it, then it is simply discarded.

 of( firstInnerObservable, secondInnerObservable ).pipe( exhaust() ).subscribe({ next: console.log }); 



In the example above, we did not get a deuce, because at that moment the operator was waiting for the completion of the first stream, and simply dropped the second one.

I think many have a question when this behavior may be needed? A good example is an authorization form. It makes no sense to send multiple requests to the server until the current one is executed.

We are finalizing the application


We recall the example from the second article . In it, we implemented a search on GitHub and used the mergeMap operator to send requests to the server. Now we know the features of this operator, is it really suitable in our case?

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) }) 

Let's assume that the GitHub server will be very overloaded, then processing our response will take a lot of time. What can go wrong in this case?

Suppose the user entered some data, did not wait for an answer, and entered new ones. In this case, we will send the second request to the server. However, no one guarantees that the answer to the first request will come earlier.

Since the mergeMap operator doesn’t care in what order to process internal threads, in the case when the first request is executed later than the second, we will load the actual data. Therefore, I suggest replacing the mergeMap method with switchMap:

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), switchMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) }) 

Now, if the user enters new data, switchMap will unsubscribe from the previous stream and subscribe to a new one.

It should be noted that our http request will continue to hang until the server gives an answer to it. But, since we have unsubscribed from the internal flow, the answer will not fall into the external flow.

Note : if you work with Angular and use HttpClient to work with http, you can not worry about the cancellation of the request itself. HttpClient can do this for you when unsubscribing.

Cancel http


In fetch api, you can cancel the http request using the AbortController . This functionality when combined with the switchMap operator will allow saving user traffic.

Let's rewrite our example a little. And create a method that will wrap the fetch call in observable:

 const createCancellableRequest = (url) => { //      const controller = new AbortController(); const signal = controller.signal; return new Observable(observer => { fetch(url, { signal }) .then(response => { if (response.ok) { return response.json(); } throw new Error(''); }) //     .then(result => observer.next(result)) //   .then(() => observer.complete()) //   ,     .catch(error => observer.error(error)); // ,    return () => { //   controller.abort(); }; }); }; 

We will also change the getUsersRepsFromApi method:

 const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return createCancellableRequest(url); } 

Now the method returns not promise, but observable. Therefore, we remove the from wrapper in switchMap:

 switchMap(value => { return getUsersRepsFromAPI(value).pipe( catchError(err => of([]) ) ) 

Note : in RxJS version 6.5, the fromFetch operator was added , which itself calls the abort method under the hood, so you no longer need to write your “bicycle”.

That's all! All sample code can be found here .

Conclusion


Today we looked at what HOO and some very useful operators in this category are. Of course, these were not all of them. For more detailed and detailed information I recommend to visit the documentation on RxJS.

In the next article I plan to consider the difference between Hot and Cold observables.

Finally: do not use a subscription in a subscription, because there is a HOO!

Source: https://habr.com/ru/post/450050/


All Articles