📜 ⬆️ ⬇️

Introduction to Actors Based on Java / GPars, Part I

The API of the GPars library and the solution of a multi-threaded problem of medium complexity are briefly discussed, the results of which can be useful in the “national economy”.

This article was written in the course of researching various actor libraries available to a Java programmer in preparation for reading the course Multicore programming in Java .

I also teach Scala for Java Developers on the udemy.com online education platform (equivalent to Coursera / EdX).
')
This is the first article in a series of articles whose goal is to compare the API, speed, and implementation of Akka actors with implementations in other libraries on a model problem. This article offers this task and solution on GPars.

GPars is a Clojure library with extensive support for various parallel computing approaches.
GPars pros


"Installation" GPars


Connect to Maven GPars and Groovy
<dependency> <groupId>org.codehaus.gpars</groupId> <artifactId>gpars</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-all</artifactId> <version>2.2.2</version> </dependency> 


Without Maven, just download from the GPars-1.1.0 ( sources ) and Groovy-2.2.2 ( sources ) repository and connect to the project.

Stateless actor


Let's start with simple examples.
We send the message to the actor.
 import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("receive: " + msg); } }.start(); actor.send("Hello!"); System.in.read(); } } >> receive: Hello! 


We send the message and we wait for the answer
 import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("ping: " + msg); getSender().send(msg.toUpperCase()); } }.start(); System.out.println("pong: " + actor.sendAndWait("Hello!")); } } >> ping: Hello! >> pong: HELLO! 


We send the message and we hang up on the answer asynchronous callback
 import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("ping: " + msg); getSender().send(msg.toUpperCase()); } }.start(); actor.sendAndContinue("Hello!", new MessagingRunnable<String>() { protected void doRun(String msg) { System.out.println("pong: " + msg); } }); System.in.read(); } } >> ping: Hello! >> pong: HELLO! 


Making pattern matching by type of received message
 import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String arg) { getSender().send(arg.toUpperCase()); } public void onMessage(Long arg) { getSender().send(1000 + arg); } }.start(); System.out.println("42.0 -> " + actor.sendAndWait(42.0)); } } >> Hello! -> HELLO! >> 42 -> 1042 


Pattern matching did not find a suitable handler.
 import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String arg) { getSender().send(arg.toUpperCase()); } public void onMessage(Long arg) { getSender().send(1000 + arg); } }.start(); System.out.println("42.0 -> " + actor.sendAndWait(42.0)); } } >> An exception occurred in the Actor thread Actor Thread 1 >> groovy.lang.MissingMethodException: No signature of method: >> net.golovach.Demo_4$1.onMessage() is applicable for argument types: (java.lang.Double) values: [42.0] >> Possible solutions: onMessage(java.lang.Long), onMessage(java.lang.String) >> at org.codehaus.groovy.runtime.ScriptBytecodeAdapter ... >> ... 


What can be seen
- “pattern matching” makes the selection of a suitable overloaded (overloaded) version of the onMessage (<one-arg>) method, if there is none, then we “get” an exception
- the actors work on the basis of a pool of threads, “demons”, so we need to somehow suspend the work of the main () method (I used System.in.read ()) in order to prevent the premature termination of the JVM
- using the reply () method as an example, we see that when inheriting from DynamicDispatchActor, the actor’s namespace contains many methods (reply, replyIfExists, getSender, terminate, ...)

Although the authors of GPars call the heirs of the DynamicDispatchActor class stateless actor , these are regular instances of java classes that can have mutated fields and store their state in them. Show it
 import groovyx.gpars.actor.*; import java.util.ArrayList; import java.util.List; public class StatelessActorTest { public static void main(String[] args) throws InterruptedException { Actor actor = new DynamicDispatchActor() { private final List<Double> state = new ArrayList<>(); public void onMessage(final Double msg) { state.add(msg); reply(state); } }.start(); System.out.println("answer: " + actor.sendAndWait(1.0)); System.out.println("answer: " + actor.sendAndWait(2.0)); System.out.println("answer: " + actor.sendAndWait(3.0)); System.out.println("answer: " + actor.sendAndWait(4.0)); System.out.println("answer: " + actor.sendAndWait(5.0)); } } >> answer: [1.0] >> answer: [1.0, 2.0] >> answer: [1.0, 2.0, 3.0] >> answer: [1.0, 2.0, 3.0, 4.0] >> answer: [1.0, 2.0, 3.0, 4.0, 5.0] 


Statefull actor


