<?xml version="1.0" encoding="utf-8"?> <config> <address>localhost</address> <port>9999</port> <workThreadCount>2</workThreadCount> <processThreadCount>2</processThreadCount> </config>
public class SocketServletContainer { private Channel channel; private ServerBootstrap networkServer; private QueueHandler queueHander; private Map<String, Servlet> servlets; private Config conf; private static SocketServletContainer server= null; private static List<SocketSession> list= new ArrayList<SocketSession>(); public List<SocketSession> getListSession() { return list; } static public SocketServletContainer getInstance() { if (server==null) { server= new SocketServletContainer(); } return server; } private SocketServletContainer() { conf= new Config("conf.xml"); // , - Exception. try { conf.read(); } catch(Exception e) { throw new ContainerInitializeException(e.toString()); } servlets= new HashMap<String, Servlet>(); } public void start() { // Netty ExecutorService bossExec = new OrderedMemoryAwareThreadPoolExecutor(1, 400000000, 2000000000, 60, TimeUnit.SECONDS); ExecutorService ioExec = new OrderedMemoryAwareThreadPoolExecutor(conf.getWorkThreadCount(), 400000000, 2000000000, 60, TimeUnit.SECONDS); networkServer = new ServerBootstrap(new NioServerSocketChannelFactory(bossExec, ioExec, conf.getWorkThreadCount())); networkServer.setOption("backlog", 500); networkServer.setOption("connectTimeoutMillis", 10000); networkServer.setPipelineFactory(new ServerPipelineFactory()); channel = networkServer.bind(new InetSocketAddress(conf.getAddress(), conf.getPort())); // queueHander= new QueueHandler(conf.getProcessThreadCount()); System.out.println("Ready"); } // «» public void stop() { if (channel.isOpen()) { ChannelFuture future= channel.close(); future.awaitUninterruptibly(); } queueHander.stop(); } public QueueHandler getQueueHandler() { return this.queueHander; } // public void registerServlet(Servlet servlet, String name) { // - HashMap. synchronized(servlets) { if (!servlets.containsKey(name)) { servlets.put(name, servlet); } } } public Servlet getServlet(String name) { return servlets.get(name); } }
abstract public class Servlet { abstract public void doRequest(InputBuffer input, OutputBuffer output, SocketSession session); }
public class SocketSession { private static byte[] idPool; public int generateId() { synchronized(idPool) { if (idPool==null) { idPool= new byte[20000]; for (int j=0;j<idPool.length;j++) { idPool[j]=0; } } for (int j=0;j<idPool.length;j++) { if (idPool[j]==0) { idPool[j]=1; return j; } } return -1; } } private int id; private Channel channel; // List . public SocketSession(Channel channel) { this.channel= channel; this.id= generateId(); // if (this.id==-1) { OutputBuffer out= new OutputBuffer(); out.setPar("error", "Connection limit error"); send(out, "System Servlet"); // System.err.println("Connection limit error"); return; } SocketServletContainer.getInstance().getListSession().add(this); } public int getId() { return id; } // . . public void send(OutputBuffer output, String servletName) { synchronized(channel) { channel.write(new Task(servletName, output.toString())); } } // , , - «» public void close() { synchronized(idPool) { idPool[this.id]= 0; } channel.close(); SocketServletContainer.getInstance().getListSession().remove(this); } }
public class InputBuffer { private Map<String, String> map= new HashMap<String, String>(); public InputBuffer(String source) { String[] par= source.split(","); for (int j=0; j< par.length; j++) { if (!par[j].contains("=")) { continue; } String[] data= par[j].split("="); if (data.length<2) { System.err.println("Parsing Error"); continue; } map.put(data[0], data[1]); } } public String getPar(String key) { return map.get(key); } }
public class OutputBuffer { private List<String> list= new ArrayList<String>(); public void setPar(String key, String par) { list.add(key+"="+par); } @Override public String toString() { StringBuilder res= new StringBuilder(); for (int j=0; j< list.size();j++) { res.append(list.get(j)); if (j!=list.size()-1) { res.append(","); } } return res.toString(); } }
public class QueueHandler { private ExecutorService threadPool; private int threadPoolSize; public QueueHandler(int size) { threadPoolSize= size; threadPool= Executors.newFixedThreadPool(threadPoolSize); } public void stop() { threadPool.shutdown(); } public void addTaskToProcess(Task task, SocketSession session) { threadPool.execute(new TaskHandler(task, session)); } }
public class TaskHandler implements Runnable{ private Task task; private SocketSession session; public TaskHandler(Task task, SocketSession session) { this.task= task; this.session= session; } @Override public void run() { // Servlet servlet= SocketServletContainer.getInstance().getServlet(task.getServletName()); OutputBuffer output= new OutputBuffer(); // , . if (servlet==null) { output.setPar("error", "servlet not found"); session.send(output, "Error Message"); return; } // servlet.doRequest(new InputBuffer(task.getBuffer()),output, session); // . session.send(output, task.getServletName()); } }
public class Task { private String servletName=""; private String buffer=""; public Task(String servletName, String buffer) { this.servletName= servletName; this.buffer= buffer; } public Task() { } public String getServletName() { return servletName; } public String getBuffer() { return buffer; } // public void get(ChannelBuffer buffer) { int length= buffer.readInt(); byte[] bytes= new byte[length]; buffer.readBytes(bytes); String input= new String(bytes); String[] data= input.split(java.util.regex.Pattern.quote("[sysDiv]")); if (data.length<2) { System.err.println("Parsing error"); return; } this.servletName= data[0]; this.buffer= data[1]; } // public void send(ChannelBuffer buffer) { String output= this.servletName + "[sysDiv]"+ this.buffer; buffer.writeInt(output.getBytes().length); buffer.writeBytes(output.getBytes()); } public static Task read(ChannelBuffer buffer) { Task task= new Task(); task.get(buffer); return task; } public static void write(Task task, ChannelBuffer buffer) { task.send(buffer); } }
public class ServerPipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new Encoder(),new Decoder(),new NioHandler()); } }
public class Decoder extends ReplayingDecoder<DecoderState> { public enum DecoderState { READ_LENGTH, READ_CONTENT; } public Decoder() { super(DecoderState.READ_LENGTH); } private int length; @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer, DecoderState state) { switch (state) { case READ_LENGTH: length = buffer.readInt(); checkpoint(DecoderState.READ_CONTENT); case READ_CONTENT: ChannelBuffer frame= buffer.readBytes(length); // task Task task= Task.read(frame); checkpoint(DecoderState.READ_LENGTH); return task; default: throw new Error( "Shouldn't reach here" ); } } }
public class Encoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext channelhandlercontext, Channel channel, Object obj) throws Exception { // Task, if(!(obj instanceof Task)) { return obj; } Task task= (Task)obj; ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); // task Task.write(task, buffer); return buffer; } }
public class NioHandler extends SimpleChannelUpstreamHandler { private SocketSession session; @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { // session= new SocketSession(e.getChannel()); System.out.println("Has connect"); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { session.close(); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { if(e.getChannel().isOpen()) { // Task QueueHandler. Task message= (Task)e.getMessage(); SocketServletContainer.getInstance().getQueueHandler().addTaskToProcess(message, session); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { // . , . session.close(); e.getCause().printStackTrace(System.err); ctx.getChannel().close(); } }
public class TS extends Servlet { @Override public void doRequest(InputBuffer input, OutputBuffer output, SocketSession session) { output.setPar("request", input.getPar("message")+session.getId()); } }
public class App { public static void main( String[] args ) throws ContainerInitializeException { SocketServletContainer server= SocketServletContainer.getInstance(); server.registerServlet(new TS(), "TS"); server.start(); } }
socket = new Socket("127.0.0.1", 9999); DataOutputStream dos= new DataOutputStream(socket.getOutputStream()); DataInputStream dis= new DataInputStream(socket.getInputStream()); String buffer= "TS[sysDiv]message=IloveJava"; dos.writeInt(buffer.getBytes().length+4); dos.writeInt(buffer.getBytes().length); dos.write(buffer.getBytes()); dos.flush();
Source: https://habr.com/ru/post/188088/
All Articles