📜 ⬆️ ⬇️

Container java-server code with the support of a permanent connection

Disclaimer


Everything described in the article is a personal practical experience and does not pretend to the title of "truth in the last resort."

Preamble


Hello. I am fond of computer games. My favorite direction in which I constantly try to improve and learn something new is browser-based multiplayer games.
Apache Tomcat is used as a servlet container to create a prototype for a single idea. It communicates with the client part of the http protocol. For this type of game, the scheme is quite effective, and it is quite simple to implement.
But one of the premature optimizations (yes, this is bad, but then I decided to allow myself this) was the idea to use a permanent connection between the server and the client, since In such a scheme, no time is spent on opening / closing a connection in each request. To implement the scheme, we considered the WebSocket API for Tomcat, but it became interesting to write your own bike, therefore, meet the story about the development under the cut.

Instruments


So, for the implementation of this idea were used:

')

Architecture



First, consider the logic of the application:

The container is represented by the main class of SocketServletContainer . It serves to start / stop the container, also contains methods for managing servlets. I want to note that in the article the term servlet denotes an object containing a method with server code, and has nothing to do with the Servlet specification from JCP. It's just easier for me to call such objects exactly servlets.

Actually, we have the base class Servlet , from which all the user's servlets are inherited, the connection session class ( SocketSession ), which serves to store information about the session and to send messages to the user (why I did this, I will explain later). The classes of the incoming and outgoing buffer ( InputBuffer and OutputBuffer ), respectively, were also implemented.

It was also necessary to implement the auxiliary class Config , which is responsible for parsing the configuration file in xml format. Mention should be made of the QueueHandler and TaskHandler classes .

QueueHandler is a request queue handler and contains a method for adding an instance of the Task class for processing.
TaskHandler implements the Runnable interface. The run method contains the processing of the transmitted request.
The Task class contains information about the incoming request (which servlet to access and the parameters passed to the server) and methods for working with the network (read \ write).

Now consider the organization of work with the network:

I will not describe in detail the work with Netty, because it has already been done before me (special thanks to Rena4ka for her article on Netty). Read on Habré or documentation on the official website, as you will be more convenient. I will consider only the part that is necessary to understand the basic principles of a person who does not have programming experience with Netty.
The ServerPipelineFactory class is a ChannelPipeline factory and is needed for Netty to function. Also I had to implement 3 classes: Decoder , Encoder , NioHandler .
The first 2 are the packet handlers that came to the server. The decoder is responsible for correctly parsing the packet received from the network and returns an instance of the Task class. The encoder is responsible for correctly recording the Task instance on the network and sending it to the client.
NioHandler is essentially a network manager: accepts connections, sends tasks for processing and manages sessions.

Protocol


For communication between client and server, you need your own protocol. I decided to make it fairly simple and textual.
As a result, the client sends a query string to the server that looks like this: servlet_name [sysDiv] query_parameters.
The format of the list of query parameters: name1 = value1, name2 = value2, ...

Example : "TS [sysDiv] message = Hello habrahabr.ru".

It should be noted that the protocol is symmetrical in the sense that the client receives a string indicating the servlet that generated the response and the list of transmitted parameters.

And now we proceed directly to the consideration of the code of our container. But first things first.

Configuration file format


<?xml version="1.0" encoding="utf-8"?> <config> <address>localhost</address> <port>9999</port> <workThreadCount>2</workThreadCount> <processThreadCount>2</processThreadCount> </config> 


workThreadCount - the number of threads that messages receive from the network and write to the network (needed to initialize Netty).
processThreadCount is the number of threads processing the common queue of requests that came to the server. In them, in fact, there is a parsing of query strings, the work of the entire server code and the formation of responses.

SocketServletContainer


