java.util.concurrent
package, which is part of the HotSpot JDK, provides the following tools for writing multi-threaded code:java.util.concurrent.atomic
there is a set of classes for atomic work with primitive types. The contract of these classes guarantees the execution of the compare-and-set
operation for "1 unit of processor time". When you set a new value for this variable, you also pass in its old value (optimistic locking approach). If the value of the variable is different from the expected one since the method was called, the result will be false
.long
variables [1,2,3,4,5]
and [-1,-2,-3,-4,-5]
. Each of the threads will be sequentially iterated over the array and sum the elements into a single variable. The code ( groovy ) with pessimistic blocking looks like this: class Sum { static monitor = new Object() static volatile long sum = 0 } class Summer implements Callable { long[] data Object call() throws Exception { data.each { synchronized (Sum.monitor) { println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}") Sum.sum += it } } } } Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: ${Sum.sum}")
pool-1-thread-1: add 1 to 0 pool-1-thread-2: add -1 to 1 pool-1-thread-1: add 2 to 0 pool-1-thread-2: add -2 to 2 pool-1-thread-1: add 3 to 0 pool-1-thread-2: add -3 to 3 pool-1-thread-1: add 4 to 0 pool-1-thread-1: add 5 to 4 pool-1-thread-2: add -4 to 9 pool-1-thread-2: add -5 to 5 Sum: 0
AtomicLong
to implement optimistic locking when calculating the same amount: class Sum { static volatile AtomicLong sum = new AtomicLong(0) } class Summer implements Callable { long[] data Object call() throws Exception { data.each { while(true) { long localSum = Sum.sum.get() if (Sum.sum.compareAndSet(localSum, localSum + it)) { println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}") break; } else { println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}") } } } } } Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: ${Sum.sum}")
[MISS!] pool-1-thread-1: add 1 to -1 pool-1-thread-2: add -1 to -1 pool-1-thread-2: add -2 to -3 [MISS!] pool-1-thread-1: add 1 to -3 pool-1-thread-2: add -3 to -6 pool-1-thread-1: add 1 to -5 [MISS!] pool-1-thread-2: add -4 to -5 pool-1-thread-1: add 2 to -7 pool-1-thread-2: add -4 to -7 pool-1-thread-1: add 3 to -9 pool-1-thread-2: add -5 to -9 pool-1-thread-1: add 4 to -5 pool-1-thread-1: add 5 to 0 Sum: 0
compare-and-set
will occur, and the more often it will be necessary to perform this action again.compare-and-set
, a non-blocking read lock can also be implemented. In this case, the atomic variable will store the version of the object being processed. Having obtained the value of the version before the calculations, we can verify it after the calculation. Normal read-write
locks take effect only if the version check fails. class Transaction { long debit } class Account { AtomicLong version = new AtomicLong() ReadWriteLock readWriteLock = new ReentrantReadWriteLock() List<Transaction> transactions = new ArrayList<Transaction>() } long balance(Account account) { ReentrantReadWriteLock.ReadLock locked while(true) { long balance = 0 long version = account.version.get() account.transactions.each {balance += it.debit} //volatile write for JMM if (account.version.compareAndSet(version, version)) { if (locked) {locked.unlock()} return balance } else { locked = account.readWriteLock.readLock() } } } void modifyTransaction(Account account, int position, long newDebit) { def writeLock = account.readWriteLock.writeLock() account.version.incrementAndGet() account.transactions[position].debit = newDebit writeLock.unlock() }
ReentrantLock
allows ReentrantLock
to more ReentrantLock
choose the moments of removing and receiving locks, since uses regular java calls. Also ReentrantLock
allows you to get information about the current state of the lock, allows you to "wait" for a lock for a certain time. Supports proper recursive retrieval and release of blocking for a single thread. If you need honest locks (which take priority when capturing the monitor) - ReentrantLock
also equipped with this mechanism.syncronized
and ReentrantLock
locks are very similar, the implementation at the JVM level is quite different.ReentrantLock
instead of the ReentrantLock
provided syncronized lock is only worth it if you very often have a battle of threads for the monitor. In the case when only one stream gets into the syncronized method __, the performance of ReentrantLock
inferior to the JVM locking mechanism.ReentrantLock
ability to lock multiple read and write locks. A write lock can be “omitted” before a read lock, if necessary. double distanceFromOriginV1() { // A read-only method long stamp; if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic double currentX = x; double currentY = y; if (sl.validate(stamp)) return Math.sqrt(currentX * currentX + currentY * currentY); } stamp = sl.readLock(); // fall back to read lock try { double currentX = x; double currentY = y; return Math.sqrt(currentX * currentX + currentY * currentY); } finally { sl.unlockRead(stamp); } }
put()
take()
) and nonblocking ( offer()
pool()
) methods. Disable null values. Queue capacity must be specified at creation.hash
function. There are no locks for reading. When recording, only part of the map (segment) is blocked. The number of segments is limited to the closest to concurrencyLevel
degree 2.ConcurrentSkipListMap
with no values.CopyOnWriteArrayList
without values.PriorityBlockingQueue
allowing to receive an item only after a certain delay (the delay is announced through the Delayed
object interface). DelayQueue
can be used to implement the scheduler. Queue capacity is not fixed.BlockingQueue
based on coherence (cache-miss & cache coherence overhead). Queue capacity is not fixed.BlockingQueue
based on coherence (cache-miss & cache coherence overhead). Queue capacity is not fixed.transfer()
logic for put()
methods.await()
), waiting for a specific (or more) number of countDown()
calls. Barrier status cannot be reset.await()
), waiting for a specific number of await()
calls by other threads. When the number of threads reaches the specified option, an optional callback will be called and the lock will be removed. The barrier resets its state to the initial state when the waiting threads are released and can be reused.ExecutorService
came to replace new Thread(runnable)
to simplify work with threads. ExecutorService
helps to reuse freed threads, organize queues from tasks for a thread pool, subscribe to the result of the task. Instead of the Runnable
interface, the pool uses the Callable
interface (it can return the result and throw errors). ExecutorService pool = Executors.newFixedThreadPool(4) Future future = pool.submit(new Callable() { Object call() throws Exception { println("In thread") return "From thread" } }) println("From main") println(future.get()) try { pool.submit(new Callable() { Object call() throws Exception { throw new IllegalStateException() } }).get() } catch (ExecutionException e) {println("Got it: ${e.cause}")} pool.shutdown()
invokeAll
method returns control to the calling thread only upon completion of all tasks. The invokeAny
method returns the result of the first successfully completed task, canceling all subsequent ones.ThreadPoolExecutor
ability to perform tasks deferred or regularly. class LNode { List<LNode> childs = [] def object } class Finder extends RecursiveTask<LNode> { LNode node Object expect protected LNode compute() { if (node?.object?.equals(expect)) { return node } node?.childs?.collect { new Finder(node: it, expect: expect).fork() }?.collect { it.join() }?.find { it != null } } } ForkJoinPool es = new ForkJoinPool() def invoke = es.invoke(new Finder( node: new LNode( childs: [ new LNode(object: "ivalid"), new LNode( object: "ivalid", childs: [new LNode(object: "test")] ) ] ), expect: "test" )) print("${invoke?.object}")
Source: https://habr.com/ru/post/187854/
All Articles