📜 ⬆️ ⬇️

Introduction to reactive programming in Spring

Hi, Habr!

This week we are expecting a new book on Spring 5 from typography:


Among the interesting features of Spring 5, reactive programming deserves special mention, the implementation of which in this framework is briefly described by Matt Raible's proposed article. In the aforementioned book, reactive patterns are discussed in Chapter 11.
')
Matt was co-authored by Josh Long, author of another excellent book about Java and Spring, " Java in the Cloud, " published last summer.

Reactive programming is your way to creating systems that are resistant to high loads. Processing huge traffic is no longer a problem, since the server is non-blocking, and client processes do not have to wait for responses. The client can not directly observe how the program runs on the server, and synchronize with it. When an API finds it difficult to process requests, it must still give reasonable responses. Should not refuse and discard messages in an uncontrolled manner. Must report to the parent components that it is working under load so that they can partially release it from this load. This technique is called “backpressure” (backpressure), it is an important aspect of reactive programming.

This article we co-wrote with Josh Long . Josh is a Java champion, Spring Developer Advocate and generally a world guy who works at Pivotal. I've been working with Spring for a long time, but it was Josh who showed me the Spring Boot, it was at the Devoxx conference in Belgium. Since then we have become close friends, we are interested in Java and write cool applications.

Reactive programming or I / O, I / O, we go to work ...

Reactive programming is an approach to software development that actively uses asynchronous I / O. Asynchronous I / O is a small idea, fraught with big changes in programming. The idea itself is simple: to remedy the situation with the inefficient allocation of resources, freeing up those resources that would have been wasted without our intervention, waiting for the completion of I / O. Asynchronous I / O inverts the usual approach to handling I / O: the client is released and may be engaged in other tasks, waiting for new notifications.

Consider what is common between synchronous and asynchronous I / O, and what are the differences between them.

Let's write a simple program that reads data from a source (specifically, this is a java.io.File reference). Let's start with an implementation that uses the good old java.io.InputStream :

