📜 ⬆️ ⬇️

Introducing reactive threads - for Java developers

Hi, Habr!

Today we will return to one of the topics covered in our wonderful book " Reactive Design Patterns ". We will talk about Akka Streams and streaming data in general - in the book of Roland Kuhn, chapters 10 and 15-17 are devoted to these issues.

Reactive streams are a standard way to asynchronously process data in streaming style. They were included in Java 9 as java.util.concurrent.Flow interfaces, and are now becoming a real lifesaver for creating streaming components in various applications — and this arrangement will continue over the coming years. It should be noted that reactive streams are “just” standard, and by themselves they are not suitable for anything. In practice, one or another specific implementation of this standard is used, and today we will talk about Akka Streams - one of the leading implementations of reactive flows since their inception.

Context
')
A typical stream processing pipeline consists of several steps, each of which carries information to the next step (that is, in descending order). So, if you take two adjacent steps and consider the upstream supplier, and the next one - the data consumer, it turns out that the supplier can work either slower or faster than the consumer. When the supplier is slower, everything is fine, but the situation is complicated if the consumer does not keep up with the supplier. In this case, the consumer may overflow with the data that he has to (moderately force) carefully process.

The simplest way to handle excess data is to pick up and drop everything that cannot be processed. This is exactly what they do, for example, when working with network equipment. But what if we don’t want to discard anything at all? Then we can use back pressure (backpressure)

The idea of ​​back pressure is very important in the context of Reactive Streams and boils down to the fact that we limit the amount of data transmitted between adjacent links of the conveyor, so no link is full. Since the most important aspect of the reactive approach is to avoid blocking unless absolutely necessary, the implementation of back pressure in the reactive flow must also be non-blocking.

How it's done

The Reactive Streams standard defines a number of interfaces, but not an implementation as such. This means that by simply adding a dependency to org.reactivestreams: reactive-streams, we are just marking time — we still need a concrete implementation. There are many implementations of Reactive Streams, and in this article we will use Akka Streams and the corresponding Java-based DSL . Other implementations include RxJava 2.x or Reactor and others.

Usage example

Suppose we have a directory in which we want to track new CSV files, then each file is streamlined, some aggregation is performed on the fly, and the results collected in this way are sent to a web socket (in real time). In addition, we want to set a certain threshold for the accumulation of aggregated data, upon reaching which an email notification will be initiated.

In our example, the CSV lines will have pairs ( id , value ), and the id will change every two lines, for example:

370582,0.17870700247256666
370582,0.5262255382633264
441876,0.30998025265909457
441876,0.3141591265785087
722246,0.7334219632071504
722246,0.5310146239777006


We want to calculate the average value for two strings with a common id and send it to a web socket only if it exceeds 0.9. Moreover, we want to send an email notification after every fifth value that arrives on a web socket. Finally, we want to read and display data obtained from a web socket, and this will be done through a trivial frontend written in JavaScript.

Architecture

We are going to use a number of tools from the Akka ecosystem (see Figure 1). Naturally, Akka Streams will be in the center of the entire system, which allows you to process data in real time on a streaming basis. We will use Alpakka to read CSV files, this is a set of connectors for integrating Akka Streams with various technologies, protocols or libraries. Interestingly, since Akka Streams are jet streams, the entire Alpakka ecosystem is also available for any other RS ​​implementations — it is this interoperability gain that RS interfaces are designed to achieve. Finally, we will use Akka HTTP, which will provide the end point of the web socket. The most pleasant thing in this case is that Akka HTTP seamlessly integrates with Akka Streams (which, in fact, uses it under the hood), so it’s easy to provide a stream as a web socket.



Fig. 1. Architecture Overview

If we compare this scheme with the classical architecture of Java EE, it is probably noticeable that everything here is much simpler. No containers and bins, but only a simple standalone application. Moreover, the Java EE stack does not support the stream approach at all.

