📜 ⬆️ ⬇️

We investigate and test distributed computing from Hazelcast

We continue to test Hazelcast . In the previous post, we met with his queues. And in this we take a closer look at his ability to distributed tasks.

It is much more efficient to work with data as close as possible to them, instead of pumping out “to yourself”, then counting and / or changing and sending it back to the distributed storage. This is the opportunity that Hazelcast provides us in the form of a distributed implementation of the ExecutorService. It is possible to control the servers on which servers to store data, by grouping them by a common key, and to run tasks on the necessary servers using the same key.

We will try to find out - is this true and are there any pitfalls?
')


All tests are available on GitHub .

And so, let's go. The latest stable version of hazelcast is used - 3.2.3.
(The tests provide approximate data on the measurement of speed and quantity. The configuration of the test machine is not published. The data are sufficient to compare the tests with each other, which is the goal)

Test 1 - we work in the old manner


The purpose of this test is to measure performance when working without distributed computing for further comparisons.

We take two nodes and two storages (Map). We emulate the worst case in which all the data we have is not on the local node. Those. The hazelcast will be forced to download data from another node and, after changing it, save it also on another node. We create all the problems that should be solved by distributed computing tasks in future tests.

We expect that we will quickly throw 100k tasks into the queue and then they will be executed in 10 threads.

Result:

INFO: 100,000 713 sec: 0.538
INFO: Done tasks: 100000 sec: 15.470

The expectations were met. They threw 100k fast - in half a second. During the cast, only 713 tasks were completed. All tasks completed in 15.5 seconds.

Test 2 - distributed computing


Now turn on distributed computing from hazelcast. We also run the worst case when the necessary data is on the second node. But now the hazelcast should determine this and send the task to the second node and start it there.

We expect that we will quickly throw 100k tasks into the queue and then they will be executed in 10 threads as well (we specified this in the hazelcast configuration) but faster than in the first test. Additionally, we register - on which node the tasks were performed.

Result:

INFO: 100000 99998 sec: 6.308
INFO: Done tasks: hz1: 0 hz2: 100000 sec: 6.319

We see that all the tasks have been completed on the second node (hz2: 100000) and the speed increase is more than two times. It is very good. Moreover, this difference will grow with the growth of data processed in reality. Now we have only integers in the vault.

But something very incomprehensible is happening with the cast. It can be seen that the cast was very slow. During the cast, almost all the tasks were completed. We will put a little aside this riddle, and now we will look at more vital test.

Test 3 - even more reality


In this test, we will no longer artificially create tasks only for the second node. In real life, there will be bright moments when the data will be on the same node where the calculations are initiated.

We expect that the speed will be even higher. We expect close to uniform distribution of tasks by nodes.

Result:

INFO: 100000 99857 sec: 5.241
INFO: Done tasks: hz1: 50818 hz2: 49182 sec: 5.252

The expectations were met. We see the normal distribution of tasks among the nodes (hz1: 50818 hz2: 49182) and the execution speed is faster by a second.

Test 4 - perform slow tasks


Tasks are different. Some can and resources of input of an output to operate in the course of work. Therefore, we will try to perform slow tasks (add a delay of 10 milliseconds to complete them). In addition, we are interested in the previously discovered effect that almost all of the tasks are completed during their casting. Perhaps the fact is that they are fast and there is a hazelcast overhead compared to the first test?

We do not expect a decrease in time for the abandonment of tasks. We expect an increase in the execution time of all tasks.

Result:

INFO: 100000 99978 sec: 59.442
INFO: Done tasks: hz1: 49522 hz2: 50478 sec: 59.455

Casting also catastrophically slowed down. This is strange, especially since we use the executorService.executeOnKeyOwner casting method, which, unlike executorService.submitToKeyOwner does not imply waiting for the execution result. Must quit task and forget.

We read the source. The first point (you can skip the code, the conclusions below):

  public void executeOnKeyOwner(Runnable command, Object key) { Callable<?> callable = createRunnableAdapter(command); submitToKeyOwner(callable, key); } 


Hazelcast translates our call into
 Future submitToKeyOwner.         . 

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
 Future submitToKeyOwner.         . 

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
 Future submitToKeyOwner.         . 

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .
Future submitToKeyOwner. .

( , ):

public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }


, ExecutorService 100 10 , hazelcast (100 ) .

, . , .. , . , . , .



Test 5 -

, 4. , hazelcast . . 100 .

hazelcast . .

:

INFO: 100000 25223 sec: 13.350
INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837

25k . , , - . hazelcast ( ) , .

, 50 OOM

oom

. , . - .

Test 6 - OOM

Hazelcast . . 10.

:

WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)

:

public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...

, - . ( executeOnKeyOwner ), .

:

INFO: 100000 32604 sec: 16.931
INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269

.

Test 7 -

- , , ? executeOnKeyOwner Future submitToKeyOwner Callable .

:

INFO: 100000 31790 sec: 16.713
INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046
INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229

. .



hazelcast , 10 99; , ; , . OOM; . ;

- . , . .

Source: https://habr.com/ru/post/229149/


All Articles