📜 ⬆️ ⬇️

Java 8 in parallel. Learn to create subtasks and monitor their implementation

We continue the series of articles devoted to processing large amounts of data in parallel (a beautiful word, is not it?).

In the previous article, we also introduced the interesting Fork / Join Framework toolkit, which allows you to split the processing into several parts and run parallel tasks. What's new in this article - you ask? I will answer - more informative examples and new mechanisms for high-quality information processing. In parallel, I will tell you about the resource and other features of work in this mode.


')
All interested invite under the cat:

Start


All that is well divided - divide. Something like this I wrote in the previous article, offering to divide the processing into parts and to load processors as much as possible (unless of course you have them). Somehow rudely sounded.

Yes, of course, there are, for a long time, even home computers are multi-core. Here lies the first feature of work in this mode. It is necessary to observe parity between the number of subtasks and the number of cores. According to numerous tests, the launch formula is approximately the following, the number of subtasks should be: (Number of cores +0) or (Number of cores +1). These options have been tested on several serious servers and several ordinary machines.

Restriction Mechanisms


By limiting mechanisms, I understand all sorts of mechanisms (“cut-offs”) as quickly and conveniently as possible to handle and debug errors. In my projects I try to create the maximum number of ways to debug code, for example:

a) Try to implement single-tasking and multi-tasking mechanisms for your calculations. What for? I'll explain now. Suppose you have a successfully submitted project, and even possibly a tested one. In case of a non-standard situation, the first thing to do is to quickly understand and correct the error, switch to the single-task mode (forcibly) and immediately get an error on the application server screen (if it is possible and has access).

It is more convenient to identify shortcomings in single-task mode - if you have parallel mode enabled, then when you stop the first subtask, you will see part of the second one (this is not always convenient for viewing).

b) Consider the mechanisms for accessing and downloading the constantly required data.

I'll tell you more. For example, you want to transfer about 100 huge tables to Map <K, V> before processing. Yes, it is quickly convenient, but there are some unpleasant moments.

Suppose you start testing big data. There is a problem in three contracts, blocks, clients, without a difference, in three positions (let's call them “positions”). You figured out what was wrong, fixed, rewritten jar, restarted and ... Nothing! We sit again, waiting for a few minutes. We are waiting for the calculation.

In this situation, we would be helped by the mechanisms of selective loading (s) of data.

For example, the next is not the best option. In fact, the construction of the map for all data. Although sometimes it is applied.

public Map<Long, String> getValueInDao(Date date) { Map<Long, String> valueMap = new HashMap<Long, String>(); HashMap map = new LinkedHashMap(); map.put("date", date); List<Values> ValuesList = this.findWithQuery("select c from Values c where c.vf < :date and c.vd >= :date", map); if (ValuesList.size() > 0) { ListIterator<Values> iterValues = ValuesList.listIterator(); while (iterValues.hasNext()) { Values tmpValues = iterValues.next(); valueMap.put(tmpValues.getId,tmpValues.getDescr ()); } } if (valueMap.size() > 0) { return valueMap; } return Collections.emptyMap(); } 

The next option is preferable (IMHO). In it, we limit ourselves to incoming data, since we only get what we really need.

 public Map<Long,String> getValue(List<Long> idList,Date date) { Map<Long,String> valueMap = new HashMap<Long, String>(); HashMap map = new LinkedHashMap(); List<Value> list = new ArrayList<>(); if (idList.size() > 1000) { int j = 0; for (int i = 0; i < idList.size() / 999; i++) { map.put("idList", idList.subList(j, j + 999)); map.put("date", date); list.addAll(this.findWithQuery("select c from Value c where c.id in (" + ":idList" + ") and c.val < :date and c.val2 >= :date", map)); map.clear(); j += 999; } if (j <= idList.size()-1) { map.put("idList", idList.subList(j, idList.size())); map.put("date", date); list.addAll(this.findWithQuery("select c from Value c where c.id in (" + ":idList" + ") and c.val < :date and c.val2 >= :date", map)); } } else { map.put("idList", idList); map.put("date", date); list = this.findWithQuery("select c from Value c where c.id in (" + ":idList" + ") and c.val < :date and c.val2 >= :date", map); } Iterator<Value> iter = list.iterator(); while(iter.hasNext()) { Value tmpValue = iter.next(); valueMap.put(tmpValue.getId, tmpValue.getValue()); } if (valueMap.size() > 0) { return valueMap; } return Collections.emptyMap(); } 

c) Immediately create mechanisms for recording and outputting errors in tables (files) and other sources. If your algorithms are clearly aligned, then nothing prevents you from creating a class that will work by key. For example, before loading there is a “flag” that allows you to write data to a table with errors, thus you know the exact area.

 if (varKeyError == 1) { --   , ,    err       } 


