Good afternoon, habrauzer!
We have the task of linking various services and existing systems into managed processes. The speed is needed not by space (that is, not by the response of stock quotes to create a response), but there are a lot of processes and components (systems) that need to be used also appear fairly decent. I do not want to do p2p binding. I want something beautiful and manageable.
After reviewing the market, it was decided to make a replica based on Amazon Simple Workflow, since we cannot use it directly. The properties of the framework that suit us:
- Low threshold of start (a good programmer is expensive now). The low threshold here is more in terms of the beginning of programming, since everything is done at a high level - almost at the level of interaction with interfaces. But in order to adequately manage the process asynchronously, one must of course increase the experience
- When saving the parameters of the tasks and the results of the execution, we get the analysis and the basis for the regression testing of executors of the tasks of the process
- Concentration of process control logic in specific locations (in the Coordinator). This may not be obvious at first glance, but this is a great blessing in comparison with a possible alternative, when each actor has his own logic - what other components should be further called up (transfer control). Often leads to the complexity of the system and the inability to reuse components
This is the minimum that I would like, but as practice shows, there are more pluses. The project was named
Taskurotta in honor of “Task” - a task, and a Finnish gopher, who still does not see, but he is. Open source is available on
GitHub . The project was implemented using Hazelcast to form a common memory space and runtime between servers, Dropwizard for fast and convenient implementation of REST services and friends from Amazon who were the first and made an excellent product inspiring us to develop our own. With the documentation is still difficult, but we will soon fix it.
')
Let's move from theory to what is now on a real example.
Suppose we need to develop an application that sends a string message to the user. At the entrance we get a user id and character set. From his profile (by Id) we get data about preferences - to receive messages by email or phone number. Phone number and email are also available in the profile. Next, send the message to the desired transport. In case the sending failed (due to a wrong address or number), it is necessary to mark this in the profile to prevent retry attempts in the future.
Let's add to this non-functional requirement that the services sending messages already exist are on other subnets and need to be reused.
PS: All the source code of the example described is also available on the GitHub
taskurotta \ taskurotta-getstartedTaskurotta allows you to implement system components (Actors) that interact with each other in a way familiar to the developer — by calling each other’s methods, but in an asynchronous manner. Actors are divided into two types - Performers and Coordinators. Artists must clearly carry out their tasks. They are the most independent modules, and accordingly - the most reusable. Artists can interact with the outside world (any input / output streams) by performing the task in such a way and for as long as required. On the other hand, the Coordinators do not carry out tasks related to the outside world. They should work as quickly as possible and not stumble on direct interaction with the database, network and other potentially unstable components. Their duty is to assign tasks to performers, coordinate their actions and thereby ensure the implementation (description) of the process. Coordinators can assign tasks to other coordinators, realizing the paradigm of reusable subprocesses.
The task of the Coordinator as soon as possible to distribute the currently known tasks. Those. it should not be blocked waiting for the result. It should build dependencies between tasks known to it and, if necessary, form asynchronous points for determining further actions.
For our process, the Coordinator should do the following:
- Request a user profile
- Wait for a profile
- Send message to user
- Wait for the departure result
To encode this sequence of actions with the help of an automaton of states, methods of changing one message by several performers and other usual ones, but in bony ways we will not We will do it simply and beautifully with the help of the Promise entity and our system that monitors the actions of the Coordinator.
Promise<Profile> profilePromise = userProfileService.get(userId); Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message);
In the example it is clear that as a result of calling the services we get not a real object, but a certain Promise - a link to the result of the task. We can pass this Promise as an argument to other services (i.e., tasks). Calls of other services will be intercepted by the system (i.e. there will be no real synchronous call) and the dependency between them will be built. Tasks will not be submitted for execution to services until all their Promise arguments are ready, i.e. until all the necessary preliminary tasks are completed.
Thus, the process is managed jointly by the coordinator and our system. The coordinator builds dependencies between tasks, and the system assumes, among other things, the function of waiting for the execution of preliminary tasks and the subsequent launch of tasks dependent on them.
Let's now reveal what asynchronous points determine the next steps.
We need to make sure that the sending of the notification was successful and if not, then we need to block the sending of notifications to the user.
In this case, after sending the notification and before further action, it is necessary to analyze the result of sending. Those. wait for the task to perform, analyze and, depending on the result, block or not. To solve this problem, the coordinator has the opportunity to create a task for himself — that is, the point of defining further actions to which the necessary Promise should be transferred. Below is how it looks.
public void start(String userId, String message) { Promise<Profile> profilePromise = userProfileService.get(userId); Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message); decider.blockOnFail(userId, sendResultPromise); } @Asynchronous public void blockOnFail(String userId, Promise<Boolean> sendResultPromise) { logger.info(".blockOnFail(userId = [{}], sendResultPromise = [{}])", userId, sendResultPromise); if (!sendResultPromise.get()) { userProfileService.blockNotification(userId); } }
The start () method is the start of the process. Next comes the setting of three tasks. The first to receive the profile, the second and the third Coordinator sets himself for further analysis of the result (calling the sendToTransport and blockOnFail methods). Thus, the Coordinator is waiting for the solution of the first task, but without blocking. As soon as the task is solved, the Taskurotta system calls the coordinator's sendToTransport method, passing to it a ready-made Promise object, from which real data can be obtained using the get () method. After the sendToTransport task is completed, the blockOnFail task is started where we set the task for the userProfileService service to block messages for the userId if an error occurred while sending a notification.
Using the points for defining further actions, you can implement various process behaviors:
- Parallelization to different branches
- Further merging of independent process streams at one point using Promise and @NoWait annotations
- Asynchronous recursion
- Parallelization of the execution of similar tasks, for example, verifying the EDS of all files and waiting for the results of execution at one decision point
- etc.
PS: The blockOnFail task is called through the decider object. This is an artificial object that intercepts the call, but does not really call the blockOnFail method. We need to set a task, and not to call it synchronously.
Since according to the scenario we already have the Contractors for sending email and sms, we can only create the Contractor for working with the profile. This Contractor has two tasks:
- Return the profile by user ID
- Make a mark in the profile about the impossibility of sending messages for a specific user
We start by declaring its interface. Coordinator will work with this interface. Hereinafter, for compactness, comments and other non-essential parts of the code are omitted.
@Worker public interface UserProfileService { public Profile get(String userId); public void blockNotification(String userId); }
The
@Worker
defines this interface as the Contractor. An annotation has optional attributes defining its name and version (of a contract). By default, the name is the full name of the interface, and the version is “1.0”. Performers of different versions can work simultaneously for different processes without any conflicts.
Let us turn to the implementation of the interface.
public class UserProfileServiceImpl implements UserProfileService { private static final Logger logger = LoggerFactory.getLogger(UserProfileServiceImpl.class); @Override public Profile get(String userId) { return ProfileUtil.createRandomProfile(userId); } @Override public void blockNotification(String userId) { logger.info(".blockNotification(userId = [{}]", userId); } }
Here we have lowered the initialization of the profile manager (ProfileUtil). It can work with DB, LDAP or other registry. This example shows us that the Contractor receives tasks (calls) and delegates them to the real module.
This completes the creation of the Contractor.
To solve the task set before us, the Coordinator must send a link to the user profile that has not yet been received (the Promise object) to the point where the next steps will be determined. There he will choose a transport or will not send anything if the sending of messages for this user is already blocked.
However, the executor interface, like the executor himself, receives and delivers the result synchronously, and therefore does not have the execution results in the declaration in the form of a Promise object, but returns a clean data object. This is right. The performer does not need to know how to use it. For example, our Contractor for obtaining a profile can be used if the user ID is already known, or if it is not known and you need to send a link to another task that receives this ID from somewhere. Thus, we come to the interface of interaction with the Contractor. This interface is defined by the Coordinator himself for his needs. Those. it is defined in the package (project) of the Coordinator. Add an interface with the Contractor to work with the profile:
@WorkerClient(worker = UserProfileService.class) public interface UserProfileServiceClient { public Promise<Profile> get(String userId); public void blockNotification(String userId); }
We see an interface annotated with
@WorkerClient
. The annotation parameter refers to the class of the real interface of the Contractor. In this way, a connection is established between the existing interface and the required interface for a specific Coordinator. Let's call this interface the “Contractor’s client interface”. This client interface must contain all the methods necessary for the coordinator (you can not declare not used) and with the identical signature of the arguments. Any argument can be a type of Promise, if you want to pass the result of an unfinished task as an argument.
Now we come to the most interesting - the creation of a coordinator. First, the coordinator interface is presented below, using which Taskurotta customers will run the processes they need.
@Decider public interface NotificationDecider { @Execute public void start(String userId, String message); }
This interface is defined as
@Decider
— i.e. as the coordinator. This annotation has the same properties as the
@Worker
annotation — the name and version. By default, the full name of the interface is taken as the name, and the version is “1.0”.
The start method is marked as
@Execute
. This means that through this method you can start the process.
Now go to the implementation coordinator
public class NotificationDeciderImpl implements NotificationDecider { private static final Logger logger = LoggerFactory.getLogger(NotificationDeciderImpl.class); private UserProfileServiceClient userProfileService; private MailServiceClient mailService; private SMSServiceClient smsService; private NotificationDeciderImpl decider; @Override public void start(String userId, String message) { logger.info(".start(userId = [{}], message = [{}])", userId, message); Promise<Profile> profilePromise = userProfileService.get(userId); Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message); decider.blockOnFail(userId, sendResultPromise); } @Asynchronous public Promise<Boolean> sendToTransport(Promise<Profile> profilePromise, String message) { logger.info(".sendToTransport(profilePromise = [{}], message = [{}])", profilePromise, message); Profile profile = profilePromise.get(); switch (profile.getDeliveryType()) { case SMS: { return smsService.send(profile.getPhone(), message); } case EMAIL: { return mailService.send(profile.getEmail(), message); } } return Promise.asPromise(Boolean.TRUE); } @Asynchronous public void blockOnFail(String userId, Promise<Boolean> sendResultPromise) { logger.info(".blockOnFail(userId = [{}], sendResultPromise = [{}])", userId, sendResultPromise); if (!sendResultPromise.get()) { userProfileService.blockNotification(userId); } } }
In this code, we also lowered the initialization of private objects. The complete and working example code can be viewed in the
taskurotta-getstarted project . Here we just note that the values ​​of private fields are obtained through a special factory of proxies of objects for the Coordinator.
In the sample implementation, there are two waiting points for the results of the implementation of unfinished tasks by the coordinator. This is the sendToTransport and blockOnFail method. These methods will only be called when all their Promise arguments are ready,
those. completed the corresponding tasks.
Field objects of the type MailServiceClient and SMSServiceClient are also client interfaces to the respective Contractors. Their initialization can also be viewed in the taskurotta-getstarted project.
At the moment we have all the Implemented Performers and the Coordinator. Let's proceed directly to the launch of the Actors (i.e. Performers and Coordinators).
Tasks can be executed both inside application servers and as a separate java application (this example uses the version of a separate application from the taskurotta \ bootstrap module). What a separate application does:
- It is registered on the Taskurotta server.
- Runs a pool of N threads to perform tasks.
- Receives tasks from Taskurotta servers
- Runs their execution
- Sends result to Taskurotta server
To launch a separate java application, the bootstrap package is used, and more specifically, the ru.taskurotta.bootstrap.Main class is used. As an argument, it needs to pass the location of the configuration file in YAML format.
How to try to run it? Very simple. Below is a step-by-step build of the server, the actors and their launch from the source code. Be careful, minor changes are necessary if you do not have linux.
Suppose you already have:
we will collect the Taskurotta server
git clone https://github.com/taskurotta/taskurotta.git cd taskurotta/
Run the build. To speed up disable tests.
mvn clean install -DskipTests
Now we will launch a cluster of two nodes (we use one machine for demonstration purposes and therefore different ports in the launch parameters). In a real environment, you can run the required number of machines with the same configuration.
We start the first cluster node:
java -Xmx64m -Ddw.http.port=8081 -Ddw.http.adminPort=9081 -Ddw.logging.file.currentLogFilename="assemble/target/server1.log" -jar assemble/target/assemble-0.4.0-SNAPSHOT.jar server assemble/src/main/resources/hz.yml
We start the second node (We intentionally limit the memory in order to detect its possible leaks at an early stage. In the configuration of this example, no database is used, so more memory is needed for large volumes).
java -Xmx64m -Ddw.http.port=8082 -Ddw.http.adminPort=9082 -Ddw.logging.file.currentLogFilename="assemble/target/server2.log" -jar assemble/target/assemble-0.4.0-SNAPSHOT.jar server assemble/src/main/resources/hz.yml
When both servers connect to each other, the log will look like this message:
Members [2] { Member [192.168.1.2]:7777 Member [192.168.1.2]:7778 this }
Open the console in the browser.
http: // localhost: 8081 / index.html - the first node or
http: // localhost: 8082 / index.html - the second node.
You can use any node to work with the console. They display mostly the same information. Currently, not all console functions function in this configuration (without a database). Everything works in configuration with oracle and mongodb. See the deployment options in the
documentation .
Now let's run our process. We clone taskurotta-getstarted repository for this.
git clone https://github.com/taskurotta/taskurotta-getstarted.git cd taskurotta-getstarted/ mvn clean install
In order for the actors to start working, it is necessary to start the processes. Run them for example 91 pieces.
java -cp target/getstarted-process-1.0-SNAPSHOT.jar ru.taskurotta.example.starter.NotificationModule http://localhost:8081 91
Check in the console
http: // localhost: 8081 / index.html . Select the "Queues" tab. We will see that the coordinator has 91 tasks, which corresponds to the 91st running process.

