📜 ⬆️ ⬇️

Transferring files to Java using ØMQ and JZMQ

Greetings, we are a small company of like-minded people who are developing a product designed to manage data, regardless of their format and storage method - ArkStore , in our blog we will try to share the experience that we have accumulated during the course of almost two years of development. The first article I decided to highlight the IO layer and the product called ØMQ (or ZeroMQ). I will try to tell you how to start using ØMQ and how it can be used to transfer a large amount of data.

Why ØMQ

During the development of our product, we were faced with the task of “how to ensure reliable transfer of a larger amount of data?”, While it is desirable that the entire IO layer is completely asynchronous, does not eat a large amount of memory and is simple enough. Initially, since all of our architecture was built using Akka, we used Spray IO (or Akka IO). But we encountered a number of problems for which there was no adequate solution, for example, the bug I discovered forced us to create additional Heartbeat messages or to transmit a large amount of service information.

In the end, we decided to look in the direction of message brokers. ActiveMQ , RabbitMQ and ØMQ . In principle, all brokers solved the task set before us, but we stopped at ØMQ. ActiveMQ seemed too heavy, and RabbitMQ brought the master node to the initially distributed architecture (without a clear leader).

ØMQ supports three basic data transfer patterns:

More details about these templates and ØMQ can be found in the wonderful manual available on the official website.
')
In this article, we consider only the first template (Request-Reply).

We collect ØMQ and JZMQ

Before we go to the code, we need to build ØMQ itself and the binding libraries (Java Bindings).

Linux

For CentOS, the process will look like this (should be slightly different for other * nix OSs).

Make sure that we have all the necessary libraries:
yum install libtool autoconf automake gcc-c++ make e2fsprogs 

We will take and unzip the latest stable version of the library ØMQ from the developer’s site (at the time of writing 3.2.4).
 wget http://download.zeromq.org/zeromq-3.2.4.tar.gz tar -xzvf zeromq-3.2.4.tar.gz cd zeromq-3.2.4 

We assemble and install ØMQ, the libraries will go to the / usr / local / lib directory, we will need it in the future.
 ./configure make sudo make install sudo ldconfig 

After we have collected ØMQ we need to collect JZMQ. To do this, download the latest version from the GIT repository (master or tag, the last tag at the time of writing - 2.2.2).
 wget https://github.com/zeromq/jzmq/archive/v2.2.2.zip unzip jzmq-2.2.2.zip cd jzmq-2.2.2 ./autogen.sh ./configure make sudo make install sudo ldconfig 

Libraries will also go to the / usr / lib / local directory. Why is it important? The fact is that in order to use native libraries, Java needs to know where we can find them. To do this, we need to specify the java.library.path parameter when starting the program. There are several ways to do this, we can specify it when the application is launched -Djava.library.path = "/ usr / lib / local" , or install it right while the program is running. We can also use the default java.library.path. To find out which values ​​are set by default, run the following command:

 java -XshowSettings:properties 

In my case it is:
 java.library.path = /usr/java/packages/lib/amd64 /usr/lib64 /lib64 /lib /usr/lib 

In order for Java to be able to find native libraries, we just need to transfer our libraries to one of these addresses or link them. Which approach to choose, choose you personally.

To find out where the libraries were installed after make install, you just need to run the following commands:
 whereis libzmq whereis libjzmq 

Windows

The collected dll files for libzmq can be downloaded from the official site , and here you can find a guide to collecting JZMQ for Windows. Unfortunately, I could not build libraries using CMAKE , I had to build libzmq and jzmq using Visual Studio 2013. At the same time, it is important that the libraries themselves be built under the architecture corresponding to your JVM (32 or 64 bit)

If libzmq.dll and jzmq.dll are added to the PATH, then the JVM should find them automatically.

Program

Wow, we were finally able to install and configure ØMQ and JZMQ on our computer! It is time to let it go. As an example, we will try to implement the file transfer protocol described in the manual and slightly improve it.

To begin, let us describe the requirements for our protocol:

Let's prepare one gigabyte of "guaranteed random" ™ data.
 dd if=/dev/urandom of=testdata bs=1M count=1024 

Let's measure how long the OS takes to copy the data. These figures are fairly approximate, but at least we will have some sort of comparison point.
 echo 3 > /proc/sys/vm/drop_caches time cp testdata testdata2 real 0m7.745s user 0m0.011s sys 0m1.223s 

