📜 ⬆️ ⬇️

High-performance SUN / ONCRPC server on Java NIO

The dCache article discusses how to use it as an NFS server. But interoperability with existing customers is not enough to use the system. Performance should also be on top. The workhorse of the NFS protocol is the ONCRPC protocol. In dCache, we use our own implementation based on the grizzly nio framework .

A bit of history for the young

ONC RPC (Open Network Computing Remote Procedure Call) is a protocol created by Sun Microsystems in the late 80s and published in 1995 along with NFSv2. ONCRPC was quickly distributed and widely used until it was supplanted by fashionable alternatives like CORBA, SOAP, and later REST and JSON-RPC in early 2000. However, ONCRPC is still used, where simplicity and speed are more important than fashion - in network file systems.

Implementation


In order not to reinvent the next bike, at first we used the implementation of the Remote Tea , but soon we faced restrictions that could not easily be resolved: IPv6, GSSAPI, NIO. So the bike had to be invented, but not from scratch. We have preserved compatibility with RemoteTea as much as possible and adapted the code already written.
')


Grizzly-nio

The basis we took grizzly-nio, used in glassfish. Like all modern NIO frameworks, grizzly is based on event handling and a chain of responsibility pattern. That is, we describe a chain of filters that are called on a specific event.

package org.glassfish.grizzly.filterchain; import java.io.IOException; public interface Filter { public void onAdded(FilterChain fc); public void onRemoved(FilterChain fc); public void onFilterChainChanged(FilterChain fc); public NextAction handleRead(FilterChainContext fcc) throws IOException; public NextAction handleWrite(FilterChainContext fcc) throws IOException; public NextAction handleConnect(FilterChainContext fcc) throws IOException; public NextAction handleAccept(FilterChainContext fcc) throws IOException; public NextAction handleEvent(FilterChainContext fcc, FilterChainEvent fce) throws IOException; public NextAction handleClose(FilterChainContext fcc) throws IOException; public void exceptionOccurred(FilterChainContext fcc, Throwable thrwbl); } 


