At various conferences, we repeatedly told about our cloud for CLI scripts ( videotape of the report , slides ). The cloud is designed to run various PHP scripts on a schedule or through an API. As a rule, these scripts process queues, and the load is spread over approximately 100 servers. Previously, we focused on how the control logic is implemented, which is responsible for evenly distributing the load over such a number of servers and generating tasks on a schedule. But beyond that, we needed to write a daemon that would be able to run our PHP scripts in the CLI and monitor their execution status.diff --git a/ext/standard/php_fopen_wrapper.cb/ext/standard/php_fopen_wrapper.c index f8d7bda..fee964c 100644 --- a/ext/standard/php_fopen_wrapper.c +++ b/ext/standard/php_fopen_wrapper.c @@ -24,6 +24,7 @@ #if HAVE_UNISTD_H #include <unistd.h> #endif +#include <fcntl.h> #include "php.h" #include "php_globals.h" @@ -296,11 +297,11 @@ php_stream * php_stream_url_wrap_php(php_stream_wrapper *wrapper, char *path, ch "The file descriptors must be non-negative numbers smaller than %d", dtablesize); return NULL; } - - fd = dup(fildes_ori); - if (fd == -1) { + + fd = fildes_ori; + if (fcntl(fildes_ori, F_GETFD) == -1) { php_stream_wrapper_log_error(wrapper, options TSRMLS_CC, - "Error duping file descriptor %ld; possibly it doesn't exist: " + "File descriptor %ld invalid: " "[%d]: %s", fildes_ori, errno, strerror(errno)); return NULL; } diff --git a/ext/standard/streamsfuncs.cb/ext/standard/streamsfuncs.c index 0610ecf..14fd3b0 100644 --- a/ext/standard/streamsfuncs.c +++ b/ext/standard/streamsfuncs.c @@ -24,6 +24,7 @@ #include "ext/standard/flock_compat.h" #include "ext/standard/file.h" #include "ext/standard/php_filestat.h" +#include "ext/standard/php_fopen_wrappers.h" #include "php_open_temporary_file.h" #include "ext/standard/basic_functions.h" #include "php_ini.h" @@ -484,6 +485,7 @@ PHP_FUNCTION(stream_get_meta_data) zval *arg1; php_stream *stream; zval *newval; + int tmp_fd; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &arg1) == FAILURE) { return; @@ -502,6 +504,9 @@ PHP_FUNCTION(stream_get_meta_data) add_assoc_string(return_value, "wrapper_type", (char *)stream->wrapper->wops->label, 1); } add_assoc_string(return_value, "stream_type", (char *)stream->ops->label, 1); + if (SUCCESS == php_stream_cast(stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&tmp_fd, 1) && tmp_fd != -1) { + add_assoc_long(return_value, "fd", tmp_fd); + } add_assoc_string(return_value, "mode", stream->mode, 1); $ telnet localhost 31337 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. {"hash":1} # "Request had 1 keys" {"hash":1,"cnt":2} # "Request had 2 keys"
<?php class Simple { const PORT = 31337; const SERVER_KEY = 'SERVER'; /** @var resource[] (client_id => stream) */ private $streams = []; /** @var string[] (client_id => read buffer) */ private $read_buf = []; /** @var string[] (client_id => write buffer) */ private $write_buf = []; /** @var resource[] (client_id => stream from which to read) */ private $read = []; /** @var resource[] (client_id => stream where to write) */ private $write = []; /** @var int Total connection count */ private $conn_count = 0; public function run() { $this->listen(); echo "Entering main loop\n"; $this->mainLoop(); } protected function listen() { $port = self::PORT; $ip_port = "0.0.0.0:$port"; $address = "tcp://$ip_port"; $server = stream_socket_server($address, $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN); if (!$server) { fwrite(STDERR, "stream_socket_server failed: $errno $errstr\n"); exit(1); } $this->read[self::SERVER_KEY] = $server; echo "Listening on $address\n"; } public function response($stream_id, $response) { $json_resp = json_encode($response); echo "stream$stream_id " . $json_resp . "\n"; $this->write($stream_id, $json_resp . "\n"); } public function write($stream_id, $buf) { $this->write_buf[$stream_id] .= $buf; if (!isset($this->write[$stream_id])) { $this->write[$stream_id] = $this->streams[$stream_id]; } } public function accept($server) { echo "Accepting new connection\n"; $client = stream_socket_accept($server, 1, $peername); $stream_id = ($this->conn_count++); if (!$client) { fwrite(STDERR, "Accept failed\n"); return; } stream_set_read_buffer($client, 0); stream_set_write_buffer($client, 0); stream_set_blocking($client, 0); stream_set_timeout($client, 1); $this->read_buf[$stream_id] = ''; $this->write_buf[$stream_id] = ''; $this->read[$stream_id] = $this->streams[$stream_id] = $client; echo "Connected stream$stream_id: $peername\n"; } private function disconnect($stream_id) { echo "Disconnect stream$stream_id\n"; unset($this->read_buf[$stream_id], $this->write_buf[$stream_id]); unset($this->streams[$stream_id]); unset($this->write[$stream_id], $this->read[$stream_id]); } private function handleRead($stream_id) { $buf = fread($this->streams[$stream_id], 8192); if ($buf === false || $buf === '') { echo "got EOF from stream$stream_id\n"; if (empty($this->write_buf[$stream_id])) { $this->disconnect($stream_id); } else { unset($this->read[$stream_id]); } return; } $this->read_buf[$stream_id] .= $buf; $this->processJSONRequests($stream_id); } private function processJSONRequests($stream_id) { if (!strpos($this->read_buf[$stream_id], "\n")) return; $requests = explode("\n", $this->read_buf[$stream_id]); $this->read_buf[$stream_id] = array_pop($requests); foreach ($requests as $req) { $res = json_decode(rtrim($req), true); if ($res !== false) { $this->response($stream_id, "Request had " . count($res) . " keys"); } else { $this->response($stream_id, "Invalid JSON"); } } } private function handleWrite($stream_id) { if (!isset($this->write_buf[$stream_id])) { return; } $wrote = fwrite($this->streams[$stream_id], substr($this->write_buf[$stream_id], 0, 65536)); if ($wrote === false) { fwrite(STDERR, "write failed into stream #$stream_id\n"); $this->disconnect($stream_id); return; } if ($wrote === strlen($this->write_buf[$stream_id])) { $this->write_buf[$stream_id] = ''; unset($this->write[$stream_id]); if (empty($this->read[$stream_id])) { $this->disconnect($stream_id); } } else { $this->write_buf[$stream_id] = substr($this->write_buf[$stream_id], $wrote); } } public function mainLoop() { while (true) { $read = $this->read; $write = $this->write; $except = null; echo "Selecting for " . count($read) . " reads, " . count($write) . " writes\n"; $n = stream_select($read, $write, $except, NULL); if (!$n) { fwrite(STDERR, "Could not stream_select()\n"); } if (count($read)) { echo "Can read from " . count($read) . " streams\n"; } if (count($write)) { echo "Can write to " . count($write) . " streams\n"; } if (isset($read[self::SERVER_KEY])) { $this->accept($read[self::SERVER_KEY]); unset($read[self::SERVER_KEY]); } foreach ($read as $stream_id => $_) { $this->handleRead($stream_id); } foreach ($write as $stream_id => $_) { $this->handleWrite($stream_id); } } } } $instance = new Simple(); $instance->run(); $ telnet localhost 31337 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. # restart # "Restarted successfully" # run {"hash":1,"params":[1,2,3],"class":"TestClass1"} # {"error_text":"OK"} # ( child TestClass1 ) restart "Restarted successfully" # : check {"hash":1} {"error_text":"Still running"} # 5 : TestClass1 check {"hash":1} {"retcode":0} # , free check {"hash":1} {"retcode":0} free {"hash":1} {"error_text":"OK"} restart "Restarted successfully" # , restart restart {"error_text":"Restarted successfully"} bye Connection closed by foreign host.
echo "Creating restart file...\n"; if (!$res = $this->getFdRestartData()) { fwrite(STDERR, "Could not get restart FD data, exiting, graceful restart is not supported\n"); exit(0); } /* Close all extra file descriptors that we do not know of, including opendir() descriptor :) */ $dh = opendir("/proc/self/fd"); $fds = []; while (false !== ($file = readdir($dh))) { if ($file[0] === '.') continue; $fds[] = $file; } foreach ($fds as $fd) { if (!isset($this->known_fds[$fd])) { fclose(fopen("php://fd/" . $fd, 'r+')); } } $contents = serialize($res); if (file_put_contents(self::RESTART_DIR . self::RESTART_FILENAME, $contents) !== strlen($contents)) { fwrite(STDERR, "Could not fully write restart file\n"); unlink(self::RESTART_DIR . self::RESTART_FILENAME); } $res = []; foreach (self::$restart_fd_resources as $prop) { $res[$prop] = []; foreach ($this->$prop as $k => $v) { $meta = stream_get_meta_data($v); if (!isset($meta['fd'])) { fwrite(STDERR, "No fd in stream metadata for resource $v (key $k in $prop), got " . var_export($meta, true) . "\n"); return false; } $res[$prop][$k] = $meta['fd']; $this->known_fds[$meta['fd']] = true; } } foreach (self::$restart_fd_props as $prop) { $res[$prop] = $this->$prop; } return $res; if (!file_exists(self::RESTART_DIR . self::RESTART_FILENAME)) { return; } echo "Restart file found, trying to adopt it\n"; $contents = file_get_contents(self::RESTART_DIR . self::RESTART_FILENAME); unlink(self::RESTART_DIR . self::RESTART_FILENAME); if ($contents === false) { fwrite(STDERR, "Could not read restart file\n"); return; } $res = unserialize($contents); if (!$res) { fwrite(STDERR, "Could not unserialize restart file contents"); return; } foreach (self::$restart_props as $prop) { if (!array_key_exists($prop, $res)) { fwrite(STDERR, "No property $prop in restart file\n"); continue; } $this->$prop = $res[$prop]; } $this->loadFdRestartData($res); $fd_resources = []; foreach (self::$restart_fd_resources as $prop) { if (!isset($res[$prop])) { fwrite(STDERR, "Property '$prop' is not present in restart fd resources\n"); continue; } $pp = []; foreach ($res[$prop] as $k => $v) { if (isset($fd_resources[$v])) { $pp[$k] = $fd_resources[$v]; } else { $fp = fopen("php://fd/" . $v, 'r+'); if (!$fp) { fwrite(STDERR, "Failed to open fd = $v, exiting\n"); exit(1); } stream_set_read_buffer($fp, 0); stream_set_write_buffer($fp, 0); stream_set_blocking($fp, 0); stream_set_timeout($fp, self::CONN_TIMEOUT); $fd_resources[$v] = $fp; $pp[$k] = $fp; } } $this->$prop = $pp; } foreach (self::$restart_fd_props as $prop) { if (!isset($res[$prop])) { fwrite(STDERR, "Property '$prop' is not present in restart fd properties\n"); continue; } $this->$prop = $res[$prop]; } Source: https://habr.com/ru/post/252809/
All Articles