📜 ⬆️ ⬇️

RxDart: magical stream transformations

Welcome to this third part of my Flutter architecture series.



This time we will make a slight immersion in the magical realm of reactive expansions (Rx). I will focus on the most used Rx functions and explain their application. If you have not read the previous post, now is the time for this, before moving on.


RxDart is an implementation of the Rx concept for the Dart language, for which we should say thanks to Frank Pepermans and Brian Egan . If you previously used Rx in other languages, you will surely notice a difference in the naming of a number of functions, but this is unlikely to cause you any difficulty.


The code for testing is here .


So far, we have used streams as a way to transfer data from one place to another in our application, but they can do much more. Let's take a look at some of the features that Rx adds to Streams.


Creating Observables


As mentioned earlier , Observables are Rx varieties of high-capacity streams. There are several interesting ways to create them:


Out of stream


Any Stream can be converted to Observable by passing it to the constructor:


var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print); 

Recurring events


 var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print); 

This way will be constructed Observable, displaying values ​​with a certain period. So you can replace the timer.


From single value


Sometimes the API expects Stream / Observable where you just have a value. For such cases, Observable has a factory.


 var justObservable = Observable<int>.just(42); justObservable.listen(print); //   : 42 

From the future


  Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print); 

Creating an Observable from a Future will wait for the completion of the Future and return the value of its result, or null if no value is returned. Another way to create a stream from Future is toStream() for any Future.


You may be wondering what is the point of converting a Future into an Observable / Stream instead of just waiting for it. Rest assured, this will become clear when we examine the available functions for manipulating data while they are "in the flow".


Subjects


Subjects are a replacement for StreamController in RxDart, and this is how they are implemented somewhere in the bowels of the library.


But their behavior is slightly different from basic StreamControllers:



PublishSubjects


PublishSubjects behave like StreamControllers , except for the possibility of many listeners:


 var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); //    subject.listen((item) => print(item.toUpperCase())); subject.add("Item2"); subject.add("Item3"); //        await Future.delayed(Duration(seconds: 5)); //    subject.close; 

Run this code and you will get:


 Item1 ITEM2 Item2 ITEM3 Item3 

It is clear that the second listener, who was late for the party (we will call them late subscribers), missed the first item. To avoid this, you can use the BehaviourSubject


BehaviourSubject


With the BehaviourSubject each new subscriber will first receive the last accepted value:


 var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3"); 

At the exit


 Item1 ITEM2 ITEM3 Item2 Item3 

You can see that Item1 lost for the second subscriber, but it gets Item2 . You may be surprised that the second subscriber receives Item3 before the first subscriber receives Item2 . This is because the sequence of servicing subscribers is not guaranteed, although all subscribers receive data in the correct order. BehaviourSubject caches only the last item received for late subscribers. If you need to cache more items, you can use ReplaySubject . In most cases, this is not necessary.


Manipulate data on the fly



The true power of Rx is that it allows data to be processed during transmission over the stream. Each of the Rx methods returns a new stream with the resulting data (as in the illustration), which means you can link them together in one processing pipeline, and this makes Rx an extremely powerful tool.


Map


If there is any operation Stream, which I most of all do not want to miss, then this is map() . What map() does is that it takes each element of data to be transferred and applies a function to it, and then puts the result in the result stream. A simple example:



 var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3"); 

Result:


 ITEM1 ITEM2 ITEM3 

But map not obliged to return the same data type that it receives as input. The following example will accept integers instead of strings. Additionally, we will associate two transformations:


 var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3); 

or something like this:



 class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a)); 

One of the most useful uses of .map is when you get data in a format from some REST API or from a database and want it to be converted into your own objects:


 class User { final String name; final String adress; final String phoneNumber; final int age; //       - //   factory User.fromJson(String jsonString) { var jsonMap = json.decode(jsonString); return User( jsonMap['name'], jsonMap['adress'], jsonMap['phoneNumber'], jsonMap['age'], ); } User(this.name, this.adress, this.phoneNumber, this.age); @override String toString() { return '$name - $adress - $phoneNumber - $age'; } } void main() { test('Map', () { // -  var jsonStrings = [ '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }', '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }', '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }', ]; //   json-,    API/DB. var dataStreamFromAPI = new PublishSubject<String>(); dataStreamFromAPI .map<User>((jsonString) => User.fromJson(jsonString)) // json -> User .listen((user) => print(user.toString())); //    dataStreamFromAPI.add(jsonStrings[0]); dataStreamFromAPI.add(jsonStrings[1]); dataStreamFromAPI.add(jsonStrings[2]); }); 

I note, not only Streams, but also any Iterable offers a map function, which you can use for conversions in lists.


Where


If you are only interested in certain values ​​found in the stream, you can use the .where() function instead of using the if in your listener, this is more expressive and easier to read:


 var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3); //: This only prints odd numbers: 1 This only prints even numbers: 2 This only prints odd numbers: 3 

Debounce


This is one of the little Rx pearls! Imagine that you have a search field that makes a call to the REST API if its text is modified. Making an API call for each keystroke is expensive. Thus, you would like to make a call only if the user pauses for a moment. This is exactly what the debounce() function is used for, which swallows all incoming events if there is no pause after them.


 var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC"); //    await Future.delayed(Duration(milliseconds: 700)); //       : 'ABC' 

