📜 ⬆️ ⬇️

Windows Sockets, IOCP and Delphi

Prologue


Recently, I was faced with the need to effectively work with sockets in a Windows application. The task is typical for a loaded server. Atypical here will only seem to the implementation language - Delphi.
I want to describe the way mass asynchronous work with a large number of sockets using I / O Completion Ports. Microsoft recommends using this technology in its literature. I think many are familiar with it, but just in case I will provide a link to MSDN. The essence of the technology is that the system organizes a highly efficient event queue, and the program processes it from the thread pool, the size of which is selected by the number of cores. This approach has advantages when there are a large number of simultaneous asynchronous I / O operations for different endpoints. Ready source can (better) immediately look here . Not everything is perfect, but it will do for experimentation.

Roadmap


I, in a sense, will stick to the ideology of Node.Js in all that concerns the organization of objects and input-output operations.
In the case of the server part, you need to implement the following:

For the client, the first item in this list is not relevant, but you need to implement an asynchronous connection to the server. In both classes there will be the possibility of simultaneous reading and writing to one end point.
All created instances of client and server sockets will use one common message queue and one thread pool. It is necessary to be able to use both types of sockets in the same application in an optimal way.

Implementation


Let's get started To begin with, I note that in connection with an absolutely asynchronous event-based construction model, I will implement not classes, but interfaces. This is very convenient in this case, since responsibility for the allocated memory is removed from the final programmer. And in general, to track its use here in another way is either very costly or completely impossible. A lot of work should occur during module initialization.

And so, the initialization section contains the following procedure, which implements the list item by item.
procedure Init; var WSAData: TWsaData; i: Integer; begin gClients := TProtoStore.Create; gListeners := TProtoStore.Create; gServerClients := TProtoStore.Create; if WSAStartup(MAKEWORD(2, 2), WSAData) <> 0 then raise IOCPClientException.Create(sErrorInit_WSAtartup); gIOCP := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, CPUCount * 2); if gIOCP = INVALID_HANDLE_VALUE then raise IOCPClientException.Create(sErrorInit_CreateIoCompletionPort); for i := 1 to CPUCount * 2 do begin SetLength(gWorkers, Length(gWorkers) + 1); gWorkers[Length(gWorkers) - 1] := TWorkerThread.Create(); end; gListenerAcceptEvent := WSACreateEvent; if gListenerAcceptEvent = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gServerClientsCloseEvent := WSACreateEvent; if gServerClientsCloseEvent = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gClisentsConnectAndCloseEvents := WSACreateEvent; if gClisentsConnectAndCloseEvents = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gClientSocketEventThread := TSocketEventThread.Create (gClisentsConnectAndCloseEvents, gClients, ET_EVENT_SIGNALED); gClientSocketEventThread.Start; gServerClientsSocketEventThread := TSocketEventThread.Create (gServerClientsCloseEvent, gServerClients, ET_EVENT_SIGNALED); gServerClientsSocketEventThread.Start; gServerSocketEventThread := TSocketEventThread.Create(gListenerAcceptEvent, gListeners, ET_EVENT_SIGNALED); gServerSocketEventThread.Start; end; 

The CreateIoCompletionPort function in this case creates a special message queue.
You can see that the same thread class TSocketEventThread is used to track events on sockets with different purposes. Threads of this class perform a procedure that waits for socket events, and immediately queue messages (for each socket of the type that serves this thread) that an event has occurred.
 procedure TSocketEventThread.WaitForClientsEvents; var WaitResult: DWORD; const TimeOut: DWORD = 100; begin WaitResult := WSAWaitForMultipleEvents(1, @fEvent, FALSE, TimeOut, FALSE); if WaitResult = WSA_WAIT_FAILED then raise IOCPClientException.Create (sErrorWaitForClientsEvents_WSAWaitForMultipleEvents); if WaitResult = WSA_WAIT_EVENT_0 then begin if not WSAResetEvent(fEvent) then raise IOCPClientException.Create (sErrorWaitForClientsEvents_WSAResetEvent); fStore.Post(fKey); end; end; 

Here is the fStore.Post (fKey) method; just sends messages to the queue.
 procedure TProtoStore.Post(CompletionKey: DWORD); var i: Integer; begin fLock.Enter; try for i := 0 to Length(ProtoArray) - 1 do begin ProtoArray[i]._AddRef; if not PostQueuedCompletionStatus(gIOCP, 0, CompletionKey, POverlapped(ProtoArray[i])) then begin ProtoArray[i]._Release; raise IOCPClientException.Create(sErrorPost_PostQueuedCompletionStatus); end; end; finally fLock.Leave; end; end; 

