Welcome to this third part of my Flutter architecture series.
This time we will make a slight immersion in the magical realm of reactive expansions (Rx). I will focus on the most used Rx functions and explain their application. If you have not read the previous post, now is the time for this, before moving on.
RxDart is an implementation of the Rx concept for the Dart language, for which we should say thanks to Frank Pepermans and Brian Egan . If you previously used Rx in other languages, you will surely notice a difference in the naming of a number of functions, but this is unlikely to cause you any difficulty.
The code for testing is here .
So far, we have used streams as a way to transfer data from one place to another in our application, but they can do much more. Let's take a look at some of the features that Rx adds to Streams.
As mentioned earlier , Observables are Rx varieties of high-capacity streams. There are several interesting ways to create them:
Any Stream can be converted to Observable by passing it to the constructor:
var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print);
var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print);
This way will be constructed Observable, displaying values with a certain period. So you can replace the timer.
Sometimes the API expects Stream / Observable where you just have a value. For such cases, Observable has a factory.
var justObservable = Observable<int>.just(42); justObservable.listen(print); // : 42
Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print);
Creating an Observable
from a Future will wait for the completion of the Future and return the value of its result, or null
if no value is returned. Another way to create a stream from Future is toStream()
for any Future.
You may be wondering what is the point of converting a Future into an Observable / Stream instead of just waiting for it. Rest assured, this will become clear when we examine the available functions for manipulating data while they are "in the flow".
Subjects
are a replacement for StreamController
in RxDart, and this is how they are implemented somewhere in the bowels of the library.
But their behavior is slightly different from basic StreamControllers:
listen()
directly on the Subject, without referring to the Stream propertyPublishSubjects
behave like StreamControllers
, except for the possibility of many listeners:
var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); // subject.listen((item) => print(item.toUpperCase())); subject.add("Item2"); subject.add("Item3"); // await Future.delayed(Duration(seconds: 5)); // subject.close;
Run this code and you will get:
Item1 ITEM2 Item2 ITEM3 Item3
It is clear that the second listener, who was late for the party (we will call them late subscribers), missed the first item. To avoid this, you can use the BehaviourSubject
With the BehaviourSubject
each new subscriber will first receive the last accepted value:
var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3");
At the exit
Item1 ITEM2 ITEM3 Item2 Item3
You can see that Item1
lost for the second subscriber, but it gets Item2
. You may be surprised that the second subscriber receives Item3
before the first subscriber receives Item2
. This is because the sequence of servicing subscribers is not guaranteed, although all subscribers receive data in the correct order. BehaviourSubject
caches only the last item received for late subscribers. If you need to cache more items, you can use ReplaySubject . In most cases, this is not necessary.
The true power of Rx is that it allows data to be processed during transmission over the stream. Each of the Rx methods returns a new stream with the resulting data (as in the illustration), which means you can link them together in one processing pipeline, and this makes Rx an extremely powerful tool.
If there is any operation Stream, which I most of all do not want to miss, then this is map()
. What map()
does is that it takes each element of data to be transferred and applies a function to it, and then puts the result in the result stream. A simple example:
var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3");
Result:
ITEM1 ITEM2 ITEM3
But map
not obliged to return the same data type that it receives as input. The following example will accept integers instead of strings. Additionally, we will associate two transformations:
var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3);
or something like this:
class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a));
One of the most useful uses of .map
is when you get data in a format from some REST API or from a database and want it to be converted into your own objects:
class User { final String name; final String adress; final String phoneNumber; final int age; // - // factory User.fromJson(String jsonString) { var jsonMap = json.decode(jsonString); return User( jsonMap['name'], jsonMap['adress'], jsonMap['phoneNumber'], jsonMap['age'], ); } User(this.name, this.adress, this.phoneNumber, this.age); @override String toString() { return '$name - $adress - $phoneNumber - $age'; } } void main() { test('Map', () { // - var jsonStrings = [ '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }', '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }', '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }', ]; // json-, API/DB. var dataStreamFromAPI = new PublishSubject<String>(); dataStreamFromAPI .map<User>((jsonString) => User.fromJson(jsonString)) // json -> User .listen((user) => print(user.toString())); // dataStreamFromAPI.add(jsonStrings[0]); dataStreamFromAPI.add(jsonStrings[1]); dataStreamFromAPI.add(jsonStrings[2]); });
I note, not only Streams, but also any Iterable offers a map
function, which you can use for conversions in lists.
If you are only interested in certain values found in the stream, you can use the .where()
function instead of using the if
in your listener, this is more expressive and easier to read:
var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3); //: This only prints odd numbers: 1 This only prints even numbers: 2 This only prints odd numbers: 3
This is one of the little Rx pearls! Imagine that you have a search field that makes a call to the REST API if its text is modified. Making an API call for each keystroke is expensive. Thus, you would like to make a call only if the user pauses for a moment. This is exactly what the debounce()
function is used for, which swallows all incoming events if there is no pause after them.
var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC"); // await Future.delayed(Duration(milliseconds: 700)); // : 'ABC'
Therefore, if you convert the TextField.onChanged
handler to Observable
, you will get an elegant solution.
If your source Stream emits an array of objects, and you want to process each object yourself, you can use an .expand
that does exactly that:
You will see the application of this method below in the FireStore example.
If you have several different threads, but you want to process their objects together, you can use .mergeWith
(in other implementations of Rx, just merge
), which takes an array of threads and returns one combined stream.
.mergeWith
does not guarantee compliance with any order in the streams when they are combined. Data is emitted in the order of entry.
For example, if you have two components that report errors through a stream, and you want them to appear together in a dialog, you can do this as follows (pseudocode):
@override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); }
or if you want a combined display of messages from several social networks, it might look like this (pseudo-code):
final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]);
zipWith
also combines one stream with another. But, unlike .mergeWith
, it does not send data as soon as it receives an element from one of its source streams. He waits until items from both source streams arrive, and then combines them using the provided zipper
function:
The zipWith
signature looks scary, but for now we’ll look at it:
// R : Stream/Observable // S : Stream/Observable // zipper: - Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s))
A very simplified example:
new Observable.just(1) // .just() Observable, .zipWith(new Observable.just(2), (one, two) => one + two) .listen(print); // 3
More practical application - if you need to wait for two asynchronous functions that return Future
, and you want to process the data as soon as both results are returned. In this slightly contrived example, we present two REST APIs: one returns the User
, the other the Product
as JSON strings, and we want to wait for both calls before returning the Invoice
object.
class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } } // HTTP , Product, JSON Future<String> getProduct() async { print("Started getting product"); await Future.delayed(Duration(seconds: 2)); print("Finished getting product"); return '{"name": "Flux compensator", "price": 99999.99}'; } // HTTP , User, JSON Future<String> getUser() async { print("Started getting User"); await Future.delayed(Duration(seconds: 4)); print("Finished getting User"); return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }'; } void main() { test('zipWith', () async { var userObservable = Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString)); var productObservable = Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)); Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>( productObservable, (user, product) => Invoice(user, product)); print("Start listening for invoices"); invoiceObservable.listen((invoice) => invoice.printInvoice()); // await Future.delayed(Duration(seconds: 5)); }); }
Looking at the output, you can see how it is done asynchronously.
Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99
combineLatest
also combines the values of streams, but a little differently than merge
and zip
. It listens to more streams and gives the combined value whenever a new value comes from one of the streams. Interestingly, it generates not only the modified value, but also the last obtained values of all other source streams. Look closely at this animation:
Before combineLates
its first value, all source streams must receive at least one element as input.
Unlike the methods used previously, combineLatest
is static. In addition, since Dart does not allow operator overload, there are versions of combLastest
depending on the number of source threads: combineLatest2 ... combineLatest9
Good use combineLatest
, for example, if you have two Observable<bool>
, which signal that some parts of your application are busy, and you want to display the "Busy" spinner if one of them is busy. It might look like this (pseudocode):
class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; }
In your UI, you can use isBusy
with StreamBuilder
to display Spinner
if the value obtained is true.
combineLatest
very suitable feature in combination with FireStore snapshots streams.
Imagine that you want to create an application that displays a news feed along with a weather forecast. Ticker messages and weather data are stored in two different FireStore collections. Both are updated independently. You want to display data updates with StreamBuilder. With combineLatest
is easy:
class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } }
In your UI, it would look something like this: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).
In the scenario described above, it may happen that isBusyOne and isBusyTwo return the same value, which will update the user interface with the same data. To prevent this, we can use .distinct()
. It ensures that the data is transmitted by stream only if the value of the new element differs from the last one. So we would change the code to:
Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct();
and it also demonstrates that we can combine our functions into different chains at will.
In addition to map()
there is also a function asyncMap
, which allows you to use an asynchronous function as a map function. Let's imagine a slightly different setting for our FireStore example. Now the required WeatherForecast depends on the location of NewsMessage and should be updated only when a new NewsMessage is received :
Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); }
The observable returned by getDependendMessages will generate a new CombinedMessage each time the newsCollection changes.
Looking at the elegant Rx call chains, it seems almost impossible to debug an expression like this:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
But keep in mind that =>
is only a short form for an anonymous function. Using Convert to block body , you will get:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); });
And now we can set a breakpoint or add print statements at every stage of our “pipeline”.
If you want to take advantage of Rx to make your code more reliable, always keep in mind that Rx is the conversion of data as it moves "along a conveyor belt." Therefore, never call functions that change any variables / states outside the processing pipeline until you reach the .listen function.
Instead of doing this:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen();
do this:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); });
Charge map()
- data conversion in the stream, and nothing else! If the transferred display function does something else, it will be viewed as a side effect, producing potential errors that are difficult to detect when reading the code.
To avoid memory leaks, always call cancel()
for subscriptions, dispose()
for StreamControllers, close()
for Subjects as soon as you no longer need them.
Congratulations if you stayed with me until this moment. Now, not only can you use Rx to make your life easier, but also prepare for the next posts, in which we delve into the details of RxVMS .
Source: https://habr.com/ru/post/451292/
All Articles