📜 ⬆️ ⬇️

High-performance NIO server on Netty

Preamble

Hello. I am the main developer of the largest Minecraft server in the CIS (I won’t advertise who I need, they know). For almost a year we have been writing our server implementation, designed for more than 40 people (we want to see a figure of 500 at least). So far everything has been successful, but lately the system has begun to rest on the fact that, due to the not very successful network implementation (1 stream for input, 1 for output + 1 for processing), more than 980 streams (+ systemic) work online with 300 players That, combined with the performance of Java's default io, gives a huge performance drop, and already with 100 players the server is mainly engaged in what it writes / reads to / from the network.

So I decided to switch to NIO. Netty got into the hands quite by accident, the structure of which seemed just perfect to integrate it into a ready-made working solution. Unfortunately, Netty manuals are not only not only in Russian, but also in English, so I had to experiment a lot and search the library code to find the best way.

Here I will try to paint the server part of working with the network through Netty, maybe it will be useful to someone.
')

Server creation


ExecutorService bossExec = new OrderedMemoryAwareThreadPoolExecutor(1, 400000000, 2000000000, 60, TimeUnit.SECONDS); ExecutorService ioExec = new OrderedMemoryAwareThreadPoolExecutor(4 /*    */, 400000000, 2000000000, 60, TimeUnit.SECONDS); ServerBootstrap networkServer = new ServerBootstrap(new NioServerSocketChannelFactory(bossExec, ioExec, 4 /*       */)); networkServer.setOption("backlog", 500); networkServer.setOption("connectTimeoutMillis", 10000); networkServer.setPipelineFactory(new ServerPipelineFactory()); Channel channel = networkServer.bind(new InetSocketAddress(address, port)); 

OrderedMemoryAwareThreadPoolExecutor is used to perform Netty tasks; they are the most effective from the experience of French colleagues. You can use other Executors, for example, Executors.newFixedThreadPool (n) . In no case do not use Executors.newCachedThreadPool () , it creates unreasonably many threads and there is almost no netty gain from Netty. It makes no sense to use more than 4 worker threads, because they more than cope with a huge load (programmers from Xebia-France pulled over 100,000 simultaneous connections on 4 threads). Boss streams should be one for each listening port. The channel that the bind function returns, as well as the ServerBootsrap, must be saved so that you can stop the server later.

PipelineFactory


How the connections and client packages will be handled is determined by PipelineFactory , which, when opened with a client, creates a pipeline for it, in which event handlers are defined that occur on the channel. In our case, this is ServerPipelineFactory :
 public class ServerPipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { PacketFrameDecoder decoder = new PacketFrameDecoder(); PacketFrameEncoder encoder = new PacketFrameEncoder(); return Channels.pipeline(decoder, encoder, new PlayerHandler(decoder, encoder)); } } 

In this code, PacketFrameDecoder , PacketFrameEncoder, and PlayerHandler are event handlers that we define. The Channels.pipeline () function creates a new pipeline with the handlers passed to it. Be careful: events pass handlers in the order in which you passed from the pipeline function!

Protocol


I will describe the protocol a little so that it will be clear further.

