📜 ⬆️ ⬇️

Asynchronous Symphony: JavaFX Tasks and Netty Sockets

Good Friday everyone!

We finally got around to a book about Netty, which was recommended to us, including grateful readers of our habroblog.


')
To be honest, we haven’t had anything narrow-topic in Java for a long time. But the topic of Netty causes the most lively interest on Habré, so we decided to post a review material on it (the author got the idea of ​​the post from this book) and arrange the most indicative survey. Come in, speak up!


This article explains how to integrate Netty client / server framework into a JavaFX application. Although you can write a distributed application using simple protocols that work on a request / response model, such as HTTP or RMI, these protocols are often inefficient and not functional enough for applications requiring constant server updates, push notifications that perform long-term operations. Netty uses an efficient network implementation based on asynchronous processing and state-dependent connections. This structure allows you to do without additional tricks, for example, does not require a survey to update client code.

By integrating Netty with JavaFX, you need to ensure that interactions with your UI are implemented in the interaction FX in your thread without blocking the UI. Thus, you need to wrap Netty calls into the Task class from FX. The FX Task class provides a stream for long-term operations, and in most cases you can let Netty just wait for a response ( wait() ). This is done by calling sync (), which provides a lock, but does not cause the application to hang.

This example is based on a program for the exchange of echo requests between the client and the server, which I found in the book " Netty in Action " by Norman Maurer and Marvin Allen Wolftal. After the connection is established, the client collects the string java.lang.String and sends it to the server. The server converts this string with toUpperCase() and sends the resulting string back to the client. The client displays a string in the user interface.

All the code for this post is on GitHub .

Project

For convenience, I packed all server and client code into one Maven project. The following UML class diagram shows which classes are in our program.



FX Echo Client Class Diagram

EchoServer and EchoClient contain main() functions, which are entry points for server and client processes. EchoServer contains Netty code for bootstrapping, linking and creating a pipeline with a special EchoServerHandler handler. EchoClient creates an EchoClient user interface EchoClientController that contains Netty code for creating a connection, breaking a connection, sending and receiving. The EchoClientController also creates a client pipeline using the EchoClientHandler .

The diagram shows the connection / send / receive / break connection sequence. It is not normalized, therefore some operations (“Enter Text”, “Netty Connect”) are nominal and are not in the code. Data exchange in the program is mainly implemented using standard binding JavaFX and Netty Futures.



So, this is how our sequence looks like schematically.

  1. The user clicks the Connect button.
  2. The EchoClientController controller EchoClientController and connects to the EchoServer .
  3. The user enters the text and presses the Send button.
  4. The writeAndFlush() operation is called on the channel. The channelRead() and channelReadComplete() methods of the channelRead() handler are EchoServerHandler .
  5. The channelRead() method of the channelRead() handler performs its own write() method, and the channelReadComplete() method performs flush() .
  6. EchoClientHandler receives data
  7. EchoClientHandler sets the EchoClientHandler property associated with the UI. Automatically updated TextField field in the UI.
  8. The user presses the Disconnect button.
  9. The EchoClientController controller closes its Channel channel and disables the EventGroup group (it is not shown in the diagram).


Client Code

Since all the code is on GitHub, I will focus on the interaction of client-side JavaFX and Netty interaction. Omitting a trivial subclass of EchoClient JavaFX Application that creates a scene (Stage) and loads an EchoClient.fxml file. The client code we are interested in is in the EchoClientController class.

connect ()

The connect() method takes a host and a port from the UI and creates a Netty channel, which is then saved as an EchoClientController field.