This class is a singleton because it is a “central” class and it will be more convenient to contact it from other classes of the program. And, of course, it means 1 copy of the server per application (therefore no thread safe implementation of the singleton is required). Which, in my opinion, is logical.
 public class SocketServletContainer { private Channel channel; private ServerBootstrap networkServer; private QueueHandler queueHander; private Map<String, Servlet> servlets; private Config conf; private static SocketServletContainer server= null; private static List<SocketSession> list= new ArrayList<SocketSession>(); public List<SocketSession> getListSession() { return list; } static public SocketServletContainer getInstance() { if (server==null) { server= new SocketServletContainer(); } return server; } private SocketServletContainer() { conf= new Config("conf.xml"); // ,   -  Exception. try { conf.read(); } catch(Exception e) { throw new ContainerInitializeException(e.toString()); } servlets= new HashMap<String, Servlet>(); } public void start() { // Netty ExecutorService bossExec = new OrderedMemoryAwareThreadPoolExecutor(1, 400000000, 2000000000, 60, TimeUnit.SECONDS); ExecutorService ioExec = new OrderedMemoryAwareThreadPoolExecutor(conf.getWorkThreadCount(), 400000000, 2000000000, 60, TimeUnit.SECONDS); networkServer = new ServerBootstrap(new NioServerSocketChannelFactory(bossExec, ioExec, conf.getWorkThreadCount())); networkServer.setOption("backlog", 500); networkServer.setOption("connectTimeoutMillis", 10000); networkServer.setPipelineFactory(new ServerPipelineFactory()); channel = networkServer.bind(new InetSocketAddress(conf.getAddress(), conf.getPort())); //    queueHander= new QueueHandler(conf.getProcessThreadCount()); System.out.println("Ready"); } // «»   public void stop() { if (channel.isOpen()) { ChannelFuture future= channel.close(); future.awaitUninterruptibly(); } queueHander.stop(); } public QueueHandler getQueueHandler() { return this.queueHander; } //      public void registerServlet(Servlet servlet, String name) { //    -    HashMap. synchronized(servlets) { if (!servlets.containsKey(name)) { servlets.put(name, servlet); } } } public Servlet getServlet(String name) { return servlets.get(name); } } 

Servlet


Everything is simple and clear. The doRequest method is called when a packet arrives with an indication to invoke the servlet.
Sub : The transfer of the session to the doRequest method is done with the intent that the servlet can get a List of all existing sessions and send them a message. For example, when implementing a chat.
 abstract public class Servlet { abstract public void doRequest(InputBuffer input, OutputBuffer output, SocketSession session); } 

Socket session


Each session has its own unique id. There is a pool of id-nicknames for 20,000 connected clients. If this limit is exceeded, the server, when trying to create a session, will log an error, send a non-client error message and close the channel.
It is better to calculate the size of the pool empirically, ideally it should be a little more than the number of maximum possible simultaneously connected clients on your server.
 public class SocketSession { private static byte[] idPool; public int generateId() { synchronized(idPool) { if (idPool==null) { idPool= new byte[20000]; for (int j=0;j<idPool.length;j++) { idPool[j]=0; } } for (int j=0;j<idPool.length;j++) { if (idPool[j]==0) { idPool[j]=1; return j; } } return -1; } } private int id; private Channel channel; //      List  . public SocketSession(Channel channel) { this.channel= channel; this.id= generateId(); //    if (this.id==-1) { OutputBuffer out= new OutputBuffer(); out.setPar("error", "Connection limit error"); send(out, "System Servlet"); //  System.err.println("Connection limit error"); return; } SocketServletContainer.getInstance().getListSession().add(this); } public int getId() { return id; } //  .            . public void send(OutputBuffer output, String servletName) { synchronized(channel) { channel.write(new Task(servletName, output.toString())); } } // , ,      - «»  public void close() { synchronized(idPool) { idPool[this.id]= 0; } channel.close(); SocketServletContainer.getInstance().getListSession().remove(this); } } 

InputBuffer


The constructor is initialized, the source string must contain a list of query parameters in the specified format.
 public class InputBuffer { private Map<String, String> map= new HashMap<String, String>(); public InputBuffer(String source) { String[] par= source.split(","); for (int j=0; j< par.length; j++) { if (!par[j].contains("=")) { continue; } String[] data= par[j].split("="); if (data.length<2) { System.err.println("Parsing Error"); continue; } map.put(data[0], data[1]); } } public String getPar(String key) { return map.get(key); } } 

Outputbuffer


The class interface is quite understandable. An important note is that you need to override the toString () method, since it is this method that is used to form the answer in the SocketSession class.
 public class OutputBuffer { private List<String> list= new ArrayList<String>(); public void setPar(String key, String par) { list.add(key+"="+par); } @Override public String toString() { StringBuilder res= new StringBuilder(); for (int j=0; j< list.size();j++) { res.append(list.get(j)); if (j!=list.size()-1) { res.append(","); } } return res.toString(); } } 

Config


I will not provide the implementation of this class, because its interface is clear from what is used in the SocketServletContainer , and there are quite a lot of implementations of xml parsers on java on the Internet and, I hope, the reader will find the most suitable for it.
I personally used a DOM parser.

Queuehandler


This class is also very simple to implement. Inside it contains a thread pool that performs tasks ( TaskHandler ). Planning I shifted to a reliable and proven implementation of threadPool. To create a pool, use the Executors.newFixedThreadPool (n) factory.

When the stop method is called, the existing tasks in the queue will be processed, but new TaskHandlers will not be accepted for processing.
 public class QueueHandler { private ExecutorService threadPool; private int threadPoolSize; public QueueHandler(int size) { threadPoolSize= size; threadPool= Executors.newFixedThreadPool(threadPoolSize); } public void stop() { threadPool.shutdown(); } public void addTaskToProcess(Task task, SocketSession session) { threadPool.execute(new TaskHandler(task, session)); } } 

Taskhandler


Everything here is also very simple. The session of the player and the task to be processed ( Task ) are transferred to the constructor.
 public class TaskHandler implements Runnable{ private Task task; private SocketSession session; public TaskHandler(Task task, SocketSession session) { this.task= task; this.session= session; } @Override public void run() { //     Servlet servlet= SocketServletContainer.getInstance().getServlet(task.getServletName()); OutputBuffer output= new OutputBuffer(); //  ,   . if (servlet==null) { output.setPar("error", "servlet not found"); session.send(output, "Error Message"); return; } //     servlet.doRequest(new InputBuffer(task.getBuffer()),output, session); //  . session.send(output, task.getServletName()); } } 

Task


The Task object has the "servlet name" and "buffer" fields. The buffer is a query parameter string.
Static write / read methods are required to get instances of a class / write to the channel for Netty to work.
 public class Task { private String servletName=""; private String buffer=""; public Task(String servletName, String buffer) { this.servletName= servletName; this.buffer= buffer; } public Task() { } public String getServletName() { return servletName; } public String getBuffer() { return buffer; } //    public void get(ChannelBuffer buffer) { int length= buffer.readInt(); byte[] bytes= new byte[length]; buffer.readBytes(bytes); String input= new String(bytes); String[] data= input.split(java.util.regex.Pattern.quote("[sysDiv]")); if (data.length<2) { System.err.println("Parsing error"); return; } this.servletName= data[0]; this.buffer= data[1]; } //    public void send(ChannelBuffer buffer) { String output= this.servletName + "[sysDiv]"+ this.buffer; buffer.writeInt(output.getBytes().length); buffer.writeBytes(output.getBytes()); } public static Task read(ChannelBuffer buffer) { Task task= new Task(); task.get(buffer); return task; } public static void write(Task task, ChannelBuffer buffer) { task.send(buffer); } } 

Network part

As I promised, I will not consider in detail the work with netty, I’ll just give the code and explain the points that relate to the implementation of logic.

ServerPipelineFactory


 public class ServerPipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new Encoder(),new Decoder(),new NioHandler()); } } 

Decoder


The packet comes to the server in the following format: the first 4 bytes are the length of the “useful” data, then the data itself. The decoder reads, so that in the layers above, we may not think that the data has not yet arrived completely.
 public class Decoder extends ReplayingDecoder<DecoderState> { public enum DecoderState { READ_LENGTH, READ_CONTENT; } public Decoder() { super(DecoderState.READ_LENGTH); } private int length; @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer, DecoderState state) { switch (state) { case READ_LENGTH: length = buffer.readInt(); checkpoint(DecoderState.READ_CONTENT); case READ_CONTENT: ChannelBuffer frame= buffer.readBytes(length); // task       Task task= Task.read(frame); checkpoint(DecoderState.READ_LENGTH); return task; default: throw new Error( "Shouldn't reach here" ); } } } 

Encoder


 public class Encoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext channelhandlercontext, Channel channel, Object obj) throws Exception { //    Task,      if(!(obj instanceof Task)) { return obj; } Task task= (Task)obj; ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); // task   Task.write(task, buffer); return buffer; } } 

Niohandler


This object handles the main events of work with the network: connection of clients, receipt of messages, disconnection.
 public class NioHandler extends SimpleChannelUpstreamHandler { private SocketSession session; @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { //     session= new SocketSession(e.getChannel()); System.out.println("Has connect"); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { session.close(); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { if(e.getChannel().isOpen()) { //  Task       QueueHandler. Task message= (Task)e.getMessage(); SocketServletContainer.getInstance().getQueueHandler().addTaskToProcess(message, session); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { //  .  ,    . session.close(); e.getCause().printStackTrace(System.err); ctx.getChannel().close(); } } 


Servlet example


 public class TS extends Servlet { @Override public void doRequest(InputBuffer input, OutputBuffer output, SocketSession session) { output.setPar("request", input.getPar("message")+session.getId()); } } 


How it works or the main class of the application


Actually, the application is not so many lines of code, everything is simple and transparent.
 public class App { public static void main( String[] args ) throws ContainerInitializeException { SocketServletContainer server= SocketServletContainer.getInstance(); server.registerServlet(new TS(), "TS"); server.start(); } } 

Some tests


Well, the container is written, it works. I did not bother with creating a client-side wrapper for it, I limited myself to direct writing to the socket, it looks like this:
 socket = new Socket("127.0.0.1", 9999); DataOutputStream dos= new DataOutputStream(socket.getOutputStream()); DataInputStream dis= new DataInputStream(socket.getInputStream()); String buffer= "TS[sysDiv]message=IloveJava"; dos.writeInt(buffer.getBytes().length+4); dos.writeInt(buffer.getBytes().length); dos.write(buffer.getBytes()); dos.flush(); 

In general, as mentioned at the very beginning of the article, the creation of such a system is one of the premature optimizations that I decided to allow myself. Therefore, it would be foolish not to conduct a couple of tests, since we all wrote this.

Actually, I decided to compare this solution with the servlet container, working on http.
For tests, a servlet was written that spins into Tomcat and a servlet that runs inside the created container.

Zam: I intentionally compared the performance of the http protocol and solutions on sockets, since the web-socket, which Tomcat successfully supports, was not considered by me for the implementation of this game project.

Features of the test:

And the result is as follows: on average, processing 1 “empty” servlet for Tomcat took 0, 99 ms .
The container described in the article coped with a similar task in 0, 09 ms .

We have 2 results that differ by an order of magnitude. But given the fact that the idea of ​​using sockets came to me not because of speed, but because of the need to be able to reach the server to the client, the result can be considered more than satisfactory.

TODO:


There is also a small list that could be implemented for a similar system, which I will also give:

  1. Validation of input data. In the input buffer, you can add the validate (String mask) method, which by the data type mask for the corresponding parameters would automatically convert them to the desired (not just string) type. It might look something like this: validate (“message: String, count: int”);
  2. Add data encryption. This is what the record is written to in the byte [] buffer, and not writeUTF8 (), although the protocol is textual. You can implement interface Crypto {}, which would have 2 methods: code () and encode (). And the implementation of such an interface should be passed to SocketServletContainer (), for convenient changing or selecting a cryptography algorithm.
  3. Working with annotations (as done in Tomcat) and deferred initialization of servlets.
  4. More "safe" parsing of the input buffer with escaping delimiter
  5. A bunch of other useful little things you might need in such a system.

Instead of conclusion


The results of the container completely satisfied my expectations. Using NIO allowed us to economically consume the flows and configure the container for the existing iron so that it worked as efficiently as possible.
The functionality provided by the container makes it quite convenient to develop an application without worrying about “low-level” things, such as parsing packages, and so on (I, who was used to developing on tomcat, found everything quite convenient :)).

But I still did not dare to use such a solution as the basis of a real project, because for me the presence of sockets, of course, would be very useful (for server-client feedback), but, in principle, not critical. But the performance and reliability of Tomcat, proven over the years and thousands of developers, raises no questions.
I plan to use the implemented system in “narrow”, but noncritical places, in which the http-protocol is not very good to use, for example, it is perfect for chat implementation.

I hope this article will be interesting for readers and someone will be useful. I tried to convey a complete picture, perhaps a little too far from the reasoning and the amount of code. I am pleased to hear your questions and suggestions on the written material.

Source: https://habr.com/ru/post/188088/


All Articles