📜 ⬆️ ⬇️

JDK concurrent package

The memory model that currently exists in Java guarantees the expected order of execution of multi-threaded code, in the absence of this code in the race races. And in order to protect your code from racing, various ways to synchronize and exchange data between them have been invented.

The java.util.concurrent package, which is part of the HotSpot JDK, provides the following tools for writing multi-threaded code:


Atomic

In the child package java.util.concurrent.atomic there is a set of classes for atomic work with primitive types. The contract of these classes guarantees the execution of the compare-and-set operation for "1 unit of processor time". When you set a new value for this variable, you also pass in its old value (optimistic locking approach). If the value of the variable is different from the expected one since the method was called, the result will be false .

For example, take two arrays of long variables [1,2,3,4,5] and [-1,-2,-3,-4,-5] . Each of the threads will be sequentially iterated over the array and sum the elements into a single variable. The code ( groovy ) with pessimistic blocking looks like this:
')
 class Sum { static monitor = new Object() static volatile long sum = 0 } class Summer implements Callable { long[] data Object call() throws Exception { data.each { synchronized (Sum.monitor) { println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}") Sum.sum += it } } } } Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: ${Sum.sum}") 


The output will be expected:

 pool-1-thread-1: add 1 to 0 pool-1-thread-2: add -1 to 1 pool-1-thread-1: add 2 to 0 pool-1-thread-2: add -2 to 2 pool-1-thread-1: add 3 to 0 pool-1-thread-2: add -3 to 3 pool-1-thread-1: add 4 to 0 pool-1-thread-1: add 5 to 4 pool-1-thread-2: add -4 to 9 pool-1-thread-2: add -5 to 5 Sum: 0 


However, this approach has significant performance disadvantages. In this case, the useless work for us, takes more resources than useful:


Consider using AtomicLong to implement optimistic locking when calculating the same amount:

 class Sum { static volatile AtomicLong sum = new AtomicLong(0) } class Summer implements Callable { long[] data Object call() throws Exception { data.each { while(true) { long localSum = Sum.sum.get() if (Sum.sum.compareAndSet(localSum, localSum + it)) { println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}") break; } else { println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}") } } } } } Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: ${Sum.sum}") 


As can be seen from the results of the “erroneous” attempts, there were not so many:

 [MISS!] pool-1-thread-1: add 1 to -1 pool-1-thread-2: add -1 to -1 pool-1-thread-2: add -2 to -3 [MISS!] pool-1-thread-1: add 1 to -3 pool-1-thread-2: add -3 to -6 pool-1-thread-1: add 1 to -5 [MISS!] pool-1-thread-2: add -4 to -5 pool-1-thread-1: add 2 to -7 pool-1-thread-2: add -4 to -7 pool-1-thread-1: add 3 to -9 pool-1-thread-2: add -5 to -9 pool-1-thread-1: add 4 to -5 pool-1-thread-1: add 5 to 0 Sum: 0 


When deciding to use an optimistic lock, it is important that the action with the variable being modified does not take a lot of time. The longer this action is, the more often erroneous compare-and-set will occur, and the more often it will be necessary to perform this action again.

