📜 ⬆️ ⬇️

Tugging a Camel or integration with Camel. Part 2

Apache Camel Integration Scenarios



How many application integration patterns (EIP) do you know? How many of them can you use?

Cute "camel" is here again, which means I present to you the continuation of a series of articles on Apache Camel. In this article there are both the most necessary and very interesting patterns of integration. I will tell about how they fall on our integration.

If you are familiar with the templates, but decide whether to contact the "camel", then our examples will help you figure it out. If you are interested in the path from usage scenarios to integration implementation, then this article is just about that. I ask under the cat.


Let me remind you, we built our service bus on Apache Camel. The prehistory was described in the previous part . Now Camel for us is a given, with which we must fight. In our "zoo" was added, except for the last, we have two systems. The first is our main system, which is built on the classic three-tier “client-server” architecture. It is a BPMS system, the complexity of which is due to the long-term process of “dopilivaniya” small “hotelok.” The second is a small and simple, like a felt boot, system of the same architecture. We will call it a sales office. Disassemble the use cases will be just for her example.
')

Questionnaire system with the need for integration.


Our sales office is an application that does not have complicated logic. It implements the process of registering customers and entering the documents required for the order. This system is open to customers after registration, so the response time and throughput requirements are of a higher order than the main system. Restrictions on security, bandwidth, independent configuration, and the need to maintain a separate development cycle led to the separation of a sales office as a standalone application. To service the sales process, we needed its integration into our customer's IT infrastructure.
Familiarity with the integration will begin with the options for using the sales office. We have identified two main roles in the business process of our application: customer and administrator. It is these user groups that work with it.

Detailed use case diagram

The scheme dismantled the main uses. Pay attention to the right side of the scheme, a few precedents go beyond the sales office and require certain actions from the main system. It is these precedents we will analyze further.

To form a holistic view of our office, let’s analyze the scheme. Our buyers submit applications for participation in auctions. Carrying out auctions is the main process of the sales office. To participate in them, buyers must register and fill out applications, filling out a large number of boring registration forms. The administrator performs the functions of preparing and conducting auctions. For this purpose, a special application form - a questionnaire is prepared for each of the auctions; therefore, the class of such systems is sometimes called personal ones. The auction process is completed for the sales office by transmitting information on submitted applications to the main system for selecting the winner and processing the transaction.

Qualitative requirements for integration processes:

The following architectural tasks arose when parsing the precedents identified:
  1. Transport organization;
  2. Distribution of functions between subsystems;
  3. Directory synchronization;
  4. Transfer of dependent entities;
  5. Monitoring the exchange process;
  6. File transfer

Let us analyze the solutions of these architectural problems by the example of usage scenarios and correlate them with EIP templates.

The organization of transport.


The transport includes both the systems described above and Camel. The latter has already been said a lot in the previous part of the article, so let's go further.
All three systems are connected by an ActiveMQ broker using AMQP protocol.

Let me remind you that systems exchange packets using JMS. With the payload of these packages, we decided to make XML and serialize into it objects of one of the systems using JAXB. But what objects to take as a basis, while at the same time minimize the time spent on creating integration? There are two systems, which means there can be two formats. We decided to stop at the sales office objects, this system is more lightweight, the domain domain objects are connected with other architectural layers of this application with only JPA annotations. Allocation of transport objects on its basis was not difficult. An alternative solution (to use the objects of the main system) seemed almost impossible due to the presence of a large amount of metadata and complex links with other business entities that go beyond the boundaries of the sales office service we are interested in. Another remaining option is the creation of new transport facilities. He was not even considered, as he demanded the implementation of the procedure and import and export in both systems, which in the first case for the sales office subsystem could be refused.

Perhaps you were surprised that we chose XML as the payload of our packages? But, I assure you, there were reasons for this. Serialization using the JAXB standard is included in the JVM — it simplifies working with it and does not require additional modules. At the time of integration development, we already had experience with JAXB, so no overhead was required for familiarization. Another “bun” is that XML is a text format, which means that in case of emergency situations you can interfere with its structure and make the necessary adjustments. But there were disadvantages: it is known that the structure of the XML format significantly increases the amount of data. However, the information we planned to share, according to initial estimates, did not exceed 100 MB for reference information, and 1 MB for applications for applications. This, you see, is not very big numbers. In addition, the exchange of information had to occur at a time, and there were no requirements for efficiency.