From EchoClientController.java

 @FXML HBox hboxStatus; @FXML ProgressIndicator piStatus; @FXML Label lblStatus; private BooleanProperty connected = new SimpleBooleanProperty(false); private StringProperty receivingMessageModel = new SimpleStringProperty(""); private Channel channel; @FXML public void connect() { if( connected.get() ) { return; //   ;    } String host = tfHost.getText(); int port = Integer.parseInt(tfPort.getText()); group = new NioEventLoopGroup(); Task<Channel> task = new Task<Channel>() { @Override protected Channel call() throws Exception { updateMessage("Bootstrapping"); updateProgress(0.1d, 1.0d); Bootstrap b = new Bootstrap(); b .group(group) .channel(NioSocketChannel.class) .remoteAddress( new InetSocketAddress(host, port) ) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler(receivingMessageModel)); } }); ChannelFuture f = b.connect(); Channel chn = f.channel(); updateMessage("Connecting"); updateProgress(0.2d, 1.0d); f.sync(); return chn; } @Override protected void succeeded() { channel = getValue(); connected.set(true); } @Override protected void failed() { Throwable exc = getException(); logger.error( "client connect error", exc ); Alert alert = new Alert(AlertType.ERROR); alert.setTitle("Client"); alert.setHeaderText( exc.getClass().getName() ); alert.setContentText( exc.getMessage() ); alert.showAndWait(); connected.set(false); } }; hboxStatus.visibleProperty().bind( task.runningProperty() ); lblStatus.textProperty().bind( task.messageProperty() ); piStatus.progressProperty().bind(task.progressProperty()); new Thread(task).start(); } 


Netty calls for initial download and connection are wrapped in a JavaFX task. The task is a key concept when programming in JavaFX, and I have a rule: putting any code into a task that can potentially run for longer than a second. Thus, I have almost everything in my tasks, with the exception of manipulations with Java objects in RAM.

The task provides several properties: runningProperty , messageProperty , progressProperty . I associate them with UI elements: a HBox container, a Label label, a ProgressIndicator indicator. Thanks to JavaFX binding, there is no need to register listeners and call setter () methods on user interface controls.

The call() method returns the channel. In this implementation, I do not care about the asynchronous behavior of Netty - because I already work in the new Thread() - so I can wait until the sync() call returns. The return value of the channel is set in the succeeded() method field If Netty throws an exception, the failed() method is called, the message is logged and displayed to the user in a dialog box.

The methods succeeded() , failed() , updateMessage() and updateProgress() are executed in the FX stream, but call() is not. The call() method should not update the UI in any way. The call () method should deal only with the long-term operation of Netty.

send ()

The send() method uses the saved Channel object to call the writeAndFlush() method. This writeAndFlush() will be launched using the EchoClientHandler delegate via the Netty framework.

Also from EchoClientController.java

 @FXML public void send() { if( !connected.get() ) { return; } final String toSend = tfSend.getText(); Task task = new Task() { @Override protected Void call() throws Exception { ChannelFuture f = channel.writeAndFlush( Unpooled.copiedBuffer(toSend, CharsetUtil.UTF_8) ); f.sync(); return null; } @Override protected void failed() { Throwable exc = getException(); logger.error( "client send error", exc ); Alert alert = new Alert(AlertType.ERROR); alert.setTitle("Client"); alert.setHeaderText( exc.getClass().getName() ); alert.setContentText( exc.getMessage() ); alert.showAndWait(); connected.set(false); } }; hboxStatus.visibleProperty().bind( task.runningProperty() ); lblStatus.textProperty().bind( task.messageProperty() ); piStatus.progressProperty().bind(task.progressProperty()); new Thread(task).start(); } 


Notice the similarity with connect() . The newly created task is connected with all the same three progress objects. There is no succeeded() method, and the failed() method contains the same logic as the error handler in the connect() implementation.

The task returns nothing (returned type is void). In an optimistic scenario, the call should work right away, but if it didn’t work, then you should wait for an error. Since the call() method is already in my new thread, I can afford to wait in the sync() method.

disconnect ()

The disconnect() method works with the Task task on the same principle as the two previous methods. The other two methods use one updateMessage / Progress pair. In this method, wrapping a connection with Netty takes place in two separate steps. Not much time is needed to execute sync() in close (). The shutdownGracefully() method takes significantly longer. However, in my experiments, the UI never hung.

 @FXML public void disconnect() { if( !connected.get() ) { return; } Task<Voidgt; task = new Task<Void>() { @Override protected Void call() throws Exception { updateMessage("Disconnecting"); updateProgress(0.1d, 1.0d); channel.close().sync(); updateMessage("Closing group"); updateProgress(0.5d, 1.0d); group.shutdownGracefully().sync(); return null; } @Override protected void succeeded() { connected.set(false); } @Override protected void failed() { connected.set(false); Throwable t = getException(); logger.error( "client disconnect error", t ); Alert alert = new Alert(AlertType.ERROR); alert.setTitle("Client"); alert.setHeaderText( t.getClass().getName() ); alert.setContentText( t.getMessage() ); alert.showAndWait(); } }; hboxStatus.visibleProperty().bind( task.runningProperty() ); lblStatus.textProperty().bind( task.messageProperty() ); piStatus.progressProperty().bind(task.progressProperty()); new Thread(task).start(); } 


Reading

Reading from the server is mediated through the EchoClientHandler object. When creating this object, a reference is made to the StringProperty property, which is a model element with which the user interface is also associated. I could pass UI elements directly to the handler, but at the same time the principle of sharing responsibility is violated and it becomes more difficult to apply this notification to several views at once. Thus, the StringProperty property can communicate with any number of UI elements, and when an update comes from the handler, all these UI elements are updated.

Here is the code for EchoClientHandler.java. Note the FX Thread protection in the channelRead0() method.

 @Sharable public class EchoClientHandler extends SimpleChannelInboundHandler { private Logger logger = LoggerFactory.getLogger( EchoClientHandler.class ); private final StringProperty receivingMessageModel; public EchoClientHandler(StringProperty receivingMessageModel) { this.receivingMessageModel = receivingMessageModel; } @Override protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) throws Exception { final String cm = in.toString(CharsetUtil.UTF_8); Platform.runLater( () -> receivingMessageModel.set(cm) ); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error( "error in echo client", cause ); ctx.close(); } } 


The last note about the binding sequence ... we do not know when it will be called
channelRead0() (in this case, we rely on asynchronous Netty), but when such a call occurs, we will set the model object. I finish updating the model object, providing some protection for FX Thread. FX - since it is a binding framework - will update all UI elements, for example, TextField .

Final Notes on Client Code

When integrating Netty with JavaFX, the most important thing is to use tasks. Thanks to the tasks, the UI does not hang, thanks to the properties provided, all work can be tracked visually. Thanks to the tasks, there is no need for asynchronous processing by Netty (at least at the application level), so tasks can be blocked for as long as possible without blocking the user interface. When receiving notifications about new data, try using JavaFX binding, mediated through a dedicated model object, and thus updating the UI, rather than making individual calls to specific objects.

Server code

I just quote here the entire server code without comments, since the article is devoted to client aspects of Netty. A very similar example is in the Manning book.

From EchoServer.java

 public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { if( args.length != 1 ) { System.err.println("usage: java EchoServer port"); System.exit(1); } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); } public void start() throws Exception { final EchoServerHandler echoServerHandler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b .group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( echoServerHandler ); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } }  EchoServerHandler.java @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger( EchoServerHandler.class ); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; String in_s = in.toString(CharsetUtil.UTF_8); String uc = in_s.toUpperCase(); if( logger.isInfoEnabled() ) { logger.info("[READ] read " + in_s + ", writing " + uc); } in.setBytes(0, uc.getBytes(CharsetUtil.UTF_8)); ctx.write(in); //      ( ) } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { if( logger.isDebugEnabled() ) { logger.debug("[READ COMPLETE]"); } ctx.flush(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); if(logger.isDebugEnabled() ) { logger.debug("[CHANNEL ACTIVE]"); } ctx.channel().closeFuture().addListener(f -> logger.debug("[CLOSE]")); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error( "error in echo server", cause); ctx.close(); } } 


Although modern high-speed computers may well spend some cycles on polling, thanks to the effective network level, your application will respond quickly and be dynamic, and also save the server from unnecessary work.

Source: https://habr.com/ru/post/301298/


All Articles