java.io.File
reference). Let's start with an implementation that uses the good old java.io.InputStream
: package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) { //1 byte[] data = new byte[FileCopyUtils.BUFFER_SIZE]; int res; while ((res = in.read(data, 0, data.length)) != -1) { //2 consumer.accept(BytesPayload.from(data, res)); //3 } } } }
java.io.File
Consumer<BytesPayloadgt;
called when new data arrivesjava.io.FileInputStream
, inherited from java.io.InputStream
). What is wrong with this example? In this case, we use the InputStream, pointing to the data located in our file system. If the file is there and the hard disk is functioning, then this code will work as expected.File
, but from a network socket, and, moreover, we use another InputStream
implementation? There is nothing to worry about! Of course, there will be nothing to worry about, if the speed of the network is infinitely great. And if the network channel between this and another node will never fail. If these conditions are met, then the code will work perfectly.in.read(…)
. In fact, she may never return! This is a problem if we try to do something else with the stream from which we read data. Of course, you can always create another stream and read data through it. Up to a certain point, this can be managed, but in the end we will reach the limit at which simply adding flows for further scaling will not be enough. We will not have true competition over the number of cores that are on our machine. Dead end! In this case, we can increase the processing of input / output (here we mean reading) only through additional threads, and here we will sooner or later reach the limit. package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath(); // 1 this.fileChannel = AsynchronousFileChannel.open(path, Collections.singleton(StandardOpenOption.READ), this.executorService); //2 ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE); this.fileChannel.read(buffer, position, buffer, this); //3 while (this.bytesRead > 0) { this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } } @Override public void completed(Integer result, ByteBuffer buffer) { //4 this.bytesRead = result; if (this.bytesRead < 0) return; buffer.flip(); byte[] data = new byte[buffer.limit()]; buffer.get(data); //5 consumer.accept(BytesPayload.from(data, data.length)); buffer.clear(); this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { log.error(exc); } }
java.io.File
, making it Java NIO java.nio.file.Path
Channel
, we, in particular, specify the java.util.concurrent.ExecutorService
service, which will be used to call the CompletionHandler
handler when the necessary data for this appearsCompletionHandler<Integer, ByteBuffer> (this)
ByteBuffer
to the capacity of byte[]
Synchronous
example, byte[]
data is passed to the consumer.Java NIO Channel
, and then processes this data in a separate thread responsible for callbacks. Thus, the stream in which the reading has begun is not monopolized. We return almost instantly after the call .read(..)
, and when, finally, we have the data at our disposal, the callback is made - in a different thread. If there is a delay between .read()
calls, you can move on to other matters by performing them in our stream. The duration of an asynchronous read operation, from the first byte to the last, is at best no more than that of a synchronous read operation. Normally, an asynchronous operation is not significantly longer. However, by going to such additional difficulties, we can more effectively handle our flows. Do more work, multiplex I / O in a pool with a finite number of threads.Channel
or InputStream
implementations for everyday work! Problems have to reflect at the level of higher-level abstractions. It's about things like arrays or, rather, about the java.util.Collection
hierarchy. The java.util.Collection
collection is very well displayed on the InputStream: both entities assume that you can operate with all the data at once, and almost instantly. It is expected that you will be able to complete reading from most of the InputStreams
earlier, not later. Collection types become a bit awkward when moving to larger amounts of data. What to do if you are dealing with something potentially infinite (unlimited) - for example, with web sockets or server events? What if there is a delay between posts?Future<T>
or CompletableFuture<T>
are well suited for such a purpose, but they describe only one thing at a time that happens in the end. In fact, Java does not provide a suitable metaphor for describing this kind of data. Both Iterator
and the Stream
types from Java 8 can be unbound, however, both are pull-oriented; you yourself are requesting the next entry, and not the type should send a callback to your code. It is assumed that, if push-based processing was supported in this case, allowing much more to be achieved at the thread level, the API would also provide threading and scheduling control. Iterator
implementations say nothing about threading, and all Java 8 threads share the same fork-join pool.Iterator
and Stream
really supported push processing, we would face another problem that really escalates in the context of I / O: we will need some kind of backward penetration mechanism! Since the data consumer is processed asynchronously, we have no idea when the data will be in the pipeline and in what quantity. We do not know how much data will need to be processed on the next callback: one byte or one terabyte!InputStream
, you read as much information as you are willing to process, and no more. In the previous examples, we read the data into a byte[]
buffer of a fixed and known length. In an asynchronous context, we need some way to tell the provider how much data we are ready to process.Publisher<T>
; produces values ​​that may come eventually. Publisher<T>
; produces values ​​of type T
for Subscriber<T>
.Publisher<T>
interface . package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); }
Subscriber
type subscribes to Publisher<T>
, receiving notifications of any new values ​​of type T
through its onNext(T)
method. If any errors occur, its onError(Throwable)
method is onError(Throwable)
. When processing is completed normally, the subscriber’s onComplete
method is called.Subscriber<T>
interface. package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Subscriber
first connects to Publisher
, it gets the Subscription
in the Subscriber#onSubscribe
. Subscription subscriptions are perhaps the most important part of the entire specification; it is precisely it that provides the reverse flow. The Subscriber subscriber uses the Subscription#request
method to request additional data or the Subscription#cancel
method to stop processing.Subscription<T>
interface . package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); }
Processor<A,B>
is just an interface that inherits both Subscriber<A>
and Publisher<B>
.Processor<T>
interface . package org.reactivestreams; public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
java.util.concurrent.Flow
class, for example: java.util.concurrent.Flow.Publisher
.Publisher<T>
specializations.Flux<T>
, is Publisher
, producing zero or more values. The second, Mono<T>
is a Publisher<T>
that produces zero or one value. Both of them publish values ​​and can handle them accordingly, however, their capabilities are much broader than the Reactive Streams specification. Both provide operators, allow processing of value streams. Reactor types are well packaged — the output of one of them can serve as input for the other, and if the type needs to work with other data streams, they rely on instances of Publisher<T>
.Mono<T>
and Flux<T>
implement Publisher<T>
; recommend that your methods accept instances of Publisher<T>
, but return Flux<T>
or Mono<T>
; this will help the client to distinguish exactly which data he receives.Publisher<T>
and asked to display the user interface for this Publisher<T>
. In this case, is it necessary to display a detail page for one record, so how can you get a CompletableFuture<T>
? Or to display an overview page with a list or a grid, where all records are displayed page by page? It is hard to say.Flux<T>
and Mono<T>
very specific. You know that you need to display an overview page if Flux<T>
received, and a page with details for one (or none) records when you get Mono<T>
.Source: https://habr.com/ru/post/435972/
All Articles