Few details about the architecture of the formation and analysis of messages.

Before that, we described the mechanisms for transmitting and working with messages, without touching Camel directly. It is time to tell what functions were assigned to it, and how it was built into the integration system.

So, the functions:
  1. Independent configuration of message routing;
  2. Transmission of messages from office queues to the main system queues and vice versa;
  3. Matching formats for endpoint unification.

To simplify the connection of systems with a single message broker, the names of queues and topics should be unified. We use such an agreement: [ ].[ ].[ ]
However, it soon turned out that the fewer points in a complex system, the easier the setting. All the functionality for parsing messages and routing is easily transferred to the service bus. So for our main system, only one endpoint is left to send data: bpms.office.export

Distribution of functions between subsystems.


Let's look at this distribution in terms of integration architecture. The sales office is responsible for:

Service Bus:

Main system:

Go ahead, now real examples.

Synchronization of reference books.


The process of synchronization of reference books is based on the precedents mentioned earlier. The scheme is as follows:
It can be seen that, compared with the use case diagram, synchronization here is performed on the sales office side. We have already mentioned this before, but again, sales offices can be many, which means that regulatory information should be uploaded to each of them. Since the requirements for reliability of transmission, as well as ensuring guaranteed delivery are not so strict here, you can use JMS channels like “publish-subscribe”. The following is a diagram of data transmission in the form of EIP templates.

The diagram shows that two routes are used in the service bus. Messages pass through a single output channel of the main system (BPMS), are filtered and transmitted to a specialized channel responsible for working with reference information. Here, the message format is consistent with the office system, they are combined into directories (aggregated) and transmitted to the sales office system via another JMS channel.

The scheme uses several EIP patterns:
  1. Message channel - message channel;
  2. Endpoint - the end point;
  3. Message filter - message filter;
  4. Message translator - template for the message format conversion component;
  5. Aggregator - a template that allows you to combine multiple messages into one.

Why so hard? Let's go in order. Using the message filter is necessary to simplify the configuration and appeared here after merging the output endpoints into one. Cons of the approach:

This possibility cannot be completely excluded, but the effect can be reduced by creating additional buffer channels for each type of message. The ActiveMQ broker maintains a separate setting for the allocation of resources for each such channel, so you can take care of the necessary memory and disk space, taking into account the exchange rate.

Let's return to our route, then the messages are transmitted just over such a buffer JMS channel. The next interesting point is the component that unites the packages. It is required due to the fact that to perform synchronization requires a complete set of directory elements. The easiest way to ensure completeness is to collect all the elements in one message. The critical minus message merge is the large volume of the message received, which can be a problem for both the broker and the service bus.
Fortunately, our directories are not so large, and simplicity came out on top. But this problem should not be underestimated; a limited amount of RAM is allocated for each channel in the broker settings, exceeding which can lead to a critical error.

Sample routes:
 from("jms:topic:bpms.office.request").routeId("catalog-synchronization-filter") .filter( header("destination").isEqualTo( "portal.export.catalog" ) ) .setHeader("catalogCode", simple("${header.catalog}")) .inOnly("jms:topic:catalog.synchronized"); from("jms:topic:catalog.synchronized").routeId("catalog-synchronization-topic") .filter( header("catalogCode").in( ... "GRNTI","OKATO","OKFS","OKOGU","OKOPF","OKVED", … ) ) .setHeader("catalogCode") .groovy( "switch( request.getHeaders().get('catalogCode') ){" + ... " case \"GRNTI\": return \"GRNTICatalog\"" + " case \"OKATO\": return \"okatoCatalog\"" + " case \"OKFS\": return \"okfsCatalog\"" + " case \"OKOGU\": return \"okoguCatalog\"" + " case \"OKOPF\": return \"okopfCatalog\"" + " case \"OKVED\": return \"okvedCatalog\"" + ... "}") .inOnly( "direct:office.synchronization"); from("direct:office.synchronized").routeId("catalog-import-office-filter") .aggregate(header("catalogCode"), new CatalogItemAggregationStrategy()).completionTimeout(3000) .inOnly("jms:topic:office.catalog.synchronization?timeToLive=200000"); 