Basics of Akka Streams

In Akka Streams, the processing pipeline (graph) consists of elements of three types Source (source), Sink (catcher) and Flows (processing steps).

Based on these components, we define our graph, which, in essence, is just a recipe for data processing. No calculations are made there. In order for the pipeline to work, we need to materialize the graph, that is, to bring it into a launchable form. To do this, you will need a so-called materializer that optimizes the definition of the graph and, ultimately, launches it. However, the built-in ActorMaterializer is virtually uncontested, so you are unlikely to use any other implementation.
If you look closely at the parameters of the component types, it is noticeable that each component (except for the corresponding types of I / O) is of the mysterious type Mat. It refers to the so-called “materialized value” - this value is accessible from the outside of the graph (as opposed to input / output types, available only for internal communication between the steps of the graph - see Fig. 2). If you prefer to ignore the materialized value (and this often happens if we are only interested in data transfer between the steps of the graph), then to indicate this option there is a special parameter of the type: NotUsed . It can be compared to Void from Java, however, semantically it is a little more loaded: in the sense of meaning, “we do not use this meaning” is more informative than Void . Also note that some APIs use a similar type of Done, signaling that a task has been completed. Perhaps the other Java libraries in both of these cases would use Void , but in Akka Streams all types try to fill up the maximum with useful semantics.



Fig. 2. Description of Flow Type Parameters

application

Now let's move on to a specific implementation of a CSV handler. First, we define the Akka Streams graph, and then, using the Akka HTTP protocol, we will connect the stream to a web socket.

Components of a stream conveyor

At the input point of our streaming pipeline, we want to track whether new CSV files have appeared in the directory of interest to us. I would like to use java.nio.file.WatchService for this, but since we have a streaming application, you need to get an event source ( Source ) and work with it, and not organize everything through callbacks. Fortunately, such a Source is already available in Alpakka in the form of one of the DirectoryChangesSource connectors, part of the alpakka-file , where WatchService used "under the hood":

 private final Source<Pair<Path, DirectoryChange>, NotUsed> newFiles = DirectoryChangesSource.create(DATA_DIR, DATA_DIR_POLL_INTERVAL, 128); 

So we get the source, giving us the type of the Pair<Path, DirectoryChange> . We are going to filter them so that we only select new CSV files and then transfer them down. For such data conversion, as well as for all subsequent ones, we will use small elements called Flow, from which then a complete processing pipeline will be formed:

 private final Flow<Pair<Path, DirectoryChange>, Path, NotUsed> csvPaths = Flow.<Pair<Path, DirectoryChange>>create() .filter(this::isCsvFileCreationEvent) .map(Pair::first); private boolean isCsvFileCreationEvent(Pair<Path, DirectoryChange> p) { return p.first().toString().endsWith(".csv") && p.second().equals(DirectoryChange.Creation); } 

You can create Flow , for example, using the generic create() method — it is useful when the input type itself is generic. Here, the resulting stream will generate (in the form of Path ) each new CSV file appearing in DATA_DIR .

Now we are going to convert Paths to strings chosen from each file. To turn a source into another source, you can use one of the flatMap* methods. In both cases, we create a Source from each incoming element and in some way combine several resulting sources into a new one-piece one, linking or merging the original sources. In this case, we will focus on flatMapConcat , since we want to preserve the order of the rows, so that the rows with the same id remain next to each other. To convert the Path to a stream of bytes, use the built-in FileIO utility:

 private final Flow<Path, ByteString, NotUsed> fileBytes = Flow.of(Path.class).flatMapConcat(FileIO::fromPath); 

This time we will use the of() method to create a new stream — it is convenient when the input type is not generic.

The above ByteString is a byte sequence representation accepted in Akka Streams. In this case, we want to parse the byte stream as a CSV file - and for this we use again one of the Alpakka modules, this time alpakka-csv :

 private final Flow<ByteString, Collection<ByteString>, NotUsed> csvFields = Flow.of(ByteString.class).via(CsvParsing.lineScanner()); 