Now run the coordinator. In the YAML configuration file, only he is defined - without executors. Therefore, after the launch, all the tasks of the process will not be completed and we will see the tasks of the performers standing in line.
java -Xmx64m -jar target/getstarted-process-1.0-SNAPSHOT.jar -f src/main/resources/config-decider.yml
In the configuration file, the first node of our cluster is defined as the Taskurotta server for the coordinator
spreader: - Spreader: class: ru.taskurotta.example.bootstrap.SimpleSpreaderConfig instance: endpoint: "http://localhost:8081" threadPoolSize: 10 readTimeout: 0 connectTimeout: 3000
You can update the list of queues in the console and see that there are tasks awaiting executors.
Now let's run the executors (leave the coordinator to work) and for demonstration we will send them to the second node of the cluster. Since the cluster nodes form a shared memory and an environment for performing internal tasks, it does not matter which server will receive a request from the executor.
java -Xmx64m -jar target/getstarted-process-1.0-SNAPSHOT.jar -f src/main/resources/config-workers.yml
The executors have the second cluster node for interaction:
spreader: - Spreader: class: ru.taskurotta.example.bootstrap.SimpleSpreaderConfig instance: endpoint: "http://localhost:8082" threadPoolSize: 10 readTimeout: 0 connectTimeout: 3000
As a result, all processes must fully work out and this is evident from the queues in the management console.

So far, and all what I would like to share at the moment. We will welcome suggestions and constructive criticism.