RxJava is a ReactiveX implementation for Java - a library for asynchronous processing of data streams. The pattern is observable on steroids, as they themselves write. On the Internet, including on Habré, there are many "introductions in RxJava". I want to give a few examples of real problems. They are not very complex, but maybe someone will see some similarities with their own and will think.
Actually, the tasks:
1. Simple client TCP connection. There is a protocol over TCP / IP, you need to form a message, connect to a remote host, if you have not connected yet, send a message and read the response. Plus error handling, timeout checking, retry sending in case of failure. There are no strict performance requirements, the traffic is not large.
')
2. There is a motor and some sensor. It is necessary to make a scan - go through the engine along a predetermined path: send the engine to a point, wait for it to arrive, take a sensor reading, display a point on the graph (in the GUI stream), go to the next point ...
3. The data obtained after scanning must be processed (conditionally long computing process) and shoved into a pdf report (conditionally lengthy input-output process) along with a graphic image and data entered by the user (GUI stream).
1. Simple client TCP connection
Suppose there is some messaging protocol. Messages can contain a header, checksum or anything else. For each message must be a response from the server. In its simplest form, the solution might look something like this:
public String send(String command) { try { if (!isConnected()) { connect(); } byte[] bytes = command.getBytes(); bytes = addHeader(bytes); sendBytes(bytes); return readAnswer(); } catch (IOException e) {
I do not describe the implementation details, but in brief: connect () creates java.net.Socket and connects to the server, sendBytes () writes to the output stream of the socket, readAnswer () reads from the input stream of the socket. In addition to addHeader (), there may be methods that add a checksum, coding, and so on.
The problems of this code: blocking the read / write and inconvenient error handling - it is not clear what to do with the exception: either to forward to the top, or to do something here (repeat the sending recursively?). Just these two problems and solves RxJava. Rewrite:
public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .map(result -> readAnswer()); }
Application:
connection.send("echo 123") .subscribe( answer -> { }, throwable -> { } );
In general, it turned out the same thing, only in the form of a monad, a set of operators and with a number of nuances.
First, the sendBytes () method now returns a boolean. RxJava works with data streams, and if someone returns void instead of data, then there is no flow anymore. Therefore, you must either add the return result to the method (at least return true), or use doOnNext instead of map — this operator returns the same as it received.
Secondly, the send () method now returns Observable, and not the String itself. Therefore, we need a separate response handler (or lambda, as in the example). The exception is the same. Here it is necessary, as they say, to start thinking asynchronously. Instead of the result itself, we get an object, which then someday will provide us with a result, and we must provide it with what this result will receive. But this code is still blocking, so this asynchronous thinking does not make sense. You can, however, make a wrapper for the String and pull the result out of the monad through the closure of this wrapper, but these are already dirty hacks that violate the principles of functional programming.
Improve this code. Let's start with error handling. RxJava catches exceptions that occur in operators, and passes them to the subscriber. The second argument of the subscribe () method is the functional interface of Action1 — it’s responsible for handling the exception. If any of the methods previously could throw an IOException or some other checked exception, then now it is no longer possible. Such exceptions need to catch hands and do something with them. For example, wrap in a RuntimeException to provide further RxJava solutions. But Action1 is not much different from the usual try-catch approach. RxJava has operators for handling errors: doOnError (), onErrorReturn (), onErrorResumeNext () and onExceptionResumeNext (). And then there is the banal retry (), which is exactly what is needed here. If there is any connection error, you can simply repeat the sending n-time.
public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .map(result -> readAnswer()) .doOnError(throwable -> disconnect()) .retry(MAX_RETRY_COUNT); }
The exception handler passed to subscribe () will only be called if all repetitions end with an error. For reliability, we also call disconnect () before retrying to close and zero the socket. Otherwise, in checkConnection () inside, when isConnected () is called, we can get a false positive response, and all repeated attempts will again lead to an error. For example, if the server killed the connection by timeout, then the Socket.isConnected () method on the client side will still return true - on the client side, the socket is connected, all the rules.
You can also add a timeout in case the server poplohelo and the client is blocked from writing to the socket:
public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS) .map(result -> readAnswer()) .doOnError(throwable -> disconnect()) .retry(MAX_RETRY_COUNT); }
The timeout operator throws an exception if no items have been received from the Observable within the specified time. And we are already able to process exceptions.
Now the second problem is that we still have blocking operations, so if we call send () from the GUI thread, we can get interface hangs. You just need to say RxJava, so that all these actions are performed in another thread.
To do this, there are observeOn () and subscribeOn () statements. Many people have problems understanding how these operators differ - there are a lot of articles on this topic and questions on stackoverflow. Let's raise this topic again and think together what we need to use now. Here is what is written in the official documentation:
SubscribeOn - specify the Scheduler on which an Observable will operate.
ObserveOn - observe the Observable.
An observable is one who supplies data. An observer is someone who receives data and does something with it. We need everything to be done in a different thread. Rather, we need our Observable to deliver data initially in another stream. And once the data is delivered in another stream, then all observers will process them in another stream. This is by definition subscribeOn () - it defines the scheduler for Observable, which we created at the very beginning:
public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS) .map(result -> readAnswer()) .doOnError(throwable -> disconnect()) .retry(MAX_RETRY_COUNT) .subscribeOn(Schedulers.io()); }
Now the operators will be executed in the stream that the io scheduler provides them. If you call send () several times in a row without waiting for an answer, then synchronization problems may arise. In an amicable way, the functions passed to the operators should be clean (without side effects), but in the case of a socket, this is problematic. Pure functions are generally not very friendly with I / O. You need to synchronize calls to the socket or implement something like a ConnectionPool - here you need to proceed from the task.
It should be borne in mind that then the processing of the response by the subscriber (he is also an observer) will be carried out in another stream, and this is not always good. For example, if we want to display the answer in the graphical interface, we will most likely get an exception that we are not doing this in the main thread. To do this, put the handler in the event queue of the framework responsible for the graphical interface. In different frameworks, this is done differently. JavaFX has a Platform.runLater (runnable) method for this. You can call it directly in the response handler, or you can write your scheduler:
public final class FxScheduler extends Scheduler { private final static FxScheduler m_instance = new FxScheduler(); private FxScheduler() {} public static FxScheduler getInstance() { return m_instance; } @Override public Worker createWorker() { return new Worker() { private final CompositeSubscription m_subscription = new CompositeSubscription(); @Override public Subscription schedule(Action0 action0) { Platform.runLater(action0::call); return m_subscription; } @Override public Subscription schedule(Action0 action0, long delay, TimeUnit timeUnit) { Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { Platform.runLater(action0::call); } }, timeUnit.toMillis(delay)); return m_subscription; } @Override public void unsubscribe() { m_subscription.unsubscribe(); } @Override public boolean isUnsubscribed() { return m_subscription.isUnsubscribed(); } }; } }
By the way, for Android there is AndroidSchedulers.mainThread () in RxAndroid - add-on for RxJava. An example of sending a command is then:
send("echo 123") .observeOn(FxScheduler.getInstance()) .subscribe( answer -> { }, throwable -> { } );
Here we use already observeOn () - we need to inform RxJava that “the next observer should be performed through such a scheduler”.
RxJava provides convenient control of the operator pipeline. Next to .map (bytes -> sendBytes (bytes)), you can add a checksum calculation, then run the bytes through encoding. You can add logging of the outgoing command at the beginning, and at the end - the received response. In general, you get the idea.
2. Scan using engine and sensor
There is a set of points - it can be the angles of rotation of the engine in degrees or the position of the device, which is driven by the engine. In general, there is some kind of actuator. And there is also an external sensor from which you can get values. It is necessary to drive the engine on a set of points, at each point to get the value from the sensor, to build a curve on the graph. Repeat the procedure n times (n curves on the graph). In this case, the engine does not work instantly, you need to wait when it comes to the position.
So, we have a set of points, for each you need to do something (preferably in another thread), and the result is processed in the GUI stream (add a point to LineChart, for example). It looks like a typical task for RxJava.
public Observable<Point> startScan(List<Double> trajectory, int iterationCount) { return Observable.from(trajectory) .subscribeOn(Schedulers.io()) .doOnNext(this::moveMotor) .doOnNext(this::blockUntilTargetReached) .map(this::createResultPoint) .repeat(iterationCount); }
We use Schedulers.io (): control of the engine and sensor is still input-output operations. moveMotor () sends a command to the engine (via the previously written Connection, for example).
blockUntilTargetReached () queries the engine for its position, compares it with the target, and puts the stream to sleep for a few milliseconds if the engine has not reached it yet. createResultPoint () queries the sensor for a value in returns a Point object containing a pair of numbers — the target position and the value from the sensor. Repeat () works almost like retry () - it repeats the entire stream from the very beginning every time, and retry () only after an error.
The original Observable will output points one by one. It will issue the next point only when the previous one passes all the operators up to the subscriber. This corresponds to the functional approach with its lazy calculations and stream processing. StreamAPI and LINQ work in the same way. Due to this, the scan will go on the points in turn, not forEach (this :: moveMotor), then forEach (this :: blockUntilTargetReached) and so on.
Application:
final List<Double> trajectory = ...; final int n = ...; startScan(trajectory, n) .observeOn(FxScheduler.getInstance()) .subscribe( point -> processPoint(point), throwable -> processError(throwable), () -> processData() );
The problem is that the subscriber does not distinguish at which repetition point was obtained. That is, instead of n curves, we get one curve n times longer. Somehow you need to manually keep track of what has begun a new scan. For example, count the number of points and start a new curve if the counter value exceeded the number of points in the path. Or compare the arrived point with the first point of the trajectory.
The third argument appeared in subscribe () - this is the onComplete () handler, which is called when the Observable has run out of elements.
subscribe () returns an object that has a Subscription interface. If you call the unsubscribe () method, then the Observable will no longer have a subscriber accepting data, so it will simply stop issuing it. The principle of lazy calculations - if the data is not needed by anyone, then you do not need to transfer them. Operators should still not have side effects in accordance with the functional programming paradigm, so Observable doesn’t make sense to simply execute operators without a subscriber. With unsubscribe (), you can implement a cancellation scan. Unless the engine needs to send a command to stop the movement - unsubscribe () is not responsible for this.
3. Data Processing and Report
After scanning, we received a lot of useful data, now we need to process them, calculate the necessary values ​​and generate a pdf report.
The report should also include the values ​​of some fields from the interface (for example, the user's full name) and a drawing of the graphs received. In the case of JavaFX, a drawing can be obtained using the snapshot () method that each graphic object has. Since these are actions with JavaFX objects, they must be executed in the GUI thread. For this, we already have FxScheduler.
class ReportMetaInfo { private String fileName; private String name; private WritableImage image; } final Observable<ReportMetaInfo> reportGuiData = Observable.just(m_reportInfoProvider) .subscribeOn(FxScheduler.getInstance()) .map(provider -> { final ReportMetaInfo info = new ReportMetaInfo(); info.fileName = m_reportInfoProvider.getFileName(); info.name = m_reportInfoProvider.getName(); info.image = m_reportInfoProvider.getChartSnapshot(); return info; });
m_reportInfoProvider is an implementation of the ReportInfoProvider interface - the layer between the model and the view. In essence, this is a call to the getters from TextView, but the models don't care - it just has an interface.
For calculations, there is Schedulers.computation ().
final Observable<ScanResult> reportComputationalData = Observable.just(scanData) .subscribeOn(Schedulers.computation()) .map(data -> new ResultProcessor(data).calculateAll());
Now we want to combine the data from the form and the data from the calculations and put it all into a heavy pdf-file. There is a zip () operator and Schedulers.io () for this:
class ReportData { ReportMetaInfo metaInfo; ScanResult result; ReportData(ReportMetaInfo metaInfo, ScanResult result) { this.metaInfo = metaInfo; this.result = result; } } Observable.zip( reportGuiData, reportComputationalData, (reportInfo, scanResult) -> new ReportData(reportInfo, scanResult) ) .observeOn(Schedulers.io()) .map(reportData -> ReportGenerator.createPdf( reportData.metaInfo.fileName, reportData.metaInfo.name, reportData.metaInfo.image, reportData.result )).subscribe( isOk -> { }, throwable -> { }, () -> { } );
zip () accepts up to nine different Observables and connects elements from them into tuples. The function for the connection must be provided by yourself as well as the resulting type for the tuples. As a result, the data acquisition from the interface (including the image of the graph) and the processing of the scan results are parallel. Whether it is necessary to parallelize such actions depends on specific tasks and data volumes - I gave a somewhat simplified example.
It should be borne in mind that when we have multiple data streams, backpressure can occur. These are various problems associated with different thread performance and different performance of the Observable and Observer. In general, this is a situation where someone is idle, and someone already overflows through the buffer. So you need to be careful.
Conclusion
Most likely, there are other solutions for these tasks (and more effective ones) - if someone specifies them to me, I will be happy to take note of this and take it into account in my work. Using these tasks as an example, I tried to show some features of RxJava: error handling, the difference between subscribeOn () and observeOn (), custom schedulers and getting results in a GUI stream, the principle of lazy calculations and its use for controlling external devices, interruption of Observable operation, parallel operation several observable and their union. So even if these tasks are not entirely successful for RxJava, the principles themselves considered may be useful for others.