Pay attention to the via combinator used here, which allows you to attach an arbitrary Flow to the output obtained in another step of the graph ( Source or another Flow ). The result is a stream of elements, each of which corresponds to a field in a single line of the CSV file. Then they can be transformed into a model of our subject area:

 class Reading { private final int id; private final double value; private Reading(int id, double value) { this.id = id; this.value = value; } double getValue() { return value; } @Override public String toString() { return String.format("Reading(%d, %f)", id, value); } static Reading create(Collection<ByteString> fields) { List<String> fieldList = fields.stream().map(ByteString::utf8String).collect(toList()); int id = Integer.parseInt(fieldList.get(0)); double value = Double.parseDouble(fieldList.get(1)); return new Reading(id, value); } } 

For transformation as such, we use the map method and pass the reference to the Reading.create method:

 private final Flow<Collection<ByteString>, Reading, NotUsed> readings = Flow.<Collection<ByteString>>create().map(Reading::create); 

At the next stage, we have to add the readings in pairs, calculate the average value for each group and transmit the information further only when a certain threshold is reached. Since we want the average to be calculated asynchronously, we will use the mapAsyncUnordered method, which performs an asynchronous operation with a given level of parallelism:

 private final Flow<Reading, Double, NotUsed> averageReadings = Flow.of(Reading.class) .grouped(2) .mapAsyncUnordered(10, readings -> CompletableFuture.supplyAsync(() -> readings.stream() .map(Reading::getValue) .collect(averagingDouble(v -> v))) ) .filter(v -> v > AVERAGE_THRESHOLD); 

Having identified the above components, we are ready to fold one of them into an integral conveyor (with the help of the already known combinator via ). It is absolutely not difficult:

 private final Source<Double, NotUsed> liveReadings = newFiles .via(csvPaths) .via(fileBytes) .via(csvFields) .via(readings) .via(averageReadings); 

Note

When combining components as shown above, the compiler protects us by not accidentally connecting two blocks containing incompatible data types.

Stream as a web socket

Now we use Akka HTTP to create a simple web server that will play such roles:


It costs nothing to create a web server using Akka HTTP: you just need to inherit HttpApp and provide the required mappings on the DSL route:

 class Server extends HttpApp { private final Source<Double, NotUsed> readings; Server(Source<Double, NotUsed> readings) { this.readings = readings; } @Override protected Route routes() { return route( path("data", () -> { Source<Message, NotUsed> messages = readings.map(String::valueOf).map(TextMessage::create); return handleWebSocketMessages(Flow.fromSinkAndSourceCoupled(Sink.ignore(), messages)); } ), get(() -> pathSingleSlash(() -> getFromResource("index.html") ) ) ); } } 

Two routes are defined here: /data , that is, the end point of the web socket, and / along which a trivial frontend is issued. It is already clear how easy it is to provide Source from Akka Streams as the end point of a web socket: we take handleWebSocketMessages , the task of which is to improve the HTTP connection before connecting to the web socket and organize a stream in which incoming and outgoing data will be processed.

WebSocket modeled as a stream, that is, outgoing and incoming messages are sent to the client. In this case, we want to ignore the incoming data and create such a stream, the “incoming” side of which is entered into Sink.ignore() . The “outgoing” side of the web socket handler thread is simply connected to our source from which the average values ​​come. All that has to be done with double numbers, in the form of which are presented are averages - convert each of them to TextMessage , this is a wrapper used in Akka for web socket data. Everything is simply done using the map method already familiar to us.

To start the server, you just need to run the startServer method, specifying the host name and port:

 Server server = new Server(csvProcessor.liveReadings); server.startServer(config.getString("server.host"), config.getInt("server.port")); 

Frontend

