📜 ⬆️ ⬇️

PHP process manager

image

Hello!

On Habré there were many articles about how to write demons for PHP and other fork-nuty things. I want to share with you my work on a similar, but still somewhat different topic - managing several PHP processes .

')
To begin with a small glossary of terms used in the article.


The goal of the task is to be able to influence the already running and running processes and receive information on the progress of their implementation.

To launch new processes, I use the proc_open function, which allows you to redefine the I / O handles for the new process. To manage a separate process, a Job class has been developed. The work is characterized by the name and team performed.

class Job { protected $_pid = 0; protected $_name; protected $_cmd = ''; protected $_stderr = '/dev/null'; private $_resource = NULL; private $_pipes = array(); private $_waitpid = TRUE; public function __construct($cmd, $name = 'job') { $this->_cmd = $cmd; $this->_name = $name; } public function __destruct() { //    if ($this->_resource) { if ($this->_waitpid && $this->isRunning()) { echo "Waiting for job to complete "; $status = NULL; pcntl_waitpid($this->_pid, $status); /*while ($this->isRunning()) { echo '.'; sleep(1); }*/ echo "\n"; } } //   if (isset($this->_pipes) && is_array($this->_pipes)) { foreach (array_keys($this->_pipes) as $index ) { if (is_resource($this->_pipes[$index])) { fflush($this->_pipes[$index]); fclose($this->_pipes[$index]); unset($this->_pipes[$index]); } } } //    if ($this->_resource) { proc_close($this->_resource); unset($this->_resource); } } public function pid() { return $this->_pid; } public function name() { return $this->_name; } //    "". $nohup      private function readPipe($index, $nohup = FALSE) { if (!isset($this->_pipes[$index])) return FALSE; if (!is_resource($this->_pipes[$index]) || feof($this->_pipes[$index])) return FALSE; if ($nohup) { $data = ''; while ($line = fgets($this->_pipes[$index])) { $data .= $line; } return $data; } while ($data = fgets($this->_pipes[$index])) { echo $data; } } public function pipeline($nohup = FALSE) { return $this->readPipe(1, $nohup); } public function stderr($nohup = FALSE) { return $this->readPipe(2, $nohup); } //      public function execute() { //         $descriptorspec = array( 0 => array('pipe', 'r'), // stdin 1 => array('pipe', 'w'), // stdout 2 => array('pipe', 'w') // stderr ); $this->_resource = proc_open('exec '.$this->_cmd, $descriptorspec, $this->_pipes); //      stream_set_blocking($this->_pipes[0], 0); stream_set_blocking($this->_pipes[1], 0); stream_set_blocking($this->_pipes[2], 0); if (!is_resource($this->_resource)) return FALSE; $proc_status = proc_get_status($this->_resource); $this->_pid = isset($proc_status['pid']) ? $proc_status['pid'] : 0; } public function getPipe() { return $this->_pipes[1]; } public function getStderr() { return $this->_pipes[2]; } public function isRunning() { if (!is_resource($this->_resource)) return FALSE; $proc_status = proc_get_status($this->_resource); return isset($proc_status['running']) && $proc_status['running']; } //    public function signal($sig) { if (!$this->isRunning()) return FALSE; posix_kill($this->_pid, $sig); } //    STDIN  public function message($msg) { if (!$this->isRunning()) return FALSE; fwrite($this->_pipes[0], $msg); } } 


To manage the work created class Job_Manager, which in fact is the key throughout the scheme.

