
In the
previous article, we looked at what streams are and what they eat with. In the new part, we will learn what methods RxJS provides for creating streams, what are operators, pipes and how to work with them.
RxJS has a rich
API . The documentation describes more than a hundred methods. In order to get acquainted with them a bit, we will write a simple application and in practice we will see what the reactive code looks like. You will see that the same tasks that previously seemed routine and required writing a large amount of code have an elegant solution if you look at them through the prism of reactivity. But before we move on to practice, consider how the flows can be represented graphically, and get acquainted with convenient methods for creating and processing them.
')
Graphical presentation of streams
To clearly demonstrate how a stream behaves, I will use the notation adopted in the reactive approach. Recall our example from the previous article:
const observable = new Observable((observer) => { observer.next(1); observer.next(2); observer.complete(); });
Here is what its graphical representation will look like:

The stream is usually depicted as a straight line. If a stream emits a value, it is displayed on the line as a circle. The forward bar in the display is the signal to end the stream. To display the error, use the symbol “×”.
const observable = new Observable((observer) => { observer.error(); });

One line thread
In my practice, I rarely encountered the need to create my own Observable instances directly. Most of the methods for creating threads already exist in RxJS. To create a stream that emits values ​​of 1 and 2, you just need to use the method of:
const observable = of(1, 2);
The of method accepts any number of arguments as input and returns a finished Observable instance. After the subscription, it will emit the values ​​received and end:

If you want to represent an array as a stream, you can use the from method. The from method expects any iterable object (array, string, etc.) or promise as an argument, and projects this object onto the stream. Here is what the stream from the string will look like:
const observable = from('abc');

And this is how you can wrap a promise into a thread:
const promise = new Promise((resolve, reject) => { resolve(1); }); const observable = from(promise);
Note: Often streams are compared to promise. In fact, they have only one thing in common - the
push strategy for propagating changes. Otherwise, it is a completely different entity. Promise cannot provide multiple values. It can only resolve or reject, i.e. have only two states. A stream can transmit multiple values, and can be reused.
Do you remember the example at intervals from the
first article ? This thread is a timer that counts the time in seconds since the subscription.
const timer = new Observable(observer => { let counter = 0; const intervalId = setInterval(() => { observer.next(counter++); }, 1000); return () => { clearInterval(intervalId); } });
Here's how to do the same thing in one line:
const timer = interval(1000);

And finally, the method that allows you to create a stream of events DOM elements:
const observable = fromEvent(domElementRef, 'keyup');
As a value, this stream will receive and emit “keyup” event objects.
Payps & Operators
Pipe is an Observable class method added in RxJS in version 5.5. Thanks to him, we can build chains of operators for sequential processing of values ​​obtained in the stream. Pipe is a unidirectional channel that interconnects operators. The operators themselves are normal functions described in RxJS that process values ​​from the stream.
For example, they can convert a value and pass it further to a stream, or they can play the role of filters and not skip any values ​​if they do not meet the specified condition.
Look at the operators in the case. Multiply each value from the stream by 2 using the map operator:
of(1,2,3).pipe( map(value => value * 2) ).subscribe({ next: console.log });
Here's what the stream looks like before the map operator is applied:

After the map operator:

Let's use the filter operator. This operator works just like the filter function in the Array class. The first argument is a method that takes a function that describes a condition. If the value from the stream satisfies the condition, then it is passed on:
of(1, 2, 3).pipe(
And this is how the whole scheme of our stream will look like:

After filter:

After map:
Note: pipe! == subscribe. The pipe method declares the behavior of the stream, but does not perform the subscription. Until you call the subscribe method, your thread will not start working.
We write application
Now that we have figured out what the pipes and operators are, we can begin to practice. Our application will perform one simple task: display a list of open github repositories by the owner’s nickname entered.
There will be few requirements:
- Do not execute a request to the API, if the input line contains less than 3 characters;
- In order not to execute the request for each character entered by the user, you should set the delay (debounce) to 700 milliseconds before accessing the API;
To search for repositories, we use the
github API . I recommend the examples themselves run on
stackblitz . In the same place I laid out the finished implementation. Links are presented at the end of the article.
Let's start with html markup. We describe the input and ul elements:
<input type="text"> <ul></ul>
Then in the js or ts file we get links to the current elements using the browser API:
const input = document.querySelector('input'); const ul = document.querySelector('ul');
We also need a method that will execute the request to the github API. Below is the getUsersRepsFromAPI function code, which accepts the user's nickname as input and executes an ajax request using fetch. It then returns a promise, converting a successful response to json in passing:
const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return fetch(url) .then(response => { if(response.ok) { return response.json(); } throw new Error(''); }); }
Next we will write a method that will display the list of repository names:
const recordRepsToList = (reps) => { for (let i = 0; i < reps.length; i++) {
Preparations are complete. It is time to look at RxJS in action. We need to listen to the keyup event of our input. First of all, we need to understand that in the reactive approach we work with threads. Fortunately, a similar option is already provided in RxJS. Recall the fromEvent method I mentioned above. We use it:
const keyUp = fromEvent(input, 'keyup'); keyUp.subscribe({ next: console.log });
Now our event is presented as a stream. If we see what is displayed in the console, we will see an object of type KeyboardEvent. But we need the value entered by the user. This is where the pipe method and the map operator come in handy:
fromEvent(input, 'keyup').pipe( map(event => event.target.value) ).subscribe({ next: console.log });
Let us turn to the implementation of the requirements. To begin with, we will execute the query when the entered value contains more than two characters. To do this, use the filter operator:
fromEvent(input, 'keyup').pipe( map(event => event.target.value), filter(value => value.length > 2) )
Dealt with the first requirement. We proceed to the second. We need to implement debounce. RxJS has a debounceTime statement. This operator takes as the first argument the number of milliseconds during which the value will be held before it passes on. In this case, each new value will reset the timer. Thus, at the output we get the last value, after entering which 700 milliseconds passed.
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(value => value.length > 2) )
Here is what our stream might look like without debounceTime:

And here it will look like the same stream passed through this operator:

With debounceTime, we will be less likely to access the API, due to which we will save traffic and unload the server.
For additional optimization, I suggest using another operator, distinctUntilChanged. This method will save us from duplicates. It is best to show his work with an example:
from('aaabccc').pipe( distinctUntilChanged() )
Without distinctUntilChanged:

With distinctUntilChanged:

Add this statement immediately after the debounceTime statement. Thus, we will not access the API if the new value for some reason coincides with the previous one. A similar situation may occur when the user enters new characters, and then erases them again. Since we have implemented a delay, only the last value will fall into the stream, the answer to which we already have.
We go to the server
Already, we can describe the logic of the request and processing the response. While we can only work with promise. Therefore, we describe another map statement that will call the getUsersRepsFromAPI method. In the observer, we describe the logic of processing our promise:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), map(value => getUsersRepsFromAPI(value)) ).subscribe({ next: promise => promise.then(reps => recordRepsToList(reps)) });
At the moment we have implemented everything we wanted. But our example has one big drawback: there is no error handling. Our observer receives only promise and has no idea that something could go wrong.
Of course, we can hang a catch on a promise in the next method, but because of this, our code will increasingly resemble “callback hell”. If suddenly we need to perform another request, the complexity of the code will increase.
Note: using promise in code with RxJS is considered antipattern. Promise has many drawbacks compared to observable. It cannot be undone and cannot be reused. If you are faced with a choice, then choose observable. The same goes for the Observable class toPromise method. This method was implemented for compatibility with libraries that cannot work with threads.
We can use the from method to project a promise onto a stream, but this method is fraught with additional calls to the subscribe method, and will also lead to code sprawl and complexity.
To solve this problem, use the mergeMap operator:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
Now we don’t need to write promise processing logic. The from method made a stream from a promise, and the mergeMap operator processed it. If the promise succeeds, the next method will be called, and our observer will receive the finished object. If an error occurs, the error method will be called, and our observer will display an error in the console.
The mergeMap operator is slightly different from the operators we worked with earlier; it belongs to the so-called
Higher Order Observables , which I will discuss in the next article. But, looking ahead, I will say that the mergeMap method itself subscribes to the stream.
Error processing
If our thread receives an error, it will end. And if we try to interact with the application after the error, then we will not get any reaction, since our thread has ended.
Here the catchError statement will help us. catchError is called only when an error occurs in the stream. It allows you to intercept it, process it and return to the stream the usual value, which will not lead to its completion.
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))), catchError(err => of([]) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
We catch the error in catchError and return a stream with an empty array instead. Now, when an error occurs, we will clear the list of repositories. But then the stream will end again.
The thing is that catchError replaces our original thread with a new one. And then our observer listens only to him. When the of thread emits an empty array, the complete method will be called.
In order not to replace our original stream, we call the catchError operator on the from stream inside the mergeMap operator.
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([]) ) }) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
Thus, our original stream will not notice anything. Instead of an error, it will receive an empty array.
Conclusion
We finally got to practice and saw what the pipes and operators are for. We looked at how to reduce the code using the rich API provided by RxJS. Of course, our application is not finished, in the next part we will analyze how it is possible to process another one in one stream and how to cancel our http request in order to save more traffic and resources of our application. And so that you can see the difference, I laid out an example without using RxJS, you can see it
here . At
this link you will find the full code of the current application. I used the
RxJS visualizer to generate the schemas.
I hope this article has helped you better understand how RxJS works. I wish you success in learning!