Task producer / consumer
The article is intended for beginners who have recently begun their acquaintance with the world of multithreading on JAVA.
Not so long ago, I did an internship at EPAM and among the list of tasks that young Padawan needed to perform was a multi-threaded task. To be fair, it’s worth noting that no one was forced to do it with low-level tools such as wait () and notify (), when you can use ArrayBlockingQueue, Semophore and the generally powerful API (java.util.concurrent) without great tricks. Moreover, it is considered a bad practice when instead of reinventing one's own bike, ready for any scenario, while the API hides the overwhelming complexity of parallelism. I implemented both native variants using the API. Today I will show 1 option.
For a start, I suggest to get acquainted with the
following article .
Link to this project .
The task was the following:')
- There are transport ships that swim up to the tunnels and then sail to the berths for loading various kinds of goods.
- They pass through a narrow tunnel where only 5 ships can be at a time. Under the word “swim up to the tunnels” is meant that the ships should appear from somewhere. There may be a limited number, that is, 10 or 100, or there may be an infinite number. The word “swim up” is called the ship generator.
- The type of ships and their capacity may vary depending on the type of goods that need to be loaded onto the ship. That is, for TK, I came up with 3 types of ships (bread, banana and clothing) and three types of capacity of 10, 50, 100 pcs. goods. 3 types of ships * 3 types of capacity = 9 different types of ships.
- Then there are 3 types of berths for loading ships - Bread, Banana and Clothes. Each berth takes or calls to itself the ship it needs and begins to load it. In one second the mooring loads 10 units on the ship. goods. That is, if the ship has a capacity of 50 pcs., Then the pier will load it in 5 seconds of its work.
The requirement was the following:- Correctly break the task into parallelism.
- Synchronize streams, maintain data integrity. After all, it’s not difficult to limit the access of threads to a shared resource, and it is much more difficult to make them work in concert.
- The work of the ship generator should not depend on the work of the berths and vice versa.
- Shared resource should be Thread Safe (If there is one in the implementation)
- Threads should not be active if there are no tasks.
- Threads should not hold mutex if there are no tasks.
And so let's go!
To begin with, I drew a diagram to make it clearer what was happening.

