procedure Init; var WSAData: TWsaData; i: Integer; begin gClients := TProtoStore.Create; gListeners := TProtoStore.Create; gServerClients := TProtoStore.Create; if WSAStartup(MAKEWORD(2, 2), WSAData) <> 0 then raise IOCPClientException.Create(sErrorInit_WSAtartup); gIOCP := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, CPUCount * 2); if gIOCP = INVALID_HANDLE_VALUE then raise IOCPClientException.Create(sErrorInit_CreateIoCompletionPort); for i := 1 to CPUCount * 2 do begin SetLength(gWorkers, Length(gWorkers) + 1); gWorkers[Length(gWorkers) - 1] := TWorkerThread.Create(); end; gListenerAcceptEvent := WSACreateEvent; if gListenerAcceptEvent = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gServerClientsCloseEvent := WSACreateEvent; if gServerClientsCloseEvent = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gClisentsConnectAndCloseEvents := WSACreateEvent; if gClisentsConnectAndCloseEvents = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gClientSocketEventThread := TSocketEventThread.Create (gClisentsConnectAndCloseEvents, gClients, ET_EVENT_SIGNALED); gClientSocketEventThread.Start; gServerClientsSocketEventThread := TSocketEventThread.Create (gServerClientsCloseEvent, gServerClients, ET_EVENT_SIGNALED); gServerClientsSocketEventThread.Start; gServerSocketEventThread := TSocketEventThread.Create(gListenerAcceptEvent, gListeners, ET_EVENT_SIGNALED); gServerSocketEventThread.Start; end;
procedure TSocketEventThread.WaitForClientsEvents; var WaitResult: DWORD; const TimeOut: DWORD = 100; begin WaitResult := WSAWaitForMultipleEvents(1, @fEvent, FALSE, TimeOut, FALSE); if WaitResult = WSA_WAIT_FAILED then raise IOCPClientException.Create (sErrorWaitForClientsEvents_WSAWaitForMultipleEvents); if WaitResult = WSA_WAIT_EVENT_0 then begin if not WSAResetEvent(fEvent) then raise IOCPClientException.Create (sErrorWaitForClientsEvents_WSAResetEvent); fStore.Post(fKey); end; end;
procedure TProtoStore.Post(CompletionKey: DWORD); var i: Integer; begin fLock.Enter; try for i := 0 to Length(ProtoArray) - 1 do begin ProtoArray[i]._AddRef; if not PostQueuedCompletionStatus(gIOCP, 0, CompletionKey, POverlapped(ProtoArray[i])) then begin ProtoArray[i]._Release; raise IOCPClientException.Create(sErrorPost_PostQueuedCompletionStatus); end; end; finally fLock.Leave; end; end;
procedure TWorkerThread.ProcessIOCP; var NumberOfBytes: DWORD; CompletionKey: NativeUInt; Overlapped: POverlapped; Proto: TIOCPSocketProto; begin if not((not GetQueuedCompletionStatus(gIOCP, NumberOfBytes, CompletionKey, Overlapped, INFINITE)) and (Overlapped = nil)) then begin if CompletionKey = ET_EVENT_SIGNALED then begin Proto := TIOCPSocketProto(Overlapped); with Proto do begin IOCPProcessEventsProc(); _Release; end end else if CompletionKey <> 0 then begin Proto := TIOCPSocketProto(CompletionKey); if Proto.IOCPProcessIOProc(NumberOfBytes, Overlapped) then Proto._Release; end; end end;
procedure TIOCPSocketProto.IOCPProcessEventsProc(); var WSAEvents: TWsaNetworkEvents; AcceptedSocket: TSocket; RemoteAddress: string; begin if fStateLock <> CLI_SOCKET_LOCK_CLOSED then begin fClosingLock.BeginRead; try if (fStateLock <> CLI_SOCKET_LOCK_CLOSED) then if WSAEnumNetworkEvents(fSocket, 0, WSAEvents) <> SOCKET_ERROR then begin if ((WSAEvents.lNetworkEvents and FD_CONNECT) <> 0) then begin if 0 = WSAEvents.iErrorCode[FD_CONNECT_BIT] then InterlockedExchange(fStateLock, CLI_SOCKET_LOCK_CONNECTED); CallOnConnect; end; if ((WSAEvents.lNetworkEvents and FD_CLOSE) <> 0) and (0 = WSAEvents.iErrorCode[FD_CLOSE_BIT]) then CallOnClose; if ((WSAEvents.lNetworkEvents and FD_ACCEPT) <> 0) and (0 = WSAEvents.iErrorCode[FD_ACCEPT_BIT]) then begin AcceptedSocket := DoAccept(RemoteAddress); if AcceptedSocket <> INVALID_SOCKET then begin fClientClass.Create(AcceptedSocket, fOnConnect, fOnClose, RemoteAddress).Prepare; end; end; end finally fClosingLock.EndRead; end; end; end;
function TIOCPSocketProto.DoAccept(var RemoteAddress: string): TSocket; var addr: TSockAddr; addrlen: Integer; dwCallbackData: NativeUInt; RemoteAddrLen: DWORD; begin dwCallbackData := NativeUInt(self); addrlen := SizeOf(addr); Result := WSAAccept(fSocket, @addr, @addrlen, ServerAcceptCallBack, dwCallbackData); if Result <> INVALID_SOCKET then begin SetLength(RemoteAddress, 255); RemoteAddrLen := Length(RemoteAddress); if WSAAddressToString(addr, addrlen, nil, PChar(@RemoteAddress[1]), RemoteAddrLen) = SOCKET_ERROR then raise IOCPClientException.Create(sErrorAccept_WSAAddressToString); SetLength(RemoteAddress, RemoteAddrLen - 1) end end;
procedure TIOCPSocketProto.Read(Length: DWORD; OnRead, OnReadProcess: TOnReadEvent); var Bytes, Flags: DWORD; WsaBuf: TWsaBuf; begin fClosingLock.BeginRead; try if fStateLock = CLI_SOCKET_LOCK_CONNECTED then begin if InterlockedCompareExchange(fReadLock, IO_PROCESS, IO_IDLE) = IO_IDLE then begin fOnRead := OnRead; fOnReadProcess := OnReadProcess; fReaded := 0; fReadBufLength := Length; fReadBuffer := nil; GetMem(fReadBuffer, Length); if fReadBuffer <> nil then begin Bytes := 0; FillChar(fOverlappedRead, SizeOf(fOverlappedRead), 0); WsaBuf.buf := fReadBuffer; WsaBuf.len := fReadBufLength; Flags := 0; Bytes := 0; _AddRef; if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin FreeMem(fReadBuffer, Length); InterlockedExchange(fReadLock, IO_IDLE); _Release; raise IOCPClientException.Create(sErrorRead_WSARecv); end; end else raise IOCPClientException.Create(sErrorRead_GetMem); end else raise IOCPClientException.Create(sErrorRead_InProcess); end else raise IOCPClientException.Create(sErrorRead_NotConnected); finally fClosingLock.EndRead; end; end;
function TIOCPSocketProto.IOCPProcessIOProc(NumberOfBytes: DWORD; Overlapped: POverlapped): Boolean; var Bytes, Flags: DWORD; WsaBuf: TWsaBuf; begin Result := FALSE; fClosingLock.BeginRead; try if Overlapped = @fOverlappedRead then begin if NumberOfBytes <> 0 then begin if fReadLock = IO_PROCESS then begin inc(fReaded, NumberOfBytes); if fReaded < fReadBufLength then begin CallOnReadProcess; WsaBuf.buf := fReadBuffer; inc(WsaBuf.buf, fReaded); WsaBuf.len := fReadBufLength; dec(WsaBuf.len, fReaded); Flags := 0; Bytes := 0; if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin CallOnRead; Result := True; end end else begin CallOnReadProcess; CallOnRead; Result := True; end; end end else begin CallOnRead; Result := True; end; end else if Overlapped = @fOverlappedWrite then begin if NumberOfBytes <> 0 then begin if fWriteLock = IO_PROCESS then begin inc(fWrited, NumberOfBytes); if fWrited < fWriteBufLength then begin CallOnWriteProcess; WsaBuf.buf := fWriteBuffer; inc(WsaBuf.buf, fWrited); WsaBuf.len := fWriteBufLength; dec(WsaBuf.len, fWrited); Flags := 0; Bytes := 0; if (WSASend(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedWrite, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin CallOnWrite; Result := True; end end else begin CallOnWriteProcess; CallOnWrite; Result := True; end; end end else begin CallOnWrite; Result := True; end; end finally fClosingLock.EndRead; end; end;
constructor TIOCPClientSocket.Create(RemoteAddress: string; OnConnect, OnClose: TOnSimpleSocketEvenet); var lRemoteAddress: TSockAddr; lRemoteAddressLength: Integer; begin inherited Create(); fStore := gClients; fOnConnect := OnConnect; fOnClose := OnClose; fStateLock := 0; fRemoteAddressStr := RemoteAddress; fSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED); if fSocket = INVALID_SOCKET then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSASocket); if (WSAEventSelect(fSocket, gClisentsConnectAndCloseEvents, FD_CONNECT or FD_CLOSE) = SOCKET_ERROR) then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAEventSelect); if CreateIoCompletionPort(fSocket, gIOCP, NativeUInt(self), 0) = 0 then raise IOCPClientException.Create (sErrorTIOCPClientSocket_CreateIoCompletionPort); fStateLock := CLI_SOCKET_LOCK_CREATED; fStore.Add(self); lRemoteAddressLength := SizeOf(lRemoteAddress); lRemoteAddress.sa_family := AF_INET; if WSAStringToAddress(PChar(@fRemoteAddressStr[1]), AF_INET, nil, lRemoteAddress, lRemoteAddressLength) = SOCKET_ERROR then raise IOCPClientException.Create (sErrorTIOCPClientSocket_WSAStringToAddress); if (WSAConnect(fSocket, lRemoteAddress, lRemoteAddressLength, nil, nil, nil, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSAEWOULDBLOCK) then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAConnect); end;
Source: https://habr.com/ru/post/145140/
All Articles