 Java 8 has a new
 Java 8 has a new CompletableFuture class that allows you to conveniently write asynchronous code.CompletableFuture from several threads, I was faced with its unobvious behavior, namely that callbacks on it can be executed at all in those threads, as expected. About this and how I managed to solve the problem - I will tell in this article.ConcurrentModificationException on the internal structures of the single-threaded client.CompletableFuture , all operations within the client were performed in one thread (hereinafter singleThreadExecutor code).get method that is available to users: //   private final Set<CompletableFuture> pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>()); public CompletableFuture<String> get(String key) { CompletableFuture<String> future = new CompletableFuture<>(); //       singleThreadExecutor.execute(() -> { // future     pendingFutures.add(future); future.whenComplete((v, e) -> { // future       pendingFutures.remove(future); }); //            //     future.complete(data);    singleThreadExecutor }); return future; } CompletableFuture .CompletableFuture called on the same thread that CompletableFuture.complete makes.ConcurrentModificationException in code that pendingFutures over pendingFutures in the same client thread ( singleThreadExecutor ).future.whenComplete (which calls pendingFutures.remove ) is sometimes performed in a completely different thread. Or rather, in the flow of the application that my client uses: Client client = new Client("127.0.0.1", 8080); CompletableFuture<String> result = client.get(key); result.thenAccept(data -> { System.out.println(data); }); result.thenAccept in this application sometimes leads to calling other callbacks on the future that were added inside the client code itself. Thread mainThread = Thread.currentThread(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); future.complete(null); true , since the callback is executed on the same thread as the complete method.CompletableFuture has at least one call from another thread, the behavior may change: //  Thread mainThread = Thread.currentThread(); //   Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread) }); //  callback    future    executor.execute(() -> { future.thenRun(() -> { //nop }); }); // future future.complete(null); false .thenRun call from the same future, but in the second thread, can trigger the callback in the first thenRun . In this case, the callback of the first thenRun will be called in the second thread.future.complete(null) started to run, but did not have time to call callbacks, and thenRun volunteered in the second thread, which will execute all other callbacks on this future but already in its own flow. //  Thread mainThread = Thread.currentThread(); //   Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> secondThreadFuture = future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); //  callback    future    executor.execute(() -> { secondThreadFuture.thenRun(() -> { //nop }); }); // future future.complete(null); thenRun call on it in the second thread does not lead to the possible triggering of callbacks on the original future.CompletableFuture has async method implementations, for example, thenRunAsync , which need to be passed to the Executor. But async versions of methods may work slower than usual ones. Therefore, I did not want to use them once again.CompletableFuture object in several threads, if you need to be sure that all callbacks on it are executed in the specified stream. And if it is necessary to use several streams with one CompletableFuture, then it is enough to transfer to another stream not the original CompletableFuture , but a new one, which will depend on the original one. For example, like this: CompletableFuture<Void> secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> { //nop }); Source: https://habr.com/ru/post/325730/
All Articles