Hi, Habr!
Today we will return to one of the topics covered in our wonderful book "
Reactive Design Patterns ". We will talk about Akka Streams and streaming data in general - in the book of Roland Kuhn, chapters 10 and 15-17 are devoted to these issues.
Reactive streams are a standard way to asynchronously process data in streaming style. They were
included in Java 9 as
java.util.concurrent.Flow
interfaces, and are now becoming a real lifesaver for creating streaming components in various applications — and this arrangement will continue over the coming years. It should be noted that reactive streams are “just” standard, and by themselves they are not suitable for anything. In practice, one or another specific implementation of this standard is used, and today we will talk about Akka Streams - one of the leading implementations of reactive flows since their inception.
Context')
A typical stream processing pipeline consists of several steps, each of which carries information to the next step (that is, in descending order). So, if you take two adjacent steps and consider the upstream supplier, and the next one - the data consumer, it turns out that the supplier can work either slower or faster than the consumer. When the supplier is slower, everything is fine, but the situation is complicated if the consumer does not keep up with the supplier. In this case, the consumer may overflow with the data that he has to (moderately force) carefully process.
The simplest way to handle excess data is to pick up and drop everything that cannot be processed. This is exactly what they do, for example, when working with network equipment. But what if we don’t want to discard anything at all? Then we can use back pressure (backpressure)
The idea of ​​back pressure is very important in the context of Reactive Streams and boils down to the fact that we limit the amount of data transmitted between adjacent links of the conveyor, so no link is full. Since the most important aspect of the reactive approach is to avoid blocking unless absolutely necessary, the implementation of back pressure in the reactive flow must also be non-blocking.
How it's doneThe Reactive Streams standard defines a number of interfaces, but not an implementation as such. This means that by simply adding a dependency to org.reactivestreams: reactive-streams, we are just marking time — we still need a concrete implementation. There are many implementations of Reactive Streams, and in this article we will use
Akka Streams and the corresponding Java-based DSL . Other implementations include
RxJava 2.x or
Reactor and others.
Usage exampleSuppose we have a directory in which we want to track new CSV files, then each file is streamlined, some aggregation is performed on the fly, and the results collected in this way are sent to a web socket (in real time). In addition, we want to set a certain threshold for the accumulation of aggregated data, upon reaching which an email notification will be initiated.
In our example, the CSV lines will have pairs (
id
,
value
), and the
id
will change every two lines, for example:
370582,0.17870700247256666
370582,0.5262255382633264
441876,0.30998025265909457
441876,0.3141591265785087
722246,0.7334219632071504
722246,0.5310146239777006
We want to calculate the average value for two strings with a common id and send it to a web socket only if it exceeds 0.9. Moreover, we want to send an email notification after every fifth value that arrives on a web socket. Finally, we want to read and display data obtained from a web socket, and this will be done through a trivial frontend written in JavaScript.
ArchitectureWe are going to use a number of tools from the Akka ecosystem (see Figure 1). Naturally, Akka Streams will be in the center of the entire system, which allows you to process data in real time on a streaming basis. We will use
Alpakka to read CSV files, this is a set of connectors for integrating Akka Streams with various technologies, protocols or libraries. Interestingly, since Akka Streams are jet streams, the entire Alpakka ecosystem is also available for any other RS ​​implementations — it is this interoperability gain that RS interfaces are designed to achieve. Finally, we will use Akka HTTP, which will provide the end point of the web socket. The most pleasant thing in this case is that Akka HTTP seamlessly integrates with Akka Streams (which, in fact, uses it under the hood), so it’s easy to provide a stream as a web socket.
Fig. 1. Architecture OverviewIf we compare this scheme with the classical architecture of Java EE, it is probably noticeable that everything here is much simpler. No containers and bins, but only a simple standalone application. Moreover, the Java EE stack does not support the stream approach at all.
Basics of Akka StreamsIn Akka Streams, the processing pipeline (graph) consists of elements of three types
Source
(source),
Sink
(catcher) and Flows (processing steps).
Based on these components, we define our graph, which, in essence, is just a recipe for data processing. No calculations are made there. In order for the pipeline to work, we need to materialize the graph, that is, to bring it into a launchable form. To do this, you will need a so-called materializer that optimizes the definition of the graph and, ultimately, launches it. However, the built-in ActorMaterializer is virtually uncontested, so you are unlikely to use any other implementation.
If you look closely at the parameters of the component types, it is noticeable that each component (except for the corresponding types of I / O) is of the mysterious type Mat. It refers to the so-called “materialized value” - this value is accessible from the outside of the graph (as opposed to input / output types, available only for internal communication between the steps of the graph - see Fig. 2). If you prefer to ignore the materialized value (and this often happens if we are only interested in data transfer between the steps of the graph), then to indicate this option there is a special parameter of the type:
NotUsed
. It can be compared to
Void
from Java, however, semantically it is a little more loaded: in the sense of meaning, “we do not use this meaning” is more informative than
Void
. Also note that some APIs use a similar type of Done, signaling that a task has been completed. Perhaps the other Java libraries in both of these cases would use
Void
, but in Akka Streams all types try to fill up the maximum with useful semantics.
Fig. 2. Description of Flow Type ParametersapplicationNow let's move on to a specific implementation of a CSV handler. First, we define the Akka Streams graph, and then, using the Akka HTTP protocol, we will connect the stream to a web socket.
Components of a stream conveyorAt the input point of our streaming pipeline, we want to track whether new CSV files have appeared in the directory of interest to us. I would like to use
java.nio.file.WatchService
for this, but since we have a streaming application, you need to get an event source (
Source
) and work with it, and not organize everything through callbacks. Fortunately, such a Source is already available in Alpakka in the form of one of the
DirectoryChangesSource
connectors, part of the
alpakka-file
, where
WatchService
used "under the hood":
private final Source<Pair<Path, DirectoryChange>, NotUsed> newFiles = DirectoryChangesSource.create(DATA_DIR, DATA_DIR_POLL_INTERVAL, 128);
So we get the source, giving us the type of the
Pair<Path, DirectoryChange>
. We are going to filter them so that we only select new CSV files and then transfer them down. For such data conversion, as well as for all subsequent ones, we will use small elements called Flow, from which then a complete processing pipeline will be formed:
private final Flow<Pair<Path, DirectoryChange>, Path, NotUsed> csvPaths = Flow.<Pair<Path, DirectoryChange>>create() .filter(this::isCsvFileCreationEvent) .map(Pair::first); private boolean isCsvFileCreationEvent(Pair<Path, DirectoryChange> p) { return p.first().toString().endsWith(".csv") && p.second().equals(DirectoryChange.Creation); }
You can create
Flow
, for example, using the generic
create()
method — it is useful when the input type itself is generic. Here, the resulting stream will generate (in the form of
Path
) each new CSV file appearing in
DATA_DIR
.
Now we are going to convert Paths to strings chosen from each file. To turn a source into another source, you can use one of the
flatMap*
methods. In both cases, we create a
Source
from each incoming element and in some way combine several resulting sources into a new one-piece one, linking or merging the original sources. In this case, we will focus on
flatMapConcat
, since we want to preserve the order of the rows, so that the rows with the same
id
remain next to each other. To convert the
Path
to a stream of bytes, use the built-in
FileIO
utility:
private final Flow<Path, ByteString, NotUsed> fileBytes = Flow.of(Path.class).flatMapConcat(FileIO::fromPath);
This time we will use the
of()
method to create a new stream — it is convenient when the input type is not generic.
The above
ByteString
is a byte sequence representation accepted in Akka Streams. In this case, we want to parse the byte stream as a CSV file - and for this we use again one of the Alpakka modules, this time
alpakka-csv
:
private final Flow<ByteString, Collection<ByteString>, NotUsed> csvFields = Flow.of(ByteString.class).via(CsvParsing.lineScanner());
Pay attention to the
via
combinator used here, which allows you to attach an arbitrary
Flow
to the output obtained in another step of the graph (
Source
or another
Flow
). The result is a stream of elements, each of which corresponds to a field in a single line of the CSV file. Then they can be transformed into a model of our subject area:
class Reading { private final int id; private final double value; private Reading(int id, double value) { this.id = id; this.value = value; } double getValue() { return value; } @Override public String toString() { return String.format("Reading(%d, %f)", id, value); } static Reading create(Collection<ByteString> fields) { List<String> fieldList = fields.stream().map(ByteString::utf8String).collect(toList()); int id = Integer.parseInt(fieldList.get(0)); double value = Double.parseDouble(fieldList.get(1)); return new Reading(id, value); } }
For transformation as such, we use the
map
method and pass the reference to the
Reading.create
method:
private final Flow<Collection<ByteString>, Reading, NotUsed> readings = Flow.<Collection<ByteString>>create().map(Reading::create);
At the next stage, we have to add the readings in pairs, calculate the average value for each group and transmit the information further only when a certain threshold is reached. Since we want the average to be calculated asynchronously, we will use the
mapAsyncUnordered
method, which performs an asynchronous operation with a given level of parallelism:
private final Flow<Reading, Double, NotUsed> averageReadings = Flow.of(Reading.class) .grouped(2) .mapAsyncUnordered(10, readings -> CompletableFuture.supplyAsync(() -> readings.stream() .map(Reading::getValue) .collect(averagingDouble(v -> v))) ) .filter(v -> v > AVERAGE_THRESHOLD);
Having identified the above components, we are ready to fold one of them into an integral conveyor (with the help of the already known combinator
via
). It is absolutely not difficult:
private final Source<Double, NotUsed> liveReadings = newFiles .via(csvPaths) .via(fileBytes) .via(csvFields) .via(readings) .via(averageReadings);
NoteWhen combining components as shown above, the compiler protects us by not accidentally connecting two blocks containing incompatible data types.
Stream as a web socketNow we use Akka HTTP to create a simple web server that will play such roles:
- Provide a source of readings as a web socket,
- Produce a trivial web page that connects to a web socket and displays the data received.
It costs nothing to create a web server using Akka HTTP: you just need to inherit
HttpApp
and provide the required mappings on the DSL route:
class Server extends HttpApp { private final Source<Double, NotUsed> readings; Server(Source<Double, NotUsed> readings) { this.readings = readings; } @Override protected Route routes() { return route( path("data", () -> { Source<Message, NotUsed> messages = readings.map(String::valueOf).map(TextMessage::create); return handleWebSocketMessages(Flow.fromSinkAndSourceCoupled(Sink.ignore(), messages)); } ), get(() -> pathSingleSlash(() -> getFromResource("index.html") ) ) ); } }
Two routes are defined here:
/data
, that is, the end point of the web socket, and
/
along which a trivial frontend is issued. It is already clear how easy it is to provide
Source
from Akka Streams as the end point of a web socket: we take
handleWebSocketMessages
, the task of which is to improve the HTTP connection before connecting to the web socket and organize a stream in which incoming and outgoing data will be processed.
WebSocket
modeled as a stream, that is, outgoing and incoming messages are sent to the client. In this case, we want to ignore the incoming data and create such a stream, the “incoming” side of which is entered into
Sink.ignore()
. The “outgoing” side of the web socket handler thread is simply connected to our source from which the average values ​​come. All that has to be done with
double
numbers, in the form of which are presented are averages - convert each of them to
TextMessage
, this is a wrapper used in Akka for web socket data. Everything is simply done using the
map
method already familiar to us.
To start the server, you just need to run the
startServer
method, specifying the host name and port:
Server server = new Server(csvProcessor.liveReadings); server.startServer(config.getString("server.host"), config.getInt("server.port"));
FrontendTo retrieve data from a web socket and display it, we use absolutely simple JavaScript code that simply attaches the values ​​to the textarea. This code uses the ES6 syntax, which should normally run in any modern browser.
let ws = new WebSocket("ws://localhost:8080/data"); ws.onopen = () => log("WS connection opened"); ws.onclose = event => log("WS connection closed with code: " + event.code); ws.onmessage = event => log("WS received: " + event.data);
The
log
method attaches a message to the textarea, and also puts a timestamp.
LaunchTo run and test the application, you need:
- start the server (
sbt run
), - go to localhost : 8080 in the browser (or to the selected host / port, if you change the defaults),
- copy one or several files from
src/main/resources/sample-data
to the data
directory in the project root (if you did not change csv-processor.data-dir
in the configuration), - Watch how the data is displayed in the server logs and in the browser.
Add a mail triggerThe final touch in our application is a side channel in which we will simulate email alerts sent after every fifth element is received on a web socket. It should work "sideways" in order not to disturb the transfer of basic elements.
To implement this behavior, we use the more advanced feature of Akka Streams - the Graph DSL language - in which we write our own graph step, in which the stream splits into two parts. The first simply submits the values ​​to the web socket, and the second controls when the next 5 seconds will expire, and sends an email notification — see fig. 3
Fig. 3. Our own graph pitch for emailingWe will use the built-in step
Broadcast
, in which our input is sent to a set of announced conclusions. Also we will write our own catcher -
Mailer
:
private final Graph<FlowShape<Double, Double>, NotUsed> notifier = GraphDSL.create(builder -> { Sink<Double, NotUsed> mailerSink = Flow.of(Double.class) .grouped(EMAIL_THRESHOLD) .to(Sink.foreach(ds -> logger.info("Sending e-mail") )); UniformFanOutShape<Double, Double> broadcast = builder.add(Broadcast.create(2)); SinkShape<Double> mailer = builder.add(mailerSink); builder.from(broadcast.out(1)).toInlet(mailer.in()); return FlowShape.of(broadcast.in(), broadcast.out(0)); });
We begin to create our own graph step from the
GraphDSL.create()
method, where a copy of the graph
Builder
is provided,
Builder
- it is used for manipulating the graph structure.
Next, we define our own catcher, where
grouped
is used to combine incoming elements into groups of arbitrary size (default 5), after which these groups are sent down. For each such group, we will simulate a side effect: an e-mail notification.
Having defined our own catcher, we can use the
builder
instance to add it to the graph. We also add a
Broadcast
step with two outputs.
Next, you need to specify the connection between the elements of the graph - we want to connect one of the outputs of the
Broadcast
step to the email trap, and the other to make an output for the step of the graph written by us. Entering the step we wrote will be directly connected to the output of the
Broadcast
step.
Note 1The compiler cannot determine if all parts of the graph are correctly connected. However, this moment is checked by the materializer at runtime, so there will be no hanging elements at the input or at the output.
Note 2In this case, you can see that all the steps we have written are of the form Graph <S, M>, where S is the form that determines the number and types of inputs and outputs, and M is the materialized value (if any). Here we deal with the Flow form, that is, we have one input and one output.
At the last stage, we connect notifier as an additional step of the
liveReadings
pipeline, which will now take the following form:
private final Source<Double, NotUsed> liveReadings = newFiles .via(csvPaths) .via(fileBytes) .via(csvFields) .via(readings) .via(averageReadings) .via(notifier);
When you run the updated code, you will see mail notification entries appear in the log. The notification is sent whenever five more values ​​have passed through the web socket.
TotalIn this article, we studied the general concepts of stream data processing, learned how to build a lightweight data processing pipeline using Akka Streams. This is an alternative to the traditional approach used in Java EE.
We looked at how to use some of the processing steps built into Akka Streams, how to write your own step in Graph DSL. It also showed how to use Alpakka to stream data from the file system and the Akka HTTP protocol, which allows you to create a simple web server with a web socket on the endpoint that is seamlessly integrated with Akka Streams.
A full working example with the code from this article is
on GitHub . There are several additional
log
steps placed at different points. They help to more accurately imagine what is happening inside the pipeline. In the article I specifically lowered them to make it shorter.