📜 ⬆️ ⬇️

Grokayem RxJava, part two: Operators

In the first part, we looked at the basic building blocks of RxJava, and also met the map() operator. I can understand those of you who still do not feel the desire to drop everything and start using this framework, since so far we, conditionally speaking, have considered only the tip of the iceberg. But soon everything will change - most of all the power of RxJava is hidden in its operators, and I have just prepared for you an example by which you can study some of them.

Task


Suppose I have such a method:

 //   url',      - Observable<List<String>> query(String text); 

I want to write a system for searching and displaying text. Based on what we learned in the previous lesson, we can write something like this:

 query("Hello, world!") .subscribe(urls -> { for (String url : urls) { System.out.println(url); } }); 

This solution does not satisfy us in any way because we lose the ability to transform the data stream. If we have a desire to modify each url, we will have to do all this in Subscriber , thus leaving all our tricks with map() out of work.
One could write a map() that would work with one list of urls, and would give out a list of modified urls, but in this case each map() call would contain for-each. Also not very nice.
')

Glimmer of hope


Apply the Observable.from() method, which takes a collection and "emits" one element of this collection after another:

 Observable.from("url1", "url2", "url3") .subscribe(url -> System.out.println(url)); 

It seems that we need, let's try to use it in our task:

 query("Hello, world!") .subscribe(urls -> { Observable.from(urls) .subscribe(url -> System.out.println(url)); }); 

We got rid of the cycle, but what turned out as a result looks like a complete mess : instead of a cycle, we received subscriptions nested in each other. And it's not only bad that the code looks confusing and therefore it will most likely be difficult to modify; it conflicts with some features of RxJava that I haven’t mentioned 1 yet. Hmm.

There is a better way


Hold your breath at the sight of a savior: flatMap() .
Observable.flatMap() takes as input the data emitted by one Observable and returns the data emitted by another Observable , replacing one Observable with another. An unexpected turn of events, so to speak: you thought you were receiving one stream of data, and you were actually receiving another. Here's how flatMap() helps us solve our problem:

 query("Hello, world!") .flatMap(new Func1<List<String>, Observable<String>>() { @Override public Observable<String> call(List<String> urls) { return Observable.from(urls); } }) .subscribe(url -> System.out.println(url)); 

I showed the full version in order to make it easier for you to understand what is happening, but if you rewrite the code with lambdas, then it starts to look just great:

 query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .subscribe(url -> System.out.println(url)); 

Quite a strange thing, if you think about it. Why flatMap() return another Observable ? The key point here is that the new Observable is what our Subscriber will eventually see. He will not get a List<String> , he will receive a stream of individual objects of class String as he would receive from Observable.from() .
By the way, this moment seemed to me the most difficult, but as soon as I understood it and realized, most of the way RxJava works, it fell into my head in my head.

And you can do something cooler.


I’ll emphasize again because it’s important: flatMap() can give us any Observable you want.
For example, I have a second method:

 //   ,  null,     404 Observable<String> getTitle(String URL); 

Instead of typing urls, I now want to print the title of each site that I managed to reach. There are problems: my method accepts only one url, and it does not return a String , it returns Observable , which returns a String .
You can solve both of these problems with flatMap() ; First we move from the list of urls to the stream of individual urls, and then use getTitle() inside flatMap() before passing the final result to Subscriber :

 query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(new Func1<String, Observable<String>>() { @Override public Observable<String> call(String url) { return getTitle(url); } }) .subscribe(title -> System.out.println(title)); 

And once again we simplify everything with the help of lambda:

 query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .subscribe(title -> System.out.println(title)); 

Great, huh? We combine together several independent methods that return Observables us.
Notice how I combined the two API calls together in one chain. The same can be done for any number of API calls. You probably know how difficult it is sometimes to coordinate the work of several API calls in order to get some result we need: we made a request, got the result in a callback function, we already made a new request from inside ... Brr. And here we took and avoided all this hell with the party, putting all the same logic in one short reactive challenge 2 .

Abundance of operators


So far we have only looked at two operators, but there are actually many more of them in RxJava. How else can we improve our code?
getTitle() returns null if we get a 404 error. We do not want to display "null" , and we can filter out the values ​​we do not need:

 query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .filter(title -> title != null) .subscribe(title -> System.out.println(title)); 

filter() "emits" the same data stream element that it received, but only if it passes the test.
And now we want to show only 5 results, not more:

 query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .filter(title -> title != null) .take(5) .subscribe(title -> System.out.println(title)); 

take() returns no more than the specified number of elements (if in our case it turned out to be less than 5 elements, take() will simply complete its work earlier.
You know, let's also save each header we received to the disk:

 query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .filter(title -> title != null) .take(5) .doOnNext(title -> saveTitle(title)) .subscribe(title -> System.out.println(title)); 

doOnNext() allows us to add some additional action that occurs every time we receive a new data element, in this case, this action will be saving the header.
Take a look at how easy it is for us to manipulate the flow of data. You can add more and more new ingredients to your recipe, and not end up with an indigestible weed.
RxJava comes with a carriage and a small trolley of various operators. Such a huge list may be scary, but you should look at it at least in order to have an idea of ​​what is available. It will take you some time to memorize the operators available to you, but as soon as you do this, you will find true power at your fingertips.
Yes, by the way, you can also write your own operators! This topic is beyond the scope of this series of articles, but, let's say: if you come up with your own operator, you will almost certainly be able to implement it 3 .

So, what is next?


Well, I understand, you are a skeptic, and again you failed to convince you. Why do you even need to worry about all these operators?

Idea # 3: Operators let you do whatever you want with the data stream.


The only limitation is within you.
You can write a fairly complex logic of data manipulation, without using anything other than chains of simple operators. This is functional reactive programming. The more often you use it, the more your idea of ​​how the program code should look changes.
Also, think about how easy it was to present our data to the end user after we transformed it. At the end of our example, we made two requests to the API, processed the data, and at the same time saved them to disk. But our final Subscriber has no idea about this, it still works with the usual Observable<String> . Encapsulation makes code simpler!
In the third part, we will go through other cool features of RxJava, which are associated with data manipulation to a lesser extent: error handling and concurrency.

Go to the third part.


1 For example, error handling, multithreading and cancellation of subscriptions in RxJava are combined with this code a little less than nothing. I will address these topics in the third part.
2 And here you may have thought about the other side of the callback hell: error handling. I will discuss this in part three.
3 If you want to write your own operators, you should look here . Some details of their implementation, however, will be quite difficult to understand, before reading the third part of the article.

Source: https://habr.com/ru/post/265583/


All Articles