Let's get down to the code. Customer:
 package com.coldsnipe.example; import org.zeromq.ZFrame; import org.zeromq.ZMQ; public class ZMQClient implements Runnable { //      "" private static final int PIPELINE = 10; //  1  , 250. private static final int CHUNK_SIZE = 250000; private final ZMQ.Context zmqContext; public ZMQClient(ZMQ.Context zmqContext) { this.zmqContext = zmqContext; } @Override public void run() { //  ZMQ.Socket,     DEALER socket try (ZMQ.Socket socket = zmqContext.socket(ZMQ.DEALER)) { //            //  ,   1      (drop)   socket.setLinger(1000); //     TCP  socket.connect("tcp://127.0.0.1:6000"); //  ,  Chunks     int credit = PIPELINE; //    long total = 0; //    (Chunks) int chunks = 0; //      long offset = 0; while (!Thread.currentThread().isInterrupted()) { //     "",     //   while (credit > 0) { socket.sendMore("fetch"); socket.sendMore(Long.toString(offset)); socket.send(Integer.toString(CHUNK_SIZE)); offset += CHUNK_SIZE; credit--; } //       ZFrame zFrame = ZFrame.recvFrame(socket); //   ,   if (zFrame == null) { break; } chunks++; credit++; int size = zFrame.getData().length; total += size; zFrame.destroy(); //         //       if (size < CHUNK_SIZE) { break; } } System.out.format("%d chunks received, %d bytes %n", chunks, total); } } } 

The client connects to the server and sends requests for data. Each message consists of several parts.
  1. The command is what we want from the server, in this case there is only one command, fetch is to receive data.
  2. Command parameters (if present) - in the case of fetch, this is the indent from the beginning of the file and the size of the piece of data.

The client sends these commands to “credit”, which means that the client will send as many fetch commands as he has “credit” left. Credit increases only if the client has successfully processed the data. In the example, the client does nothing with the data, but we can add a processor to save the data or simulate work using sleep, even if the client processes the data very slowly, there will be no more than ten pieces of data in its queue, 250kb each. Thus, the client will not be idle while waiting for new data from the server.

 package com.coldsnipe.example; import org.zeromq.ZFrame; import org.zeromq.ZMQ; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; public class ZMQServer implements Runnable { private final ZMQ.Context zmqContext; //         private final Path filePath; public ZMQServer(ZMQ.Context zmqContext, Path filePath) { this.zmqContext = zmqContext; this.filePath = filePath; } @Override public void run() { try { File file = filePath.toFile(); if (!file.exists()) { throw new RuntimeException("File does not exists: " + filePath); } //    , Router   identity , //         try (FileChannel fileChannel = FileChannel.open(filePath)) { try (ZMQ.Socket socket = zmqContext.socket(ZMQ.ROUTER)) { //      localhost  6000  socket.bind("tcp://*:6000"); socket.setLinger(1000); while (!Thread.currentThread().isInterrupted()) { //   -  identity frame ZFrame identity = ZFrame.recvFrame(socket); assert identity != null; //    String command = socket.recvStr(); if (command.equals("fetch")) { //     fetch,   offset     String offsetString = socket.recvStr(); long offset = Long.parseLong(offsetString); String chunkSizeString = socket.recvStr(); int chunkSize = Integer.parseInt(chunkSizeString); int currentChunkSize = chunkSize; //   offset +       //     ,      offset    if (file.length() < (offset + chunkSize)) { currentChunkSize = (int) (file.length() - offset); } if (currentChunkSize > 0) { ByteBuffer byteBuffer = ByteBuffer.allocate(currentChunkSize); fileChannel.read(byteBuffer, offset); byteBuffer.flip(); byte[] bytes = new byte[currentChunkSize]; byteBuffer.get(bytes); //     ZFrame frameToSend = new ZFrame(bytes); //        identity // ,     identity.send(socket, ZFrame.MORE); frameToSend.send(socket, 0); } } } } } } catch (IOException e) { throw new RuntimeException(e); } } } 

Our server can transfer only one file (the link to which it received at startup) and respond to just one command - fetch . He can distinguish between clients, but clients can only get one single file. How to improve it, I will write a little lower, but for now the test and measurement results.

 package com.coldsnipe.example; import org.junit.Test; import org.zeromq.ZMQ; import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; public class ZMQExampleTest { @Test public void testDataExchange() throws Exception { //  ZMQ.Context,    1!      //          ZMQ.Context zmqContext = ZMQ.context(1); //     final URL fileUrl = ZMQExampleTest.class.getClassLoader().getResource("testdata"); assert fileUrl != null; Path filePath = Paths.get(fileUrl.toURI()); //        ,     //       long startTime = System.nanoTime(); Thread clientThread = new Thread(new ZMQClient(zmqContext)); Thread serverThread = new Thread(new ZMQServer(zmqContext, filePath)); clientThread.start(); serverThread.start(); clientThread.join(); long estimatedTime = System.nanoTime() - startTime; float timeForTest = estimatedTime / 1000000000F; System.out.format("Elapsed time: %fs%n", timeForTest); //    ,     //          . zmqContext.term(); } } 

Our test starts ZMQ.Context, starts the client and server, and measures the time needed to transfer data. On the context I want to say separately. It is the context that is the hidden conductor who controls our sockets inside the process and decides how and when to send the data. Therefore, a simple rule follows from this - one context per process .

Run the test and look at the result.
 4295 chunks received, 1073741824 bytes Elapsed time: 1.429522s 

Reading one gigabyte took 1.42 seconds. It's hard for me to say how good this indicator is, but compared to the same Spray IO, ØMQ works 30-40% faster, and the IO load is close to 100% (Spray 85-90), the CPU load is lower by almost a third .

We improve the protocol

While our protocol knows only one command, it is enough for a test, but in real conditions we want our server to be able to transfer many different files and provide service information for the client. To do this, we introduce two new commands:

Message handlers in this case might look like this.
 else if(command.equals("get")) { // ID  String id = socket.recvStr(); //       BigInteger identityString = new BigInteger(identity.getData()); DataBlob dataBlob = blobMap.get(identityString); if (dataBlob!= null){ //      ,   DataBlob dataBlob.closeSource(); blobMap.remove(identityString); } //     DataBlob dataBlob = dataProvider.getBlob(id); if (dataBlob == null) { log.error("Received wrong get call on server socket, ID [{}]", id); } else { //   ,        DataHeader dataHeader = new DataHeader(id, dataBlob.getSize()); byte[] bytesToSend = FrameHeaderEncoder.encode(dataHeader); ZFrame frameToSend = new ZFrame(bytesToSend); //   DataBlob   blobMap.put(new BigInteger(identity.getData()), dataBlob); //    identity.send(socket, ZFrame.MORE); frameToSend.send(socket, 0); } } else if (command.equals("end")){ BigInteger identityString = new BigInteger(identity.getData()); //  DataBlob    DataBlob dataBlob = blobMap.remove(identityString); if (dataBlob != null) { dataBlob.closeSource(); } } 

In order for us to work with many files, we added new objects to the server.
 public interface DataProvider { public DataBlob getBlob(String dataId); } public abstract class DataBlob { private final long size; private final String dataId; protected DataBlob(long size, String dataId) { this.size = size; this.dataId= dataId; } public abstract byte[] getData(long position, int length); public abstract void closeSource(); public long getSize() { return size; } public String getDataId() { return getDataId; } } 

The class implementing DataProvider manages data retrieval, the getBlob method returns us a new DataBlob which is essentially a link to a resource.

The implementation of the DataBlob for a file may look like this:
 public class FileDataBlob extends DataBlob { private final FileChannel fileChannel; public FileDataBlob(long size, String dataId, Path filePath) { super(size, dataId); try { this.fileChannel = FileChannel.open(filePath); } catch (IOException e) { throw new DataBlobException(e); } } @Override public byte[] getData(long position, int length) { ByteBuffer byteBuffer = ByteBuffer.allocate(length); try { fileChannel.read(byteBuffer, position); byteBuffer.flip(); byte[] bytes = new byte[length]; byteBuffer.get(bytes); return bytes; } catch (IOException e) { throw new DataBlobException(e); } } @Override public void closeSource() { try { fileChannel.close(); } catch (IOException e) { throw new DataBlobException(e); } } } 

By adding these two methods, we allow the client to choose which data the client wants to receive and inform the client about the size of the requested data. We can further improve the protocol, for example, add the ability to send multiple files in a row, add Heartbeat to identify dead clients and free up resources, etc.

Conclusion

In the course of this article, I wanted to show how we can use ØMQ in Java. Currently, in our project, ØMQ is the main message broker, not only for files but also for metadata, showing fairly good results (problems associated with it have not yet been observed).

In the next articles I will try to talk about other technologies used in ArkStore , while Akka and the Semantic Web are next in line. Thank you for your attention, I hope the read was at least useful to someone!

Source: https://habr.com/ru/post/200674/


All Articles