We continue the series of utilities that we share with you. Now it's Java again.
If you are already familiar with the Stream API and used it, then you know that this is a convenient way to process data. With the help of various built-in operations, such as map, filter, sort and others, you can convert the incoming data and get the result. Before the advent of streams, the developer was forced to imperatively describe the processing, that is, create a for loop on the elements, then compare, analyze and sort if necessary. The Stream API allows you to declare declaratively what you need to get without having to describe how to do it. This is somewhat similar to SQL when working with databases.

')
Stream made Java code more compact and readable. Another idea when creating the Stream API was to provide the developer with an easy way to parallelize tasks so that you can get performance gains on multi-core machines. However, it was necessary to avoid the complexity inherent in multi-threaded programming. And we managed to do this, the Stream API has methods BaseStream :: parallel and Collection.parallelStream (), which return a parallel stream.
That is, if we had the code:
Collection.stream().operation()
it is easy to parallelize it if you change one call
Collection.parallelStream().operation()
or in general for arbitrary stream:
Source.stream().parallel().operation()
As with any simple API, parallelStream () hides a complex mechanism for parallelizing operations. And the developer will have to face the fact that the use of parallel stream may not improve performance, and even worsen it, so it is important to understand what is happening behind the parallelStream () call. There is
an article by Doug Lea about when the use of parallel streams will have a positive effect. Attention should be paid to the following factors:
F is the operation that will be applied to each element of the stream. It must be independent - that is, it does not affect other elements besides the current one and does not depend on other elements (stateless non-interfering function)
S - data source (collection) is effectively split (efficiently splittable). For example, ArrayList is effectively a separable source, it is easy to calculate indices and intervals that can be processed in parallel. It is also efficient to handle HashMap. BlockingQueue, LinkedList and most IO sources are bad candidates for parallel processing.
Evaluate the benefits of parallel processing. On modern machines, it makes sense to parallelize tasks whose execution time exceeds 100 microseconds.
Thus, before using this tool, you need to understand how your task fits into the described limitations.
Experimenting with parallel () stumbled upon another interesting point related to the current implementation. Parallel () tries to execute your code in several streams and it becomes interesting who creates these streams and how they control them.
Let's try to run this code:
void parallel() { int result = IntStream.range(0, 3) .parallel() .peek(it -> System.out.printf("Thread [%s] peek: %d\n", Thread.currentThread().getName(), it)) .sum(); System.out.println("sum: " + result); }
Thread [ForkJoinPool.commonPool-worker-1] peek: 0 Thread [main] peek: 1 Thread [ForkJoinPool.commonPool-worker-0] peek: 2 sum: 3
Already interesting, it turns out that by default the parallel stream uses ForkJoinPool.commonPool. This pool is created statically, that is, when the ForkJoinPool is first accessed, it does not respond to shutdown () / shutdownNow () and lives until System :: exit is called. If the tasks do not specify a specific pool, they will be executed in the framework of commonPool.
Let's try to figure out what the size of commonPool is and look at the source code jdk1.8.0_111. For readability removed some calls that are not related to parallelism.
ForkJoinPool::makeCommonPool private static ForkJoinPool makeCommonPool() { int parallelism = -1; try {
From the same class constant:
static final int MAX_CAP = 0x7fff;
We are interested in parallelism, which is responsible for the number of workers in the pool. By default, the pool size is Runtime.getRuntime (). AvailableProcessors () - 1, that is, 1 less than the number of available cores. When you create a custom FJPool, you can set the desired level of parallelism through the constructor. And for commonPool, you can set the level via JVM parameters:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=n
From above, the property is limited to 32767 (0x7fff);
This can be useful if you do not want to give all the kernels for the ForkJoinPool tasks, perhaps your application normally disposes 4 of the 8 CPUs, then it makes sense to give the remaining 4 cores to FJ.
The question arises why the number of workers is 1 less than the number of cores. The answer can be seen in the documentation for ForkJoinPool.java:
When external threads submit to the common pool, they can perform subtask processing (see externalHelpComplete and related methods) upon joins. This caller-helps policy makes it sensible to set common pool parallelism level to one (or more) less than the total number of available cores, or even zero for pure caller-runs
That is, when a certain thread sends a task to the common pool, then the pool can use the calling thread (caller-thread) as a worker. That is why in the output of the program we saw main! The solution is found, ForkJoinPool is trying to load its tasks and the calling thread. In the code above, this is main, but if we call the code from another thread, we will see that it works for an arbitrary stream:
Thread t = new Thread(() -> { parallel(); }, "MyThread"); t.start(); t.join();
Thread [ForkJoinPool.commonPool-worker-1] peek: 0 Thread [MyThread] peek: 1 Thread [ForkJoinPool.commonPool-worker-0] peek: 2 sum: 3
Now we know a little more about the ForkJoinPool device and the parallel stream. It turns out that the number of workers in parallel stream is limited and these are general purpose workers, that is, they can be used by any other tasks that are run on commonPool. Let's try to understand what it is fraught for us in the development.
Consider the following code. For clarity, we run with -Djava.util.concurrent.ForkJoinPool.common.parallelism = 2, that is, there are 2 workers and a calling thread available for FJPool.commonPool.
final long ms = System.currentTimeMillis(); ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println("Parallelism: " + commonPool.getParallelism()); IntStream.range(0, commonPool.getParallelism() + 1).forEach((it) -> commonPool.submit(() -> { try { System.out.printf("[%d sec] [%s]: #%d start()\n", TimeUnit.SECONDS.convert(System.currentTimeMillis() - ms, TimeUnit.MILLISECONDS), Thread.currentThread().getName(), it); TimeUnit.SECONDS.sleep(5); } catch (Exception e) {e.printStackTrace();} System.out.printf("[%d sec] [%s]: #%d finish()\n", TimeUnit.SECONDS.convert(System.currentTimeMillis() - ms, TimeUnit.MILLISECONDS), Thread.currentThread().getName(), it); })); int result = IntStream.range(0, 3) .parallel() .peek(it -> System.out.printf("Thread [%s] peek: %d\n", Thread.currentThread().getName(), it)) .sum(); System.out.println("sum: " + result); commonPool.awaitTermination(100, TimeUnit.SECONDS);
Parallelism: 2 [0 sec] [ForkJoinPool.commonPool-worker-1]:
In the code, the following happens: we are trying to completely occupy the pool by sending parallelism + 1 task there (that is, 3 pieces in this case). After that we start parallel processing of a stream from the first example. The logs show that the parallel stream is executed in one stream, since all the resources of the pool are exhausted. Without knowing about such a feature, it will be difficult to understand if in your program the processing time of a request through BaseStream :: parallel increases.
What to do if you want to be sure that your code will actually be parallelized? There is a solution, you need to run parallel () on the custom pool, for this we will have to slightly modify the code from the example above and run the data processing code, like Runnable on the custom FJPool:
ForkJoinPool custom = new ForkJoinPool(2); custom.submit(() -> { int result = IntStream.range(0, 3) .parallel() .peek(it -> System.out.printf("Thread [%s] peek: %d\n", Thread.currentThread().getName(), it)) .sum(); System.out.println("sum: " + result); });
Parallelism: 2 [0 sec] [ForkJoinPool.commonPool-worker-1]:
Ok, now we have achieved our goal and are confident that our calculations are under control and no one can influence them from the outside.
Before you apply any, even the simplest tool you need to find out its features and limitations. For the parallel stream there are many such features and it is necessary to take into account how well your task is suitable for parallelization. Parallel stream work well if the operations are independent and do not store state, the data source can be easily divided into segments for parallel processing and the task really makes sense to perform in parallel. In addition, you need to take into account the specifics of the implementation and make sure that you use a separate thread pool for important calculations, and do not share the application with the general pool.
Questions and suggestions, as always, are welcome. this is part of our java
course and we are interested in the opinion on the material.