Introducing the stateless / statefull division, the authors mean that Statefull Actor allows you to seamlessly create implementations of the State pattern. Consider a simple example (inheritors of DefaultActor - Statefull Actors)
 import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(Arrays.asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(Arrays.asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new MessagingRunnable<Object>(this) { protected void doRun(final Object msg) { System.out.println("react: " + msg); } }); } }); } } } >> react: A >> react: 1.0 >> react: [1, 2, 3] >> react: B >> react: 2.0 >> react: [4, 5, 6] 


However, the promised implementation of the State pattern does not “smell” at all. Let's go on this side (Java is not the best language for such tricks; Clojure / Scala doesn’t have this code much more compact)
 import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import java.util.List; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new MessagingRunnable<String>(this) { protected void doRun(final String msg) { System.out.println("Stage #0: " + msg); react(new MessagingRunnable<Double>() { protected void doRun(final Double msg) { System.out.println(" Stage #1: " + msg); react(new MessagingRunnable<List<Integer>>() { protected void doRun(final List<Integer> msg) { System.out.println(" Stage #2: " + msg + "\n"); } }); } }); } }); } }); } } } >> Stage #0: A >> Stage #1: 1.0 >> Stage #2: [1, 2, 3] >> >> Stage #0: B >> Stage #1: 2.0 >> Stage #2: [4, 5, 6] 


Well, or let's get rid of this terrible nesting of anonymous classes and “materialize states”
 import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import java.util.List; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new Stage0(MyStatefulActor.this)); } }); } } private static class Stage0 extends MessagingRunnable<String> { private final DefaultActor owner; private Stage0(DefaultActor owner) {this.owner = owner;} protected void doRun(final String msg) { System.out.println("Stage #0: " + msg); owner.react(new Stage1(owner)); } } private static class Stage1 extends MessagingRunnable<Double> { private final DefaultActor owner; private Stage1(DefaultActor owner) {this.owner = owner;} protected void doRun(final Double msg) { System.out.println(" Stage #1: " + msg); owner.react(new Stage2()); } } private static class Stage2 extends MessagingRunnable<List<Integer>> { protected void doRun(final List<Integer> msg) { System.out.println(" Stage #2: " + msg + "\n"); } } } 

Yes, yes, I fully agree with you, Java is an extremely verbose language.

This is how the transition diagram looks like (we didn’t do a fork in the argument)
 // START // ----- // | // | // | // | +--------+ // +->| Stage0 | ---String----+ // +--------+ | // ^ v // | +--------+ // | | Stage1 | // List<Integer> +--------+ // | | // | +--------+ Double // +--| Stage2 |<-------+ // +--------+ 


Timer


To solve my problem, I will need a timer - something that can be programmed to notify me of the end of a certain period of time. In "normal" Java, we use java.util.concurrent. ScheduledThreadPoolExecutor or java.util.Timer at worst. But we are in the world of actors!
This is a Statefull Actor that hangs waiting for a message in the react () method with a timeout. If no message arrives during this period of time, the GPars infrastructure sends us the message Actor.TIMEOUT (this is just the “TIMEOUT” string) and we “return” the message from the timeoutMsg constructor to our creator. If you want to "turn off" the timer - send him any other message (I will send him the string "KILL")
 import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import groovyx.gpars.actor.impl.MessageStream; import static java.util.concurrent.TimeUnit.MILLISECONDS; public class Timer<T> extends DefaultActor { private final long timeout; private final T timeoutMsg; private final MessageStream replyTo; public Timer(long timeout, T timeoutMsg, MessageStream replyTo) { this.timeout = timeout; this.timeoutMsg = timeoutMsg; this.replyTo = replyTo; } protected void act() { loop(new Runnable() { public void run() { react(timeout, MILLISECONDS, new MessagingRunnable() { protected void doRun(Object argument) { if (Actor.TIMEOUT.equals(argument)) { replyTo.send(timeoutMsg); } terminate(); } }); } }); } } 


An example of using a timer.
I create two timers, timerX and timerY, which with a delay of 1000ms will send me messages “X” and “Y”, respectively. But after 500ms I changed my mind and “hit” timerX.
 import groovyx.gpars.actor.Actor; import groovyx.gpars.actor.impl.MessageStream; public class TimerDemo { public static void main(String[] args) throws Exception { Actor timerX = new Timer<>(1000, "X", new MessageStream() { public MessageStream send(Object msg) { System.out.println("timerX send timeout message: '" + msg + "'"); return this; } }).start(); Actor timerY = new Timer<>(1000, "Y", new MessageStream() { public MessageStream send(Object msg) { System.out.println("timerY send timeout message: '" + msg + "'"); return this; } }).start(); Thread.sleep(500); timerX.send("KILL"); System.in.read(); } } >> timerY send timeout message: 'Y' 


Problem statement and solution scheme