The use of objects with interfaces deserves special attention.
The _AddRef method is used to denote the fact that an object is “in the queue” and should not be destroyed. (Later after processing _Release will be called). The PostQueuedCompletionStatus procedure directly performs the queuing of the message.
The pool will process each message asynchronously.
To do this, he performs the following procedure.
 procedure TWorkerThread.ProcessIOCP; var NumberOfBytes: DWORD; CompletionKey: NativeUInt; Overlapped: POverlapped; Proto: TIOCPSocketProto; begin if not((not GetQueuedCompletionStatus(gIOCP, NumberOfBytes, CompletionKey, Overlapped, INFINITE)) and (Overlapped = nil)) then begin if CompletionKey = ET_EVENT_SIGNALED then begin Proto := TIOCPSocketProto(Overlapped); with Proto do begin IOCPProcessEventsProc(); _Release; end end else if CompletionKey <> 0 then begin Proto := TIOCPSocketProto(CompletionKey); if Proto.IOCPProcessIOProc(NumberOfBytes, Overlapped) then Proto._Release; end; end end; 

The GetQueuedCompletionStatus procedure is used to retrieve a message from a queue. Next, it is determined whether this message is a message about completed I / O or this message about an event that has occurred. There are two ways to pass some information through a queue, in this case it is a link to a specific instance of the socket class.
Processing is unified for all types of sockets, this is achieved by inheriting from a common ancestor that contains common handlers, they can be redefined.
Consider the mechanism for handling socket events.
 procedure TIOCPSocketProto.IOCPProcessEventsProc(); var WSAEvents: TWsaNetworkEvents; AcceptedSocket: TSocket; RemoteAddress: string; begin if fStateLock <> CLI_SOCKET_LOCK_CLOSED then begin fClosingLock.BeginRead; try if (fStateLock <> CLI_SOCKET_LOCK_CLOSED) then if WSAEnumNetworkEvents(fSocket, 0, WSAEvents) <> SOCKET_ERROR then begin if ((WSAEvents.lNetworkEvents and FD_CONNECT) <> 0) then begin if 0 = WSAEvents.iErrorCode[FD_CONNECT_BIT] then InterlockedExchange(fStateLock, CLI_SOCKET_LOCK_CONNECTED); CallOnConnect; end; if ((WSAEvents.lNetworkEvents and FD_CLOSE) <> 0) and (0 = WSAEvents.iErrorCode[FD_CLOSE_BIT]) then CallOnClose; if ((WSAEvents.lNetworkEvents and FD_ACCEPT) <> 0) and (0 = WSAEvents.iErrorCode[FD_ACCEPT_BIT]) then begin AcceptedSocket := DoAccept(RemoteAddress); if AcceptedSocket <> INVALID_SOCKET then begin fClientClass.Create(AcceptedSocket, fOnConnect, fOnClose, RemoteAddress).Prepare; end; end; end finally fClosingLock.EndRead; end; end; end; 

The TMultiReadExclusiveWriteSynchronizer class is interestingly applied here. It is used to prevent an attempt to close a socket and destroy an object from another thread in the pool (fClosingLock.BeginRead). All operations with a socket are performed as read operations for this synchronization object, except for the create operation and close operation of the socket — they are write operations and therefore can only be performed with exclusive ownership of the resource.
In all other respects, the work with sockets in this procedure is completely ordinary.
The only thing worth considering in this procedure is the additional connection of the new client to the server, the DoAccept method.
 function TIOCPSocketProto.DoAccept(var RemoteAddress: string): TSocket; var addr: TSockAddr; addrlen: Integer; dwCallbackData: NativeUInt; RemoteAddrLen: DWORD; begin dwCallbackData := NativeUInt(self); addrlen := SizeOf(addr); Result := WSAAccept(fSocket, @addr, @addrlen, ServerAcceptCallBack, dwCallbackData); if Result <> INVALID_SOCKET then begin SetLength(RemoteAddress, 255); RemoteAddrLen := Length(RemoteAddress); if WSAAddressToString(addr, addrlen, nil, PChar(@RemoteAddress[1]), RemoteAddrLen) = SOCKET_ERROR then raise IOCPClientException.Create(sErrorAccept_WSAAddressToString); SetLength(RemoteAddress, RemoteAddrLen - 1) end end; 

The key point here is to use WSAAccept. This feature allows you to reject client connections in such a way that the client actually receives the FD_CONNECT event.
This is the preferred way to organize the so-called black lists.
We go further. Consider the organization of input output. Let's do this on the example of a read operation.
 procedure TIOCPSocketProto.Read(Length: DWORD; OnRead, OnReadProcess: TOnReadEvent); var Bytes, Flags: DWORD; WsaBuf: TWsaBuf; begin fClosingLock.BeginRead; try if fStateLock = CLI_SOCKET_LOCK_CONNECTED then begin if InterlockedCompareExchange(fReadLock, IO_PROCESS, IO_IDLE) = IO_IDLE then begin fOnRead := OnRead; fOnReadProcess := OnReadProcess; fReaded := 0; fReadBufLength := Length; fReadBuffer := nil; GetMem(fReadBuffer, Length); if fReadBuffer <> nil then begin Bytes := 0; FillChar(fOverlappedRead, SizeOf(fOverlappedRead), 0); WsaBuf.buf := fReadBuffer; WsaBuf.len := fReadBufLength; Flags := 0; Bytes := 0; _AddRef; if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin FreeMem(fReadBuffer, Length); InterlockedExchange(fReadLock, IO_IDLE); _Release; raise IOCPClientException.Create(sErrorRead_WSARecv); end; end else raise IOCPClientException.Create(sErrorRead_GetMem); end else raise IOCPClientException.Create(sErrorRead_InProcess); end else raise IOCPClientException.Create(sErrorRead_NotConnected); finally fClosingLock.EndRead; end; end; 