Before solving any problem, it is considered good practice to visualize it on the surface of the paper. Especially when it comes to multi-threaded application.
Let's break our application into components, in our case into classes. The structure of the project.
Class - Ship The class itself does not contain any logic. POJO.
Source codepublic class Ship { private int count; private Size size; private Type type; public Ship(Size size, Type type) { this.size = size; this.type = type; } public void add(int count) { this.count += count; } public boolean countCheck() { if (count >= size.getValue()) { return false; } return true; } public int getCount() { return count; } public Type getType() { return type; } public Size getSize() { return size; } }
Since the ships may differ from each other in size and types, it was decided to create Enum classes to determine the properties of the ship. Size and Type. Size has predefined properties. (code)
Also, the Ship class has an int count counter, how many goods are already loaded into it, if more than indicated in the ship property, then the boolean countCheck () method of the same class returns false, in other words, the ship is loaded.
Class - TunnelSource code public class Tunnel { private List<Ship> store; private static final int maxShipsInTunel = 5; private static final int minShipsInTunel = 0; private int shipsCounter = 0; public Tunnel() { store = new ArrayList<>(); } public synchronized boolean add(Ship element) { try { if (shipsCounter < maxShipsInTunel) { notifyAll(); store.add(element); String info = String.format("%s + The ship arrived in the tunnel: %s %s %s", store.size(), element.getType(), element.getSize(), Thread.currentThread().getName()); System.out.println(info); shipsCounter++; } else { System.out.println(store.size() + "> There is no place for a ship in the tunnel: " + Thread.currentThread().getName()); wait(); return false; } } catch (InterruptedException e) { e.printStackTrace(); } return true; } public synchronized Ship get(Type shipType) { try { if (shipsCounter > minShipsInTunel) { notifyAll(); for (Ship ship : store) { if (ship.getType() == shipType) { shipsCounter--; System.out.println(store.size() + "- The ship is taken from the tunnel: " + Thread.currentThread().getName()); store.remove(ship); return ship; } } } System.out.println("0 < There are no ships in the tunnel"); wait(); } catch (InterruptedException e) { e.printStackTrace(); } return null; }
The tunnel keeps the ships, this is done using the List. Spaciousness tunnels 5 ships. In the tunnel somehow you need to add ships and get. 2 add () and get () methods do this. The get () method gets and removes the ship of the type it needs from the list. As you can see in the add () and get () methods there is a check on the number of ships in the list. If ship> 5, then add the ship does not work and vice versa if ship <0, then take from the list also fail. Tunnel is a shared resource in a multithreading context. Shared resources in multithreading are evil. All the problems and complexity of multi-threaded programming come precisely because of shared resources. So they should be avoided.
What is the problem specifically with our shared resource.
First, the addition and removal of ships must be consistent among the threads. If there is no consistency, there is a
greater likelihood of race condition and data loss. This is what we solve with the
synchronized keyword .
Secondly, in the TK it was mentioned:
- Streams should not be active if there are no tasks.
- Threads should not hold mutex if there are no tasks.
For this,
there is also
a solution in the form of wait () and notifyAll ().
For the add () method and the stream that executes it, the phrase “no task” means that ship> 5. When ship> 5 flow should stop its activity and wait. In order to stop the thread and remove the mutex we call wait (). The get () method has similar rules. Only for him ship <0. Wait, they will wait, but how to awaken them and tell them to go to work again? This is where the notifyAll () method comes to our rescue. Its task is to switch the stream from the WAITING state to RUNNABLE. When the add () method works and at the same time ship <5, then it awakens the stream that works with the get () method. And vice versa, when the get () method and ship> 0 is triggered, it will wake up the thread working with the add () method. Some kind of recursion ... But be careful! There is a chance to catch DEADLOCK and go to an endless wait. (http://www.quizful.net/interview/java/Deadlock)
Going further ...
Class - ShipGenerator.Now we need something that will generate the ships, and will do it in an independent stream. Class ShipGenerator.
Source code public class ShipGenerator implements Runnable { private Tunnel tunnel; private int shipCount; public ShipGenerator(Tunnel tunnel, int shipCount) { this.tunnel = tunnel; this.shipCount = shipCount; } @Override public void run() { int count = 0; while (count < shipCount) { Thread.currentThread().setName(" Generator ship"); count++; tunnel.add(new Ship(getRandomSize(), getRandomType())); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } private Type getRandomType() { Random random = new Random(); return Type.values()[random.nextInt(Type.values().length)]; } private Size getRandomSize() { Random random = new Random(); return Size.values()[random.nextInt(Size.values().length)]; } }
In order to add the task of generating ships to the stream, you must implement the Runnable interface. At the input 2 parameters are Tunnel and the number of ships required for generations. Next, override the run () method. The run () method is the method that will be directly executed by the stream. There we will put the logic for generating ships. The logic is simple. We generate ships in a random way, using Random and put Tunnel in the shared resource.
A small digression. I said that it is necessary to add a task for the stream, as many mistakenly believe that by implementing the Runnable interface, they create a stream. In fact, they create a task for the thread. In other words, creating 1000 classes that implement the Runnable interface does not mean creating 1000 threads. This means creating 1000 tasks. And the number of threads that will perform these tasks will depend on the number of cores on your machine.Class - PierLoaderSource code public class PierLoader implements Runnable { private Tunnel tunnel; private Type shipType; public PierLoader(Tunnel tunnel, Type shipType) { this.tunnel = tunnel; this.shipType =shipType; } @Override public void run() { while (true) { try { Thread.currentThread().setName("Loader "+shipType);
The ship was generated, added to the tunnel now needs to be removed from the tunnel and load it with goods. To do this, we will create a PierLoader class, in other words a berth. As we know, we have 3 types of ships, so we need to create 3 types of berths working independently of each other. By analogy ShipGenerator we implement the Runnable interface. At input 2, parameters are Tunnel and Type (the type of ships that receives the given berth). Instead of add (), we call the get () method and the specific type of ships.
Note that I use sleep () only to emulate the work of loaders and ship generations (supposedly it takes time to load the goods and the ships must sail), and not to delay them in order to adjust them to other flows. Never do this, there are many reasons for this, the most obvious: what happens if the load on flow A increases and will work much longer (not 1 second, but 3 seconds as you expected) than you put stream B to sleep (for 1 second) so that wait for flow A? Even depending on the OS, the JVM can behave differently. As you know, sleep does not release the resource, but keeps it even during sleep, if wait (), which tells the thread to stop its work and release the lock.Class - MainSource code public class Main { public static void main(String[] args) { System.out.println("Available number of cores: " + Runtime.getRuntime().availableProcessors()); Tunnel tunnel = new Tunnel(); ShipGenerator shipGenerator = new ShipGenerator(tunnel, 10); PierLoader pierLoader1 = new PierLoader(tunnel, Type.DRESS); PierLoader pierLoader2 = new PierLoader(tunnel, Type.BANANA); PierLoader pierLoader3 = new PierLoader(tunnel, Type.MEAL); ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); service.execute(shipGenerator); service.execute(pierLoader1); service.execute(pierLoader2); service.execute(pierLoader3); service.shutdown(); } }
He is the easiest. Initialize the Tunnel class we created. Next, we initialize ShipGenerator, in the constructor we pass the Tunnel object and the number of ships necessary for the flow generation.
In the same way, we create 3 PierLoader objects for loading 3 ship types. We pass to the constructor our shared resource for threads called Tunnel. And the type of ship that PierLoader should work.
Then we give all this stuff to the ExecutorService class. First, create a thread pool for running tasks. You determine the number of threads using the Runtime.getRuntime (). AvailableProcessors () command. It returns the number of streams available to us in int format. It does not make sense to create 1000 threads if you have only 8 available cores. Since only 8 will be processed at a time.
Well that's all! Multithreaded application is ready. You can play with the number of tasks for the threads, the time of generation of ships and the loading of goods. The result will always be different.
Conclusion
In conclusion, I want to advise you to read the book "The Philosophy of Java" by Bruce Ekkel. Chapter “Parallel Execution” and “JAVA. Programming Methods ”N. Blinova. Chapter "Stream Execution." It describes the main practices of working with multithreading in Java and their complexity.
Before writing a multithreaded application, you need to draw it on a piece of paper, make a sketch of your project. Avoid shared resources. Shared resources are evil. It is better that the streams do not know and hear nothing about each other. To use ready API, than to do everything the pens. This will reduce development time and increase the reliability of your application. Test and profile the application. Perhaps you will find another bottleneck that slows down your application and does not reveal the full potential of your multithreaded application.