📜 ⬆️ ⬇️

Java.util.concurrent. * Synchronizer reference

The purpose of this publication is not a complete analysis of the synchronizers from the java.util.concurrent package. I am writing it, first of all, as a reference book, which will facilitate entry into the topic and show the possibilities of practical application of classes for synchronization of threads (hereinafter thread = thread).

In java.util.concurrent there are many different classes that can be divided into groups according to their functionality: Concurrent Collections, Executors, Atomics, etc. Synchronizers will be one of these groups.


')
Synchronizers are auxiliary utilities for synchronizing threads, which enable the developer to regulate and / or restrict the work of streams and provide a higher level of abstraction than the main language primitives (monitors).


Semaphore


Semaphore Synchronizer implements the Semaphore sync pattern. Most often, semaphores are needed when you need to restrict access to a shared resource. The constructor of this class ( Semaphore(int permits) or Semaphore(int permits, boolean fair) ) always passes the number of threads that the semaphore will allow to simultaneously use the specified resource.



Access is controlled by a counter: initially, the counter value is int permits , when a thread enters a given block of code, the counter value decreases by one, when the stream leaves it, it increases. If the counter value is zero, then the current flow is blocked until someone leaves the block (as an example of life with permits = 1 , you can bring a queue to the office in the clinic: when the patient leaves the office, the lamp flashes, and the next patient enters ).

Official documentation for Semaphore.

