📜 ⬆️ ⬇️

Java Multi-Threading Labs: Parallel Copy

Good laboratory for multithreading (simple, understandable, non-trivial and useful in the national economy) - a rarity. I offer you one condition and four laboratory work on elementary multithreading in Java.

I also teach Scala for Java Developers on the udemy.com online education platform (equivalent to Coursera / EdX).

Conditions


This is the implementation of a single- stream byte copier from the InputStream to the OutputStream. Copying takes place in the stream that calls the copy (...) method.
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class CopyUtil { public static void copy(InputStream src, OutputStream dst)throws IOException{ try (InputStream src0 = src; OutputStream dst0 = dst) { int b; while ((b = src.read()) != -1) { dst.write(b); } } } } 


This is a single-threaded implementation of an array copier from the InputStream to the OutputStream. Copying takes place in the stream that calls the copy (...) method.
 import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class CopyUtil { public static void copy(InputStream src, OutputStream dst)throws IOException{ byte[] buff = new byte[128]; try (InputStream src0 = src; OutputStream dst0 = dst) { int count; while ((count = src.read(buff)) != -1) { dst.write(buff, 0, count); } } } } 

')
This is a multithreaded array copyer implementation from the InputStream to the OutputStream. We start to read and write on a separate new stream and connect them with a blocking limited queue to transfer data from the reader to the writer.
 import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicReference; public class CopyUtil { public static void copy(final InputStream src, final OutputStream dst) throws IOException { // reader-to-writer byte[]-channel final BlockingQueue<byte[]> buffer = new ArrayBlockingQueue<>(64); // exception-channel from reader/writer threads? final AtomicReference<Throwable> ex = new AtomicReference<>(); final ThreadGroup group = new ThreadGroup("read-write") { public void uncaughtException(Thread t, Throwable e) {ex.set(e);} }; // reader from 'src' Thread reader = new Thread(group, () -> { try (InputStream src0 = src) { // 'src0' for auto-closing while (true) { byte[] data = new byte[128]; // new data buffer int count = src.read(data, 1, 127); // read up to 127 bytes data[0] = (byte) count; // 0-byte is length-field buffer.put(data); // send to writer if (count == -1) {break;} // src empty } } catch (Exception e) {group.interrupt();} // interrupt writer }); reader.start(); // writer to 'dst' Thread writer = new Thread(group, () -> { try (OutputStream dst0 = dst) { // 'dst0' for auto-closing while (true) { byte[] data = buffer.take(); // get new data from reader if (data[0] == -1) {break;} // its last data dst.write(data, 1, data[0]); // } } catch (Exception e) {group.interrupt();} // interrupt writer }); writer.start(); // wait to complete read/write operations try { reader.join(); // wait for reader writer.join(); // wait for writer } catch (InterruptedException e) {throw new IOException(e);} if (ex.get() != null) {throw new IOException(ex.get());} } } 


You can use the following test to verify the copy is correct.
 import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.Random; public class Test { public static void main(String[] args) throws IOException { Random rnd = new Random(0); byte[] testData = new byte[64 * 1024]; rnd.nextBytes(testData); ByteArrayOutputStream dst = new ByteArrayOutputStream(); CopyUtil.copy(new ByteArrayInputStream(testData), dst); if (!Arrays.equals(testData, dst.toByteArray())) { throw new AssertionError("Lab decision wrong!"); } else { System.out.println("OK!"); } } } 


Exercise 1


In the last two-threaded solution, we start two streams - for reading and for writing. Rewrite the code so that the reading is carried out in a new stream, and the recording was made by the stream that caused copy (...). By the way, then it will be possible to get rid of a pair of join-s, since the stream at the receiving end of the buffer knows when the data has run out.

Task # 2


In the latter two-point solution, the reader constantly creates new byte [] buffers, passes them to the writer, and he sends them to the GC for consumption. Create a separate backlog of empty buffers from writer to reader.

Task # 3


In all three code examples we implemented data transfer from one reader - one writer. Implement a multi-threaded solution for transferring data from one reader to many writers. All writers receive identical data. The reader and writers each work in their own separate thread. Do not create separate copies of data for each writer - let the writers read from one to all buffers, but store these buffers simultaneously in different queues (a separate queue stretches from the reader to each writer).
 import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class CopyUtil { public static void copy(InputStream src, OutputStream ... dst) throws IOException { // some code } } 


Task # 4


Do the previous task # 3, but form not the star topology, where the reader and the rays from the center are to the writers, and the ring ring topology. In which the reader and writers line up in a circle and pass the buffer in a circle. The reader is the first writer, the first writer is the second, ... the last writer is the reader. And after that the reader can reuse the buffer.

Contacts


I am developing a Java Core programming course (online course) .
email: GolovachCourses@gmail.com
skype: GolovachCourses

Source: https://habr.com/ru/post/226559/


All Articles