yum install libtool autoconf automake gcc-c++ make e2fsprogs
wget http://download.zeromq.org/zeromq-3.2.4.tar.gz tar -xzvf zeromq-3.2.4.tar.gz cd zeromq-3.2.4
./configure make sudo make install sudo ldconfig
wget https://github.com/zeromq/jzmq/archive/v2.2.2.zip unzip jzmq-2.2.2.zip cd jzmq-2.2.2 ./autogen.sh ./configure make sudo make install sudo ldconfig
java -XshowSettings:properties
java.library.path = /usr/java/packages/lib/amd64 /usr/lib64 /lib64 /lib /usr/lib
whereis libzmq whereis libjzmq
dd if=/dev/urandom of=testdata bs=1M count=1024
echo 3 > /proc/sys/vm/drop_caches time cp testdata testdata2 real 0m7.745s user 0m0.011s sys 0m1.223s
package com.coldsnipe.example; import org.zeromq.ZFrame; import org.zeromq.ZMQ; public class ZMQClient implements Runnable { // "" private static final int PIPELINE = 10; // 1 , 250. private static final int CHUNK_SIZE = 250000; private final ZMQ.Context zmqContext; public ZMQClient(ZMQ.Context zmqContext) { this.zmqContext = zmqContext; } @Override public void run() { // ZMQ.Socket, DEALER socket try (ZMQ.Socket socket = zmqContext.socket(ZMQ.DEALER)) { // // , 1 (drop) socket.setLinger(1000); // TCP socket.connect("tcp://127.0.0.1:6000"); // , Chunks int credit = PIPELINE; // long total = 0; // (Chunks) int chunks = 0; // long offset = 0; while (!Thread.currentThread().isInterrupted()) { // "", // while (credit > 0) { socket.sendMore("fetch"); socket.sendMore(Long.toString(offset)); socket.send(Integer.toString(CHUNK_SIZE)); offset += CHUNK_SIZE; credit--; } // ZFrame zFrame = ZFrame.recvFrame(socket); // , if (zFrame == null) { break; } chunks++; credit++; int size = zFrame.getData().length; total += size; zFrame.destroy(); // // if (size < CHUNK_SIZE) { break; } } System.out.format("%d chunks received, %d bytes %n", chunks, total); } } }
package com.coldsnipe.example; import org.zeromq.ZFrame; import org.zeromq.ZMQ; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; public class ZMQServer implements Runnable { private final ZMQ.Context zmqContext; // private final Path filePath; public ZMQServer(ZMQ.Context zmqContext, Path filePath) { this.zmqContext = zmqContext; this.filePath = filePath; } @Override public void run() { try { File file = filePath.toFile(); if (!file.exists()) { throw new RuntimeException("File does not exists: " + filePath); } // , Router identity , // try (FileChannel fileChannel = FileChannel.open(filePath)) { try (ZMQ.Socket socket = zmqContext.socket(ZMQ.ROUTER)) { // localhost 6000 socket.bind("tcp://*:6000"); socket.setLinger(1000); while (!Thread.currentThread().isInterrupted()) { // - identity frame ZFrame identity = ZFrame.recvFrame(socket); assert identity != null; // String command = socket.recvStr(); if (command.equals("fetch")) { // fetch, offset String offsetString = socket.recvStr(); long offset = Long.parseLong(offsetString); String chunkSizeString = socket.recvStr(); int chunkSize = Integer.parseInt(chunkSizeString); int currentChunkSize = chunkSize; // offset + // , offset if (file.length() < (offset + chunkSize)) { currentChunkSize = (int) (file.length() - offset); } if (currentChunkSize > 0) { ByteBuffer byteBuffer = ByteBuffer.allocate(currentChunkSize); fileChannel.read(byteBuffer, offset); byteBuffer.flip(); byte[] bytes = new byte[currentChunkSize]; byteBuffer.get(bytes); // ZFrame frameToSend = new ZFrame(bytes); // identity // , identity.send(socket, ZFrame.MORE); frameToSend.send(socket, 0); } } } } } } catch (IOException e) { throw new RuntimeException(e); } } }
package com.coldsnipe.example; import org.junit.Test; import org.zeromq.ZMQ; import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; public class ZMQExampleTest { @Test public void testDataExchange() throws Exception { // ZMQ.Context, 1! // ZMQ.Context zmqContext = ZMQ.context(1); // final URL fileUrl = ZMQExampleTest.class.getClassLoader().getResource("testdata"); assert fileUrl != null; Path filePath = Paths.get(fileUrl.toURI()); // , // long startTime = System.nanoTime(); Thread clientThread = new Thread(new ZMQClient(zmqContext)); Thread serverThread = new Thread(new ZMQServer(zmqContext, filePath)); clientThread.start(); serverThread.start(); clientThread.join(); long estimatedTime = System.nanoTime() - startTime; float timeForTest = estimatedTime / 1000000000F; System.out.format("Elapsed time: %fs%n", timeForTest); // , // . zmqContext.term(); } }
4295 chunks received, 1073741824 bytes Elapsed time: 1.429522s
else if(command.equals("get")) { // ID String id = socket.recvStr(); // BigInteger identityString = new BigInteger(identity.getData()); DataBlob dataBlob = blobMap.get(identityString); if (dataBlob!= null){ // , DataBlob dataBlob.closeSource(); blobMap.remove(identityString); } // DataBlob dataBlob = dataProvider.getBlob(id); if (dataBlob == null) { log.error("Received wrong get call on server socket, ID [{}]", id); } else { // , DataHeader dataHeader = new DataHeader(id, dataBlob.getSize()); byte[] bytesToSend = FrameHeaderEncoder.encode(dataHeader); ZFrame frameToSend = new ZFrame(bytesToSend); // DataBlob blobMap.put(new BigInteger(identity.getData()), dataBlob); // identity.send(socket, ZFrame.MORE); frameToSend.send(socket, 0); } } else if (command.equals("end")){ BigInteger identityString = new BigInteger(identity.getData()); // DataBlob DataBlob dataBlob = blobMap.remove(identityString); if (dataBlob != null) { dataBlob.closeSource(); } }
public interface DataProvider { public DataBlob getBlob(String dataId); } public abstract class DataBlob { private final long size; private final String dataId; protected DataBlob(long size, String dataId) { this.size = size; this.dataId= dataId; } public abstract byte[] getData(long position, int length); public abstract void closeSource(); public long getSize() { return size; } public String getDataId() { return getDataId; } }
public class FileDataBlob extends DataBlob { private final FileChannel fileChannel; public FileDataBlob(long size, String dataId, Path filePath) { super(size, dataId); try { this.fileChannel = FileChannel.open(filePath); } catch (IOException e) { throw new DataBlobException(e); } } @Override public byte[] getData(long position, int length) { ByteBuffer byteBuffer = ByteBuffer.allocate(length); try { fileChannel.read(byteBuffer, position); byteBuffer.flip(); byte[] bytes = new byte[length]; byteBuffer.get(bytes); return bytes; } catch (IOException e) { throw new DataBlobException(e); } } @Override public void closeSource() { try { fileChannel.close(); } catch (IOException e) { throw new DataBlobException(e); } } }
Source: https://habr.com/ru/post/200674/
All Articles