public class ConnectionHandler extends SimpleChannelHandler
{
@Override
public void channelOpen( ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception
{
if( )
{
//
e.getChannel().close();
}
}
@Override
public void channelClosed( ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception
{
// , , .
}
@Override
public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e ) throws Exception
{
//
log.error( "message: {}", e.getCause().getMessage() );
}
}
public class FrameHandler extends ReplayingDecoder<DecoderState>
{
public enum DecoderState
{
READ_LENGTH,
READ_CONTENT;
}
private int length;
public ProtocolPacketFramer()
{
super( DecoderState.READ_LENGTH );
}
@Override
protected Object decode( ChannelHandlerContext chc, Channel chnl, ChannelBuffer cb, DecoderState state ) throws Exception
{
switch ( state )
{
case READ_LENGTH:
length = cb.readInt();
checkpoint( DecoderState.READ_CONTENT );
case READ_CONTENT:
ChannelBuffer frame = cb.readBytes( length );
checkpoint( DecoderState.READ_LENGTH );
return frame;
default:
throw new Error( "Shouldn't reach here." );
}
}
}
public class ProtocolPacketHandler extends SimpleChannelHandler
{
@Override
public void messageReceived( ChannelHandlerContext ctx, MessageEvent e ) throws Exception
{
//
byte[] msg = ((ChannelBuffer)e.getMessage()).array();
//
Packet packet = new Packet();
packet.setData( msg );
packet.setSender( session );
//
session.addReadPacketQueue( packet );
//
Server.getReader().addSessionToProcess( session );
}
}
public final class ReadQueueHandler implements Runnable
{
private final BlockingQueue<Session> sessionQueue;
private final ExecutorService threadPool;
private int threadPoolSize;
public ReadQueueHandler( int threadPoolSize )
{
this.threadPoolSize = threadPoolSize;
this.threadPool = Executors.newFixedThreadPool( threadPoolSize );
this.sessionQueue = new LinkedBlockingQueue();
initThreadPool();
}
private void initThreadPool()
{
for ( int i = 0; i < this.threadPoolSize; i++ )
{
this.threadPool.execute( this );
}
}
//
public void addSessionToProcess( Session session )
{
if ( session != null )
{
this.sessionQueue.add( session );
}
}
@Override
public void run()
{
while ( isActive )
{
//
Session session = (Session) this.sessionQueue.take();
//
//
packet = session.getReadPacketQueue().take();
//
data = packet.getData();
}
}
}
Source: https://habr.com/ru/post/136765/
All Articles