The handleXXXX methods return NextAction, which can be StopAction or ContinueAction. If the filter returns StopAction, the processing of the chain stops. Basically, we are interested in handleRead and handleWrite, which are called when reading and writing a network connection.

  @Override public NextAction handleRead(FilterChainContext ctx) throws IOException { Buffer messageBuffer = ctx.getMessage(); if (!isMessageArrived(messageBuffer)) { //     //    return ctx.getStopAction(messageBuffer); } //    ctx.setMessage(getMessage(messageBuffer)); return ctx.getInvokeAction(); } 


Combat code
 import java.io.IOException; import java.nio.ByteOrder; import org.glassfish.grizzly.Buffer; import org.glassfish.grizzly.filterchain.BaseFilter; import org.glassfish.grizzly.filterchain.FilterChainContext; import org.glassfish.grizzly.filterchain.NextAction; import org.glassfish.grizzly.memory.BuffersBuffer; public class RpcMessageParserTCP extends BaseFilter { /** * RPC fragment record marker mask */ private final static int RPC_LAST_FRAG = 0x80000000; /** * RPC fragment size mask */ private final static int RPC_SIZE_MASK = 0x7fffffff; @Override public NextAction handleRead(FilterChainContext ctx) throws IOException { Buffer messageBuffer = ctx.getMessage(); if (messageBuffer == null) { return ctx.getStopAction(); } if (!isAllFragmentsArrived(messageBuffer)) { return ctx.getStopAction(messageBuffer); } ctx.setMessage(assembleXdr(messageBuffer)); final Buffer reminder = messageBuffer.hasRemaining() ? messageBuffer.split(messageBuffer.position()) : null; return ctx.getInvokeAction(reminder); } @Override public NextAction handleWrite(FilterChainContext ctx) throws IOException { Buffer b = ctx.getMessage(); int len = b.remaining() | RPC_LAST_FRAG; Buffer marker = GrizzlyMemoryManager.allocate(4); marker.order(ByteOrder.BIG_ENDIAN); marker.putInt(len); marker.flip(); marker.allowBufferDispose(true); b.allowBufferDispose(true); Buffer composite = GrizzlyMemoryManager.createComposite(marker, b); composite.allowBufferDispose(true); ctx.setMessage(composite); return ctx.getInvokeAction(); } private boolean isAllFragmentsArrived(Buffer messageBuffer) throws IOException { final Buffer buffer = messageBuffer.duplicate(); buffer.order(ByteOrder.BIG_ENDIAN); while (buffer.remaining() >= 4) { int messageMarker = buffer.getInt(); int size = getMessageSize(messageMarker); /* * fragmen size bigger than we have received */ if (size > buffer.remaining()) { return false; } /* * complete fragment received */ if (isLastFragment(messageMarker)) { return true; } /* * seek to the end of the current fragment */ buffer.position(buffer.position() + size); } return false; } private static int getMessageSize(int marker) { return marker & RPC_SIZE_MASK; } private static boolean isLastFragment(int marker) { return (marker & RPC_LAST_FRAG) != 0; } private Xdr assembleXdr(Buffer messageBuffer) { Buffer currentFragment; BuffersBuffer multipleFragments = null; boolean messageComplete; do { int messageMarker = messageBuffer.getInt(); int size = getMessageSize(messageMarker); messageComplete = isLastFragment(messageMarker); int pos = messageBuffer.position(); currentFragment = messageBuffer.slice(pos, pos + size); currentFragment.limit(size); messageBuffer.position(pos + size); if (!messageComplete & multipleFragments == null) { /* * we use composite buffer only if required * as they not for free. */ multipleFragments = GrizzlyMemoryManager.create(); } if (multipleFragments != null) { multipleFragments.append(currentFragment); } } while (!messageComplete); return new Xdr(multipleFragments == null ? currentFragment : multipleFragments); } } 


If we stopped the chain due to lack of data, the next call to handleRead will contain a composite buffer (consisting of several buffers).

Primitive server looks like this
  public static void main(String[] args) throws IOException { FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless(); filterChainBuilder.add(new TransportFilter()); filterChainBuilder.add(new /*   */); filterChainBuilder.add(new /*   */); final TCPNIOTransport transport = TCPNIOTransportBuilder.newInstance().build(); transport.setProcessor(filterChainBuilder.build()); transport.bind(HOST, PORT); transport.start(); System.in.read(); } 

On the project page you can find many examples. By default, grizzly will create as many threads as there are processors on the machine. This approach has proven itself in practice. On a machine with 24 cores, our NFS server easily serves about a thousand clients.

The project itself is actively developing, and the development team quickly responds to both error messages and sent patches and recommendations.

oncrpc4j

All ONCRPC code is designed as a simple to use separate library. Two typical integration options are supported — a service embedded in an application or a service initialized as Spring bean.

Embedded application

 import org.dcache.xdr.RpcDispatchable; import org.dcache.xdr.RpcCall; import org.dcache.xdr.XdrVoid; import org.dcache.xdr.OncRpcException; public class Svcd { private static final int DEFAULT_PORT = 1717; private static final int PROG_NUMBER = 111017; private static final int PROG_VERS = 1; public static void main(String[] args) throws Exception { RpcDispatchable dummy = new RpcDispatchable() { @Override public void dispatchOncRpcCall(RpcCall call) throws OncRpcException, IOException { call.reply(XdrVoid.XDR_VOID); } }; OncRpcSvc service = new OncRpcSvcBuilder() .withTCP() .withAutoPublish() .withPort(DEFAULT_PORT) .withSameThreadIoStrategy() .build(); service.register(new OncRpcProgram(PROG_NUMBER, PROG_VERS), dummy); service.start(); } } 


Spring integration

I'm not afraid of XML
 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> <bean id="my-rpc-svc" class="me.mypackage.Svcd"> <description>My RPC service</description> </bean> <bean id="my-rpc" class="org.dcache.xdr.OncRpcProgram"> <description>My RPC program number</description> <constructor-arg index="0" value="1110001" /> <constructor-arg index="1" value="1" /> </bean> <bean id="rpcsvc-builder" class="org.dcache.xdr.OncRpcSvcFactoryBean"> <description>Onc RPC service builder</description> <property name="port" value="1717"/> <property name="useTCP" value="true"/> </bean> <bean id="oncrpcsvc" class="org.dcache.xdr.OncRpcSvc" init-method="start" destroy-method="stop"> <description>My RPC service</description> <constructor-arg ref="rpcsvc-builder"/> <property name="programs"> <map> <entry key-ref="my-rpc" value-ref="my-rpc-svc"/> </map> </property> </bean> </beans> 



Performance



As can be seen from the graph, the code in Java is not only no slower written in 'C', but also overtakes the Linux kernel (because of a bug that, I hope, has already been repaired).

To steal and contribute code


The code is available on a githab under the LGPL license.

Source: https://habr.com/ru/post/199196/


All Articles