📜 ⬆️ ⬇️

How it works in the java world. Thread pool

The basic principle of programming is: do not reinvent the wheel. But sometimes, to understand what is happening and how to use the tool incorrectly, we need to do it. Today we invent the pattern of multi-threaded execution of tasks.


Imagine that which causes a large CPU load:


public class Counter { public Double count(double a) { for (int i = 0; i < 1000000; i++) { a = a + Math.tan(a); } return a; } } 

We want to process a number of such tasks as quickly as possible, try *:


 public class SingleThreadClient { public static void main(String[] args) { Counter counter = new Counter(); long start = System.nanoTime(); double value = 0; for (int i = 0; i < 400; i++) { value += counter.count(i); } System.out.println(format("Executed by %ds, value : %f", (System.nanoTime() - start) / (1000_000_000), value)); } } 

On my wheelbarrow with 4 physical cores, use the top -pid {pid} processor resources:


image


Execution time 104 sec.


As you noticed, the load of one processor on one java-process with one executable thread is 100%, but the total processor load in user space is only 2.5%, and we have a lot of unused system resources.


Let's try using more by adding more worker threads:


 public class MultithreadClient { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(8); Counter counter = new Counter(); long start = System.nanoTime(); List<Future<Double>> futures = new ArrayList<>(); for (int i = 0; i < 400; i++) { final int j = i; futures.add( CompletableFuture.supplyAsync( () -> counter.count(j), threadPool )); } double value = 0; for (Future<Double> future : futures) { value += future.get(); } System.out.println(format("Executed by %ds, value : %f", (System.nanoTime() - start) / (1000_000_000), value)); threadPool.shutdown(); } } 

Busy resources:


image


ThreadPoolExecutor


To speed up, we used ThreadPool - in java, its role is played by ThreadPoolExecutor, which can be implemented directly or from one of the methods in the Utilities class. If we look inside the ThreadPoolExecutor, we can find the queue:


 private final BlockingQueue<Runnable> workQueue; 

in which tasks are collected if more threads are running than the size of the initial pool. If fewer threads of the initial pool size are running, the pool will try to start a new thread:


 public void execute(Runnable command) { ... if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; ... if (isRunning(c) && workQueue.offer(command)) { ... addWorker(null, false); ... } } 

Each addWorker launches a new thread with a Runnable task that polls workQueue for new tasks and executes them.


 final void runWorker(Worker w) { ... try { while (task != null || (task = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)) != null) { ... task.run(); ... } 

ThreadPoolExecutor has a very clear javadoc, so it makes no sense to rephrase it. Instead, let's try to make our own:


 public class ThreadPool implements Executor { private final Queue<Runnable> workQueue = new ConcurrentLinkedQueue<>(); private volatile boolean isRunning = true; public ThreadPool(int nThreads) { for (int i = 0; i < nThreads; i++) { new Thread(new TaskWorker()).start(); } } @Override public void execute(Runnable command) { if (isRunning) { workQueue.offer(command); } } public void shutdown() { isRunning = false; } private final class TaskWorker implements Runnable { @Override public void run() { while (isRunning) { Runnable nextTask = workQueue.poll(); if (nextTask != null) { nextTask.run(); } } } } } 

Now let's do the same task as above with our pool.
Change the line in MultithreadClient:


 // ExecutorService threadPool = Executors.newFixedThreadPool (8); ThreadPool threadPool = new ThreadPool (8); 

The execution time is almost the same - 15 seconds.


Thread pool size


Let's try to further increase the number of running threads in the pool - up to 100.


 ThreadPool threadPool = new ThreadPool(100); 

We can see that the run time has increased to 28 seconds - why did this happen?


There are several independent reasons why performance might fall, for example, due to constant switching of the processor context, when it pauses work on one task and has to switch to another, switching includes state saving and state recovery. While the processor is busy switching states, it does not do any useful work on any task.


The number of process context switches can be seen by looking at the csw parameter when displaying the top command.


On 8 threads:
image


On 100 threads:
image


How to choose a pool size?


The size depends on the type of tasks performed. Of course, the size of the thread pool rarely needs to be hardcoded, rather, it must be customizable and the optimal size is derived from monitoring the throughput of the tasks performed.


Assuming that the threads do not block each other, there are no I / O wait cycles, and the task processing time is the same, the optimal thread pool = Runtime.getRuntime (). AvailableProcessors () + 1.


If the threads mostly expect I / O, then the optimal pool size should be increased by the ratio between the process wait time and the computation time. For example. We have a process that spends 50% of the time in iowait, then the pool size can be 2 * Runtime.getRuntime (). AvailableProcessors () + 1.


Other types of pools


  1. A pool of threads with a memory limit that blocks sending a task when there are too many tasks in the queue MemoryAwareThreadPoolExecutor


  2. A thread pool that registers a JMX component to control and adjust the size of the pool at runtime.
    JMXEnabledThreadPoolExecutor

Source code can be found here .


[*] - the test is not accurate, for more accurate tests use: http://openjdk.java.net/projects/code-tools/jmh/


')

Source: https://habr.com/ru/post/326146/


All Articles