📜 ⬆️ ⬇️

Experience of using the Puniverse Quasar library for actors

In the past, in 2017, there was a small project that almost ideally fell on the ideology of the actors , decided to experiment and try to use their implementation from the Parallel Universe. There was not much needed from the actors themselves - know yourself to keep the state and communicate with others, sometimes change by timer and do not fall.

The library seems to be quite mature, almost 3,000 stars on a githaba , more than 300 forks, a couple of recommendations on Habré ... Why not? Our project started in February 2017, wrote on Kotlin.



')

Briefly about the library


→ Developer
→ Documentation
→ GitHub

The main purpose of the library is lightweight streams (fibers), over which Go-like channels, Erlang-like actors, all sorts of reactive buns and other similar things “for asynchronous programming in Java and Kotlin” are implemented. Developed since 2013.

Build Setup


Since The project on the cotlin, the assembly will be on the gradle. An important point: for the work of lightweight streams, manipulations with Java byte-code (instrumentation), which are usually done with the help of a java-agent, are necessary. This quasar agent kindly provides. In practice, this means that:



First we need to add the quasar configuration:

configurations { quasar } 

Connect the dependencies:

 dependencies { compile("org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version") //   compile("co.paralleluniverse:quasar-core:$quasar_version:jdk8") //   quasar  compile("co.paralleluniverse:quasar-actors:$quasar_version") //   compile("co.paralleluniverse:quasar-kotlin:$quasar_version") //     quasar "co.paralleluniverse:quasar-core:$quasar_version:jdk8" //  java- //...   } 

We say that all gradle tasks should be run with a java-agent:

 tasks.withType(JavaForkOptions) { //uncomment if there are problems with fibers //systemProperty 'co.paralleluniverse.fibers.verifyInstrumentation', 'true' jvmArgs "-javaagent:${(++configurations.quasar.iterator())}" } 

The co.paralleluniverse.fibers.verifyInstrumentation co.paralleluniverse.fibers.verifyInstrumentation is responsible for checking in runtime correctness of bytecode manipulations. Of course, if this check is enabled, then everything starts to slow down :)

For the release, I also wrote a function for generating bat / sh files that launch the application with a java-agent. Nothing particularly interesting, just create a file and put the desired launch line there, with the correct version of quasar:

 def createRunScript(String scriptPath, String type) { def file = new File(scriptPath) file.createNewFile() file.setExecutable(true) def preamble = "@echo off" if (type == "sh") { preamble = "#!/bin/bash" } def deps = configurations.quasar.files.collect { "-Xbootclasspath/a:\"libs/${it.name}\"" }.join(" ") def flags = "-Dco.paralleluniverse.fibers.detectRunawayFibers=false" def quasarAgent = configurations.quasar.files.find { it.name.contains("quasar-core") }.name file.text = """$preamble java -classpath "./*.jar" -javaagent:"libs/$quasarAgent" $deps $flags -jar ${project.name}.jar """ } 

And task release, which creates a separate folder with everything you need:

 task release(dependsOn: ['build']) { group = "Build" def targetDir = "$buildDir/release" doLast { copy { from "$buildDir/libs/${project.name}.jar" into targetDir } copy { //   quasar,  javaagent    from(configurations.quasar.files) into "$targetDir/libs" }    copy { //   ,            from("src/main/resources/application.yml")      into targetDir    } //   createRunScript("$targetDir/${project.name}.bat", "bat") createRunScript("$targetDir/${project.name}.sh", "sh") } } 

See more example in my gist or in the official example for gradle. Theoretically, it seems like there is a possibility to change the byte code at the compilation stage and not use the java-agent. For this quasar has ant-task . However, even with the car of crutches and electrical tape, I could not adjust it.

Use of actors


Let us turn to the actors. In my understanding, the basis of the actors is a constant exchange of messages. However, out of the box, Quasar represents only the universal co.paralleluniverse.kotlin.Actor with the receive method. For a permanent exchange, we had to implement a small layer:

 abstract class BasicActor : Actor() { @Suspendable abstract fun onReceive(message: Any): Any? @Suspendable override fun doRun() { while (true) { receive { onReceive(it!!) } } } fun <T> reply(incomingMessage: RequestMessage<T>, result: T) { RequestReplyHelper.reply(incomingMessage, result) } } 

Which in fact only makes the eternal cycle of receiving messages.

In addition, with the transition to Kotlin 1.1, the library started having problems that have not been resolved so far (I quote a piece of their code):

 // TODO Was "(Any) -> Any?" but in 1.1 the compiler would call the base Java method and not even complain about ambiguity! Investigate and possibly report inline protected fun receive(proc: (Any?) -> Any?) { receive(-1, null, proc) } 

Because of this, our BasicActor had to make a wrapper for receive . Well, for clarity, the reply method and the extenstion ask method were made:

 @Suspendable fun <T> ActorRef<Any>.ask(message: RequestMessage<T>): T { return RequestReplyHelper.call(this, message) } 

Please note that in order to send a question message, it must necessarily be inherited from RequestMessage . This slightly limits the messages that can be exchanged in the question-answer format.

The @Suspendable annotation is very important - when using quasar, it must be hung up on all methods that apply to other actors or lightweight streams, otherwise you will get the SuspendExecution exception in SuspendExecution , and there will be no sense from “lightness”. From the point of view of the library's developers, it is obvious that this is necessary for the java-agent, but from the programmer-user’s point of view, this is inconvenient (it is possible to do this automatically , but it will be far from free).

Further, the implementation of the actor comes down to redefining the onReceive method, which can be done quite simply with when , doing something depending on the type of message:

 override fun onReceive(message: Any) = when (message) { is SomeMessage -> { // Do stuff val someotherActor = ActorRegistry.getActor("other actor") someotherActor.send(replyOrSomeCommand) } is SomeOtherMessage -> { process(message.parameter) //  smart-cast val replyFromGuru = guruActor.ask(Question("Does 42 equals 7*6?")) doSomething() } else -> throw UnknownMessageTypeException(message) } 

In order to get a reference to the actor, you need to refer to the static method ActorRegistry.getActor , which returns a reference to the actor by a string identifier.

It remains only to run the actors. To do this, you must first create an actor, then register, and finally run:

 val myActor = MySuperDuperActor() val actorRef = spawn(register(MY_ACTOR_ID, myActor)) 

(Why it was impossible to do it at once with one method is unclear).

Some problems


What do you think will happen if the actor falls with the exception?

And nothing. Well, the actor fell. Now he will not accept messages, but so what. Great default behavior!

In this regard, the observer actor had to be implemented, who monitors the state of the actors and drops the entire application if something went wrong (there were no requirements for fault tolerance, so they could afford it):

 class WatcherActor : BasicActor(), ILogging by Logging<WatcherActor>() { override fun handleLifecycleMessage(lcm: LifecycleMessage): Any? { return onReceive(lcm) } override fun onReceive(message: Any): Any? = when (message) { is ExitMessage -> { log.fatal("Actor ${message.actor.name} got an unhandled exception. Terminating the app. Reason: ", message.getCause()) exit(-2) } else -> { log.fatal("Got unknown message for WatcherActor: $message. Terminating the app") exit(-1) } } } 

But for this you have to run the actors with reference to the observer:

 @Suspendable fun registerAndWatch(actorId: String, actorObject: Actor<*, *>): ActorRef<*> { val ref = spawn(register(actorId, actorObject)) watcherActor.link(ref) return ref } 

In general, according to impressions, many moments were inconvenient or not obvious. Perhaps, “we just do not know how to cook” Quasar, but after Akka some moments look wildly. For example, a method for implementing a query by the type of ask from Akka, which is buried somewhere in the utilities and still requires linking the types of the message-question and the message-answer (although on the other hand, this is a good feature that reduces the number of potential errors).

Another serious problem arose with the completion of the actor. What are the standard methods for this? Maybe destroy, unspawn or unregister? And no. Only crutches:

 fun <T : Actor<Any?, Any?>> T.finish() { this.ref().send(ExitMessage(this.ref(), null)) this.unregister() } 

There is, of course, ActorRegistry.clear() , which removes ALL actors, but if you get into his gut, you can see the following:

 public static void clear() { if (!Debug.isUnitTest()) throw new IllegalStateException("Must only be called in unit tests"); if (registry instanceof LocalActorRegistry) ((LocalActorRegistry) registry).clear(); else throw new UnsupportedOperationException(); } 

Yeah, only in unit tests can be called. And how do they define it?

 boolean isUnitTest = false; StackTraceElement[] stack = Thread.currentThread().getStackTrace(); for (StackTraceElement ste : stack) { if (ste.getClassName().startsWith("org.junit") || ste.getClassName().startsWith("junit.framework") || ste.getClassName().contains("JUnitTestClassExecuter")) { isUnitTest = true; break; } } unitTest = isUnitTest; 

Those. if you suddenly use no junit - goodbye.

Wait a minute, wait, here is the ActorRegistry.shutdown() method, it will surely cause the closure of every actor! We look at the implementation of the abstract method in LocalActorRegistry :

  @Override public void shutdown() { } 


One more thing, the library can mysteriously fall with some NPE for no apparent reason / explanation:

https://github.com/puniverse/quasar/issues/182

In addition, if you use third-party libraries, problems may arise with them. For example, in one of the dependencies, we had a library that communicated with iron (not very high-quality), in which was Thread.sleep() . Quasar didn’t like it very much, and he spat on the logs with exceptions: they say, Thread.sleep() blocks the stream and this will have a bad effect on performance (see details here ). At the same time, there are no specific recipes for how to fix it (except for stupidly disabling the logging of such errors with the system flag) or at least “understand and forgive” only for third-party libraries.

And finally, Kotlin support leaves much to be desired - for example, checking the java-agent will swear at some of its methods (although the application itself can continue to work without visible problems):

https://github.com/puniverse/quasar/issues/238
https://github.com/puniverse/quasar/issues/288

In general, the work had to be debugged by logs - and that was pretty sad.

Conclusion


In general, the impressions of the library are neutral. According to impressions, the actors in it are implemented at the level of “demonstrating an idea” - it seems to work, but there are problems that usually come up during the first combat use. Although the library has potential.

We are still “very lucky”: an attentive reader might have noticed that the last release was in December 2016 (according to the documentation) or in July 2017 (according to the github). And in the company's blog, the last entry is generally in July 2016 (with the intriguing title Why Writing Correct Software Is Hard ). In general, the library is more dead than alive, so it’s better not to use it in production.

PS Here the attentive reader may also ask - what, then, did Akka not use? In principle, there were no criminal problems with it (although the chain of Kotlin-Java-Scala was actually obtained), but since the project was not critical, we decided to try the “native” solution.

Source: https://habr.com/ru/post/350182/


All Articles