📜 ⬆️ ⬇️

Creating a non-blocking TCP server using OTP principles

Introduction


It is assumed that the reader of this guide is already familiar with gen_server and gen_fsm behaviors, TCP socket interactions using the gen_tcp module, active and passive socket modes, and the “OTP Supervisor” principle.

OTP provides convenient tools for creating reliable applications. In part, this is accomplished by abstracting the overall functionality into behaviors, such as gen_server and gen_fsm , which are related by the hierarchy of OTP supervisors.

There are several known TCP server templates. The one we are going to consider includes one listening process and the process of creating a new FSM process for each connected client. Although there is support for TCP connections in OTP through the gen_tcp module, there is no standard behavior for creating a non-blocking TCP server based on the principles of OTP. By a non-blocking server, we mean that the listening process and the FSM process should not make any blocking calls and quickly respond to incoming messages (for example, changes in configuration, restart, etc.) without causing timeouts. Note that locking in the Erlang context means locking the Erlang process, not the operating system process.
')
In this tutorial, we show how to create a non-blocking TCP server using gen_server and gen_fsm , which provide control over application behavior and fully comply with OTP principles.

A reader who is not familiar with OTP is advised to pay attention to Joe Armstrong's guide on how to build fault-tolerant servers using gen_tcp: connect / 3 and gen_tcp: acceept / 1 blocking calls without using OTP.

Server structure


The design of our server will include the main process supervisor of the tcp_server_app application with the restart strategy one_for_one and two child processes. The first of which is a listening process, implemented as gen_server , which will wait for asynchronous notifications about client connections. The second is another application supervisor, tcp_client_sup, and is responsible for starting the FSM process for handling client requests and registering abnormal outages using standard SASL error reports.

For the sake of simplicity, the client request handler (tcp_echo_fsm) will provide an “Echo” server that will return client requests back.

Application Behavior and Supervisors


In order to create our application, we need to write modules that implement the callback functions of the “Supervisor” and “Application” behaviors. Although traditionally these functions are implemented in separate modules, given their brevity, we combine them into one.

As an added bonus, we will implement the get_app_env function, which shows how to handle the configuration parameters, as well as the command line parameters of the emulator at startup.

Two instances of the init / 1 function are needed for two levels of the supervisor hierarchy. Since two different restart strategies are used, we implement them at different levels.

After the application starts, the callback function tcp_server_app: start / 2 calls the supervisor: start_link / 2 function, which creates the application's main supervisor by calling tcp_server_app: init ([Port, Module]) . This supervisor creates the tcp_listener process and the child supervisor tcp_client_sup responsible for handling client connections. The Module argument in the init function is the name of the FSM handler for client connections (in this case, tcp_echo_fsm ).

TCP Server Application (tcp_server_app.erl):
-module(tcp_server_app). -author('saleyn@gmail.com'). -behaviour(application). %% Internal API -export([start_client/0]). %% Application and Supervisor callbacks -export([start/2, stop/1, init/1]). -define(MAX_RESTART, 5). -define(MAX_TIME, 60). -define(DEF_PORT, 2222). %% A startup function for spawning new client connection handling FSM. %% To be called by the TCP listener process. start_client() -> supervisor:start_child(tcp_client_sup, []). %%---------------------------------------------------------------------- %% Application behaviour callbacks %%---------------------------------------------------------------------- start(_Type, _Args) -> ListenPort = get_app_env(listen_port, ?DEF_PORT), supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, tcp_echo_fsm]). stop(_S) -> ok. %%---------------------------------------------------------------------- %% Supervisor behaviour callbacks %%---------------------------------------------------------------------- init([Port, Module]) -> {ok, {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME}, [ % TCP Listener { tcp_server_sup, % Id = internal id {tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A} permanent, % Restart = permanent | transient | temporary 2000, % Shutdown = brutal_kill | int() >= 0 | infinity worker, % Type = worker | supervisor [tcp_listener] % Modules = [Module] | dynamic }, % Client instance supervisor { tcp_client_sup, {supervisor,start_link,[{local, tcp_client_sup}, ?MODULE, [Module]]}, permanent, % Restart = permanent | transient | temporary infinity, % Shutdown = brutal_kill | int() >= 0 | infinity supervisor, % Type = worker | supervisor [] % Modules = [Module] | dynamic } ] } }; init([Module]) -> {ok, {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME}, [ % TCP Client { undefined, % Id = internal id {Module,start_link,[]}, % StartFun = {M, F, A} temporary, % Restart = permanent | transient | temporary 2000, % Shutdown = brutal_kill | int() >= 0 | infinity worker, % Type = worker | supervisor [] % Modules = [Module] | dynamic } ] } }. %%---------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------- get_app_env(Opt, Default) -> case application:get_env(application:get_application(), Opt) of {ok, Val} -> Val; _ -> case init:get_argument(Opt) of [[Val | _]] -> Val; error -> Default end end. 