 class Job_Manager { private $_pool_size = 20; private $_pool = array(); private $_streams = array(); private $_stderr = array(); private $_is_terminated = FALSE; protected $_dispatch_function = NULL; public function __construct() { // init pool // } public function __destruct() { // destroy pool foreach (array_keys($this->_pool) as $index) { $this->stopJob($index); } } //     private function checkJobs() { $running_jobs = 0; foreach ($this->_pool as $index => $job) { if (!$job->isRunning()) { echo "Stopping job ".$this->_pool[$index]->name()." ($index)" . PHP_EOL; $this->stopJob($index); } else { $running_jobs++; } } return $running_jobs; } private function getFreeIndex() { foreach ($this->_pool as $index => $job) { if (!isset($job)) return $index; } return count($this->_pool) < $this->_pool_size ? count($this->_pool) : -1; } //    public function startJob($cmd, $name = 'job') { // broadcast existing jobs $this->checkJobs(); $free_pool_slots = $this->_pool_size - count($this->_pool); if ($free_pool_slots <= 0) { // output error "no free slots in the pool" return -1; } $free_slot_index = $this->getFreeIndex(); if ($free_slot_index < 0) { return -1; } echo "Starting job $name ($free_slot_index)" . PHP_EOL; $this->_pool[$free_slot_index] = new Job($cmd, $name); $this->_pool[$free_slot_index]->execute(); $this->_streams[$free_slot_index] = $this->_pool[$free_slot_index]->getPipe(); $this->_stderr[$free_slot_index] = $this->_pool[$free_slot_index]->getStderr(); return $free_slot_index; } public function stopJob($index) { if (!isset($this->_pool[$index])) return FALSE; unset($this->_streams[$index]); unset($this->_stderr[$index]); unset($this->_pool[$index]); } public function name($index) { if (!isset($this->_pool[$index])) return FALSE; return $this->_pool[$index]->name(); } public function pipeline($index, $nohup = FALSE) { if (!isset($this->_pool[$index])) return FALSE; return $this->_pool[$index]->pipeline($nohup); } public function stderr($index, $nohup = FALSE) { if (!isset($this->_pool[$index])) return FALSE; return $this->_pool[$index]->stderr($nohup); } private function broadcastMessage($msg) { // sends selected signal to all child processes foreach ($this->_pool as $pool_index => $job) { $job->message($msg); } } private function broadcastSignal($sig) { // sends selected signal to all child processes foreach ($this->_pool as $pool_index => $job) { $job->signal($sig); } } //       -   protected function dispatch($cmd) { if (is_callable($this->_dispatch_function)) { call_user_func($this->_dispatch_function, $cmd); } } //      public function registerDispatch($callable) { if (is_callable($callable)) { $this->_dispatch_function = $callable; } else { trigger_error("$callable is not callable func", E_USER_WARNING); } } //    private function dispatchMain($cmd) { $parts = explode(' ', $cmd); $arg = isset($parts[0]) ? $parts[0] : ''; $val = isset($parts[1]) ? $parts[1] : ''; switch ($arg) { case "exit": $this->broadcastSignal(SIGTERM); $this->_is_terminated = TRUE; break; case "test": echo 'sending test' . PHP_EOL; $this->broadcastMessage('test'); $this->broadcastSignal(SIGUSR1); break; case 'kill': $pool_index = $val !== '' && (int)$val >= 0 ? (int)$val : -1; if ($pool_index >= 0 && isset($this->_pool[$pool_index])) { $this->_pool[$pool_index]->signal(SIGKILL); } break; default: $this->dispatch($cmd); break; } return FALSE; } public function process() { stream_set_blocking(STDIN, 0); $write = NULL; $except = NULL; while (!$this->_is_terminated) { /* -   stream_select        */ $read = $this->_streams; $except = $this->_stderr; $read[$this->_pool_size] = STDIN; if (is_array($read) && count($read) > 0) { if (false === ($num_changed_streams = stream_select($read, $write, $except, 2))) { // oops } elseif ($num_changed_streams > 0) { //    if (is_array($read) && count($read) > 0) { $cmp_array = $this->_streams; $cmp_array[$this->_pool_size] = STDIN; foreach ($read as $resource) { $pool_index = array_search($resource, $cmp_array, TRUE); if ($pool_index === FALSE) continue; if ($pool_index == $this->_pool_size) { // stdin $content = ''; while ($cmd = fgets(STDIN)) { if (!$cmd) break; $content .= $cmd; } $content = trim($content); if ($content) { //  Process Manager    -  -      $this->dispatchMain($content); } //echo "stdin> " . $cmd; } else { //    $pool_content = $this->pipeline($pool_index, TRUE); $job_name = $this->name($pool_index); if ($pool_content) { echo $job_name ." ($pool_index)" . ': ' . $pool_content; } $pool_content = $this->stderr($pool_index, TRUE); if ($pool_content) { echo $job_name ." ($pool_index)" . ' [STDERR]: ' . $pool_content; } } } } } } $this->checkJobs(); } } } 


We have already learned how to manage some abstract tasks, it remains to implement a class for the processes themselves.