Consider the following very general problem.
1. We have many threads that quite often call some function.
2. This function has two options: processing one argument and processing an argument list.
3. This function is such that the processing of the argument list consumes less system resources than the sum of each individual processing.
4. The task is to place some Batcher between the threads and the function, which collects the arguments from the threads in a “bundle”, transfers the functions, it processes the list, Batcher “distributes” the results to the senders.
5. Batcher transmits the list of arguments in two cases: they collected a “pack” of sufficient size or after a waiting time, during which it was not possible to assemble a full “pack”, but it was time for the threads to return the results.

Let's look at the solution scheme.
Timeout 100ms, the maximum size of the "pack" - 3 arguments

At time 0, flow T-0 sends the argument “A”. Batcher is in "pure" state, generation 0
 //time:0 // // T-0 --"A"-----> +-------+ generationId=0 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] 


After a moment, Batcher knows that it is necessary to short “A” and return T-0 to the stream. A timer is set for generation 0
 // +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:0.001 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A"] // T-2 +-------+ replyToList=[T-0] 


At time of 25 milliseconds, the T-1 thread sends “B” to the processing.
 // +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:25 +-----+ // // T-0 +-------+ generationId=0 // T-1 ---"B"----> |Batcher| argList=["A"] // T-2 +-------+ replyToList=[T-0] 


After a moment, Batcher knows that it is necessary to shortcut “A” and “B” and return it to the T-0 and T-1 streams.
 // +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:25.001 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B"] // T-2 +-------+ replyToList=[T-0,T-1] 


At time of 50 milliseconds, the T-2 stream sends “C” to processing.
 // +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:50 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B"] // T-2 ----"C"---> +-------+ replyToList=[T-0,T-1] 


After a moment, Batcher knows that it is necessary to shortcut “A”, “B” and “C” and return to the T-0, T-1 and T-2 streams. Finds out that the "pack" is full and "kills" the timer
 // +-----+ timeoutMsg=0 // +-"KILL"->|Timer| timeout=100 //time:50.001 | +-----+ // | // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B","C"] // T-2 +-------+ replyToList=[T-0,T-1,T-2] 


After a moment, Batcher gives the data to the computation in a separate actor (anonimous), clears the state and changes the generation from 0 to 1
 //time:50.002 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ argList=["A","B","C"] // |anonymous| replyToList=[T-0,T-1,T-2] // +---------+ 