Data exchange takes place using class objects that extend the Packet class, in which two functions are defined, get (ChannelBuffer input) and send (ChannelBuffer output) . Accordingly, the first function reads the necessary data from the channel, the second one writes the packet data to the channel.
 public abstract class Packet { public static Packet read(ChannelBuffer buffer) throws IOException { int id = buffer.readUnsignedShort(); //  ID  ,  ,     Packet packet = getPacket(id); //      ID if(packet == null) throw new IOException("Bad packet ID: " + id); //         ,   packet.get(buffer); //       return packet; } public statuc Packet write(Packet packet, ChannelBuffer buffer) { buffer.writeChar(packet.getId()); //  ID  packet.send(buffer); //    } // ,       public abstract void get(ChannelBuffer buffer); public abstract void send(ChannelBuffer buffer); } 

An example of a pair of packages for clarity:
 // ,       public class Packet1Login extends Packet { public String login; public void get(ChannelBuffer buffer) { int length = buffer.readShort(); StringBuilder builder = new StringBuilder(); for(int i = 0; i < length ++i) builder.append(buffer.readChar()); login = builder.toString(); } public void send(ChannelBuffer buffer) { //   , ..      } } // ,       ,      public class Packet255KickDisconnect extends Packet { public String reason; public void get(ChannelBuffer buffer) { int length = buffer.readShort(); StringBuilder builder = new StringBuilder(); for(int i = 0; i < length ++i) builder.append(buffer.readChar()); reason = builder.toString(); } public void send(ChannelBuffer buffer) { buffer.writeShort(reason.length()); for(int i = 0; i < reason.length(); ++i) { buffer.writeChar(reason.getCharAt(i)); } } } 

ChannelBuffer is very similar to DataInputStream and DataOutputStream in one person. Most functions, if not the same, are very similar. Notice that I do not care about checking whether there are enough bytes in the buffer for reading, as if I were working with a blocking IO. About this further ...

Work with the client


Work with the client is mainly determined by the class PlayerHandler :
 public class PlayerHandler extends SimpleChannelUpstreamHandler { private PlayerWorkerThread worker; @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { //     .    Worker  — ,      . //      ( e.getChannel()),        worker = new PlayerWorkerThread(this, e.getChannel()); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { //   .   ,   ,    ,     .    ,     ,   ,       ,    . worker.disconnectedFromChannel(); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { //     Packet'  ,       worker.      . if(e.getChannel().isOpen()) worker.acceptPacket((Packet) e.getMessage()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { //    .  ,  . Server.logger.log(Level.WARNING, "Exception from downstream", e.getCause()); ctx.getChannel().close(); } } 

A Worker can send data to a player simply by the function channel.write (packet), where channel is the player's channel that is sent to him when connected, and packet is an object of the Packet class. Encoder will be responsible for encoding packages.

Decoder and Encoder


Actually, the most important part of the system is that they are responsible for the formation of Packet packets from the user's stream and for sending the same packets to the stream.

The encoder is very simple, it sends packets to the player:
 public class PacketFrameEncoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext channelhandlercontext, Channel channel, Object obj) throws Exception { if(!(obj instanceof Packet)) return obj; //    ,      Packet p = (Packet) obj; ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); //          .      ,       — ChannelBuffers     ,    . Packet.write(p, buffer); //     return buffer; //  ,       } } 


Decoder is much more complicated. The fact is that in the buffer that came from the client, there may simply not be enough bytes to read the entire packet. In this case, the ReplayingDecoder class will help us. We just need to implement its decode function and read the data from the stream in it, not caring about nothing:
 public class PacketFrameDecoder extends ReplayingDecoder<VoidEnum> { @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer, VoidEnum e) throws Exception { return Packet.read(buffer); } } 

The question is, how does this work? Very simple, before calling the decode function, the decoder marks the current reading index, if there is not enough data in the buffer when reading from the buffer, an exception will be generated. This will return the buffer to its initial position and the decode will be repeated when more data is received from the user. In case of successful reading (returned is not null), the decoder will try to call the decode functions again, already on the data remaining in the buffer, if there is at least one more byte in it.

Does it all work slowly if it throws an exception? Slower than, for example, checking the amount of data in the buffer and evaluating whether it is enough to read a packet. But it uses a cached exception, so it does not waste time filling out stacktrace and even creating a new exception object. Read more about some other efficiency-enhancing features of ReplayingDecoder here.

You can also experiment with FrameDecoder if, for example, you can pre-determine the size of a packet by its ID.

It seems to be all


The results were excellent. First, the server no longer spills thousands of threads - 4 Netty + 4 threads of data processing do an excellent job with 250+ clients (testing continues). Secondly, the load on the processor has become much smaller and has ceased to grow linearly on the number of connections. Thirdly, the response time in some cases has become less.

I hope someone it will be useful. I tried to transfer as much important data as possible, I could overdo it. After all, there aren't many examples? Ask your answers and do not judge strictly - the first time I write on Habr.

Postscript: a few more useful things


Netty has a few more interesting features that deserve a special mention:

First, stop the server:
 ChannelFuture future = channel.close(); future.awaitUninterruptibly(); 

Where channel is the channel that the bind function returned at the beginning. future.awaitUninterruptibly () will wait until the channel closes and the code continues.

Highlights: ChannelFuture. When we send a packet to a channel, the function channel.write (packet), it returns ChannelFuture - this is a special object that tracks the status of the action taken. Through it, you can check whether the action was performed.

For example, we want to send a disconnect packet to the client and close the channel behind it. If we do
 channel.write(new Packet255KickDisconnect("!")); channel.close(); 

then with a probability of 99%, we will get a ChannelClosedException and the packet will not reach the client. But you can do this:
 ChannelFuture future = channel.write(new Packet255KickDisconnect("!")); try { future.await(10000); //    10 ,    } catch(InterruptedException e) {} channel.close(); 

Everything will be great, except that it can block the flow of execution until the package is sent to the user. Therefore, on ChannelFuture, you can hang a listener - an object that will be notified that an event has occurred and will perform some actions. To close the connection, there is a ready listener ChannelFutureListener.CLOSE . Example of use:
 ChannelFuture future = channel.write(new Packet255KickDisconnect("!")); furute.addListener(ChannelFutureListener.CLOSE); 

The effect is the same, no locks. Understanding how to create your own listener is not difficult - there is only one function. Open any ready class, here I will not give an example.

More important information

As it was correctly noted in the comments, you should be warned that in handlers (handler-ahs that hang on the pipeline) it is better not to use blocking operations or waiting. Otherwise, you risk permanently losing the processing flow or simply slowing down the processing of other clients' events.

Also in the handler, in no case can you “wait for the future”, i.e. perform .await () or .awaitUninterruptibly () on any ChannelFuture. First, you will fail, they cannot be called from handlers - the system will not allow you to do such nonsense and will generate an exception. Secondly, if this were not, your stream could again die leaving other customers without service.

In general, all actions performed in ChannelHandlers should be as simple and non-blocking as possible. In no case do not process the data directly in them - put the packets in a queue and process them in another stream.

Source: https://habr.com/ru/post/136456/


All Articles