Based on the compare-and-set , a non-blocking read lock can also be implemented. In this case, the atomic variable will store the version of the object being processed. Having obtained the value of the version before the calculations, we can verify it after the calculation. Normal read-write locks take effect only if the version check fails.

 class Transaction { long debit } class Account { AtomicLong version = new AtomicLong() ReadWriteLock readWriteLock = new ReentrantReadWriteLock() List<Transaction> transactions = new ArrayList<Transaction>() } long balance(Account account) { ReentrantReadWriteLock.ReadLock locked while(true) { long balance = 0 long version = account.version.get() account.transactions.each {balance += it.debit} //volatile write for JMM if (account.version.compareAndSet(version, version)) { if (locked) {locked.unlock()} return balance } else { locked = account.readWriteLock.readLock() } } } void modifyTransaction(Account account, int position, long newDebit) { def writeLock = account.readWriteLock.writeLock() account.version.incrementAndGet() account.transactions[position].debit = newDebit writeLock.unlock() } 


Locks

Reentrant lock

Unlike syncronized locks, ReentrantLock allows ReentrantLock to more ReentrantLock choose the moments of removing and receiving locks, since uses regular java calls. Also ReentrantLock allows you to get information about the current state of the lock, allows you to "wait" for a lock for a certain time. Supports proper recursive retrieval and release of blocking for a single thread. If you need honest locks (which take priority when capturing the monitor) - ReentrantLock also equipped with this mechanism.

Although the syncronized and ReentrantLock locks are very similar, the implementation at the JVM level is quite different.
Without going into details JMM: use ReentrantLock instead of the ReentrantLock provided syncronized lock is only worth it if you very often have a battle of threads for the monitor. In the case when only one stream gets into the syncronized method __, the performance of ReentrantLock inferior to the JVM locking mechanism.

ReentrantReWriteLock

Complements the properties of ReentrantLock ability to lock multiple read and write locks. A write lock can be “omitted” before a read lock, if necessary.

StampedLock _jdk 1.8_

Implements optimistic and pessimistic read-write locks with the possibility of further increasing or decreasing them. Optimistic locking is implemented through the “stamp” of the lock ( javadoc ):

 double distanceFromOriginV1() { // A read-only method long stamp; if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic double currentX = x; double currentY = y; if (sl.validate(stamp)) return Math.sqrt(currentX * currentX + currentY * currentY); } stamp = sl.readLock(); // fall back to read lock try { double currentX = x; double currentY = y; return Math.sqrt(currentX * currentX + currentY * currentY); } finally { sl.unlockRead(stamp); } } 


Collections

ArrayBlockingQueue

Honest queue to send a message from one thread to another. Supports blocking ( put() take() ) and nonblocking ( offer() pool() ) methods. Disable null values. Queue capacity must be specified at creation.

Concurrenthashmap

Key-value structure based on the hash function. There are no locks for reading. When recording, only part of the map (segment) is blocked. The number of segments is limited to the closest to concurrencyLevel degree 2.

ConcurrentSkipListMap

Balanced multi-threaded key-value structure (O (log n)). The search is based on a list with gaps. The card must be able to compare keys.

ConcurrentSkipListSet

ConcurrentSkipListMap with no values.

CopyOnWriteArrayList

Blocking for writing, not blocking for reading list. Any modification creates a new instance of the array in memory.

CopyOnWriteArraySet

CopyOnWriteArrayList without values.

Delayquayue

PriorityBlockingQueue allowing to receive an item only after a certain delay (the delay is announced through the Delayed object interface). DelayQueue can be used to implement the scheduler. Queue capacity is not fixed.

LinkedBlockingDeque

Bidirectional BlockingQueue based on coherence (cache-miss & cache coherence overhead). Queue capacity is not fixed.

LinkedBlockingQueue

Unidirectional BlockingQueue based on coherence (cache-miss & cache coherence overhead). Queue capacity is not fixed.

LinkedTransferQueue

Unidirectional `BlockingQueue` based on coherence (cache-miss & cache coherence overhead). Queue capacity is not fixed. This queue allows you to expect the item to be “picked up” by the handler.

PriorityBlockingQueue

A unidirectional `BlockingQueue` that allows prioritization of messages (by comparing elements). Disable null values.

SynchronousQueue

A unidirectional `BlockingQueue` that implements transfer() logic for put() methods.

Synchronization points

CountDownLatch

Barrier ( await() ), waiting for a specific (or more) number of countDown() calls. Barrier status cannot be reset.

CyclicBarrier

Barrier ( await() ), waiting for a specific number of await() calls by other threads. When the number of threads reaches the specified option, an optional callback will be called and the lock will be removed. The barrier resets its state to the initial state when the waiting threads are released and can be reused.

Exchanger

Barrier (`exchange ()`) to synchronize two threads. At the time of synchronization, volatile transfer of objects between threads is possible.

Phaser

An extension of `CyclicBarrier`, allowing you to register and delete participants for each cycle of the barrier.

Semaphore

A barrier allowing only a specified number of streams to capture the monitor. In fact, the `Lock` functionality extends the ability to be in a block to several threads.

Executors


ExecutorService came to replace new Thread(runnable) to simplify work with threads. ExecutorService helps to reuse freed threads, organize queues from tasks for a thread pool, subscribe to the result of the task. Instead of the Runnable interface, the pool uses the Callable interface (it can return the result and throw errors).

 ExecutorService pool = Executors.newFixedThreadPool(4) Future future = pool.submit(new Callable() { Object call() throws Exception { println("In thread") return "From thread" } }) println("From main") println(future.get()) try { pool.submit(new Callable() { Object call() throws Exception { throw new IllegalStateException() } }).get() } catch (ExecutionException e) {println("Got it: ${e.cause}")} pool.shutdown() 


The invokeAll method returns control to the calling thread only upon completion of all tasks. The invokeAny method returns the result of the first successfully completed task, canceling all subsequent ones.

ThreadPoolExecutor

Thread pool with the ability to specify the working and maximum number of threads in the pool, the queue for tasks.

ScheduledThreadPoolExecutor

Extends the functionality of ThreadPoolExecutor ability to perform tasks deferred or regularly.

ThreadPoolExecutor

Lighter thread pool for self-replicating tasks. The pool expects calls for fork () `and` join () `methods on child tasks in the parent.

 class LNode { List<LNode> childs = [] def object } class Finder extends RecursiveTask<LNode> { LNode node Object expect protected LNode compute() { if (node?.object?.equals(expect)) { return node } node?.childs?.collect { new Finder(node: it, expect: expect).fork() }?.collect { it.join() }?.find { it != null } } } ForkJoinPool es = new ForkJoinPool() def invoke = es.invoke(new Finder( node: new LNode( childs: [ new LNode(object: "ivalid"), new LNode( object: "ivalid", childs: [new LNode(object: "test")] ) ] ), expect: "test" )) print("${invoke?.object}") 


Accumulators _jdk 1.8_

Accumulators allow you to perform primitive operations (sum / search for the maximum value) on numeric elements in a multithreaded environment without using CAS.

Source: https://habr.com/ru/post/187854/


All Articles