Hi, Habr!
In this article, I will describe a way to develop a REST service that allows you to receive files and save them to the messaging system in streaming mode without having to store the entire file on the service side. A reverse scenario will also be described, in which the client will receive a file hosted in the messaging system as a response.
For clarity, I will provide code samples of the developed service on JEE7 for the IBM WebSphere Liberty Server application server, and the IBM MQ will act as a messaging system.
Nevertheless, the described method is also suitable for other similar platforms, i.e. Any JMS API provider can act as a messaging system, and any JEE server (for example, Apache Tomcat) can act as an application server.
Formulation of the problem
There was a need to implement a solution that would allow both receiving large files from the client (> 100 Mb) and transfer them to another geographically remote system, and in the opposite direction — transfer files from this system to the client as an answer. In view of the unreliable network channel between the client’s network and the application’s network, a messaging system is used to ensure guaranteed delivery between them.
')
The high-level solution includes three components:
- REST service - the task of which is to provide the client with the ability to transfer a file (or request).
- MQ - is responsible for sending messages between different networks.
- Application - the application responsible for storing files and issuing them on request.
In this article I describe how to implement a REST service, the tasks of which include:
- Receiving a file from a client.
- Transfer the resulting file to MQ.
- File transfer from MQ to client as an answer.
Solution Method
In view of the large size of the file being transferred, there is no possibility of placing it completely in RAM, moreover, there is a limitation imposed by MQ - the maximum size of a single message in MQ cannot exceed 100 Mb. Thus, my decision will be based on the following principles:
- Retrieving a file and storing it in the MQ queue should be performed in streaming mode, without storing the entire file in memory.
- The MQ file will be placed in the queue as a set of small messages.
Graphically, the location of the file on the client, REST service and MQ is shown below:
On the client side, the file is completely located on the file system, only the file portion is stored in the REST service, and on the MQ side, each file portion is placed as a separate message.
REST service development
For clarity, the proposed solution method will be developed demo REST service, containing two methods:
- upload - receives a file from the client and writes it to the MQ queue, returns the message group identifier (in base64 format) as a response.
- download - receives the message group identifier from the client (in base64 format) and returns the file stored in the MQ queue.
Method of receiving a file from the client (upload)
The task of the method is to receive the stream of the incoming file and then write it to the MQ queue.
Receive stream incoming file
To receive an incoming file from the client, the method expects as an input parameter an object with the interface com.ibm.websphere.jaxrs20.multipart.IMultipartBody, which provides the ability to get a link to the input file stream
@PUT @Path("upload") public Response upload(IMultipartBody body) { ... IAttachment attachment = body.getAttachment("file"); InputStream inputStream = attachment.getDataHandler().getInputStream(); ... }
This interface (IMultipartBody) is located in the JAR archive com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, shipped with the IBM Liberty Server and is located in the folder: <
WLP_INSTALLATION_PATH > / dev / api / ibm.
Note:
- WLP_INSTALLATION_PATH is the path to the WebSphere Liberty Profile directory.
- It is expected that the client will transfer the file in a parameter named "file".
- If you use another application server, you can use the alternative library from Apache CXF.
Stream file saving in MQ
The method receives as input the stream of the incoming file, the name of the MQ queue where the file should be written, and the identifier of the group of messages that will be used to link the messages. The group identifier is generated on the service side, for example, by the org.apache.commons.lang3.RandomStringUtils utility:
String groupId = RandomStringUtils.randomAscii(24);
The algorithm for saving an incoming file in MQ consists of the following steps:
- Initializing MQ Connection Objects.
- Cyclic reading of a portion of the incoming file until the file is completely read:
- A portion of the file data is recorded as a separate message in the MQ.
- Each message file has its own sequence number (property "JMSXGroupSeq").
- All file messages have the same group value (the “JMSXGroupID” property).
- The last message has a sign indicating that this message is final (property “JMS_IBM_Last_Msg_In_Group”).
- The constant SEGMENT_SIZE contains the portion size. For example, 1Mb.
public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException { try ( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); MessageProducer producer = session.createProducer(session.createQueue(queueName)); ) { byte[] buffer = new byte[SEGMENT_SIZE]; BytesMessage message = null; for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) { readBytesSize = inputStream.read(buffer); if (message != null) { if (readBytesSize < 1) { message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true); } producer.send(message); } if (readBytesSize > 0) { message = session.createBytesMessage(); message.setStringProperty("JMSXGroupID", groupId); message.setIntProperty("JMSXGroupSeq", sequenceNumber); if (readBytesSize == SEGMENT_SIZE) { message.writeBytes(buffer); } else { message.writeBytes(Arrays.copyOf(buffer, readBytesSize)); } } } } }
Method of sending file to client (download)
The method receives the group identifier of messages in base64 format, by which it reads messages from the MQ queue and sends it as a streaming response.
Retrieving message group id
As an input parameter, the method gets the ID of the message group.
@PUT @Path("download") public Response download(@QueryParam("groupId") String groupId) { ... }
Stream client response
To transfer a file stored as a set of individual messages to MQ in streaming mode, create a class with the javax.ws.rs.core.StreamingOutput interface:
public class MQStreamingOutput implements StreamingOutput { private String groupId; private String queueName; public MQStreamingOutput(String groupId, String queueName) { super(); this.groupId = groupId; this.queueName = queueName; } @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { try { new MQWorker().read(outputStream, queueName, groupId); } catch(NamingException | JMSException e) { e.printStackTrace(); new IOException(e); } finally { outputStream.flush(); outputStream.close(); } } }
In the class, we implement the write method, which receives as input a link to the outgoing stream to which messages from MQ will be written. I added to the class another queue name and group identifier whose messages will be read.
An object of this class will be passed as a parameter to create a response to the client:
@GET @Path("download") public Response download(@QueryParam("groupId") String groupId) { ResponseBuilder responseBuilder = null; try { MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME); responseBuilder = Response.ok(streamingOutput); } catch(Exception e) { e.printStackTrace(); responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()); } return responseBuilder.build(); }
Stream file read from MQ
The algorithm for reading messages from MQ to the outgoing stream consists of the following steps:
- Initializing MQ Connection Objects.
- Cyclic reading of messages from MQ until the message with the terminator sign in the group is read (property “JMS_IBM_Last_Msg_In_Group”):
- Before each reading of a message from the queue, a filter (messageSelector) is set, which specifies the identifier of the message group and the sequence number of the message in the group.
- The content of the read message is written to the outgoing stream.
public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException { try( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); ) { connection.start(); Queue queue = session.createQueue(queueName); int sequenceNumber = 1; for(boolean isMessageExist = true; isMessageExist == true; ) { String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++; try( MessageConsumer consumer = session.createConsumer(queue, messageSelector); ) { BytesMessage message = (BytesMessage) consumer.receiveNoWait(); if (message == null) { isMessageExist = false; } else { byte[] buffer = new byte[(int) message.getBodyLength()]; message.readBytes(buffer); outputStream.write(buffer); if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) { isMessageExist = false; } } } } } }
Call REST service
To check the service, I will use the curl tool.
Sending file
curl -X PUT -F file=@<__> http://localhost:9080/Demo/rest/service/upload
In response, a base64 string will be received containing the message group identifier, which we will indicate in the following method to get the file.
Receiving a file
curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64____> -o <_____>
Conclusion
The article considered the approach to developing a REST service, which allows streaming both to receive and store large data in the messaging system queue, and to read them from the queue for return as a response. This method allows to reduce the use of resources, and thereby increase the throughput of the solution.
Additional materials
Learn more about the IMultipartBody interface used to receive the incoming file stream -
link .
An alternative library for streaming files in REST services is
Apache CXF .
Interface StreamingOutput for streaming return REST response to the client -
link .