Therefore, if you convert the TextField.onChanged handler to Observable , you will get an elegant solution.


Expand


If your source Stream emits an array of objects, and you want to process each object yourself, you can use an .expand that does exactly that:


image


You will see the application of this method below in the FireStore example.


Merge


If you have several different threads, but you want to process their objects together, you can use .mergeWith (in other implementations of Rx, just merge ), which takes an array of threads and returns one combined stream.


image


.mergeWith does not guarantee compliance with any order in the streams when they are combined. Data is emitted in the order of entry.


For example, if you have two components that report errors through a stream, and you want them to appear together in a dialog, you can do this as follows (pseudocode):


 @override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); } 

or if you want a combined display of messages from several social networks, it might look like this (pseudo-code):


 final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]); 

Zipwith


zipWith also combines one stream with another. But, unlike .mergeWith , it does not send data as soon as it receives an element from one of its source streams. He waits until items from both source streams arrive, and then combines them using the provided zipper function:


image


The zipWith signature looks scary, but for now we’ll look at it:


 // R :   Stream/Observable // S :   Stream/Observable // zipper: - Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s)) 

A very simplified example:


 new Observable.just(1) // .just()  Observable,    .zipWith(new Observable.just(2), (one, two) => one + two) .listen(print); //  3 

More practical application - if you need to wait for two asynchronous functions that return Future , and you want to process the data as soon as both results are returned. In this slightly contrived example, we present two REST APIs: one returns the User , the other the Product as JSON strings, and we want to wait for both calls before returning the Invoice object.


 class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } } //  HTTP ,  Product,  JSON Future<String> getProduct() async { print("Started getting product"); await Future.delayed(Duration(seconds: 2)); print("Finished getting product"); return '{"name": "Flux compensator", "price": 99999.99}'; } //  HTTP ,  User,  JSON Future<String> getUser() async { print("Started getting User"); await Future.delayed(Duration(seconds: 4)); print("Finished getting User"); return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }'; } void main() { test('zipWith', () async { var userObservable = Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString)); var productObservable = Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)); Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>( productObservable, (user, product) => Invoice(user, product)); print("Start listening for invoices"); invoiceObservable.listen((invoice) => invoice.printInvoice()); //        await Future.delayed(Duration(seconds: 5)); }); } 

Looking at the output, you can see how it is done asynchronously.


 Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99 

CombineLatest


combineLatest also combines the values ​​of streams, but a little differently than merge and zip . It listens to more streams and gives the combined value whenever a new value comes from one of the streams. Interestingly, it generates not only the modified value, but also the last obtained values ​​of all other source streams. Look closely at this animation:


image


Before combineLates its first value, all source streams must receive at least one element as input.


Unlike the methods used previously, combineLatest is static. In addition, since Dart does not allow operator overload, there are versions of combLastest depending on the number of source threads: combineLatest2 ... combineLatest9


Good use combineLatest , for example, if you have two Observable<bool> , which signal that some parts of your application are busy, and you want to display the "Busy" spinner if one of them is busy. It might look like this (pseudocode):


 class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; } 

In your UI, you can use isBusy with StreamBuilder to display Spinner if the value obtained is true.


combineLatest very suitable feature in combination with FireStore snapshots streams.


Imagine that you want to create an application that displays a news feed along with a weather forecast. Ticker messages and weather data are stored in two different FireStore collections. Both are updated independently. You want to display data updates with StreamBuilder. With combineLatest is easy:


 class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } } 

In your UI, it would look something like this: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).


Distinct


In the scenario described above, it may happen that isBusyOne and isBusyTwo return the same value, which will update the user interface with the same data. To prevent this, we can use .distinct() . It ensures that the data is transmitted by stream only if the value of the new element differs from the last one. So we would change the code to:


  Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct(); 

and it also demonstrates that we can combine our functions into different chains at will.


Asyncmap


In addition to map() there is also a function asyncMap , which allows you to use an asynchronous function as a map function. Let's imagine a slightly different setting for our FireStore example. Now the required WeatherForecast depends on the location of NewsMessage and should be updated only when a new NewsMessage is received :


 Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); } 

The observable returned by getDependendMessages will generate a new CombinedMessage each time the newsCollection changes.


Debugging Observables


Looking at the elegant Rx call chains, it seems almost impossible to debug an expression like this:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); 

But keep in mind that => is only a short form for an anonymous function. Using Convert to block body , you will get:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); 

And now we can set a breakpoint or add print statements at every stage of our “pipeline”.


Avoid side effects


If you want to take advantage of Rx to make your code more reliable, always keep in mind that Rx is the conversion of data as it moves "along a conveyor belt." Therefore, never call functions that change any variables / states outside the processing pipeline until you reach the .listen function.
Instead of doing this:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen(); 

do this:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); }); 

Charge map() - data conversion in the stream, and nothing else! If the transferred display function does something else, it will be viewed as a side effect, producing potential errors that are difficult to detect when reading the code.


Some thoughts on the release of resources


To avoid memory leaks, always call cancel() for subscriptions, dispose() for StreamControllers, close() for Subjects as soon as you no longer need them.


Conclusion


Congratulations if you stayed with me until this moment. Now, not only can you use Rx to make your life easier, but also prepare for the next posts, in which we delve into the details of RxVMS .


')

Source: https://habr.com/ru/post/451292/


All Articles