Semaphore usage example
Consider the following example. There is parking, which at the same time can accommodate no more than 5 cars. If the parking lot is full, the newly arrived car must wait until at least one place is vacated. After that, he will be able to park.

 import java.util.concurrent.Semaphore; public class Parking { //   - true,  - false private static final boolean[] PARKING_PLACES = new boolean[5]; //  "",     //aquire()       private static final Semaphore SEMAPHORE = new Semaphore(5, true); public static void main(String[] args) throws InterruptedException { for (int i = 1; i <= 7; i++) { new Thread(new Car(i)).start(); Thread.sleep(400); } } public static class Car implements Runnable { private int carNumber; public Car(int carNumber) { this.carNumber = carNumber; } @Override public void run() { System.out.printf(" â„–%d   .\n", carNumber); try { //acquire()          , //   ,        , //     SEMAPHORE.acquire(); int parkingNumber = -1; //     synchronized (PARKING_PLACES){ for (int i = 0; i < 5; i++) if (!PARKING_PLACES[i]) { //   PARKING_PLACES[i] = true; //  parkingNumber = i; //  ,   System.out.printf(" â„–%d    %d.\n", carNumber, i); break; } } Thread.sleep(5000); //  ,   synchronized (PARKING_PLACES) { PARKING_PLACES[parkingNumber] = false;//  } //release(), ,   SEMAPHORE.release(); System.out.printf(" â„–%d  .\n", carNumber); } catch (InterruptedException e) { } } } } 

The result of the program
Car number 1 drove to the parking lot.
Car number 1 parked at location 0.
Car number 2 drove up to the parking lot.
Car number 2 parked in place 1.
Car number 3 drove up to the parking lot.
Car number 3 parked on the spot 2.
Car number 4 drove to the parking lot.
Car number 4 parked on the spot 3.
Car number 5 drove to the parking lot.
Car number 5 parked on the spot 4.
Car number 6 drove to the parking lot.
Car number 7 drove to the parking lot.
Car number 1 left the parking lot.
Car number 6 parked at location 0.
Car number 2 left the parking lot.
Car number 7 parked in place 1.
Car number 3 left the parking lot.
Car number 4 left the parking lot.
Car number 5 left the parking lot.
Car number 6 left the parking lot.
Car number 7 left the parking lot.

A semaphore is great for solving this problem: it does not allow the car (stream) to park (go into a given code block and use a shared resource) if there are no parking spaces (the counter is 0) It is worth noting that the Semaphore class supports capturing and releasing more than one permissions at a time, but this is not necessary in this task.


CountDownLatch


CountDownLatch (countdown lock) provides the ability for any number of threads in a code block to wait until a certain number of operations are executed in other threads before they are “released” to continue their operations. The CountDownLatch ( CountDownLatch(int count) ) constructor must pass the number of operations that must be performed in order for the lock to “release” the blocked threads.



The blocking of threads is removed with the help of a counter: any valid stream, when performing a certain operation, decreases the value of the counter. When the counter reaches 0, all waiting threads are unlocked and continue to run (for example, a collection of an excursion group can serve as an example of the CountDownLatch: until a certain number of people are gathered, the excursion will not begin).

Official documentation for CountDownLatch.

CountDownLatch usage example
Consider the following example. We want to hold a car race. Five cars take part in the race. To start the race you need to fulfill the following conditions:
  1. Each of the five cars drove up to the starting straight;
  2. The command “To the start!” Was given;
  3. The command "Attention!" Was given;
  4. The command "Marsh!" Was given.
It is important that all cars start at the same time.
 import java.util.concurrent.CountDownLatch; public class Race { // CountDownLatch  8 "" private static final CountDownLatch START = new CountDownLatch(8); //    private static final int trackLength = 500000; public static void main(String[] args) throws InterruptedException { for (int i = 1; i <= 5; i++) { new Thread(new Car(i, (int) (Math.random() * 100 + 50))).start(); Thread.sleep(1000); } while (START.getCount() > 3) //,     Thread.sleep(100); //  .  ,  100ms Thread.sleep(1000); System.out.println(" !"); START.countDown();// ,    1 Thread.sleep(1000); System.out.println("!"); START.countDown();// ,    1 Thread.sleep(1000); System.out.println("!"); START.countDown();// ,    1 //   ,     //  } public static class Car implements Runnable { private int carNumber; private int carSpeed;//,     public Car(int carNumber, int carSpeed) { this.carNumber = carNumber; this.carSpeed = carSpeed; } @Override public void run() { try { System.out.printf(" â„–%d    .\n", carNumber); //     -   //   1 START.countDown(); // await()  ,  ,   ,  // CountDownLatch    0 START.await(); Thread.sleep(trackLength / carSpeed);//    System.out.printf(" â„–%d !\n", carNumber); } catch (InterruptedException e) { } } } } 

The result of the program
Car number 1 approached the starting line.
Car number 2 approached the starting line.
Car number 3 approached the starting line.
Car number 4 approached the starting line.
Car number 5 approached the starting line.
On your marks!
Attention!
March!
Car number 4 finished!
Car number 1 finished!
Car number 3 finished!
Car number 5 finished!
Car number 2 finished!

CountDownLatch can be used in a variety of synchronization schemes: for example, so that while one thread does work, make other threads wait or, on the contrary, make the stream wait for others to do the work.


CyclicBarrier


CyclicBarrier implements the pattern of synchronization Barrier . A cyclic barrier is a synchronization point at which a specified number of parallel flows are encountered and blocked. As soon as all profit streams are completed, an optional action is performed (or not performed if the barrier was initialized without it), and, after it has been completed, the barrier breaks and the waiting streams are “released”. The barrier constructor ( CyclicBarrier(int parties) and CyclicBarrier(int parties, Runnable barrierAction) ) must transmit the number of parties that must “meet”, and, optionally, the action that should occur when the parties meet, but before "Released."



The barrier is similar to CountDownLatch, but the main difference between them is that you cannot reuse the “lock” after its counter reaches zero, and you can use the barrier again even after it breaks. CyclicBarrier is an alternative to the join() method, which “collects” threads only after they have been executed.

Official CyclicBarrier documentation.

CyclicBarrier usage example
Consider the following example. There is a ferry. The ferry can ship three cars at a time. In order not to drive the ferry once again, you need to send it when at least three cars are collected at the crossing.

 import java.util.concurrent.CyclicBarrier; public class Ferry { private static final CyclicBarrier BARRIER = new CyclicBarrier(3, new FerryBoat()); //      ,   ,  //    .  ,   . public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 9; i++) { new Thread(new Car(i)).start(); Thread.sleep(400); } } //,        public static class FerryBoat implements Runnable { @Override public void run() { try { Thread.sleep(500); System.out.println("  !"); } catch (InterruptedException e) { } } } //,     public static class Car implements Runnable { private int carNumber; public Car(int carNumber) { this.carNumber = carNumber; } @Override public void run() { try { System.out.printf(" â„–%d    .\n", carNumber); //        ,    await() //    ,        BARRIER.await(); System.out.printf(" â„–%d  .\n", carNumber); } catch (Exception e) { } } } } 

The result of the program
Car number 0 approached the ferry.
Car number 1 approached the ferry.
Car number 2 approached the ferry.
Car number 3 approached the ferry.
Ferry ferried cars!
Car number 2 continued to move.
Car number 1 continued driving.
Car number 0 continued to move.
Car number 4 approached the ferry.
Car number 5 approached the ferry.
Car number 6 approached the ferry.
Ferry ferried cars!
Car number 5 continued to move.
Car number 4 continued to move.
Car number 3 continued to move.
Car number 7 drove up to the ferry.
Car number 8 approached the ferry.
Ferry ferried cars!
Car number 8 continued to move.
Car number 6 continued to move.
Car number 7 continued to move.

When the three streams reach the await() method, the barrier action is triggered and the ferry forwards three cars from the accumulated ones. After this, a new cycle begins.


Exchanger <V>


An exchanger may be needed in order to exchange data between two threads at a specific point in the operation of both streams. The exchanger is a generic class, it is parameterized by the type of object to pass.



The exchanger is the synchronization point of a pair of threads: the flow that calls the exchange() method on the exchange() blocked and another thread is waiting. When another thread calls the same method, objects will be exchanged: each of them will receive the argument of the other in the exchange() method. It is worth noting that the exchanger supports the transfer of null values. This makes it possible to use it to transfer an object in one direction, or simply as a synchronization point for two streams.

Official Exchanger documentation.

Exchanger usage example
Consider the following example. There are two trucks: one goes from point A to point D, the other from point B to point C. The roads AD and BC intersect at point E. From points A and B you need to deliver parcels to points C and D. For this, trucks at point E must meet and exchange the relevant parcels.

 import java.util.concurrent.Exchanger; public class Delivery { // ,     String private static final Exchanger<String> EXCHANGER = new Exchanger<>(); public static void main(String[] args) throws InterruptedException { String[] p1 = new String[]{"{ A->D}", "{ A->C}"};//   1-  String[] p2 = new String[]{"{ B->C}", "{ B->D}"};//   2-  new Thread(new Truck(1, "A", "D", p1)).start();// 1-     D Thread.sleep(100); new Thread(new Truck(2, "B", "C", p2)).start();// 2-      } public static class Truck implements Runnable { private int number; private String dep; private String dest; private String[] parcels; public Truck(int number, String departure, String destination, String[] parcels) { this.number = number; this.dep = departure; this.dest = destination; this.parcels = parcels; } @Override public void run() { try { System.out.printf("  â„–%d : %s  %s.\n", number, parcels[0], parcels[1]); System.out.printf(" â„–%d    %s   %s.\n", number, dep, dest); Thread.sleep(1000 + (long) Math.random() * 5000); System.out.printf(" â„–%d    .\n", number); parcels[1] = EXCHANGER.exchange(parcels[1]);//  exchange()     //    exchange(),      System.out.printf("  â„–%d     %s.\n", number, dest); Thread.sleep(1000 + (long) Math.random() * 5000); System.out.printf(" â„–%d   %s  : %s  %s.\n", number, dest, parcels[0], parcels[1]); } catch (InterruptedException e) { } } } } 

The result of the program
In the truck number 1 loaded: {parcel A-> D} and {parcel A-> C}.
Truck number 1 left from point A to point D.
In the truck number 2 loaded: {parcel B-> C} and {parcel B-> D}.
Truck number 2 drove from point B to point C.
Truck number 1 arrived at point E.
Truck number 2 arrived at point E.
In the truck number 2 moved the parcel for point C.
In the truck number 1 moved the parcel for point D.
Truck number 2 arrived in C and delivered: {parcel B-> C} and {parcel A-> C}.
Truck number 1 arrived in D and delivered: {parcel A-> D} and {parcel B-> D}.

As we can see, when one truck (one stream) arrives at point E (reaches the synchronization point), it waits until another truck (another stream) arrives at point E (reaches the synchronization point). After this, the parcel (String) is exchanged and both trucks (streams) continue their journey (work).


Phaser


Phaser (phaser), like CyclicBarrier, is an implementation of the Barrier synchronization pattern, but, unlike CyclicBarrier, provides more flexibility. This class allows you to synchronize streams that represent a single phase or stage of performing a common action. Like CyclicBarrier, the Phaser is the synchronization point at which the participating threads meet. When all parties have arrived, Phaser moves to the next phase and waits for its completion again.

If we compare Phaser and CyclicBarrier, we can highlight the following important features of the Phaser:

A Phaser object is created using one of the constructors:

 Phaser() Phaser(int parties) 

The parties parameter indicates the number of parties that will perform the action phases. The first constructor creates the Phaser object without any sides, and the barrier in this case is also “closed”. The second constructor registers the number of parties transmitted to the constructor. The barrier opens when all parties have arrived, or if the last participant is removed. (The Phaser class still has constructors to which the Phaser parent object is passed, but we will not consider them.)

Basic methods:

Official Phaser documentation.
Phaser Case Study
Consider the following example. There are five stops. The first four of them can stand passengers and wait for the bus. The bus leaves the park and stops at each stop for a while. After the final stop the bus goes to the park. We need to pick up passengers and disembark them at the right stops.
 import java.util.ArrayList; import java.util.concurrent.Phaser; public class Bus { private static final Phaser PHASER = new Phaser(1);//    // 0  6 -   , 1 - 5  public static void main(String[] args) throws InterruptedException { ArrayList<Passenger> passengers = new ArrayList<>(); for (int i = 1; i < 5; i++) { //    if ((int) (Math.random() * 2) > 0) passengers.add(new Passenger(i, i + 1));//     if ((int) (Math.random() * 2) > 0) passengers.add(new Passenger(i, 5)); //     } for (int i = 0; i < 7; i++) { switch (i) { case 0: System.out.println("   ."); PHASER.arrive();//  0  1  -  break; case 6: System.out.println("   ."); PHASER.arriveAndDeregister();//  ,   break; default: int currentBusStop = PHASER.getPhase(); System.out.println(" â„– " + currentBusStop); for (Passenger p : passengers) //,      if (p.departure == currentBusStop) { PHASER.register();// ,      p.start(); //   } PHASER.arriveAndAwaitAdvance();//    } } } public static class Passenger extends Thread { private int departure; private int destination; public Passenger(int departure, int destination) { this.departure = departure; this.destination = destination; System.out.println(this + "    â„– " + this.departure); } @Override public void run() { try { System.out.println(this + "   ."); while (PHASER.getPhase() < destination) //      () PHASER.arriveAndAwaitAdvance(); //        Thread.sleep(1); System.out.println(this + "  ."); PHASER.arriveAndDeregister(); //     } catch (InterruptedException e) { } } @Override public String toString() { return "{" + departure + " -> " + destination + '}'; } } } 

The result of the program
The passenger {1 -> 2} is waiting at the stop number 1
The passenger {1 -> 5} is waiting at the stop number 1
The passenger {2 -> 3} is waiting at the stop number 2
The passenger {2 -> 5} is waiting at the stop number 2
Passenger {3 -> 4} is waiting at stop number 3
The passenger {4 -> 5} is waiting at the stop number 4
The passenger {4 -> 5} is waiting at the stop number 4
.
â„– 1
{1 -> 5} .
{1 -> 2} .
â„– 2
{2 -> 3} .
{1 -> 2} .
{2 -> 5} .
â„– 3
{2 -> 3} .
{3 -> 4} .
â„– 4
{4 -> 5} .
{3 -> 4} .
{4 -> 5} .
â„– 5
{1 -> 5} .
{2 -> 5} .
{4 -> 5} .
{4 -> 5} .
.


By the way, the function of phaser can reproduce the work of CountDownLatch.
Phaser example from CountDownLatch
 import java.util.concurrent.Phaser; public class NewRace { private static final Phaser START = new Phaser(8); private static final int trackLength = 500000; public static void main(String[] args) throws InterruptedException { for (int i = 1; i <= 5; i++) { new Thread(new Car(i, (int) (Math.random() * 100 + 50))).start(); Thread.sleep(100); } while (START.getRegisteredParties() > 3) //,     Thread.sleep(100); //  .  ,  100ms Thread.sleep(100); System.out.println(" !"); START.arriveAndDeregister(); Thread.sleep(100); System.out.println("!"); START.arriveAndDeregister(); Thread.sleep(100); System.out.println("!"); START.arriveAndDeregister(); } public static class Car implements Runnable { private int carNumber; private int carSpeed; public Car(int carNumber, int carSpeed) { this.carNumber = carNumber; this.carSpeed = carSpeed; } @Override public void run() { try { System.out.printf(" â„–%d    .\n", carNumber); START.arriveAndDeregister(); START.awaitAdvance(0); Thread.sleep(trackLength / carSpeed); System.out.printf(" â„–%d !\n", carNumber); } catch (InterruptedException e) { } } } } 





If someone came in handy, then I am very happy =)

For more information about the Phaser here .
You can read more about synchronizers and see examples here .

An excellent review of java.util.concurrent can be found here .

Source: https://habr.com/ru/post/277669/


All Articles