Example 1. Synchronous reading of data from a file

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) { //1 byte[] data = new byte[FileCopyUtils.BUFFER_SIZE]; int res; while ((res = in.read(data, 0, data.length)) != -1) { //2 consumer.accept(BytesPayload.from(data, res)); //3 } } } } 

  1. We provide the file for reading using the usual java.io.File
  2. We pull the results from the source one line at a time ...
  3. I wrote this code to accept Consumer<BytesPayloadgt; called when new data arrives

Simple enough, what do you say? Run this code - and you will see in the log output (to the left of each line), indicating that all actions occur in a single stream.
Here we pull the bytes from our data taken from the source (in this case we are talking about a subclass of java.io.FileInputStream , inherited from java.io.InputStream ). What is wrong with this example? In this case, we use the InputStream, pointing to the data located in our file system. If the file is there and the hard disk is functioning, then this code will work as expected.

But, what will happen if we read data not from File , but from a network socket, and, moreover, we use another InputStream implementation? There is nothing to worry about! Of course, there will be nothing to worry about, if the speed of the network is infinitely great. And if the network channel between this and another node will never fail. If these conditions are met, then the code will work perfectly.

And what will happen if the network starts to slow down or fall? In this case, I mean that we will have to increase the time until the return operation in.read(…) . In fact, she may never return! This is a problem if we try to do something else with the stream from which we read data. Of course, you can always create another stream and read data through it. Up to a certain point, this can be managed, but in the end we will reach the limit at which simply adding flows for further scaling will not be enough. We will not have true competition over the number of cores that are on our machine. Dead end! In this case, we can increase the processing of input / output (here we mean reading) only through additional threads, and here we will sooner or later reach the limit.

In this example, the main piece of work falls on reading - almost nothing happens on other fronts. We are addicted to I / O. Consider how an asynchronous solution helps us partially overcome the monopolization of our flows.

Example 2. Asynchronous data read from file

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath(); // 1 this.fileChannel = AsynchronousFileChannel.open(path, Collections.singleton(StandardOpenOption.READ), this.executorService); //2 ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE); this.fileChannel.read(buffer, position, buffer, this); //3 while (this.bytesRead > 0) { this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } } @Override public void completed(Integer result, ByteBuffer buffer) { //4 this.bytesRead = result; if (this.bytesRead < 0) return; buffer.flip(); byte[] data = new byte[buffer.limit()]; buffer.get(data); //5 consumer.accept(BytesPayload.from(data, data.length)); buffer.clear(); this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { log.error(exc); } } 

  1. This time we adapt java.io.File , making it Java NIO java.nio.file.Path
  2. When creating the Channel , we, in particular, specify the java.util.concurrent.ExecutorService service, which will be used to call the CompletionHandler handler when the necessary data for this appears
  3. We start reading by passing the reference to CompletionHandler<Integer, ByteBuffer> (this)
  4. In the callback, we read the bytes from the ByteBuffer to the capacity of byte[]
  5. Just like in the Synchronous example, byte[] data is passed to the consumer.

Immediately make a reservation: this code turned goraaaazdo harder! There is such a bunch of things going on here that your head is spinning at once, however, let me note ... this code reads data from the Java NIO Channel , and then processes this data in a separate thread responsible for callbacks. Thus, the stream in which the reading has begun is not monopolized. We return almost instantly after the call .read(..) , and when, finally, we have the data at our disposal, the callback is made - in a different thread. If there is a delay between .read() calls, you can move on to other matters by performing them in our stream. The duration of an asynchronous read operation, from the first byte to the last, is at best no more than that of a synchronous read operation. Normally, an asynchronous operation is not significantly longer. However, by going to such additional difficulties, we can more effectively handle our flows. Do more work, multiplex I / O in a pool with a finite number of threads.

I work for a cloud computing company. We would like you to get all the new application instances to solve problems with horizontal scaling! Of course, here I am a little crafty. Asynchronous I / O complicates the situation a bit, but I hope this example illustrates why reactive code is so useful: it allows you to handle more requests and do more work on existing hardware if performance is highly dependent on I / O. If the performance depends on the use of the processor (for example, talking about operations on Fibonacci numbers, mining bitcoins or cryptography), then reactive programming will not give us anything.

Currently, most of us do not use Channel or InputStream implementations for everyday work! Problems have to reflect at the level of higher-level abstractions. It's about things like arrays or, rather, about the java.util.Collection hierarchy. The java.util.Collection collection is very well displayed on the InputStream: both entities assume that you can operate with all the data at once, and almost instantly. It is expected that you will be able to complete reading from most of the InputStreams earlier, not later. Collection types become a bit awkward when moving to larger amounts of data. What to do if you are dealing with something potentially infinite (unlimited) - for example, with web sockets or server events? What if there is a delay between posts?

We need a better way to describe this kind of data. We are talking about asynchronous events, such that will occur in the end. It may seem that Future<T> or CompletableFuture<T> are well suited for such a purpose, but they describe only one thing at a time that happens in the end. In fact, Java does not provide a suitable metaphor for describing this kind of data. Both Iterator and the Stream types from Java 8 can be unbound, however, both are pull-oriented; you yourself are requesting the next entry, and not the type should send a callback to your code. It is assumed that, if push-based processing was supported in this case, allowing much more to be achieved at the thread level, the API would also provide threading and scheduling control. Iterator implementations say nothing about threading, and all Java 8 threads share the same fork-join pool.

If Iterator and Stream really supported push processing, we would face another problem that really escalates in the context of I / O: we will need some kind of backward penetration mechanism! Since the data consumer is processed asynchronously, we have no idea when the data will be in the pipeline and in what quantity. We do not know how much data will need to be processed on the next callback: one byte or one terabyte!

By pulling data from an InputStream , you read as much information as you are willing to process, and no more. In the previous examples, we read the data into a byte[] buffer of a fixed and known length. In an asynchronous context, we need some way to tell the provider how much data we are ready to process.
Yes, sir. There is definitely something missing here.

Finding the missing metaphor

In this case, we are looking for a metaphor that would beautifully reflect the essence of asynchronous I / O, support such a data transfer mechanism and allow controlling the flow of execution in distributed systems. In reactive programming, the ability of a client to signal what load he is able to handle is called “reverse flow”.

Now there are a number of good projects - Vert.x, Akka Streams and RxJava - supporting reactive programming. The Spring team is also leading a project called Reactor . Between these different standards there is a fairly wide general field, de facto allocated to the standard of the Reactive Streams initiative . In the Reactive Streams initiative, four types are defined:

Publisher<T> ; produces values ​​that may come eventually. Publisher<T> ; produces values ​​of type T for Subscriber<T> .

Example 3. Reactive threads: Publisher<T> interface .

 package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); } 