Faster…


Having a little talk about possible features / errors, let's move on to our immediate goal, namely, processing in parallel mode large amounts of information and slightly expand the existing example (brazenly take them from the previous article ).

It created several classes responsible for input data and processing. Then we were based on the RecursiveAction class. Let me remind, once again, what was done in the example. In the StreamSettings class, we split the data into parts until we reach the threshold value set at countValue = 500. As I explained earlier, you can create any restriction mechanism. For example, the option (valueId.size () / Runtime.getRuntime (). AvailableProcessors () * 2) is also operational and can be used to find some optimal value.

 public class StreamSettings extends RecursiveAction { final int countValue = 500; final int countProcessors = Runtime.getRuntime().availableProcessors(); List<ValueValue> data; int start, end; StreamSettings(List<ValueValue> valueId,int startNumber,int endNumber) { data = valueId; start = startNumber; end = endNumber; } protected void compute() { if (end - start < countValue || countProcessors < 2) { for(int i = start; i < end; i++) { ValueValue value = data.get(i); try { new CalcGo().calcGo(value); } catch (Exception e) { raiseApplicationError(" " + e.getMessage(), e); } } } else { int middle = (start + end)/ 2; invokeAll(new CalcGo(data, start, middle), new CalcGo(data,middle,end)); } } } 

We continue our research. Let's try to see the new processing options, let's focus on the RecursiveTask class. The main difference will be that the compute () method will return the result (and this is required very well). In fact, we can wait for the execution of several subtasks and make calculations. The following are examples that we will discuss in more detail.
The Stream class is responsible for splitting into subtasks. In the example, we find the mean value and create an instance of the class (Stream goVar1 = new Stream (forSplit, start, middle)) from 0 to the “middle” and at (Stream goVar2 = new Stream (forSplit, middle, end)) from the “middle” to the final element.

The difference from the previous variant, the class StreamSettings, is not used invokeAll, but the fork () and join () methods will be called, respectively.

 public class Stream extends RecursiveTask<Long>{ final int countProcessors = Runtime.getRuntime().availableProcessors(); final int countLimit = 1000; int start; int end; int forSplit; Stream(int componentValue,int startNumber, int endNumber) { forSplit = componentValue; start = startNumber; end = endNumber; } protected Long compute() { Long countSum = 0L; if ( countProcessors == 1 || end - start <= countLimit) { System.out.println("=run="); System.out.println("=start="+start); System.out.println("=end="+end); for(int i = start; i <= end; i++) { countSum += 1 ; } } else { int middle = (start + end)/ 2; /* invokeAll(new Stream(forSplit, start, middle), new Stream(forSplit, middle+1, end));*/ Stream goVar1 = new Stream(forSplit,start, middle); Stream goVar2 = new Stream(forSplit,middle,end); goVar1.fork(); goVar2.fork(); countSum = goVar1.join() + goVar2.join(); } return countSum; } } import java.util.concurrent.ForkJoinPool; public class Start { public static void main(String[] args) { final int componentValue = 2000; Long beginT = System.nanoTime(); ForkJoinPool fjp = new ForkJoinPool(); Stream test = new Stream(componentValue,0,componentValue); Long countSum = fjp.invoke(test); Long endT = System.nanoTime(); Long timebetweenStartEnd = endT - beginT; System.out.println("=====time========" +timebetweenStartEnd); System.out.println("=====countSum========" +countSum); } } 

As a result of correct alignment of launches and expectations of launches, you can create a convenient system for asynchronous execution of calculations.

Features of information processing


The topic covered is extensive and allows for significant improvements. The gain in time is approximately 1.7 times compared with the sequential launch. You can use available resources more efficiently and put multiple calculations in parallel mode.

Good luck to all. Questions and suggestions leave in the comments. The next article will be devoted to such an interesting tool as Heartbeat . Not? Do not know? See you then.

Source: https://habr.com/ru/post/272479/


All Articles