After a moment (for the “storyboard” I will assume that the calculations are instantaneous), the anonymous actor performs the action on the argument list [“A”, “B”, “C”] -> [“res # A”, “res # B”, “ res # C ”]
 //time:50.003 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ resultList=["res#A","res#B","res#B"] // |anonymous| replyToList=[T-0,T-1,T-2] // +---------+ 


After a moment, an anonymous actor distributes the results of calculations to threads
 //time:50.004 // // T-0 <-----------+ +-------+ generationId=1 // T-1 <---------+ | |Batcher| argList=[] // T-2 <-------+ | | +-------+ replyToList=[] // | | | // | | +---"res#A"--- +---------+ // | +---"res#B"----- |anonymous| // +--"res#C"-------- +---------+ 


After a moment, the system returns to its original “clean” state.
 //time:50.005 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] 


Later, at the time point, the 75th T-2 stream passes the “D” to the calculation.
 //time:75 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 ----"D"---> +-------+ replyToList=[] 


After a moment, the Batcher knows that it is necessary to shortcut “D” and return to the T-2 stream. In addition, a timer has been launched for generation 1
 // +-----+ timeoutMsg=1 // |Timer| timeout=100 //time:75.001 +-----+ // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2] 


After 100ms (at the time of 175ms), the infrastructure of GPars notifies the timer about the expiration of the waiting period.
 // +--"TIMEOUT"-- // | // v // +-----+ timeoutMsg=1 // |Timer| timeout=100 //time:175 +-----+ // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2] 


A moment later, the timer notifies Batcher that generation 1 has timed out and ends suicide by terminating ()
 // +-----+ timeoutMsg=1 // +----1-----|Timer| timeout=100 //time:175.001 | +-----+ // v // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2] 


An anonymous actor is created that performs calculations on the argument list (there are only 1 argument). The generation from 1 changes to 2
 //time:175.002 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ argList=["D"] // |anonymous| replyToList=[T-2] // +---------+ 


Actor has done the job
 //time:175.003 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ resultList=["res#D"] // |anonymous| replyToList=[T-2] // +---------+ 


Actor gives the result
 //time:175.004 // // T-0 +-------+generationId=2 // T-1 |Batcher|argList=[] // T-2 <-------+ +-------+replyToList=[] // | // | +---------+ // +--"res#C"----- |anonymous| // +---------+ 


The system is in its original "clean" state
 //time:175.005 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] 


The solution of the problem



BatchProcessor - interface "functions". batch processing enabled
 import java.util.List; public interface BatchProcessor<ARG, RES> { List<RES> onBatch(List<ARG> argList) throws Exception; } 


Batcher is a class that packs arguments. Core solution
 import groovyx.gpars.actor.*; import groovyx.gpars.actor.impl.MessageStream; import java.util.*; public class Batcher<ARG, RES> extends DynamicDispatchActor { // fixed parameters private final BatchProcessor<ARG, RES> processor; private final int maxBatchSize; private final long batchWaitTimeout; // current state private final List<ARG> argList = new ArrayList<>(); private final List<MessageStream> replyToList = new ArrayList<>(); private long generationId = 0; private Actor lastTimer; public Batcher(BatchProcessor<ARG, RES> processor, int maxBatchSize, long batchWaitTimeout) { this.processor = processor; this.maxBatchSize = maxBatchSize; this.batchWaitTimeout = batchWaitTimeout; } public void onMessage(final ARG elem) { argList.add(elem); replyToList.add(getSender()); if (argList.size() == 1) { lastTimer = new Timer<>(batchWaitTimeout, ++generationId, this).start(); } else if (argList.size() == maxBatchSize) { lastTimer.send("KILL"); lastTimer = null; nextGeneration(); } } public void onMessage(final long timeOutId) { if (generationId == timeOutId) {nextGeneration();} } private void nextGeneration() { new DynamicDispatchActor() { public void onMessage(final Work<ARG, RES> work) throws Exception { List<RES> resultList = work.batcher.onBatch(work.argList); for (int k = 0; k < resultList.size(); k++) { work.replyToList.get(k).send(resultList.get(k)); } terminate(); } }.start().send(new Work<>(processor, new ArrayList<>(argList), new ArrayList<>(replyToList))); argList.clear(); replyToList.clear(); generationId = generationId + 1; } private static class Work<ARG, RES> { public final BatchProcessor<ARG, RES> batcher; public final List<ARG> argList; public final List<MessageStream> replyToList; public Work(BatchProcessor<ARG, RES> batcher, List<ARG> argList, List<MessageStream> replyToList) { this.batcher = batcher; this.argList = argList; this.replyToList = replyToList; } } } 


BatcherDemo - demonstration of the work of the class Batcher. Coincides with the schematic plan.
 import groovyx.gpars.actor.Actor; import java.io.IOException; import java.util.*; import java.util.concurrent.*; import static java.util.concurrent.Executors.newCachedThreadPool; public class BatcherDemo { public static final int BATCH_SIZE = 3; public static final long BATCH_TIMEOUT = 100; public static void main(String[] args) throws InterruptedException, IOException { final Actor actor = new Batcher<>(new BatchProcessor<String, String>() { public List<String> onBatch(List<String> argList) { System.out.println("onBatch(" + argList + ")"); ArrayList<String> result = new ArrayList<>(argList.size()); for (String arg : argList) { result.add("res#" + arg); } return result; } }, BATCH_SIZE, BATCH_TIMEOUT).start(); ExecutorService exec = newCachedThreadPool(); exec.submit(new Callable<Void>() { // T-0 public Void call() throws Exception { System.out.println(actor.sendAndWait(("A"))); return null; } }); exec.submit(new Callable<Void>() { // T-1 public Void call() throws Exception { Thread.sleep(25); System.out.println(actor.sendAndWait(("B"))); return null; } }); exec.submit(new Callable<Void>() { // T-2 public Void call() throws Exception { Thread.sleep(50); System.out.println(actor.sendAndWait(("C"))); Thread.sleep(25); System.out.println(actor.sendAndWait(("D"))); return null; } }); exec.shutdown(); } } >> onBatch([A, B, C]) >> res#A >> res#B >> res#C >> onBatch([D]) >> res#D 


Conclusion


In my view, actors are good at programming multi-threaded primitives, which are finite automata with a complex transition diagram, which, among other things, may depend on the incoming arguments.

Some examples of this article are variations of the code found on the web in various places, including gpars.org/guide .

In the second part we


UPD
Thanks for the comment Fury :
GPars is written in a mix of Java + Groovy.
The source code shows that packages are written in Groovy
- groovyx.gpars.csp. *
- groovyx.gpars.pa. *
- groovyx.gpars. * (Partially)

Contacts



I do Java online training (here are the programming courses ) and publish some of the training materials as part of the reworking of the Java Core course . You can see videos of lectures in the audience on the youtube channel , perhaps the video of the channel is better organized in this article .

skype: GolovachCourses
email: GolovachCourses@gmail.com

Source: https://habr.com/ru/post/217899/


All Articles