the details
 private static class CatalogItemAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { String newBody = newExchange.getIn().getBody(String.class); if (oldExchange != null) { String oldCode = (String)oldExchange.getIn().getHeader("catalogCode"); String newCode = (String)newExchange.getIn().getHeader("catalogCode"); if( StringUtils.equals( oldCode, newCode) ) { String oldBody = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody( StringUtils.substringBeforeLast( oldBody, "\n</catalogs>") + StringUtils.substringAfter( newBody, "?>") + "\n</catalogs>" ); return oldExchange; } } StringBuilder builder = new StringBuilder( newBody ); builder.insert( builder.indexOf("\n") + 1, "<catalogs>\n"); builder.append( "</catalogs>\n" ); newExchange.getIn().setBody( builder.toString()); return newExchange; } } 



Transfer of dependent entities.


To begin with, let's draw up a diagram of the process of publishing a lot with an eye on the precedent diagram.

As you can see, the transfer process is relatively simple. You can shift the process diagram to EIP, you get the scheme already known from the previous part:

There is a weak point in simplicity, pay attention to the detailed layout of lots in BPMN notation.

So it turns out that you can unload a lot only after the auction is created. The auction publishing process is depicted in the first diagram of this section in gray. To solve the problem of related processes, two solutions were proposed:
  1. Implement support for distributed transactions and wrap a group of messages in a transaction;
  2. Transfer all data in one message.

The second option seemed to be easier and faster to implement, and stopped at it. We needed to create stubs and transmit minimal information about missing objects. Example messages:
 <office> <lot> <id>fs000000000is3ro0d708me9ms</id> <code>2011-1.5-051-001</code> <theme></theme> <aims>  </aims> <auction> <id>fs000000000gtk9f5oa05h426o</id> <topic> </topic> </auction> </lot> </office> 

The example shows that the message contains information about the lot and minimal information about the auction.
As a result, the solution we used - to transfer related objects to preserve the completeness of changes - was a good alternative to distributed transactions. Go ahead.

Monitoring the exchange process.


Let us analyze this problem by the example of the process of obtaining and recording the results, which remains key for both integration and sales office. The process diagram is shown in the figure:

The process begins in the main system by sending a request for proposals for a lot; this part is not shown in the diagram because a similar route has already been discussed in the previous section. Further, when the request arrives at the remote office, the collection and verification of the bids submitted by the buyers for the lot begins. Only fully completed and verified applications are processed at the time of the request from the main system. Applications are divided into fragments. Fragmentation allows you to use less memory and speed up processing. When the package with applications gets into the main system, it starts parsing and creating on its basis the entities of the main system (import). We did not experiment here with parallelism in order not to encounter database consistency problems in parallel transactions. All lengthy operations of sending and importing applications are divided into phases, each phase is completed by sending a notification and the user has the opportunity to observe the import process.
The scheme of transfer of applications in EIP patterns.

Example of routes infusions:

 from("jms:topic:bpms.office.request").routeId("bpms-request-order") .filter( header("destination").isEqualTo( "office.order.request" ) ) .inOnly("jms:topic:bpms-to-office.order.request"); … from("jms:queue:office.order.export").routeId("bpms-responce-order") .log("going to bpms import: ${headers.JMSDestination}") .wireTap("direct:order.audit") .choice() .when( header("customer").isEqualTo("bpms.import")) .log("filling conumer trying recieve: ${headers.JMSDestination}, ${headers.importType}") .inOnly("jms:queue:from.office.to.bpms.order.import") .otherwise() .to("log:office?multiline=true"); 

The process of sending a request is generally the same as in other cases. The new pattern used in it is " WireTap ". This component allows you to add an observer to the message exchange process; in our example, it forwards the packet with the application to the audit channel. Example:

 from("direct:order.audit") .split( xpath("//*[local-name()='demand']") ) .process( new Processor() { @Override public void process(Exchange item ) throws Exception { Message in = item.getIn(); Message out = item.getOut(); out.setHeaders( in.getHeaders() ); out.setHeader("cliendId", in.getMessageId()); out.setHeader("level", "DEBUG"); out.setBody( String.format( "  â„–%s\n " + " :%s\n " + " callback:%s\n\n", xpath("//*[local-name()='fullNumber']/text()").evaluate(item, String.class), in.getHeader("customer"), in.getHeader("callbackUUID")) ); } }) .inOnly("jms:topic:system.audit"); 

