I offer the readers of Habrakhabr the translation of the publication “ExecutorService - 10 tips and tricks” .
The ExecutorService abstraction was introduced back in Java 5. The year was 2004 ... For a moment, now Java 5 and 6 are no longer supported and Java 7 is preparing to add to the list. And many Java programmers still do not fully understand how ExecutorService works. There are many sources at your disposal, but now I would like to tell you about little-known subtleties and practices for working with it.
1. Name thread pools
I can not mention this. When dumping or during debugging, you can see that the standard flow naming scheme is as follows:
pool-N-thread-M , where N denotes the sequential number of the pool (every time you create a new pool, the global counter N is incremented) thread number in the pool. For example,
pool-2-thread-3 means the third thread in the second pool of the JVM life cycle. See:
Executors.defaultThreadFactory () . Not very informative, is it? JDK makes it a little difficult to properly name threads, because The naming strategy is hidden inside
ThreadFactory . Fortunately, Google Guava has a built-in class for this:
import com.google.common.util.concurrent.ThreadFactoryBuilder; final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("-%d") .setDaemon(true) .build(); final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
By default,
non-daemon thread pools are created, decide for yourself which ones are more appropriate.
')
2. Change names depending on context.
I learned about this trick from the article
“Supercharged jstack: How to Debug Your Servers at 100mph” . Once we know about the names of the streams, we can change them in runtime whenever we want! This makes sense since the stream dump contains the names of classes and methods without parameters and local variables. By including some important information in the stream name, we can easily trace which messages / posts / requests, etc. inhibit the system or cause deadlock.
private void process(String messageId) { executorService.submit(() -> { final Thread currentThread = Thread.currentThread(); final String oldName = currentThread.getName(); currentThread.setName("-" + messageId); try {
Inside the
try-finally block, the current thread is called
Processing-current-message ID , which can be useful when tracking the message flow in the system.
3. Explicit and secure completion
A queue of tasks lies between the client threads and the thread pool. When the application finishes, you have to worry about two things: what will happen with the tasks waiting in the queue, and how the already running ones will behave (more on that later). Surprisingly, many developers do not close the thread pool properly. There are two ways: either allow me to work all the tasks in the queue (
shutdown () ), or delete them (
shutdownNow () ) - depending on the specific case. For example, if we have queued a set of tasks and want to return control as soon as all of them are executed, use
shutdown () :
private void sendAllEmails(List<String> emails) throws InterruptedException { emails.forEach(email -> executorService.submit(() -> sendEmail(email))); executorService.shutdown(); final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES); log.debug(" ? {}", done); }
In this example, we send a batch of letters, each as a separate task for the thread pool. After placing these tasks in the queue, we close the pool so that it can no longer accept new tasks. Next, we wait a maximum of one minute until all tasks are completed. However, if some tasks have not yet been completed,
awaitTermination () will simply return
false . In addition, the remaining tasks will continue. I know hipsters are ready to go for:
emails.parallelStream().forEach(this::sendEmail);
Call me old fashioned, but I like to control the number of parallel threads. An alternative to gradual
shutdown () is
shutdownNow () :
final List<Runnable> rejected = executorService.shutdownNow(); log.debug(" : {}", rejected.size());
This time, all queued tasks are discarded and returned. Already running tasks are allowed to continue.
4. Handle thread interruption with care.
A less well-known feature of the
Future interface is the ability to cancel. The following is one of my previous articles:
InterruptedException and interrupting threads explained .
Since the
InterruptedException exception is explicitly checked, no one, most likely, did not even think about how many errors it has suppressed over all these years. And since it must be processed, many do it wrongly or thoughtlessly. Let's look at a simple example of a stream that periodically does some sort of cleaning, and in the intervals it sleeps most of the time.
class Cleaner implements Runnable { Cleaner() { final Thread cleanerThread = new Thread(this, ""); cleanerThread.start(); } @Override public void run() { while(true) { cleanUp(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {
This code is terrible anyway!
- Running a stream from a constructor is often a bad idea. For example, some frameworks, such as Spring, like to create dynamic subclasses to support interception methods. Ultimately, we get two threads running from two instances.
- The InterruptedException exception is swallowed, not handled properly.
- This class starts a new thread in each instance. Instead, it should use the ScheduledThreadPoolExecutor , which will produce the same threads for many objects, which is more reliable and efficient.
- In addition, using the ScheduledThreadPoolExecutor, we can avoid writing sleep / work cycles and go to work really on schedule.
- Last but not least. There is no way to get rid of this thread, even if no one else refers to the Cleaner instance.
All of the problems listed are important, but suppressing
InterruptedException is the greatest sin. Before we understand why, let's think about why this exception is needed and how we can use its advantages to gracefully interrupt threads. Many blocking operations in the JDK oblige how to handle
InterruptedException , for example:
- Object.wait ()
- Thread.sleep ()
- Process.waitFor ()
- A number of blocking methods in java.util.concurrent. * , Such as ExecutorService.awaitTermination (), Future.get (), BlockingQueue.take (), Semaphore.acquire () Condition.await () and many, many others
- SwingUtilities.invokeAndWait ()
Note that blocking I / O does not throw an InterruptedException (which is unfortunate). If all these classes declare
InterruptedException , you may be surprised when these exceptions are thrown:
- When a thread is blocked on any method that declares an InterruptedException , and you call Thread.interrupt () on that thread, the blocking method will most likely immediately throw an InterruptedException .
- If you have queued a task ( ExecutorService.submit () ) and called Future.cancel (true) while it is still running, the thread pool will try to interrupt the thread executing this task, effectively completing it.
Knowing what
InterruptedException really is, we can handle it correctly. If someone tries to interrupt our thread, and we discovered this, by handling
InterruptedException , it would be wise to allow it to terminate immediately, for example:
class Cleaner implements Runnable, AutoCloseable { private final Thread cleanerThread; Cleaner() { cleanerThread = new Thread(this, "Cleaner"); cleanerThread.start(); } @Override public void run() { try { while (true) { cleanUp(); TimeUnit.SECONDS.sleep(1); } } catch (InterruptedException ignored) { log.debug("Interrupted, closing"); } }
Notice that the
try-finally block in this example surrounds a
while loop . Thus, if
sleep () throws an
InterruptedException , we will interrupt this cycle. You may argue that we should log the
InterruptedException exception stack. It depends on situation. In this case, the interruption of the flow is the expected behavior, not a fall. In general, at your discretion. In most cases, the thread will stop during sleep () and we will quickly end the
run () method at the same time. If you are very careful, you will probably ask - and what will happen if the flow is interrupted during the
cleanUp () cleanup ? Often you will come up with a decision to manually set a flag, like this:
private volatile boolean stop = false; @Override public void run() { while (!stop) { cleanUp(); TimeUnit.SECONDS.sleep(1); } } @Override public void close() { stop = true; }
Remember that the stop flag (it must be volatile!) Will not interrupt blocking operations, we must wait until the sleep () method completes. On the other hand, this explicit flag gives us better control, because we can monitor it at any time. It turns out that thread interruption works the same way. If someone interrupted the stream while he was performing non-blocking calculations (for example,
cleanUp () ), such calculations would not be interrupted immediately. However, the thread has already been marked as interrupted, so any next blocking operation such as
sleep () will immediately stop and throw an
InterruptedException , so we will not lose this signal.
We can also take advantage of this fact if we implement a non-blocking stream, which still wants to take advantage of the thread interruption mechanism. Instead of relying on
InterruptedException , we should just check
Thread.isInterrupted () periodically
: public void run() { while (Thread.currentThread().isInterrupted()) { someHeavyComputations(); } }
As you can see, if someone interrupts our stream, we will cancel the calculations as soon as the previous iteration of
someHeavyComputations ()
allows . If it runs for a very long time or forever, we will never reach the interrupt flag. It is noteworthy that this flag is not one-time. We can call
Thread.interrupted () instead of
isInterrupted (), which will reset the value of the flag and we can continue. Sometimes you may want to ignore the interrupt flag and continue execution. In this case,
interrupted () may come in handy.
If you are an old school programmer, you probably remember the
Thread.stop () method, which was outdated 10 years ago. In Java 8, there were plans for its “de-implementation”, but in 1.8u5 it is still with us. However, do not use it and refactor any code in which it occurs using
Thread.interrupt () .
You may sometimes want to completely ignore InterruptedException. In this case, look at the
Uninterruptibles class from Guava. It contains many methods such as
sleepUninterruptibly () or
awaitUninterruptibly (CountDownLatch). Just be careful with them. They do not declare
InterruptedException , but they also completely eliminate the interruption flow, which is rather unusual.
So, now you have an understanding of why some methods throw
InterruptedException :
- Thrown InterruptedException must be adequately handled in most cases.
- Suppressing InterruptedException is often a bad idea.
- If the thread was interrupted during non-blocking calculations. Use isInterrupted ().
5. Watch the queue length and determine the boundary.
Poor-sized thread pools can lead to poor performance, instability, and memory leaks. If you specify too few threads, the queue will grow, consuming a lot of memory. On the other hand, too many threads will slow down the entire system due to frequent context switches, which will lead to the same symptoms. It is important to maintain the queue depth and determine its boundaries. An overloaded pool can simply temporarily abandon new tasks.
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);
The above code is equivalent to
Executors.newFixedThreadPool (n) , but instead of using the default unlimited
LinkedBlockingQueue , we use an
ArrayBlockingQueue with a fixed capacity of 100. This means that if 100 tasks have already been recruited, the next task will be rejected with the exception of
RejectedExecutionException . In addition, since the queue is now accessible from the outside, we can periodically inquire about its size in order to write to the log, send to JMX, etc.
6. Remember exception handling
What is the result of the following code?
executorService.submit(() -> { System.out.println(1 / 0); });
I was puzzled by how many times he did not print anything. No sign of
java.lang.ArithmeticException: / by zero , nothing. The thread pool just swallowed the exception, as if it had never been thrown away. If it were a thread created from scratch, without a wrapper in the form of a pool, the
UncaughtExceptionHandler could work. But with the thread pool, you need to be more careful. If you send a
Runnable (without any result, as above), you must put the entire method body inside a
try-catch . If you are queuing Callable, make sure that you always retrieve the result using the get () blocker to rethrow the exception:
final Future<Integer> division = executorService.submit(() -> 1 / 0);
It is noteworthy that even the
Spring framework made this mistake in
@Async , see:
SPR-8995 and
SPR-12090 .
7. Keep track of waiting times in the queue.
Monitoring of the working queue depth is one-way. When solving problems with a single transaction / task, it makes sense to see how much time has passed between the statement of the problem and the beginning of its execution. This time should ideally tend to zero (when there is an idle thread in the pool), but it will increase as tasks are queued. In addition, if the pool does not have a fixed number of threads, launching a new task may require the birth of a new thread, which will also take some time. To clearly measure this indicator, wrap the original
ExecutorService into something similar:
public class WaitTimeMonitoringExecutorService implements ExecutorService { private final ExecutorService target; public WaitTimeMonitoringExecutorService(ExecutorService target) { this.target = target; } @Override public <T> Future<T> submit(Callable<T> task) { final long startTime = System.currentTimeMillis(); return target.submit(() -> { final long queueDuration = System.currentTimeMillis() - startTime; log.debug(" {} {} ", task, queueDuration); return task.call(); } ); } @Override public <T> Future<T> submit(Runnable task, T result) { return submit(() -> { task.run(); return result; }); } @Override public Future<?> submit(Runnable task) { return submit(new Callable<Void>() { @Override public Void call() throws Exception { task.run(); return null; } }); }
This is not a complete implementation, but the essence is clear. At the moment when we set the task to the thread pool, we immediately noted the time. Then they stopped the stopwatch as soon as the task was retrieved and sent for execution. Do not be deceived by the proximity
startTime and
queueDuration in the source code. In fact, these two strings are executed in different threads, in milliseconds or even in seconds from each other.
8. Keep track of client stack
Nowadays, increased attention is paid to reactive programming:
Reactive manifesto ,
reactive streams ,
RxJava (already 1.0!),
Clojure agents ,
scala.rx ... It all looks great, but the structure is no longer your friend, it is by and large useless. Consider, for example, the following exception that occurs during the execution of a job in a thread pool:
java.lang.NullPointerException: null
at com.nurkiewicz.MyTask.call (Main.java:76) ~ [classes /: na]
at com.nurkiewicz.MyTask.call (Main.java:72) ~ [classes /: na]
at java.util.concurrent.FutureTask.run (FutureTask.java:266) ~ [na: 1.8.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) ~ [na: 1.8.0]
at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) ~ [na: 1.8.0]
at java.lang.Thread.run (Thread.java:744) ~ [na: 1.8.0]
We can easily notice that
MyTask threw NPE on line 76. But we have no idea who approved this task, since the stack only applies to
Thread and
ThreadPoolExecutor . Technically, we can simply navigate through the code in the hope of finding only one area where the
MyTask is being queued. But without separate streams (not to mention event-oriented, reactive, etc. programming), we always see the whole picture at once. What if we could save the client's codetect stack (of what the task initiates) and show it, for example, when an error occurs? The idea is not new, for example,
Hazelcast distributes exceptions from the owner node to the client code. Below is a simple example of how to do this:
public class ExecutorServiceWithClientTrace implements ExecutorService { protected final ExecutorService target; public ExecutorServiceWithClientTrace(ExecutorService target) { this.target = target; } @Override public <T> Future<T> submit(Callable<T> task) { return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName())); } private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) { return () -> { try { return task.call(); } catch (Exception e) { log.error(" {} {}:", e, clientThreadName, clientStack); throw e; } }; } private Exception clientTrace() { return new Exception(" "); } @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return tasks.stream().map(this::submit).collect(toList()); }
This time, in case of failure, we retrieve the complete spectrum and the name of the thread where the task was queued. Much more valuable information than the standard exception discussed earlier:
Exception java.lang.NullPointerException in the task from the main stream:
java.lang.Exception: Client Framework
at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace (ExecutorServiceWithClientTrace.java:43) ~ [classes /: na]
at com.nurkiewicz.ExecutorServiceWithClientTrace.submit (ExecutorServiceWithClientTrace.java:28) ~ [classes /: na]
at com.nurkiewicz.Main.main (Main.java:31) ~ [classes /: na]
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) ~ [na: 1.8.0]
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) ~ [na: 1.8.0]
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) ~ [na: 1.8.0]
at java.lang.reflect.Method.invoke (Method.java:483) ~ [na: 1.8.0]
at com.intellij.rt.execution.application.AppMain.main (AppMain.java:134) ~ [idea_rt.jar: na]
9. Prefer CompletableFuture
In Java 8, the more powerful
CompletableFuture class was introduced. Please use it where possible.
ExecutorService has not been extended to support this abstraction, so you should take care of this yourself. Instead:
final Future<BigDecimal> future = executorService.submit(this::calculate);
Use:
final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::calculate, executorService);
CompletableFuture extends
Future , so everything works as before. But the more advanced users of your API will truly appreciate the advanced functionality provided by CompletableFuture.
10. Synchronous queues
SynchronousQueue is an interesting variation of
BlockingQueue , which is actually not exactly a queue. It is not even a data structure per se. Best of all, it can be defined as a queue with zero capacity.
This is what
JavaDoc says:
Each added operation must wait for the corresponding delete operation on another thread, and vice versa. A synchronous queue has no internal capacity, even a single one. You cannot look into the synchronous queue, because the element is presented only when you try to delete it; you cannot insert an element (using any method) until another thread deletes it: you cannot bypass the queue because there is nothing to bypass.
Synchronous queues are similar to the “rendezvous channels” used in CSP and Ada.
How does all this relate to thread pools? Let's try to use the
SynchronousQueue with the
ThreadPoolExecutor :
BlockingQueue<Runnable> queue = new SynchronousQueue<>(); ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, queue);
We created a pool of threads with two threads and a
SynchronousQueue before this. In fact, the
SynchronousQueue is a queue with a capacity of 0, so such
ExecutorService will only accept new tasks if an idle thread is available. If all threads are busy, the new task will be immediately rejected and will never wait in the queue. This mode can be useful for immediate background processing, if possible.
That's all, I hope you discovered at least one interesting feature!