📜 ⬆️ ⬇️

Websocket in production

10 months ago I started making a browser toy. The choice fell on cocos js as graphics and websocket as communication with the server. I really liked the technology and on it I organized all the communication between the game and the server. Used for this article . But, unfortunately, the code shown in that article cannot be used in production. As it turned out, the level of the problem is not even critical, but blocking. Everything is so bad that I had to rewrite all communication with the server from the websockets to longpooling. In the end, I left the option "if we do not have a safari browser, then use websocket, otherwise longpolling" and a little more branching on this topic.

So, the experience of using a website in the production has accumulated decent. And just recently an event happened that prompted me to write the first article on Habré.

After the toy was published on the social network, I corrected all the found critical / blocking bugs and began to put everything in order in a quiet mode. I want to draw attention to the fact that this is an example - it is generally the only online guide that contains server code that you can insert into your code and use it. Well, type in the search engine "php websocket server" - try to find something that you can put yourself.

Suddenly, I reread the above article and at the very beginning I find links to phpdaemon and ratchet. Well, I think, let's look at the code there in a quiet mode. In PhpDeamon, in the depths of processing a WebSocket connection, there is a small, but insanely important, branch to the WebSocket protocols. And there it is directly written for one case "Safari5 and many non-browser clients". To say that I am ofigel is to say nothing. Several hundred hours swept before my eyes, tons of hassle and suffering, which called into question the project at all. I did not believe, I decided to check.
')
Within ~ 15 hours I pulled out the minimum code associated with WebSocket from PhpDeamon (which works in all browsers of the latest version, and the server code itself can work under high load) and will try to publish it with explanations. So that other people would not experience the torment that I had to go through. Yes, a piece of code turned out not small, but I'm sorry: WebSocket is very simple on the client side, and on the server side everything is quite voluminous (let's say a special “thank you” to Safari developers). Also, due to the fact that the scope of WebSocket is primarily a game, the issue of non-blocking use of a server socket is important - this is a bonus complexity that is not considered here , although it is very important.

I wanted to write a test application without objects to make it clearer. But, unfortunately, such an approach in this example will produce a lot of repeating code, so I had to add 1 class and 3 of his successor. The rest is all without objects.

To start the client part
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> <title>WebSocket test page</title> </head> <body onload="create();"> <script type="text/javascript"> function create() { // Example ws = new WebSocket('ws://'+document.domain+':8081/'); ws.onopen = function () {document.getElementById('log').innerHTML += 'WebSocket opened <br/>';} ws.onmessage = function (e) {document.getElementById('log').innerHTML += 'WebSocket message: '+e.data+' <br/>';} ws.onclose = function () {document.getElementById('log').innerHTML += 'WebSocket closed <br/>';} } </script> <button onclick="create();">Create WebSocket</button> <button onclick="ws.send('ping');">Send ping</button> <button onclick="ws.close();">Close WebSocket</button> <div id="log" style="width:300px; height: 300px; border: 1px solid #999999; overflow:auto;"></div> </body> </html> 


In my game I had to use 3 server sockets. For websocket, for workers and for longpooling. There is a lot of math in the game, so you had to do the turners and give them tasks for calculations. So this is what it is for. That stream_select should be common for all of them, otherwise there will be lags or insane processor usage. This knowledge was also obtained in return for a heap of expended nerves.

Main service cycle
 $master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr); if (!$master) die("$errstr ($errno)\n"); $sockets = array($master); stream_set_blocking($master, false); //      ,         ,     "stream_socket_accept". ,         - ,      -  . while (true) { $read = $sockets; $write = $except = array(); if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) { var_dump('stream_select error'); break; //    ,   "die",                "/etc/init.d/game restart"  100%   case,     "pcntl"       . } foreach ($read as $socket) { $index_socket = array_search($socket, $sockets); if ($index_socket == 0) { //   continue; } //      } } 


Connecting with new clients is quite a standard code, but due to the fact that our sockets are in non-blocking mode, you need to write a bunch of code that will collect all the incoming data in pieces and, when there is enough data, process them, understand what protocol is needed use and switch to using this protocol. This task alone is already a mountain of code, and in PhpDeamon they have built up a lot of code, which has nothing to do with the WebSocket (they are able to raise 8 different servers there). We managed to cut off a lot and simplify this topic. Left only that relates to WebSocket.