This route is much more interesting than the previous one, the messages here are applications serialized in XML. We split the incoming packet into separate requests using a special component splitter and xpath expression. Next, we no longer need the entire application, so we leave only the information needed for the audit, and send the package to the JMS general audit channel jms:topic:system.audit . This channel accumulates notifications on the status of transfer and import of applications, and on all abnormal situations. Messages are returned to the main system and are associated with the initiator of receiving results on the “ callbackUUID ” property. An example of a route returning a part of audit messages to the main system:
 from("jms:topic:system.audit") .filter( PredicateBuilder.and( header("callbackUUID").isNotNull(), header("fcntp.audit").isNull() ) ) .setHeader("system.audit", simple( "true", Boolean.class )) .inOnly("fcntpJms:topic:fcntp.audit?timeToLive=10000"); 

Advantages of the approach:

Newly used EIP templates:
  1. Wire Tap - routing a copy of the message;
  2. Splitter is a component that allows you to split a large message into fragments.


File transfer


File transfer - the task is not very simple, especially if solved by means of JMS. Let's sort this case. There was nothing formalized in the requirements for this task, it was necessary to transfer data of unlimited size. In Camel, there was no ready-made solution to transfer the file from one server to another. The implementation of splitting files into fixed pieces followed by gluing together encountered a number of difficulties:

We decided to go the other way: in Camel, copying files between local folders was implemented simply and flexibly, this mechanism suited us. The complete file processing scheme is as follows: the file is put by one system into a network folder, Camel finds it and copies it to another folder, and after copying it informs the main system that the file has been transferred. Here is the diagram:

To complete the picture - here is the route that we use:
 RouteDefinition fileExport = (RouteDefinition)from("file:{{office.transport.dir}}?delete=true&exclude=.*\\.tmp") .onException(IOException.class) .maximumRedeliverise(1) .handled(true) .useOriginalMessage() .wireTap("jms:topic:system.audit") .transform(exceptionMessage()) .end() .to("file:{{office.failed.dir}}").end() .process( TimsetampProcsseor.newInstance("sendtimsetamp") ) .wireTap("jms:queue:office.files.import") .newExchangeHeader("fileName", simple("${headers.CamelFileName}")) .newExchangeBody( constant("progress") ) .end() .to("file:{{bpms.transport.dir}}"); // notify bpms by topic:office.files.import fileExport .transform().constant("ok") .delay( 20 ) .process( TimsetampProcessor.newInstance("sendtimsetamp") ) .setHeader("fileName", simple("${headers.CamelFileName}")) // send notify .inOnly("jms:queue:office.files.import"); 

As you can see in the picture above, the files are copied in three steps. On the first, the files are copied from the file storage to the transport folder of the sales office. Routing in the service bus explains the route, it starts only when the files become available in the transport folder of the remote office subsystem. The service bus constantly scans this folder and, as soon as the file gets there, it immediately moves it to the transport folder of the main system. Next, the service bus creates a notification about the file movement and sends it to the office.files.import . In this route, we use the exception handling mechanism; it ensures that if the file gets into the transport folder of the sales office, the main system will receive an alert regardless of the success or failure of the file movement. It's time to sum up.


Results


Using the example of one of our systems, we became acquainted with the main integration scenarios. All examples are excerpts from the real application and now they are used by us. Our examples show how simple Camel routs lie on integration patterns. The main architectural tasks given in the article are covered by the means offered by Camel. Even such complex tasks as, for example, copying files can be solved using Camel. Let me remind you that the article focuses on message-based integration. I hope that you, after reading the article, were able to make sure that the “Camel” is really good.

I plan to write about errors and how to solve them in the next article (a kind of “To be continued ...”). Therefore, to new meetings.

Useful links:

  1. Supported Apache Camel components and protocols;
  2. Integration patterns
  3. Ploznaya literature on the topic
  4. Source code examples from the book Camel in Action

Source: https://habr.com/ru/post/231861/


All Articles