Here I had to use interlocked blocking, since it is very fast and satisfies the need for clipping an attempt to recall an I / O option. Memory is allocated to the buffer once in each operation. Further reading from a socket in an asynchronous mode is caused. The object is also "marked" with the help of AddRef, for the impossibility of its removal during the time in the queue. Upon completion of the batch reading, the message is automatically queued.
Consider what happens when retrieving a complete I / O message from a queue.
 function TIOCPSocketProto.IOCPProcessIOProc(NumberOfBytes: DWORD; Overlapped: POverlapped): Boolean; var Bytes, Flags: DWORD; WsaBuf: TWsaBuf; begin Result := FALSE; fClosingLock.BeginRead; try if Overlapped = @fOverlappedRead then begin if NumberOfBytes <> 0 then begin if fReadLock = IO_PROCESS then begin inc(fReaded, NumberOfBytes); if fReaded < fReadBufLength then begin CallOnReadProcess; WsaBuf.buf := fReadBuffer; inc(WsaBuf.buf, fReaded); WsaBuf.len := fReadBufLength; dec(WsaBuf.len, fReaded); Flags := 0; Bytes := 0; if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin CallOnRead; Result := True; end end else begin CallOnReadProcess; CallOnRead; Result := True; end; end end else begin CallOnRead; Result := True; end; end else if Overlapped = @fOverlappedWrite then begin if NumberOfBytes <> 0 then begin if fWriteLock = IO_PROCESS then begin inc(fWrited, NumberOfBytes); if fWrited < fWriteBufLength then begin CallOnWriteProcess; WsaBuf.buf := fWriteBuffer; inc(WsaBuf.buf, fWrited); WsaBuf.len := fWriteBufLength; dec(WsaBuf.len, fWrited); Flags := 0; Bytes := 0; if (WSASend(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedWrite, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin CallOnWrite; Result := True; end end else begin CallOnWriteProcess; CallOnWrite; Result := True; end; end end else begin CallOnWrite; Result := True; end; end finally fClosingLock.EndRead; end; end; 

The essence of this procedure is that it causes reading or writing to the socket until the allocated buffer is full. An interesting point in this case is the definition of the type of operation by reference to the overlaped structure. This link provides a queue and you only need to compare it with the corresponding fields of the class in which the structures for reading and writing are stored.
It is also noteworthy that if the read / write operation is performed instantly, then it still enters the queue, but this can be configured through api.
It is also worth considering creating a socket class and injecting into a queue.
 constructor TIOCPClientSocket.Create(RemoteAddress: string; OnConnect, OnClose: TOnSimpleSocketEvenet); var lRemoteAddress: TSockAddr; lRemoteAddressLength: Integer; begin inherited Create(); fStore := gClients; fOnConnect := OnConnect; fOnClose := OnClose; fStateLock := 0; fRemoteAddressStr := RemoteAddress; fSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED); if fSocket = INVALID_SOCKET then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSASocket); if (WSAEventSelect(fSocket, gClisentsConnectAndCloseEvents, FD_CONNECT or FD_CLOSE) = SOCKET_ERROR) then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAEventSelect); if CreateIoCompletionPort(fSocket, gIOCP, NativeUInt(self), 0) = 0 then raise IOCPClientException.Create (sErrorTIOCPClientSocket_CreateIoCompletionPort); fStateLock := CLI_SOCKET_LOCK_CREATED; fStore.Add(self); lRemoteAddressLength := SizeOf(lRemoteAddress); lRemoteAddress.sa_family := AF_INET; if WSAStringToAddress(PChar(@fRemoteAddressStr[1]), AF_INET, nil, lRemoteAddress, lRemoteAddressLength) = SOCKET_ERROR then raise IOCPClientException.Create (sErrorTIOCPClientSocket_WSAStringToAddress); if (WSAConnect(fSocket, lRemoteAddress, lRemoteAddressLength, nil, nil, nil, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSAEWOULDBLOCK) then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAConnect); end; 

In the client socket constructor, a socket is created directly (WSASocket), registered in the queue (CreateIoCompletionPort), associated with the event and calls the asynchronous connection function (WSAConnect). The very fact of the connection is expected in the stream that was considered first (the thread waiting for events in the sockets). That in turn will put this event in the queue.

Epilogue


This article briefly discusses, in my opinion, successful techniques for creating classes for event-based programming.
It was possible to create a class for high-performance work with sockets for Delphi. This topic is generally very weakly covered and I plan to continue this publication with another 2 - 3 posts on the topics of socket contexts when using interfaces and creating secure connections when using IOCP (crypto-providers and Winsock Secure Socket Extensions). The full sample code is here .

')

Source: https://habr.com/ru/post/145140/


All Articles