File truncated <ws.php>
 class ws { const MAX_BUFFER_SIZE = 1024 * 1024; protected $socket; /** * @var array _SERVER */ public $server = []; protected $headers = []; protected $closed = false; protected $unparsed_data = ''; private $current_header; private $unread_lines = array(); protected $extensions = []; protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS'; /** * @var integer Current state */ protected $state = 0; // stream state of the connection (application protocol level) /** * Alias of STATE_STANDBY */ const STATE_ROOT = 0; /** * Standby state (default state) */ const STATE_STANDBY = 0; /** * State: first line */ const STATE_FIRSTLINE = 1; /** * State: headers */ const STATE_HEADERS = 2; /** * State: content */ const STATE_CONTENT = 3; /** * State: prehandshake */ const STATE_PREHANDSHAKE = 5; /** * State: handshaked */ const STATE_HANDSHAKED = 6; public function get_state() { return $this->state; } public function closed() { return $this->closed; } protected function close() { if ($this->closed) return; var_dump('self close'); fclose($this->socket); $this->closed = true; } public function __construct($socket) { stream_set_blocking($socket, false); $this->socket = $socket; } private function read_line() { $lines = explode(PHP_EOL, $this->unparsed_data); $last_line = $lines[count($lines)-1]; unset($lines[count($lines) - 1]); foreach ($lines as $line) { $this->unread_lines[] = $line; } $this->unparsed_data = $last_line; if (count($this->unread_lines) != 0) { return array_shift($this->unread_lines); } else { return null; } } public function on_receive_data() { if ($this->closed) return; $data = stream_socket_recvfrom($this->socket, MAX_BUFFER_SIZE); if (is_string($data)) { $this->unparsed_data .= $data; } } /** * Called when new data received. * @return void */ public function on_read() { if ($this->closed) return; if ($this->state === self::STATE_STANDBY) { $this->state = self::STATE_FIRSTLINE; } if ($this->state === self::STATE_FIRSTLINE) { if (!$this->http_read_first_line()) { return; } $this->state = self::STATE_HEADERS; } if ($this->state === self::STATE_HEADERS) { if (!$this->http_read_headers()) { return; } if (!$this->http_process_headers()) { $this->close(); return; } $this->state = self::STATE_CONTENT; } if ($this->state === self::STATE_CONTENT) { $this->state = self::STATE_PREHANDSHAKE; } } /** * Read first line of HTTP request * @return boolean|null Success */ protected function http_read_first_line() { if (($l = $this->read_line()) === null) { return null; } $e = explode(' ', $l); $u = isset($e[1]) ? parse_url($e[1]) : false; if ($u === false) { $this->bad_request(); return false; } if (!isset($u['path'])) { $u['path'] = null; } if (isset($u['host'])) { $this->server['HTTP_HOST'] = $u['host']; } $srv = & $this->server; $srv['REQUEST_METHOD'] = $e[0]; $srv['REQUEST_TIME'] = time(); $srv['REQUEST_TIME_FLOAT'] = microtime(true); $srv['REQUEST_URI'] = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : ''); $srv['DOCUMENT_URI'] = $u['path']; $srv['PHP_SELF'] = $u['path']; $srv['QUERY_STRING'] = isset($u['query']) ? $u['query'] : null; $srv['SCRIPT_NAME'] = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/'; $srv['SERVER_PROTOCOL'] = isset($e[2]) ? $e[2] : 'HTTP/1.1'; $srv['REMOTE_ADDR'] = null; $srv['REMOTE_PORT'] = null; return true; } /** * Read headers line-by-line * @return boolean|null Success */ protected function http_read_headers() { while (($l = $this->read_line()) !== null) { if ($l === '') { return true; } $e = explode(': ', $l); if (isset($e[1])) { $this->current_header = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_'])); $this->server[$this->current_header] = $e[1]; } elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) { // multiline header continued $this->server[$this->current_header] .= $e[0]; } else { // whatever client speaks is not HTTP anymore $this->bad_request(); return false; } } } /** * Process headers * @return bool */ protected function http_process_headers() { $this->state = self::STATE_PREHANDSHAKE; if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) { $str = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']); $str = preg_replace($this->extensionsCleanRegex, '', $str); $this->extensions = explode(', ', $str); } if (!isset($this->server['HTTP_CONNECTION']) || (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade") || !isset($this->server['HTTP_UPGRADE']) || (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important ) { $this->close(); return false; } if (isset($this->server['HTTP_COOKIE'])) { self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie); } if (isset($this->server['QUERY_STRING'])) { self::parse_str($this->server['QUERY_STRING'], $this->get); } // ---------------------------------------------------------- // Protocol discovery, based on HTTP headers... // ---------------------------------------------------------- if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14) $this->switch_to_protocol('v13'); } elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol $this->switch_to_protocol('v13'); } else { error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr $this->close(); return false; } } elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) { $this->switch_to_protocol('ve'); } else { // Defaulting to HIXIE (Safari5 and many non-browser clients...) $this->switch_to_protocol('v0'); } // ---------------------------------------------------------- // End of protocol discovery // ---------------------------------------------------------- return true; } private function switch_to_protocol($protocol) { $class = 'ws_'.$protocol; $this->new_instance = new $class($this->socket); $this->new_instance->state = $this->state; $this->new_instance->unparsed_data = $this->unparsed_data; $this->new_instance->server = $this->server; } /** * Send Bad request * @return void */ public function bad_request() { $this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>"); $this->close(); } /** * Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX * @param string $s String to parse * @param array &$var Reference to the resulting array * @param boolean $header Header-style string * @return void */ public static function parse_str($s, &$var, $header = false) { static $cb; if ($cb === null) { $cb = function ($m) { return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8')); }; } if ($header) { $s = strtr($s, self::$hvaltr); } if ( (stripos($s, '%u') !== false) && preg_match('~(%u[af\d]{4}|%[cf][af\d](?!%[89a-f][af\d]))~is', $s, $m) ) { $s = preg_replace_callback('~%(u[af\d]{4}|[af\d]{2})~i', $cb, $s); } parse_str($s, $var); } /** * Send data to the connection. Note that it just writes to buffer that flushes at every baseloop * @param string $data Data to send * @return boolean Success */ public function write($data) { if ($this->closed) return false; return stream_socket_sendto($this->socket, $data) == 0; } } 


The meaning of this class in such an abridged form is to set the non-blocking mode in the designer to connect with the client. Then, in the main loop, every time data arrives, immediately read and put it (add) to the "unparsed_data" variable (this is the on_receive_data method). It is important to understand that if we go beyond the dimensions of MAX_BUFFER_SIZE, then nothing terrible will happen at all. It is possible in the final example, what will happen here, put its value, say, "5" and make sure that everything is still working. Simply, the data from the buffer will be ignored at the first step - they will be incomplete, and from the second or fifth or hundredth approach they will be collected, finally, all received data will be processed. In this case, the stream_select in the main loop will not even wait microseconds until all data has been retrieved. The constant must be chosen such that 95% of the expected data is read entirely.
Then in the main loop (after receiving the next piece of data) we try to process the accumulated data (this is the on_read method). In the ws class, the on_read method consists essentially of three steps: “read the first line and prepare environment variables”, “read all the headers”, “process all the headers”. The first 2 do not need to be explained, but they are written quite volumetrically because we are in non-blocking mode and we must be prepared for the data to be broken in any place. Header processing first checks whether the request is valid or not, and then defines the protocol by which it will communicate with the client. As a result, they should pull the switch_to_protocol method. This method inside itself will form an instance of the class “ws_ <protocol>” and prepare it for return to the main loop.

In the main loop, then you need to actually check: is it not necessary to replace the object (if someone can offer the implementation of this place better, you are welcome).

Next, in the main loop, you need to put a check: is the socket closed? If it is closed, then clear the memory (about this far in the next block).

Now the full version of the file <deamon.php>
 require('ws.php'); require('ws_v0.php'); require('ws_v13.php'); require('ws_ve.php'); $master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr); if (!$master) die("$errstr ($errno)\n"); $sockets = array($master); /** * @var ws[] $connections */ $connections = array(); stream_set_blocking($master, false); /** * @param ws $connection * @param $data * @param $type */ $my_callback = function($connection, $data, $type) { var_dump('my ws data: ['.$data.'/'.$type.']'); $connection->send_frame('test '.time()); }; while (true) { $read = $sockets; $write = $except = array(); if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) { var_dump('stream_select error'); break; } foreach ($read as $socket) { $index_socket = array_search($socket, $sockets); if ($index_socket == 0) { //   if ($socket_new = stream_socket_accept($master, -1)) { $connection = new ws($socket_new, $my_callback); $sockets[] = $socket_new; $index_new_socket = array_search($socket_new, $sockets); $connections[$index_new_socket] = &$connection; $index_socket = $index_new_socket; } else { //            error_log('stream_socket_accept'); var_dump('error stream_socket_accept'); continue; } } $connection = &$connections[$index_socket]; $connection->on_receive_data(); $connection->on_read(); if ($connection->get_state() == ws::STATE_PREHANDSHAKE) { $connection = $connection->get_new_instance(); $connections[$index_socket] = &$connection; $connection->on_read(); } if ($connection->closed()) { unset($sockets[$index_socket]); unset($connections[$index_socket]); unset($connection); var_dump('close '.$index_socket); } } } 


Here "$ my_callback" is added - this is our custom message handler from the client. Of course, in production, you can wrap it all in all sorts of objects, and here to make it clearer just a variable-function. More about her later.

The processing of the new connection is implemented and the main body of the cycle, about which I wrote above, is implemented.

I want to draw attention to the server code here . That if the read data from the socket is an empty string (yes, of course, I saw an update check for an empty string there), then the socket must be closed. Oh, I do not even know how much this momet drank to me and how many users I lost. Suddenly, Safari sends an empty string and considers it the norm, and this code takes and closes the connection to the user. Yandex browser sometimes behaves the same way. I don’t know why, but in this case, the WebSocket for Safari remains frozen, that is, it does not close, does not open - it just hangs and that’s it. Have you already noticed that I am not indifferent to this magic browser? I remember how I installed it under IE6 - about the same feelings.

Now, about why I use array_search and synchronize the $ sockets array and the $ connections array. The fact is that stream_select is vital for a clean array of $ sockets and nothing else. But somehow it’s necessary to bind a specific socket from the $ sockets array to the “ws” object. I tried a bunch of options - in the end I stopped at such an option that there are 2 arrays that are constantly synchronized by keys. In one array, there are required clean sockets for stream_select, and in the second, instances of the class “ws” or its heirs If someone can offer this place better - offer.

We should also separately note the case when stream_socket_accept is filed. As I understand it, theoretically, this can be only if the master socket is in non-blocking mode, and not enough data has arrived to connect the client. Therefore, we simply do nothing.

Full version of the file <ws.php>
 class ws { private static $hvaltr = ['; ' => '&', ';' => '&', ' ' => '%20']; const maxAllowedPacket = 1024 * 1024 * 1024; const MAX_BUFFER_SIZE = 1024 * 1024; protected $socket; /** * @var array _SERVER */ public $server = []; protected $on_frame_user = null; protected $handshaked = false; protected $headers = []; protected $headers_sent = false; protected $closed = false; protected $unparsed_data = ''; private $current_header; private $unread_lines = array(); /** * @var ws|null */ private $new_instance = null; protected $extensions = []; protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS'; /** * @var integer Current state */ protected $state = 0; // stream state of the connection (application protocol level) /** * Alias of STATE_STANDBY */ const STATE_ROOT = 0; /** * Standby state (default state) */ const STATE_STANDBY = 0; /** * State: first line */ const STATE_FIRSTLINE = 1; /** * State: headers */ const STATE_HEADERS = 2; /** * State: content */ const STATE_CONTENT = 3; /** * State: prehandshake */ const STATE_PREHANDSHAKE = 5; /** * State: handshaked */ const STATE_HANDSHAKED = 6; public function get_state() { return $this->state; } public function get_new_instance() { return $this->new_instance; } public function closed() { return $this->closed; } protected function close() { if ($this->closed) return; var_dump('self close'); fclose($this->socket); $this->closed = true; } public function __construct($socket, $on_frame_user = null) { stream_set_blocking($socket, false); $this->socket = $socket; $this->on_frame_user = $on_frame_user; } private function read_line() { $lines = explode(PHP_EOL, $this->unparsed_data); $last_line = $lines[count($lines)-1]; unset($lines[count($lines) - 1]); foreach ($lines as $line) { $this->unread_lines[] = $line; } $this->unparsed_data = $last_line; if (count($this->unread_lines) != 0) { return array_shift($this->unread_lines); } else { return null; } } public function on_receive_data() { if ($this->closed) return; $data = stream_socket_recvfrom($this->socket, self::MAX_BUFFER_SIZE); if (is_string($data)) { $this->unparsed_data .= $data; } } /** * Called when new data received. * @return void */ public function on_read() { if ($this->closed) return; if ($this->state === self::STATE_STANDBY) { $this->state = self::STATE_FIRSTLINE; } if ($this->state === self::STATE_FIRSTLINE) { if (!$this->http_read_first_line()) { return; } $this->state = self::STATE_HEADERS; } if ($this->state === self::STATE_HEADERS) { if (!$this->http_read_headers()) { return; } if (!$this->http_process_headers()) { $this->close(); return; } $this->state = self::STATE_CONTENT; } if ($this->state === self::STATE_CONTENT) { $this->state = self::STATE_PREHANDSHAKE; } } /** * Read first line of HTTP request * @return boolean|null Success */ protected function http_read_first_line() { if (($l = $this->read_line()) === null) { return null; } $e = explode(' ', $l); $u = isset($e[1]) ? parse_url($e[1]) : false; if ($u === false) { $this->bad_request(); return false; } if (!isset($u['path'])) { $u['path'] = null; } if (isset($u['host'])) { $this->server['HTTP_HOST'] = $u['host']; } $address = explode(':', stream_socket_get_name($this->socket, true)); //   $srv = & $this->server; $srv['REQUEST_METHOD'] = $e[0]; $srv['REQUEST_TIME'] = time(); $srv['REQUEST_TIME_FLOAT'] = microtime(true); $srv['REQUEST_URI'] = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : ''); $srv['DOCUMENT_URI'] = $u['path']; $srv['PHP_SELF'] = $u['path']; $srv['QUERY_STRING'] = isset($u['query']) ? $u['query'] : null; $srv['SCRIPT_NAME'] = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/'; $srv['SERVER_PROTOCOL'] = isset($e[2]) ? $e[2] : 'HTTP/1.1'; $srv['REMOTE_ADDR'] = $address[0]; $srv['REMOTE_PORT'] = $address[1]; return true; } /** * Read headers line-by-line * @return boolean|null Success */ protected function http_read_headers() { while (($l = $this->read_line()) !== null) { if ($l === '') { return true; } $e = explode(': ', $l); if (isset($e[1])) { $this->current_header = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_'])); $this->server[$this->current_header] = $e[1]; } elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) { // multiline header continued $this->server[$this->current_header] .= $e[0]; } else { // whatever client speaks is not HTTP anymore $this->bad_request(); return false; } } } /** * Process headers * @return bool */ protected function http_process_headers() { $this->state = self::STATE_PREHANDSHAKE; if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) { $str = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']); $str = preg_replace($this->extensionsCleanRegex, '', $str); $this->extensions = explode(', ', $str); } if (!isset($this->server['HTTP_CONNECTION']) || (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade") || !isset($this->server['HTTP_UPGRADE']) || (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important ) { $this->close(); return false; } /* if (isset($this->server['HTTP_COOKIE'])) { self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie); } if (isset($this->server['QUERY_STRING'])) { self::parse_str($this->server['QUERY_STRING'], $this->get); } */ // ---------------------------------------------------------- // Protocol discovery, based on HTTP headers... // ---------------------------------------------------------- if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14) $this->switch_to_protocol('v13'); } elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol $this->switch_to_protocol('v13'); } else { error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr $this->close(); return false; } } elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) { $this->switch_to_protocol('ve'); } else { // Defaulting to HIXIE (Safari5 and many non-browser clients...) $this->switch_to_protocol('v0'); } // ---------------------------------------------------------- // End of protocol discovery // ---------------------------------------------------------- return true; } private function switch_to_protocol($protocol) { $class = 'ws_'.$protocol; $this->new_instance = new $class($this->socket); $this->new_instance->state = $this->state; $this->new_instance->unparsed_data = $this->unparsed_data; $this->new_instance->server = $this->server; $this->new_instance->on_frame_user = $this->on_frame_user; } /** * Send Bad request * @return void */ public function bad_request() { $this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>"); $this->close(); } /** * Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX * @param string $s String to parse * @param array &$var Reference to the resulting array * @param boolean $header Header-style string * @return void */ public static function parse_str($s, &$var, $header = false) { static $cb; if ($cb === null) { $cb = function ($m) { return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8')); }; } if ($header) { $s = strtr($s, self::$hvaltr); } if ( (stripos($s, '%u') !== false) && preg_match('~(%u[af\d]{4}|%[cf][af\d](?!%[89a-f][af\d]))~is', $s, $m) ) { $s = preg_replace_callback('~%(u[af\d]{4}|[af\d]{2})~i', $cb, $s); } parse_str($s, $var); } /** * Send data to the connection. Note that it just writes to buffer that flushes at every baseloop * @param string $data Data to send * @return boolean Success */ public function write($data) { if ($this->closed) return false; return stream_socket_sendto($this->socket, $data) == 0; } /** *         * @return bool */ protected function send_handshake_reply() { return false; } /** * Called when we're going to handshake. * @return boolean Handshake status */ public function handshake() { $extra_headers = ''; foreach ($this->headers as $k => $line) { if ($k !== 'STATUS') { $extra_headers .= $line . "\r\n"; } } if (!$this->send_handshake_reply($extra_headers)) { error_log(get_class($this) . '::' . __METHOD__ . ' : Handshake protocol failure for client ""'); // $this->addr $this->close(); return false; } $this->handshaked = true; $this->headers_sent = true; $this->state = static::STATE_HANDSHAKED; return true; } /** * Read from buffer without draining * @param integer $n Number of bytes to read * @param integer $o Offset * @return string|false */ public function look($n, $o = 0) { if (strlen($this->unparsed_data) <= $o) { return ''; } return substr($this->unparsed_data, $o, $n); } /** * Convert bytes into integer * @param string $str Bytes * @param boolean $l Little endian? Default is false * @return integer */ public static function bytes2int($str, $l = false) { if ($l) { $str = strrev($str); } $dec = 0; $len = strlen($str); for ($i = 0; $i < $len; ++$i) { $dec += ord(substr($str, $i, 1)) * pow(0x100, $len - $i - 1); } return $dec; } /** * Drains buffer * @param integer $n Numbers of bytes to drain * @return boolean Success */ public function drain($n) { $ret = substr($this->unparsed_data, 0, $n); $this->unparsed_data = substr($this->unparsed_data, $n); return $ret; } /** * Read data from the connection's buffer * @param integer $n Max. number of bytes to read * @return string|false Readed data */ public function read($n) { if ($n <= 0) { return ''; } $read = $this->drain($n); if ($read === '') { return false; } return $read; } /** * Reads all data from the connection's buffer * @return string Readed data */ public function read_unlimited() { $ret = $this->unparsed_data; $this->unparsed_data = ''; return $ret; } /** * Searches first occurence of the string in input buffer * @param string $what Needle * @param integer $start Offset start * @param integer $end Offset end * @return integer Position */ public function search($what, $start = 0, $end = -1) { return strpos($this->unparsed_data, $what, $start); } /** * Called when new frame received. * @param string $data Frame's data. * @param string $type Frame's type ("STRING" OR "BINARY"). * @return boolean Success. */ public function on_frame($data, $type) { if (is_callable($this->on_frame_user)) { call_user_func($this->on_frame_user, $this, $data, $type); } return true; } public function send_frame($data, $type = null, $cb = null) { return false; } /** * Get real frame type identificator * @param $type * @return integer */ public function get_frame_type($type) { if (is_int($type)) { return $type; } if ($type === null) { $type = 'STRING'; } $frametype = @constant(get_class($this) . '::' . $type); if ($frametype === null) { error_log(__METHOD__ . ' : Undefined frametype "' . $type . '"'); } return $frametype; } } 


In fact, there are 3 things added: “connection with the client at the web socket level”, “receiving a message from the client”, “sending a message to the client”.

First, a little theory and terminology. “Handshake” is, from the web sockets point of view, a connection establishment procedure over http. You have to solve a lot of questions: how to break through the thick of the proxy and cache, how to protect yourself from evil hackers. And the term "frame" is a piece of data in decrypted form, this is a message from a client or a message to a client. Perhaps it was worth writing about this at the beginning of the article, but because of these “frames”, it’s pointless to make a socket server in blocking sockets mode. The way this moment is made right here - it deprived me of sleep for more than one night. The article does not consider the option that the frame did not arrive completely or two arrived at once. And this and that, by the way, is quite a typical situation, as the logs of the game showed.

Now to the details.

— , (, ws_v0) «on_read» «handshake», . «handshake» . «send_handshake_reply», . «send_handshake_reply» , , « », — , — .

. , , , . «unparsed_data» . «on_read» frame, , frame , frame . - ( , frame , frame). «on_read», (- ), «on_frame», «ws», , , custom callback ( $my_callback, ). $my_callback .

. «send_frame», . . -.

3 «v13», «v0», «ve»:

<ws_v13.php>
 class ws_v13 extends ws { const CONTINUATION = 0; const STRING = 0x1; const BINARY = 0x2; const CONNCLOSE = 0x8; const PING = 0x9; const PONG = 0xA; protected static $opcodes = [ 0 => 'CONTINUATION', 0x1 => 'STRING', 0x2 => 'BINARY', 0x8 => 'CONNCLOSE', 0x9 => 'PING', 0xA => 'PONG', ]; protected $outgoingCompression = 0; protected $framebuf = ''; /** * Apply mask * @param $data * @param string|false $mask * @return mixed */ public function mask($data, $mask) { for ($i = 0, $l = strlen($data), $ml = strlen($mask); $i < $l; $i++) { $data[$i] = $data[$i] ^ $mask[$i % $ml]; } return $data; } /** * Sends a frame. * @param string $data Frame's data. * @param string $type Frame's type. ("STRING" OR "BINARY") * @param callable $cb Optional. Callback called when the frame is received by client. * @callback $cb ( ) * @return boolean Success. */ public function send_frame($data, $type = null, $cb = null) { if (!$this->handshaked) { return false; } if ($this->closed && $type !== 'CONNCLOSE') { return false; } /*if (in_array($type, ['STRING', 'BINARY']) && ($this->outgoingCompression > 0) && in_array('deflate-frame', $this->extensions)) { //$data = gzcompress($data, $this->outgoingCompression); //$rsv1 = 1; }*/ $fin = 1; $rsv1 = 0; $rsv2 = 0; $rsv3 = 0; $this->write(chr(bindec($fin . $rsv1 . $rsv2 . $rsv3 . str_pad(decbin($this->get_frame_type($type)), 4, '0', STR_PAD_LEFT)))); $dataLength = strlen($data); $isMasked = false; $isMaskedInt = $isMasked ? 128 : 0; if ($dataLength <= 125) { $this->write(chr($dataLength + $isMaskedInt)); } elseif ($dataLength <= 65535) { $this->write(chr(126 + $isMaskedInt) . // 126 + 128 chr($dataLength >> 8) . chr($dataLength & 0xFF)); } else { $this->write(chr(127 + $isMaskedInt) . // 127 + 128 chr($dataLength >> 56) . chr($dataLength >> 48) . chr($dataLength >> 40) . chr($dataLength >> 32) . chr($dataLength >> 24) . chr($dataLength >> 16) . chr($dataLength >> 8) . chr($dataLength & 0xFF)); } if ($isMasked) { $mask = chr(mt_rand(0, 0xFF)) . chr(mt_rand(0, 0xFF)) . chr(mt_rand(0, 0xFF)) . chr(mt_rand(0, 0xFF)); $this->write($mask . $this->mask($data, $mask)); } else { $this->write($data); } if ($cb !== null) { $cb(); } return true; } /** * Sends a handshake message reply * @param string Received data (no use in this class) * @return boolean OK? */ public function send_handshake_reply($extraHeaders = '') { if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY']) || !isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { return false; } if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '13' && $this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '8') { return false; } if (isset($this->server['HTTP_ORIGIN'])) { $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = $this->server['HTTP_ORIGIN']; } if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) { $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = ''; } $this->write("HTTP/1.1 101 Switching Protocols\r\n" . "Upgrade: WebSocket\r\n" . "Connection: Upgrade\r\n" . "Date: " . date('r') . "\r\n" . "Sec-WebSocket-Origin: " . $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] . "\r\n" . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n" . "Sec-WebSocket-Accept: " . base64_encode(sha1(trim($this->server['HTTP_SEC_WEBSOCKET_KEY']) . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true)) . "\r\n" ); if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) { $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n"); } $this->write($extraHeaders."\r\n"); return true; } /** * Called when new data received * @see http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10#page-16 * @return void */ public function on_read() { if ($this->closed) return; if ($this->state === self::STATE_PREHANDSHAKE) { if (!$this->handshake()) { return; } } if ($this->state === self::STATE_HANDSHAKED) { while (($buflen = strlen($this->unparsed_data)) >= 2) { $first = ord($this->look(1)); // first byte integer (fin, opcode) $firstBits = decbin($first); $opcode = (int)bindec(substr($firstBits, 4, 4)); if ($opcode === 0x8) { // CLOSE $this->close(); return; } $opcodeName = isset(static::$opcodes[$opcode]) ? static::$opcodes[$opcode] : false; if (!$opcodeName) { error_log(get_class($this) . ': Undefined opcode ' . $opcode); $this->close(); return; } $second = ord($this->look(1, 1)); // second byte integer (masked, payload length) $fin = (bool)($first >> 7); $isMasked = (bool)($second >> 7); $dataLength = $second & 0x7f; $p = 2; if ($dataLength === 0x7e) { // 2 bytes-length if ($buflen < $p + 2) { return; // not enough data yet } $dataLength = self::bytes2int($this->look(2, $p), false); $p += 2; } elseif ($dataLength === 0x7f) { // 8 bytes-length if ($buflen < $p + 8) { return; // not enough data yet } $dataLength = self::bytes2int($this->look(8, $p)); $p += 8; } if (self::maxAllowedPacket <= $dataLength) { // Too big packet $this->close(); return; } if ($isMasked) { if ($buflen < $p + 4) { return; // not enough data yet } $mask = $this->look(4, $p); $p += 4; } if ($buflen < $p + $dataLength) { return; // not enough data yet } $this->drain($p); $data = $this->read($dataLength); if ($isMasked) { $data = $this->mask($data, $mask); } //Daemon::log(Debug::dump(array('ext' => $this->extensions, 'rsv1' => $firstBits[1], 'data' => Debug::exportBytes($data)))); /*if ($firstBits[1] && in_array('deflate-frame', $this->extensions)) { // deflate frame $data = gzuncompress($data, $this->pool->maxAllowedPacket); }*/ if (!$fin) { $this->framebuf .= $data; } else { $this->on_frame($this->framebuf . $data, $opcodeName); $this->framebuf = ''; } } } } } 


<ws_v0.php>
 class ws_v0 extends ws { const STRING = 0x00; const BINARY = 0x80; protected $key; /** * Sends a handshake message reply * @param string Received data (no use in this class) * @return boolean OK? */ public function send_handshake_reply($extraHeaders = '') { if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) { return false; } $final_key = $this->_computeFinalKey($this->server['HTTP_SEC_WEBSOCKET_KEY1'], $this->server['HTTP_SEC_WEBSOCKET_KEY2'], $this->key); $this->key = null; if (!$final_key) { return false; } if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) { $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = ''; } $this->write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" . "Upgrade: WebSocket\r\n" . "Connection: Upgrade\r\n" . "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n" . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n"); if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) { $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n"); } $this->write($extraHeaders . "\r\n" . $final_key); return true; } /** * Computes final key for Sec-WebSocket. * @param string Key1 * @param string Key2 * @param string Data * @return string Result */ protected function _computeFinalKey($key1, $key2, $data) { if (strlen($data) < 8) { error_log(get_class($this) . '::' . __METHOD__ . ' : Invalid handshake data for client ""'); // $this->addr return false; } return md5($this->_computeKey($key1) . $this->_computeKey($key2) . substr($data, 0, 8), true); } /** * Computes key for Sec-WebSocket. * @param string Key * @return string Result */ protected function _computeKey($key) { $spaces = 0; $digits = ''; for ($i = 0, $s = strlen($key); $i < $s; ++$i) { $c = substr($key, $i, 1); if ($c === "\x20") { ++$spaces; } elseif (ctype_digit($c)) { $digits .= $c; } } if ($spaces > 0) { $result = (float)floor($digits / $spaces); } else { $result = (float)$digits; } return pack('N', $result); } /** * Sends a frame. * @param string $data Frame's data. * @param string $type Frame's type. ("STRING" OR "BINARY") * @param callable $cb Optional. Callback called when the frame is received by client. * @callback $cb ( ) * @return boolean Success. */ public function send_frame($data, $type = null, $cb = null) { if (!$this->handshaked) { return false; } if ($this->closed && $type !== 'CONNCLOSE') { return false; } if ($type === 'CONNCLOSE') { if ($cb !== null) { $cb($this); return true; } } $type = $this->get_frame_type($type); // Binary if (($type & self::BINARY) === self::BINARY) { $n = strlen($data); $len = ''; $pos = 0; char: ++$pos; $c = $n >> 0 & 0x7F; $n >>= 7; if ($pos !== 1) { $c += 0x80; } if ($c !== 0x80) { $len = chr($c) . $len; goto char; }; $this->write(chr(self::BINARY) . $len . $data); } // String else { $this->write(chr(self::STRING) . $data . "\xFF"); } if ($cb !== null) { $cb(); } return true; } /** * Called when new data received * @return void */ public function on_read() { if ($this->state === self::STATE_PREHANDSHAKE) { if (strlen($this->unparsed_data) < 8) { return; } $this->key = $this->read_unlimited(); $this->handshake(); } if ($this->state === self::STATE_HANDSHAKED) { while (($buflen = strlen($this->unparsed_data)) >= 2) { $hdr = $this->look(10); $frametype = ord(substr($hdr, 0, 1)); if (($frametype & 0x80) === 0x80) { $len = 0; $i = 0; do { if ($buflen < $i + 1) { // not enough data yet return; } $b = ord(substr($hdr, ++$i, 1)); $n = $b & 0x7F; $len *= 0x80; $len += $n; } while ($b > 0x80); if (self::maxAllowedPacket <= $len) { // Too big packet $this->close(); return; } if ($buflen < $len + $i + 1) { // not enough data yet return; } $this->drain($i + 1); $this->on_frame($this->read($len), 'BINARY'); } else { if (($p = $this->search("\xFF")) !== false) { if (self::maxAllowedPacket <= $p - 1) { // Too big packet $this->close(); return; } $this->drain(1); $data = $this->read($p); $this->drain(1); $this->on_frame($data, 'STRING'); } else { if (self::maxAllowedPacket < $buflen - 1) { // Too big packet $this->close(); return; } // not enough data yet return; } } } } } } 


<ws_ve.php>
 class ws_ve extends ws { const STRING = 0x00; const BINARY = 0x80; /** * Sends a handshake message reply * @param string Received data (no use in this class) * @return boolean OK? */ public function send_handshake_reply($extraHeaders = '') { if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) { $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = ''; } $this->write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" . "Upgrade: WebSocket\r\n" . "Connection: Upgrade\r\n" . "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n" . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n" ); if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) { $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n"); } $this->write($extraHeaders."\r\n"); return true; } /** * Computes key for Sec-WebSocket. * @param string Key * @return string Result */ protected function _computeKey($key) { $spaces = 0; $digits = ''; for ($i = 0, $s = strlen($key); $i < $s; ++$i) { $c = substr($key, $i, 1); if ($c === "\x20") { ++$spaces; } elseif (ctype_digit($c)) { $digits .= $c; } } if ($spaces > 0) { $result = (float)floor($digits / $spaces); } else { $result = (float)$digits; } return pack('N', $result); } /** * Sends a frame. * @param string $data Frame's data. * @param string $type Frame's type. ("STRING" OR "BINARY") * @param callable $cb Optional. Callback called when the frame is received by client. * @callback $cb ( ) * @return boolean Success. */ public function send_frame($data, $type = null, $cb = null) { if (!$this->handshaked) { return false; } if ($this->closed && $type !== 'CONNCLOSE') { return false; } if ($type === 'CONNCLOSE') { if ($cb !== null) { $cb($this); return true; } } // Binary $type = $this->get_frame_type($type); if (($type & self::BINARY) === self::BINARY) { $n = strlen($data); $len = ''; $pos = 0; char: ++$pos; $c = $n >> 0 & 0x7F; $n >>= 7; if ($pos !== 1) { $c += 0x80; } if ($c !== 0x80) { $len = chr($c) . $len; goto char; }; $this->write(chr(self::BINARY) . $len . $data); } // String else { $this->write(chr(self::STRING) . $data . "\xFF"); } if ($cb !== null) { $cb(); } return true; } /** * Called when new data received * @return void */ public function on_read() { while (($buflen = strlen($this->unparsed_data)) >= 2) { $hdr = $this->look(10); $frametype = ord(substr($hdr, 0, 1)); if (($frametype & 0x80) === 0x80) { $len = 0; $i = 0; do { if ($buflen < $i + 1) { return; } $b = ord(substr($hdr, ++$i, 1)); $n = $b & 0x7F; $len *= 0x80; $len += $n; } while ($b > 0x80); if (self::maxAllowedPacket <= $len) { // Too big packet $this->close(); return; } if ($buflen < $len + $i + 1) { // not enough data yet return; } $this->drain($i + 1); $this->on_frame($this->read($len), $frametype); } else { if (($p = $this->search("\xFF")) !== false) { if (self::maxAllowedPacket <= $p - 1) { // Too big packet $this->close(); return; } $this->drain(1); $data = $this->read($p); $this->drain(1); $this->on_frame($data, 'STRING'); } else { if (self::maxAllowedPacket < $buflen - 1) { // Too big packet $this->close(); return; } } } } } } 


, VE — . PhpDeamon.

V13 (FireFox, Opera, Chrome, ). IE (, IE6 — IE «», IE , « , »). V0 «».

Instead of conclusion


, (, , . callback frame -). , - « Anlide PhpDeamon». , , . .

:

Source: https://habr.com/ru/post/301822/


All Articles