 class Executable { protected $_is_terminated = FALSE; protected $_cleanup_function = NULL; public function __construct() { //    pcntl_signal(SIGTERM, array('Executable', 'signalHandler')); pcntl_signal(SIGHUP, array('Executable', 'signalHandler')); pcntl_signal(SIGINT, array('Executable', 'signalHandler')); pcntl_signal(SIGUSR1, array('Executable', 'signalHandler')); pcntl_signal(SIGUSR2, array('Executable', 'signalHandler')); stream_set_blocking(STDIN, 0); stream_set_blocking(STDOUT, 0); stream_set_blocking(STDERR, 0); } public function __destruct() { //echo "destructor called in " . get_class($this) . PHP_EOL; if (!$this->_is_terminated) { $this->_is_terminated = TRUE; $this->isTerminated(); } } //   -    private function cleanup() { if (is_callable($this->_cleanup_function)) { call_user_func($this->_cleanup_function); } } protected function registerCleanup($callable) { if (is_callable($callable)) { $this->_cleanup_function = $callable; } else { trigger_error("$callable is not callable func", E_USER_WARNING); } } protected function isTerminated() { pcntl_signal_dispatch(); if ($this->_is_terminated) { $this->cleanup(); } return $this->_is_terminated; } protected function dispatch($cmd) { //      /* switch ($cmd) { } */ } protected function checkStdin() { $read = array(STDIN); $write = NULL; $except = NULL; if (is_array($read) && count($read) > 0) { if (false === ($num_changed_streams = stream_select($read, $write, $except, 2))) { // oops } elseif ($num_changed_streams > 0) { if (is_array($read) && count($read) > 0) { // stdin $content = ''; while ($cmd = fgets(STDIN)) { if (!$cmd) break; $content .= $cmd; } $this->dispatch($content); echo "recieved $content"; //echo "stdin> " . $cmd; } } } } //   protected function signalHandler ($signo) { switch ($signo) { case SIGTERM: case SIGHUP: case SIGINT: $this->_is_terminated = TRUE; //echo "exiting in ".get_class($this)."...\n"; break; case SIGUSR1: //echo "SIGUSR1 recieved\n"; $this->checkStdin(); break; case SIGUSR2: $this->_is_terminated = TRUE; echo "[SHUTDOWN] in " . get_class($this) . PHP_EOL; flush(); exit(1); break; default: // handle all other signals break; } } } 


As an example of using the process manager, we implement a “sleeping” process - a script that will sleep and unsubscribe about this in STDOUT

sleep.php
 class SleeperTest extends Executable { public function sleep() { for($i = 0; !$this->isTerminated() && $i < 10; $i++) { ob_start(); echo $i . "\n"; ob_end_flush(); sleep(5); } } } $s = new SleeperTest; $s->sleep(); 


pm.php
 $pm = new Job_Manager; $pm->startJob('php sleep.php', 'sleeper1'); $pm->startJob('php sleep.php', 'sleeper2'); // $pm->process(); 


The non-blocking descriptors used in the implementation and the stream_select function make it possible to avoid a problem typical of various kinds of daemons — high CPU utilization in the idle cycle. The proposed method does not have this drawback, everything works smoothly and calmly.

UPDATE . Laid out the source of classes on github https://github.com/xzag/php-pm

Source: https://habr.com/ru/post/148596/


All Articles