CompletableFuture
class that allows you to conveniently write asynchronous code.CompletableFuture
from several threads, I was faced with its unobvious behavior, namely that callbacks on it can be executed at all in those threads, as expected. About this and how I managed to solve the problem - I will tell in this article.ConcurrentModificationException
on the internal structures of the single-threaded client.CompletableFuture
, all operations within the client were performed in one thread (hereinafter singleThreadExecutor
code).get
method that is available to users: // private final Set<CompletableFuture> pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>()); public CompletableFuture<String> get(String key) { CompletableFuture<String> future = new CompletableFuture<>(); // singleThreadExecutor.execute(() -> { // future pendingFutures.add(future); future.whenComplete((v, e) -> { // future pendingFutures.remove(future); }); // // future.complete(data); singleThreadExecutor }); return future; }
CompletableFuture
.CompletableFuture
called on the same thread that CompletableFuture.complete
makes.ConcurrentModificationException
in code that pendingFutures
over pendingFutures
in the same client thread ( singleThreadExecutor
).future.whenComplete
(which calls pendingFutures.remove
) is sometimes performed in a completely different thread. Or rather, in the flow of the application that my client uses: Client client = new Client("127.0.0.1", 8080); CompletableFuture<String> result = client.get(key); result.thenAccept(data -> { System.out.println(data); });
result.thenAccept
in this application sometimes leads to calling other callbacks on the future that were added inside the client code itself. Thread mainThread = Thread.currentThread(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); future.complete(null);
true
, since the callback is executed on the same thread as the complete method.CompletableFuture
has at least one call from another thread, the behavior may change: // Thread mainThread = Thread.currentThread(); // Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread) }); // callback future executor.execute(() -> { future.thenRun(() -> { //nop }); }); // future future.complete(null);
false
.thenRun
call from the same future, but in the second thread, can trigger the callback in the first thenRun
. In this case, the callback of the first thenRun
will be called in the second thread.future.complete(null)
started to run, but did not have time to call callbacks, and thenRun
volunteered in the second thread, which will execute all other callbacks on this future but already in its own flow. // Thread mainThread = Thread.currentThread(); // Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> secondThreadFuture = future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); // callback future executor.execute(() -> { secondThreadFuture.thenRun(() -> { //nop }); }); // future future.complete(null);
thenRun
call on it in the second thread does not lead to the possible triggering of callbacks on the original future.CompletableFuture
has async method implementations, for example, thenRunAsync
, which need to be passed to the Executor. But async versions of methods may work slower than usual ones. Therefore, I did not want to use them once again.CompletableFuture
object in several threads, if you need to be sure that all callbacks on it are executed in the specified stream. And if it is necessary to use several streams with one CompletableFuture, then it is enough to transfer to another stream not the original CompletableFuture
, but a new one, which will depend on the original one. For example, like this: CompletableFuture<Void> secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> { //nop });
Source: https://habr.com/ru/post/325730/
All Articles