Listening process


One of the drawbacks of the gen_tcp module is that it provides an interface only for blocking acceptance of connections.

Testing the prim_inet module showed an interesting fact that the command to the network driver to accept the client connection is asynchronous. Although it is not documented, which means that the OTP team can change this at any time, we will use this functionality in creating our server.

The listening process is implemented as gen_server .
TCP Listener Process (tcp_listener.erl):
 -module(tcp_listener). -author('saleyn@gmail.com'). -behaviour(gen_server). %% External API -export([start_link/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { listener, % Listening socket acceptor, % Asynchronous acceptor's internal reference module % FSM handling module }). %%-------------------------------------------------------------------- %% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason} % %% @doc Called by a supervisor to start the listening process. %% @end %%---------------------------------------------------------------------- start_link(Port, Module) when is_integer(Port), is_atom(Module) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []). %%%------------------------------------------------------------------------ %%% Callback functions from gen_server %%%------------------------------------------------------------------------ %%---------------------------------------------------------------------- %% @spec (Port::integer()) -> {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %% %% @doc Called by gen_server framework at process startup. %% Create listening socket. %% @end %%---------------------------------------------------------------------- init([Port, Module]) -> process_flag(trap_exit, true), Opts = [binary, {packet, 2}, {reuseaddr, true}, {keepalive, true}, {backlog, 30}, {active, false}], case gen_tcp:listen(Port, Opts) of {ok, Listen_socket} -> %%Create first accepting process {ok, Ref} = prim_inet:async_accept(Listen_socket, -1), {ok, #state{listener = Listen_socket, acceptor = Ref, module = Module}}; {error, Reason} -> {stop, Reason} end. %%------------------------------------------------------------------------- %% @spec (Request, From, State) -> {reply, Reply, State} | %% {reply, Reply, State, Timeout} | %% {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, Reply, State} | %% {stop, Reason, State} %% @doc Callback for synchronous server calls. If `{stop, ...}' tuple %% is returned, the server is stopped and `terminate/2' is called. %% @end %% @private %%------------------------------------------------------------------------- handle_call(Request, _From, State) -> {stop, {unknown_call, Request}, State}. %%------------------------------------------------------------------------- %% @spec (Msg, State) ->{noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @doc Callback for asyncrous server calls. If `{stop, ...}' tuple %% is returned, the server is stopped and `terminate/2' is called. %% @end %% @private %%------------------------------------------------------------------------- handle_cast(_Msg, State) -> {noreply, State}. %%------------------------------------------------------------------------- %% @spec (Msg, State) ->{noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @doc Callback for messages sent directly to server's mailbox. %% If `{stop, ...}' tuple is returned, the server is stopped and %% `terminate/2' is called. %% @end %% @private %%------------------------------------------------------------------------- handle_info({inet_async, ListSock, Ref, {ok, CliSocket}}, #state{listener=ListSock, acceptor=Ref, module=Module} = State) -> try case set_sockopt(ListSock, CliSocket) of ok -> ok; {error, Reason} -> exit({set_sockopt, Reason}) end, %% New client connected - spawn a new process using the simple_one_for_one %% supervisor. {ok, Pid} = tcp_server_app:start_client(), gen_tcp:controlling_process(CliSocket, Pid), %% Instruct the new FSM that it owns the socket. Module:set_socket(Pid, CliSocket), %% Signal the network driver that we are ready to accept another connection case prim_inet:async_accept(ListSock, -1) of {ok, NewRef} -> ok; {error, NewRef} -> exit({async_accept, inet:format_error(NewRef)}) end, {noreply, State#state{acceptor=NewRef}} catch exit:Why -> error_logger:error_msg("Error in async accept: ~p.\n", [Why]), {stop, Why, State} end; handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) -> error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]), {stop, Error, State}; handle_info(_Info, State) -> {noreply, State}. %%------------------------------------------------------------------------- %% @spec (Reason, State) -> any %% @doc Callback executed on server shutdown. It is only invoked if %% `process_flag(trap_exit, true)' is set by the server process. %% The return value is ignored. %% @end %% @private %%------------------------------------------------------------------------- terminate(_Reason, State) -> gen_tcp:close(State#state.listener), ok. %%------------------------------------------------------------------------- %% @spec (OldVsn, State, Extra) -> {ok, NewState} %% @doc Convert process state when code is changed. %% @end %% @private %%------------------------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. %%%------------------------------------------------------------------------ %%% Internal functions %%%------------------------------------------------------------------------ %% Taken from prim_inet. We are merely copying some socket options from the %% listening socket to the new client socket. set_sockopt(ListSock, CliSocket) -> true = inet_db:register_socket(CliSocket, inet_tcp), case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of {ok, Opts} -> case prim_inet:setopts(CliSocket, Opts) of ok -> ok; Error -> gen_tcp:close(CliSocket), Error end; Error -> gen_tcp:close(CliSocket), Error end. 


In this module, init / 1 takes two parameters — the port number that the listening process should open and the name of the client connection handler. The initialization function opens the socket in passive mode. This is done so that we have control over the flow of data received from the client.

The most interesting part of this code is the priminet call : async_accept / 2 . In order to make it work, we need to copy part of the internal OTP code from the set_sockopt / 2 function, which handles registering sockets and copying some options for a client socket.

As soon as the client socket is connected, the network driver will notify the listening process with the message {inet_async, ListSock, Ref, {OK, CliSocket}} . At the moment, we are launching an instance of the process of processing client requests and transferring it to CliSocket.

Client Message Processing


While tcp_listener is a generic implementation, tcp_echo_fsm is nothing more than a FSM stub for describing how to create a TCP server. Two functions must be exported from this module — start_link / 0 for the tcp_client_sup and set_socket / 2 supervisors for the listening process so that the latter notifies the processing of client messages that it becomes the owner of the socket and can start receiving messages by setting {active, once} or {active, true} option.

We would like to emphasize the synchronization pattern used between the listening process and the client, in order to avoid possible loss of a message in connection with the transfer of them to the wrong (listening) process. A process owning a socket keeps it open in passive mode. Next, the client process accepts a socket that inherits the options (including the passive mode) from the listening process. Ownership of the socket is passed to the client process by calling gen_tcp: controlling_process / 2 and set_socket / 2 , which notifies the client process that it can start receiving messages from the socket. Until the moment when the socket is set to active mode, all received data will be stored in the socket buffer.

When the ownership of the socket is transferred to the client's FSM-process in the “WAIT_FOR_SOCKET” state, {active, once} mode is set to allow the network driver to send one message at a time. This is the OTP principle used to maintain control over the flow of data and avoid mixing messages and TCP traffic in the process queue.

FSM states are implemented using special functions in the tcp_echo_fsm module, which uses the naming convention. FSM consists of two states. WAIT_FOR_SOCKET is the initial state in which the FSM is waiting for ownership of the socket, and WAIT_FOR_DATA , which is the wait state of the TCP message from the client. In this state, the FSM also processes a special “timeout” message, which means there is no activity from the client and causes the process to close the connection with the client.

 -module(tcp_echo_fsm). -author('saleyn@gmail.com'). -behaviour(gen_fsm). -export([start_link/0, set_socket/2]). %% gen_fsm callbacks -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). %% FSM States -export([ 'WAIT_FOR_SOCKET'/2, 'WAIT_FOR_DATA'/2 ]). -record(state, { socket, % client socket addr % client address }). -define(TIMEOUT, 120000). %%%------------------------------------------------------------------------ %%% API %%%------------------------------------------------------------------------ %%------------------------------------------------------------------------- %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error} %% @doc To be called by the supervisor in order to start the server. %% If init/1 fails with Reason, the function returns {error,Reason}. %% If init/1 returns {stop,Reason} or ignore, the process is %% terminated and the function returns {error,Reason} or ignore, %% respectively. %% @end %%------------------------------------------------------------------------- start_link() -> gen_fsm:start_link(?MODULE, [], []). set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) -> gen_fsm:send_event(Pid, {socket_ready, Socket}). %%%------------------------------------------------------------------------ %%% Callback functions from gen_server %%%------------------------------------------------------------------------ %%------------------------------------------------------------------------- %% Func: init/1 %% Returns: {ok, StateName, StateData} | %% {ok, StateName, StateData, Timeout} | %% ignore | %% {stop, StopReason} %% @private %%------------------------------------------------------------------------- init([]) -> process_flag(trap_exit, true), {ok, 'WAIT_FOR_SOCKET', #state{}}. %%------------------------------------------------------------------------- %% Func: StateName/2 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- 'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) -> % Now we own the socket inet:setopts(Socket, [{active, once}, {packet, 2}, binary]), {ok, {IP, _Port}} = inet:peername(Socket), {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT}; 'WAIT_FOR_SOCKET'(Other, State) -> error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]), %% Allow to receive async messages {next_state, 'WAIT_FOR_SOCKET', State}. %% Notification event coming from client 'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) -> ok = gen_tcp:send(S, Data), {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}; 'WAIT_FOR_DATA'(timeout, State) -> error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]), {stop, normal, State}; 'WAIT_FOR_DATA'(Data, State) -> io:format("~p Ignoring data: ~p\n", [self(), Data]), {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}. %%------------------------------------------------------------------------- %% Func: handle_event/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- handle_event(Event, StateName, StateData) -> {stop, {StateName, undefined_event, Event}, StateData}. %%------------------------------------------------------------------------- %% Func: handle_sync_event/4 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %% @private %%------------------------------------------------------------------------- handle_sync_event(Event, _From, StateName, StateData) -> {stop, {StateName, undefined_event, Event}, StateData}. %%------------------------------------------------------------------------- %% Func: handle_info/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) -> % Flow control: enable forwarding of next TCP message inet:setopts(Socket, [{active, once}]), ?MODULE:StateName({data, Bin}, StateData); handle_info({tcp_closed, Socket}, _StateName, #state{socket=Socket, addr=Addr} = StateData) -> error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]), {stop, normal, StateData}; handle_info(_Info, StateName, StateData) -> {noreply, StateName, StateData}. %%------------------------------------------------------------------------- %% Func: terminate/3 %% Purpose: Shutdown the fsm %% Returns: any %% @private %%------------------------------------------------------------------------- terminate(_Reason, _StateName, #state{socket=Socket}) -> (catch gen_tcp:close(Socket)), ok. %%------------------------------------------------------------------------- %% Func: code_change/4 %% Purpose: Convert process state when code is changed %% Returns: {ok, NewState, NewStateData} %% @private %%------------------------------------------------------------------------- code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}. 


Application Description


Another necessary part of creating an OTP application is to create a configuration file that contains the application name, version, start module, and environment variables.

Application File (tcp_server.app):
 {application, tcp_server, [ {description, "Demo TCP server"}, {vsn, "1.0"}, {id, "tcp_server"}, {modules, [tcp_listener, tcp_echo_fsm]}, {registered, [tcp_server_sup, tcp_listener]}, {applications, [kernel, stdlib]}, %% %% mod: Specify the module name to start the application, plus args %% {mod, {tcp_server_app, []}}, {env, []} ] }. 


Compilation


Create the following directory structure for this application:

 ./tcp_server ./tcp_server/ebin/ ./tcp_server/ebin/tcp_server.app ./tcp_server/src/tcp_server_app.erl ./tcp_server/src/tcp_listener.erl ./tcp_server/src/tcp_echo_fsm.erl 

 $ cd tcp_server/src $ for f in tcp*.erl ; do erlc +debug_info -o ../ebin $f 

Launch


We are going to run an Erlang shell with SASL support so that we can see the status of processes and error reports for our application. In addition, we are going to zapsutit appmon application in order to visually examine the hierarchy of supervisors.

 $ cd ../ebin $ erl -boot start_sasl ... 1> appmon:start(). {ok,<0.44.0>} 2> application:start(tcp_server). ok 


Now click on the tcp_server button in the appmon application to display the hierarchy of application supervisors.

 3> {ok,S} = gen_tcp:connect({127,0,0,1},2222,[{packet,2}]). {ok,#Port<0.150>} 


We have just initiated a new connection for the Echo Server client.

 4> gen_tcp:send(S,<<"hello">>). ok 5> f(M), receive M -> M end. {tcp,#Port<0.150>,"hello"} 


We checked that the echo server was working as expected. Now let's try to “put” the client connection on the server and look at the generation of the error message.

 6> [{_,Pid,_,_}] = supervisor:which_children(tcp_client_sup). [{undefined,<0.64.0>,worker,[]}] 7> exit(Pid,kill). true =SUPERVISOR REPORT==== 31-Jul-2007::14:33:49 === Supervisor: {local,tcp_client_sup} Context: child_terminated Reason: killed Offender: [{pid,<0.77.0>}, {name,undefined}, {mfa,{tcp_echo_fsm,start_link,[]}}, {restart_type,temporary}, {shutdown,2000}, {child_type,worker}] 


Note that if you get the server under load using a large number of connections, the listening process may not accept new connections after a certain limit set in the operating system. In this case, you will see an error message:

 "too many open files" 


Conclusion


OTP provides building blocks for building non-blocking TCP servers. This guide shows how to create a simple server using standard OTP behavior.

Source: https://habr.com/ru/post/111268/


All Articles