📜 ⬆️ ⬇️

The problem of using CompletableFuture in multiple threads and its solution

image Java 8 has a new CompletableFuture class that allows you to conveniently write asynchronous code.
When using the CompletableFuture from several threads, I was faced with its unobvious behavior, namely that callbacks on it can be executed at all in those threads, as expected. About this and how I managed to solve the problem - I will tell in this article.

I developed an asynchronous, non-blocking single-threaded client to a server that used thread safe data structures. The tests passed without problems, but the benchmarks sometimes fell with ConcurrentModificationException on the internal structures of the single-threaded client.

Asynchrony in the client was implemented using CompletableFuture , all operations within the client were performed in one thread (hereinafter singleThreadExecutor code).

Client code snippet with a get method that is available to users:
')
 //   private final Set<CompletableFuture> pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>()); public CompletableFuture<String> get(String key) { CompletableFuture<String> future = new CompletableFuture<>(); //       singleThreadExecutor.execute(() -> { // future     pendingFutures.add(future); future.whenComplete((v, e) -> { // future       pendingFutures.remove(future); }); //            //     future.complete(data);    singleThreadExecutor }); return future; } 

It turned out that this can not be done.

Perhaps I would have known about this earlier if I had carefully read the javadoc for CompletableFuture .

View javadoc
For the completion of the action.

When using such an architecture, it is necessary that all callbacks from the CompletableFuture called on the same thread that CompletableFuture.complete makes.

For the above code, it seems to be the case. But benchmarks sometimes ended up with ConcurrentModificationException in code that pendingFutures over pendingFutures in the same client thread ( singleThreadExecutor ).

The fact is that the callback passed in the future.whenComplete (which calls pendingFutures.remove ) is sometimes performed in a completely different thread. Or rather, in the flow of the application that my client uses:

 Client client = new Client("127.0.0.1", 8080); CompletableFuture<String> result = client.get(key); result.thenAccept(data -> { System.out.println(data); }); 

Calling result.thenAccept in this application sometimes leads to calling other callbacks on the future that were added inside the client code itself.

Consider the problem with simple examples.


 Thread mainThread = Thread.currentThread(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); future.complete(null); 

This code always displays true , since the callback is executed on the same thread as the complete method.

But if the CompletableFuture has at least one call from another thread, the behavior may change:

 //  Thread mainThread = Thread.currentThread(); //   Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread) }); //  callback    future    executor.execute(() -> { future.thenRun(() -> { //nop }); }); // future future.complete(null); 

Such code can sometimes produce false .

The fact is that the thenRun call from the same future, but in the second thread, can trigger the callback in the first thenRun . In this case, the callback of the first thenRun will be called in the second thread.

This happens at the moment when future.complete(null) started to run, but did not have time to call callbacks, and thenRun volunteered in the second thread, which will execute all other callbacks on this future but already in its own flow.

The problems are solved simply:

 //  Thread mainThread = Thread.currentThread(); //   Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> secondThreadFuture = future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); //  callback    future    executor.execute(() -> { secondThreadFuture.thenRun(() -> { //nop }); }); // future future.complete(null); 

We simply added a secondThreadFuture, which depends on the outcome of the original future. And thenRun call on it in the second thread does not lead to the possible triggering of callbacks on the original future.

For guaranteed callbacks in user-defined threads, the CompletableFuture has async method implementations, for example, thenRunAsync , which need to be passed to the Executor. But async versions of methods may work slower than usual ones. Therefore, I did not want to use them once again.

Conclusion


The conclusion I made for myself: do not use one CompletableFuture object in several threads, if you need to be sure that all callbacks on it are executed in the specified stream. And if it is necessary to use several streams with one CompletableFuture, then it is enough to transfer to another stream not the original CompletableFuture , but a new one, which will depend on the original one. For example, like this:

 CompletableFuture<Void> secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> { //nop }); 

Source: https://habr.com/ru/post/325730/


All Articles