To retrieve data from a web socket and display it, we use absolutely simple JavaScript code that simply attaches the values ​​to the textarea. This code uses the ES6 syntax, which should normally run in any modern browser.

 let ws = new WebSocket("ws://localhost:8080/data"); ws.onopen = () => log("WS connection opened"); ws.onclose = event => log("WS connection closed with code: " + event.code); ws.onmessage = event => log("WS received: " + event.data); 

The log method attaches a message to the textarea, and also puts a timestamp.

Launch

To run and test the application, you need:


Add a mail trigger

The final touch in our application is a side channel in which we will simulate email alerts sent after every fifth element is received on a web socket. It should work "sideways" in order not to disturb the transfer of basic elements.

To implement this behavior, we use the more advanced feature of Akka Streams - the Graph DSL language - in which we write our own graph step, in which the stream splits into two parts. The first simply submits the values ​​to the web socket, and the second controls when the next 5 seconds will expire, and sends an email notification — see fig. 3



Fig. 3. Our own graph pitch for emailing

We will use the built-in step Broadcast , in which our input is sent to a set of announced conclusions. Also we will write our own catcher - Mailer :

 private final Graph<FlowShape<Double, Double>, NotUsed> notifier = GraphDSL.create(builder -> { Sink<Double, NotUsed> mailerSink = Flow.of(Double.class) .grouped(EMAIL_THRESHOLD) .to(Sink.foreach(ds -> logger.info("Sending e-mail") )); UniformFanOutShape<Double, Double> broadcast = builder.add(Broadcast.create(2)); SinkShape<Double> mailer = builder.add(mailerSink); builder.from(broadcast.out(1)).toInlet(mailer.in()); return FlowShape.of(broadcast.in(), broadcast.out(0)); }); 

We begin to create our own graph step from the GraphDSL.create() method, where a copy of the graph Builder is provided, Builder - it is used for manipulating the graph structure.

Next, we define our own catcher, where grouped is used to combine incoming elements into groups of arbitrary size (default 5), after which these groups are sent down. For each such group, we will simulate a side effect: an e-mail notification.

Having defined our own catcher, we can use the builder instance to add it to the graph. We also add a Broadcast step with two outputs.

Next, you need to specify the connection between the elements of the graph - we want to connect one of the outputs of the Broadcast step to the email trap, and the other to make an output for the step of the graph written by us. Entering the step we wrote will be directly connected to the output of the Broadcast step.

Note 1
The compiler cannot determine if all parts of the graph are correctly connected. However, this moment is checked by the materializer at runtime, so there will be no hanging elements at the input or at the output.

Note 2
In this case, you can see that all the steps we have written are of the form Graph <S, M>, where S is the form that determines the number and types of inputs and outputs, and M is the materialized value (if any). Here we deal with the Flow form, that is, we have one input and one output.

At the last stage, we connect notifier as an additional step of the liveReadings pipeline, which will now take the following form:

 private final Source<Double, NotUsed> liveReadings = newFiles .via(csvPaths) .via(fileBytes) .via(csvFields) .via(readings) .via(averageReadings) .via(notifier); 

When you run the updated code, you will see mail notification entries appear in the log. The notification is sent whenever five more values ​​have passed through the web socket.

Total

In this article, we studied the general concepts of stream data processing, learned how to build a lightweight data processing pipeline using Akka Streams. This is an alternative to the traditional approach used in Java EE.

We looked at how to use some of the processing steps built into Akka Streams, how to write your own step in Graph DSL. It also showed how to use Alpakka to stream data from the file system and the Akka HTTP protocol, which allows you to create a simple web server with a web socket on the endpoint that is seamlessly integrated with Akka Streams.

A full working example with the code from this article is on GitHub . There are several additional log steps placed at different points. They help to more accurately imagine what is happening inside the pipeline. In the article I specifically lowered them to make it shorter.

Source: https://habr.com/ru/post/353496/


All Articles