<dependency> <groupId>org.codehaus.gpars</groupId> <artifactId>gpars</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-all</artifactId> <version>2.2.2</version> </dependency>
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("receive: " + msg); } }.start(); actor.send("Hello!"); System.in.read(); } } >> receive: Hello!
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("ping: " + msg); getSender().send(msg.toUpperCase()); } }.start(); System.out.println("pong: " + actor.sendAndWait("Hello!")); } } >> ping: Hello! >> pong: HELLO!
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("ping: " + msg); getSender().send(msg.toUpperCase()); } }.start(); actor.sendAndContinue("Hello!", new MessagingRunnable<String>() { protected void doRun(String msg) { System.out.println("pong: " + msg); } }); System.in.read(); } } >> ping: Hello! >> pong: HELLO!
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String arg) { getSender().send(arg.toUpperCase()); } public void onMessage(Long arg) { getSender().send(1000 + arg); } }.start(); System.out.println("42.0 -> " + actor.sendAndWait(42.0)); } } >> Hello! -> HELLO! >> 42 -> 1042
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String arg) { getSender().send(arg.toUpperCase()); } public void onMessage(Long arg) { getSender().send(1000 + arg); } }.start(); System.out.println("42.0 -> " + actor.sendAndWait(42.0)); } } >> An exception occurred in the Actor thread Actor Thread 1 >> groovy.lang.MissingMethodException: No signature of method: >> net.golovach.Demo_4$1.onMessage() is applicable for argument types: (java.lang.Double) values: [42.0] >> Possible solutions: onMessage(java.lang.Long), onMessage(java.lang.String) >> at org.codehaus.groovy.runtime.ScriptBytecodeAdapter ... >> ...
import groovyx.gpars.actor.*; import java.util.ArrayList; import java.util.List; public class StatelessActorTest { public static void main(String[] args) throws InterruptedException { Actor actor = new DynamicDispatchActor() { private final List<Double> state = new ArrayList<>(); public void onMessage(final Double msg) { state.add(msg); reply(state); } }.start(); System.out.println("answer: " + actor.sendAndWait(1.0)); System.out.println("answer: " + actor.sendAndWait(2.0)); System.out.println("answer: " + actor.sendAndWait(3.0)); System.out.println("answer: " + actor.sendAndWait(4.0)); System.out.println("answer: " + actor.sendAndWait(5.0)); } } >> answer: [1.0] >> answer: [1.0, 2.0] >> answer: [1.0, 2.0, 3.0] >> answer: [1.0, 2.0, 3.0, 4.0] >> answer: [1.0, 2.0, 3.0, 4.0, 5.0]
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(Arrays.asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(Arrays.asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new MessagingRunnable<Object>(this) { protected void doRun(final Object msg) { System.out.println("react: " + msg); } }); } }); } } } >> react: A >> react: 1.0 >> react: [1, 2, 3] >> react: B >> react: 2.0 >> react: [4, 5, 6]
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import java.util.List; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new MessagingRunnable<String>(this) { protected void doRun(final String msg) { System.out.println("Stage #0: " + msg); react(new MessagingRunnable<Double>() { protected void doRun(final Double msg) { System.out.println(" Stage #1: " + msg); react(new MessagingRunnable<List<Integer>>() { protected void doRun(final List<Integer> msg) { System.out.println(" Stage #2: " + msg + "\n"); } }); } }); } }); } }); } } } >> Stage #0: A >> Stage #1: 1.0 >> Stage #2: [1, 2, 3] >> >> Stage #0: B >> Stage #1: 2.0 >> Stage #2: [4, 5, 6]
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import java.util.List; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new Stage0(MyStatefulActor.this)); } }); } } private static class Stage0 extends MessagingRunnable<String> { private final DefaultActor owner; private Stage0(DefaultActor owner) {this.owner = owner;} protected void doRun(final String msg) { System.out.println("Stage #0: " + msg); owner.react(new Stage1(owner)); } } private static class Stage1 extends MessagingRunnable<Double> { private final DefaultActor owner; private Stage1(DefaultActor owner) {this.owner = owner;} protected void doRun(final Double msg) { System.out.println(" Stage #1: " + msg); owner.react(new Stage2()); } } private static class Stage2 extends MessagingRunnable<List<Integer>> { protected void doRun(final List<Integer> msg) { System.out.println(" Stage #2: " + msg + "\n"); } } }
// START // ----- // | // | // | // | +--------+ // +->| Stage0 | ---String----+ // +--------+ | // ^ v // | +--------+ // | | Stage1 | // List<Integer> +--------+ // | | // | +--------+ Double // +--| Stage2 |<-------+ // +--------+
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import groovyx.gpars.actor.impl.MessageStream; import static java.util.concurrent.TimeUnit.MILLISECONDS; public class Timer<T> extends DefaultActor { private final long timeout; private final T timeoutMsg; private final MessageStream replyTo; public Timer(long timeout, T timeoutMsg, MessageStream replyTo) { this.timeout = timeout; this.timeoutMsg = timeoutMsg; this.replyTo = replyTo; } protected void act() { loop(new Runnable() { public void run() { react(timeout, MILLISECONDS, new MessagingRunnable() { protected void doRun(Object argument) { if (Actor.TIMEOUT.equals(argument)) { replyTo.send(timeoutMsg); } terminate(); } }); } }); } }
import groovyx.gpars.actor.Actor; import groovyx.gpars.actor.impl.MessageStream; public class TimerDemo { public static void main(String[] args) throws Exception { Actor timerX = new Timer<>(1000, "X", new MessageStream() { public MessageStream send(Object msg) { System.out.println("timerX send timeout message: '" + msg + "'"); return this; } }).start(); Actor timerY = new Timer<>(1000, "Y", new MessageStream() { public MessageStream send(Object msg) { System.out.println("timerY send timeout message: '" + msg + "'"); return this; } }).start(); Thread.sleep(500); timerX.send("KILL"); System.in.read(); } } >> timerY send timeout message: 'Y'
//time:0 // // T-0 --"A"-----> +-------+ generationId=0 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[]
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:0.001 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A"] // T-2 +-------+ replyToList=[T-0]
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:25 +-----+ // // T-0 +-------+ generationId=0 // T-1 ---"B"----> |Batcher| argList=["A"] // T-2 +-------+ replyToList=[T-0]
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:25.001 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B"] // T-2 +-------+ replyToList=[T-0,T-1]
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:50 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B"] // T-2 ----"C"---> +-------+ replyToList=[T-0,T-1]
// +-----+ timeoutMsg=0 // +-"KILL"->|Timer| timeout=100 //time:50.001 | +-----+ // | // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B","C"] // T-2 +-------+ replyToList=[T-0,T-1,T-2]
//time:50.002 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ argList=["A","B","C"] // |anonymous| replyToList=[T-0,T-1,T-2] // +---------+
//time:50.003 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ resultList=["res#A","res#B","res#B"] // |anonymous| replyToList=[T-0,T-1,T-2] // +---------+
//time:50.004 // // T-0 <-----------+ +-------+ generationId=1 // T-1 <---------+ | |Batcher| argList=[] // T-2 <-------+ | | +-------+ replyToList=[] // | | | // | | +---"res#A"--- +---------+ // | +---"res#B"----- |anonymous| // +--"res#C"-------- +---------+
//time:50.005 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[]
//time:75 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 ----"D"---> +-------+ replyToList=[]
// +-----+ timeoutMsg=1 // |Timer| timeout=100 //time:75.001 +-----+ // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2]
// +--"TIMEOUT"-- // | // v // +-----+ timeoutMsg=1 // |Timer| timeout=100 //time:175 +-----+ // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2]
// +-----+ timeoutMsg=1 // +----1-----|Timer| timeout=100 //time:175.001 | +-----+ // v // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2]
//time:175.002 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ argList=["D"] // |anonymous| replyToList=[T-2] // +---------+
//time:175.003 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ resultList=["res#D"] // |anonymous| replyToList=[T-2] // +---------+
//time:175.004 // // T-0 +-------+generationId=2 // T-1 |Batcher|argList=[] // T-2 <-------+ +-------+replyToList=[] // | // | +---------+ // +--"res#C"----- |anonymous| // +---------+
//time:175.005 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[]
import java.util.List; public interface BatchProcessor<ARG, RES> { List<RES> onBatch(List<ARG> argList) throws Exception; }
import groovyx.gpars.actor.*; import groovyx.gpars.actor.impl.MessageStream; import java.util.*; public class Batcher<ARG, RES> extends DynamicDispatchActor { // fixed parameters private final BatchProcessor<ARG, RES> processor; private final int maxBatchSize; private final long batchWaitTimeout; // current state private final List<ARG> argList = new ArrayList<>(); private final List<MessageStream> replyToList = new ArrayList<>(); private long generationId = 0; private Actor lastTimer; public Batcher(BatchProcessor<ARG, RES> processor, int maxBatchSize, long batchWaitTimeout) { this.processor = processor; this.maxBatchSize = maxBatchSize; this.batchWaitTimeout = batchWaitTimeout; } public void onMessage(final ARG elem) { argList.add(elem); replyToList.add(getSender()); if (argList.size() == 1) { lastTimer = new Timer<>(batchWaitTimeout, ++generationId, this).start(); } else if (argList.size() == maxBatchSize) { lastTimer.send("KILL"); lastTimer = null; nextGeneration(); } } public void onMessage(final long timeOutId) { if (generationId == timeOutId) {nextGeneration();} } private void nextGeneration() { new DynamicDispatchActor() { public void onMessage(final Work<ARG, RES> work) throws Exception { List<RES> resultList = work.batcher.onBatch(work.argList); for (int k = 0; k < resultList.size(); k++) { work.replyToList.get(k).send(resultList.get(k)); } terminate(); } }.start().send(new Work<>(processor, new ArrayList<>(argList), new ArrayList<>(replyToList))); argList.clear(); replyToList.clear(); generationId = generationId + 1; } private static class Work<ARG, RES> { public final BatchProcessor<ARG, RES> batcher; public final List<ARG> argList; public final List<MessageStream> replyToList; public Work(BatchProcessor<ARG, RES> batcher, List<ARG> argList, List<MessageStream> replyToList) { this.batcher = batcher; this.argList = argList; this.replyToList = replyToList; } } }
import groovyx.gpars.actor.Actor; import java.io.IOException; import java.util.*; import java.util.concurrent.*; import static java.util.concurrent.Executors.newCachedThreadPool; public class BatcherDemo { public static final int BATCH_SIZE = 3; public static final long BATCH_TIMEOUT = 100; public static void main(String[] args) throws InterruptedException, IOException { final Actor actor = new Batcher<>(new BatchProcessor<String, String>() { public List<String> onBatch(List<String> argList) { System.out.println("onBatch(" + argList + ")"); ArrayList<String> result = new ArrayList<>(argList.size()); for (String arg : argList) { result.add("res#" + arg); } return result; } }, BATCH_SIZE, BATCH_TIMEOUT).start(); ExecutorService exec = newCachedThreadPool(); exec.submit(new Callable<Void>() { // T-0 public Void call() throws Exception { System.out.println(actor.sendAndWait(("A"))); return null; } }); exec.submit(new Callable<Void>() { // T-1 public Void call() throws Exception { Thread.sleep(25); System.out.println(actor.sendAndWait(("B"))); return null; } }); exec.submit(new Callable<Void>() { // T-2 public Void call() throws Exception { Thread.sleep(50); System.out.println(actor.sendAndWait(("C"))); Thread.sleep(25); System.out.println(actor.sendAndWait(("D"))); return null; } }); exec.shutdown(); } } >> onBatch([A, B, C]) >> res#A >> res#B >> res#C >> onBatch([D]) >> res#D
Source: https://habr.com/ru/post/217899/
All Articles