The Subscriber type subscribes to Publisher<T> , receiving notifications of any new values ​​of type T through its onNext(T) method. If any errors occur, its onError(Throwable) method is onError(Throwable) . When processing is completed normally, the subscriber’s onComplete method is called.

Example 4. Reactive threads: Subscriber<T> interface.

 package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } 

When Subscriber first connects to Publisher , it gets the Subscription in the Subscriber#onSubscribe . Subscription subscriptions are perhaps the most important part of the entire specification; it is precisely it that provides the reverse flow. The Subscriber subscriber uses the Subscription#request method to request additional data or the Subscription#cancel method to stop processing.

Example 5. Reactive threads: the Subscription<T> interface .

 package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); } 

The specification of reactive streams provides another useful, albeit obvious, type: Processor<A,B> is just an interface that inherits both Subscriber<A> and Publisher<B> .

Example 6. Reactive threads: Processor<T> interface .

 package org.reactivestreams; public interface Processor<T, R> extends Subscriber&ltT>, Publisher<R> { } 

The specification is not positioned as a prescription for implementations; in fact, its goal is to define types to support interoperability. The obvious benefit of types associated with reactive threads is that there is still a place for them in the Java 9 release, moreover, semantically, they “one to one” correspond to interfaces from the java.util.concurrent.Flow class, for example: java.util.concurrent.Flow.Publisher .

Meet Reactor

Reactive stream types alone are not enough; higher order implementations are needed to support operations such as filtering and transformation. As such, the Reactor project is convenient; it builds on the Reactive Streams specification and provides two Publisher<T> specializations.

The first, Flux<T> , is Publisher , producing zero or more values. The second, Mono<T> is a Publisher<T> that produces zero or one value. Both of them publish values ​​and can handle them accordingly, however, their capabilities are much broader than the Reactive Streams specification. Both provide operators, allow processing of value streams. Reactor types are well packaged — the output of one of them can serve as input for the other, and if the type needs to work with other data streams, they rely on instances of Publisher<T> .

Both Mono<T> and Flux<T> implement Publisher<T> ; recommend that your methods accept instances of Publisher<T> , but return Flux<T> or Mono<T> ; this will help the client to distinguish exactly which data he receives.

Suppose you were given Publisher<T> and asked to display the user interface for this Publisher<T> . In this case, is it necessary to display a detail page for one record, so how can you get a CompletableFuture<T> ? Or to display an overview page with a list or a grid, where all records are displayed page by page? It is hard to say.

In turn, Flux<T> and Mono<T> very specific. You know that you need to display an overview page if Flux<T> received, and a page with details for one (or none) records when you get Mono<T> .

Reactor is an open source project launched by Pivotal; Now he has become very popular. Facebook uses it in its jet engine to call remote procedures , also used in Rsocket , under the direction of RxJava creator Ben Christensen. Salesforce uses it in its gRPC reactive implementation . Reactor implements Reactive Streams types, so it can interact with other technologies that support these types, for example, with RxJava 2 from Netflix, Akka Streams from Lightbend and with the Vert.x project from the Eclipse Foundation. David Cairnok, head of RxJava 2, also actively collaborated with Pivotal in developing Reactor, making the project even better. Plus, of course, it is present in one form or another in the Spring Framework, starting with the Spring Framework 4.0.

Reactive programming with Spring WebFlux

For all its usefulness, Reactor is just the basis. Our applications must communicate with data sources. Must support authentication and authorization. Spring provides all this. If Reactor gives us the missing metaphor, then Spring helps us all speak a common language.

Spring Framework 5.0 was released in September 2017. It builds on Reactor and Reactive Streams specifications. It has a new reactive execution environment and a component model called Spring WebFlux .

Spring WebFlux does not depend on the Servlet API and does not require them to work. It comes with adapters that allow you to use it on top of the Servlet engine, if required, but this is not necessary. It also provides a completely new Netty-based runtime environment called Spring WebFlux. The Spring Framework 5, which works with Java 8 and Java EE 7 and higher, now serves as the basis for most of the Spring ecosystem, including Spring Data Kay, Spring Security 5, Spring Boot 2, and Spring Cloud Finchley.

Source: https://habr.com/ru/post/435972/


All Articles