Executor threadPool = Executors.newCachedThreadPool(); DatagramChannelFactory factory = new NioDatagramChannelFactory(threadPool); // UDP. bootstrap = new ConnectionlessBootstrap(factory); bootstrap.getPipeline().addLast("handler", new Handler()); // Handler . bootstrap.setOption("reuseAddress", true); // . , reuseAddress . , - . // ShutdowHook , Netty. , . Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { channel.close(); bootstrap.releaseExternalResources(); } })); String host = Config.getInstance().getString("listen.host", "127.0.0.1"); Integer port = Config.getInstance().getInt("listen.port", 8080); InetSocketAddress address = new InetSocketAddress(host, port); // , , . Netty . logger.info("Listening on " + host + ":" + port); // bind, . bootstrap.bind(address);
public class Handler extends SimpleChannelUpstreamHandler { private static final Logger logger = Logger.getLogger(Handler.class); public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer channelBuffer = (ChannelBuffer)e.getMessage(); // ChannelBuffer udp-. // , , connection ID (long), action ID (int) transaction ID (int) 16 . if (channelBuffer.readableBytes() < 16) { logger.debug("Incorrect packet received from " + e.getRemoteAddress()); } long connectionId = channelBuffer.readLong(); // connectionId, . int actionId = channelBuffer.readInt(); // ID . : 0x00 Connect; 0x01 Announce; 0x02 Scrape; 0x03: Error. () . int transactionId = channelBuffer.readInt(); // ID . ID , . Action action = Action.byId(actionId); ClientRequest request; switch (action) { case CONNECT: request = new ConnectionRequest(); break; case ANNOUNCE: request = new AnnounceRequest(); break; case SCRAPE: request = new ScrapeRequest(); break; default: logger.debug("Incorrect action supplied"); ErrorResponse.send(e, transactionId, "Incorrect action"); return; } // , , . request.setContext(ctx); request.setMessageEvent(e); request.setChannelBuffer(channelBuffer); request.setConnectionId(connectionId); request.setAction(action); request.setTransactionId(transactionId); // . request.read(); } }
@Entity("peers") public class Peer { public @Id ObjectId id; public @Indexed byte[] infoHash; public byte[] peerId; public long downloaded; public long left; public long uploaded; public @Transient int event; public int ip; public short port; public @Transient int key; public @Transient int numWant; public @Transient short extensions; public long lastUpdate; @PrePersist private void prePersist() { this.lastUpdate = System.currentTimeMillis(); } }
morphia = new Morphia(); morphia.map(Peer.class); // , . mongo = new Mongo(host, port); datastore = morphia.createDatastore(mongo, "udptracker");
Query<Peer> peersQuery = Datastore.instance().find(Peer.class); peersQuery.field("infoHash").equal(peer.infoHash); peersQuery.field("peerId").notEqual(peer.peerId); // . peersQuery.limit(peer.numWant).offset(randomOffset); // numWant .
Source: https://habr.com/ru/post/130563/