What is the need for reconstruction if the source code of this class is open?
If only because there is a highly optimized, hard-to-read code under the hood, the study of which does little in pedagogical terms.
Therefore, we will recreate the semantics of operations according to their specifications, and write functionally equivalent, understandable and readable code, although perhaps not the most economical in terms of memory and processor time.
Let's start with a relatively simple method:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. Type Parameters: U - the function's return type Parameters: supplier - a function returning the value to be used to complete the returned CompletableFuture executor - the executor to use for asynchronous execution Returns: the new CompletableFuture
Read carefully the specification:
Returns a new CompletableFuture
That is, an object of type CompletableFuture
or its subclass is created and returned as a result.
that is asynchronously completed by a task running in the given executor`
In addition, a task is created that is executed on Executor
'e.
As we know, Executor
accepts only objects of type Runnable
.
Runnable is an interface, and the first object can easily implement it - so we combine two functions in one object.
completed ... with the value obtained by calling the given Supplier.
This Runnable
should call the given Supplier
and the resulting value to complete the created CompletableFuture
.
Supplier
is a function without parameters, so encoding it all is very simple:
class CompletableFutureForSupplyAsync<U> extends CompletableFuture<U> implements Runnable { Supplier<U> supplier; public CompletableFutureForSupplyAsync(Supplier<U> supplier) { this.supplier = supplier; } public void run() { try { U result = supplier.get(); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { CompletableFutureForSupplyAsync<U> task = new CompletableFutureForSupplyAsync<>(supplier); executor.execute(task); return task; }
The following example is somewhat more complicated:
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion. Specified by: thenApplyAsync in interface CompletionStage<T> Type Parameters: U - the function's return type Parameters: fn - the function to use to compute the value of the returned CompletionStage executor - the executor to use for asynchronous execution Returns: the new CompletionStage
Returns a new CompletionStage that... is executed using the supplied Executor
Here we are directly offered to issue the object being created to issue in the form of Runnable
.
... with this stage's result as the argument to the supplied function.
but this is more interesting. The function passed to us has a parameter, and the value of this parameter is the value that completes the current CompletionStage
. At the time of the call to thenApplyAsync
this value may not be known, so we cannot immediately start the task on the Executor
. Instead, we have to negotiate with the current CompletionStage
,
so that at the time of its completion it transfers its value to the task. Among the multiple CompletionStage
methods there is one that is exactly suitable for this purpose, this is whenComplete
:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes.
That is, in the newly created object-object, it is enough to implement the BiConsumer
interface to accept the argument:
class CompletableFutureForApplyAsync<T, U> extends CompletableFuture<U> implements Runnable, BiConsumer<T,Throwable> { Function<? super T,? extends U> fn; Executor executor; T arg; Throwable throwable; public CompletableFutureForApplyAsync(Function<? super T,? extends U> fn, Executor executor) { this.fn = fn; this.executor = executor; } @Override // implementation of BiConsumer interface public void accept(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; } else { this.arg = argument; } executor.execute(this); } @Override public void run() { if (throwable == null) { try { U result = fn.apply(arg); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } else { super.completeExceptionally(throwable); } } } public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor ) { CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); this.whenComplete(task); return task; } }
This example is very important for understanding the nature of asynchronous programming, so once again we list its main steps:
1) an asynchronous procedure is created:
CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor);
2) it is not yet ready for execution, so we ask the supplier of the missing argument to pass this argument to us in the future, calling the method we submitted:
this.whenComplete(task);
3) in this method we not only save the received argument, but also run the task for execution (see the accept
() method).
4) the execution of the task is reduced to the execution of the function submitted to us and the saving of the result.
This result can also be requested by other procedures using the whenComplete
() method, already applied to our newly constructed object, so that we can construct a chain of asynchronous procedures of arbitrary length. But this chain will be executed in a strictly sequential manner, without any parallelism.
But how to depict a more complex calculation diagram containing parallel branches?
To do this, use the thenCombineAsync
method.
If in the previous example we ran an asynchronous procedure with one argument, then in this one - with two.
In this case, the calculation of both arguments can occur in parallel.
ublic <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) Description copied from interface: CompletionStage Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function.
Here, everything is the same as in the previous example with thenApplyAsync
, but the parameter-function is already thenApplyAsync
from two arguments, and the parameter CompletionStage<? extends U> other
CompletionStage<? extends U> other
, which is an asynchronous provider of the second argument.
How do we handle the second argument?
Well, first, instead of one variable T arg
describe two: T arg1; U arg2;
T arg1; U arg2;
, instead of a single public method, void accept(T argument, Throwable throwable)
describe two - accept1
and accept2
,
each of which works with its argument.
At the same time, our object under construction no longer implements the interface BiConsumer<T,Throwable>
and we can no longer write a key sentence for the connection of nodes of the asynchronous computing graph
this.whenComplete(task);
Fortunately, the object of the functional interface can be depicted as a reference to the method, without enclosing it in a separate class:
this.whenComplete(task::accept1); other.whenComplete(task::accept2);
That is, the current this
object supplies the first argument, and the other
object the second one.
Here, method codes will have to be changed so that they do not start the task immediately upon the arrival of their argument, but also check the arrival of the second one:
public synchronized void accept1(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; executor.execute(this); } else { this.arg1 = argument; if (arg2 != null) { executor.execute(this); } } }
The accept2 method is described similarly.
Note that:
null
is not the best way; maybe you should add a boolean to each argument.In this way, you can make asynchronous procedures from a larger number of arguments than two, but the thought comes right away - can you still make a separate class for the parameters, so as not to write your method to accept each parameter, but get by dynamically creating the parameters?
Parameter<Integer> arg1 = new Parameter<>(); Parameter<Float> arg2 = new Parameter<>(); ... future1.whenComplete(arg1); future2.whenComplete(arg2);
Yes, such a class can be created, but more on that next time.
A brief summary of the above:
But the means of communication flows and asynchronous procedures are fundamentally different.
Threads are associated with semaphores, blocking queues, and other similar objects.
which block the recipient's stream, if the information has not yet arrived, but the stream is already trying to extract it using a pull-based operation.
Asynchronous procedures - recipients simply do not enter execution until all the information they need is ready.
They passively wait until the information providers themselves pass it through push-based operations.
Because of this, they do not spend memory on the stack while waiting, and, therefore, take up much less memory than execution threads.
The set of CompletableFuture
methods does exactly that, and in principle, these methods can be dispensed with by creating objects explicitly, as shown in the examples above.
But for this you need to have classes similar to those described in these examples.
For some reason, the creators of java.util.concurrent
chose not to give users access to these classes and hid them in the depths of the CompletableFuture
code.
Those who want to have a visual representation of the asynchronous network being created can reconstruct these classes by continuing the examples given. The source code of the examples is available on Github .
Source: https://habr.com/ru/post/418547/
All Articles