📜 ⬆️ ⬇️

Simple system of demons for Yii2

In this article I will try to reveal the main nuances of the implementation of the system of demons for PHP and teach the console commands Yii2 to be demonized.

For the last 3 years I have been developing and developing a fairly large corporate portal for one group of companies. I, like many, have encountered a problem when the solution of the task that a business requires does not fit into any timeouts. Make a report in excel on 300 thousand lines, send a newsletter to 1500 letters and so on. Naturally, such tasks should be solved by background tasks, demons and crontab-s. Within the framework of this article I will not give a comparison of crowns and demons, we have chosen demons for solving such problems. At the same time, an important requirement for us was the opportunity to have access to everything that is already written for the backend, respectively, the demons should be a continuation of the Yii2 framework. For the same reason, ready-made solutions like phpDaemon did not suit us.

Under the cut a ready-made solution for the implementation of demons on Yii2, which I got.

The theme of demons in PHP is raised with enviable regularity ( one , two , three , and the guys from badoo even restart them without losing connections ). Maybe my bike is a quick way to run demons on a popular framework.
')

Some basics


In order for a process to become a demon, you need:
  1. Unlink script from console and standard input / output streams;
  2. Wrap the execution of the main code in an infinite loop;
  3. Implement process control mechanisms.

Unbound from the console

To begin, close the standard streams STDIN, STOUT, STDERR. But PHP cannot do without them, so it will make the first open stream standard, so open them in / dev / null.

if (is_resource(STDIN)) { fclose(STDIN); $stdIn = fopen('/dev/null', 'r'); } if (is_resource(STDOUT)) { fclose(STDOUT); $stdOut = fopen('/dev/null', 'ab'); } if (is_resource(STDERR)) { fclose(STDERR); $stdErr = fopen('/dev/null', 'ab'); } 

Next, we fork the process and fork the main process. The donor process is complete.
 $pid = pcntl_fork(); if ($pid == -1) { $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error'); } elseif ($pid) { $this->halt(self::EXIT_CODE_NORMAL); } else { posix_setsid(); } 

Endless cycle and control

I think with the cycle everything is understood. But the necessary control mechanisms should be considered in more detail.

Fixing processes already running

Everything is simple - after the launch, the daemon puts its PID in a file with its name, and at the completion of its work, this file is demolished.

POSIX signal processing

The daemon must correctly process signals from the operating system, i.e. upon receipt of a signal, SIGTERM should smoothly complete its work. This is achieved by several things: first, we define the function that will process the received signals:

 pcntl_signal(SIGTERM, ['MyClassName', 'mySignalHandlerFunction']); 

Second, in the signal processing function, we set the assignment of a certain static class property to true.
 static function signalHandler($signo, $pid = null, $status = null) { self::$stopFlag = true; } 

And third, our infinite loop should now be not so infinite:
 while (!self::$stopFlag) { pcntl_signal_dispatch(); } 

Signal processing features in different versions of PHP
In PHP <5.3.0, a special declare directive (ticks = N) was used to distribute signals. Where tick is an event that happens every N low-level operations performed by the parser inside the declare block. The distribution of signals was carried out in accordance with the setting. Too small a value resulted in a failure in performance, and too large a result in untimely signal processing.

In PHP> = 5.3.0, the pcntl_signal_dispatch () function appeared, which can be called for the manual distribution of signals, which we do after each iteration.
Finally, in PHP 7.1, the asynchronous distribution of signals will become available, which will make it possible to receive signals almost instantly without overhead and manual function calls.

Now when receiving a command from the operating system, the script will quietly complete the current iteration and exit the loop.

Memory Leak Control

Unfortunately, if the demon works for a long time without restarting, memory starts flowing from it. The intensity of the leak depends on what features you use. From our practice, the most heavily flowed are the demons that work with remote SOAP services through the standard SoapClient class. So you need to monitor this and periodically restart them. Let’s complement our cycle with leak control:

 while (!self::$stopFlag) { if (memory_get_usage() > $this->memoryLimit) { break; } pcntl_signal_dispatch(); } 

Where is the code for Yii?


Sources are laid out on Github - yii2-daemon , the package is also available for installation via composer.

The package consists of only 2 abstract classes - the base class DaemonController and the class WatcherDaemonController.

DaemonController
 <?php namespace vyants\daemon; use yii\base\NotSupportedException; use yii\console\Controller; use yii\helpers\Console; /** * Class DaemonController * * @author Vladimir Yants <vladimir.yants@gmail.com> */ abstract class DaemonController extends Controller { const EVENT_BEFORE_JOB = "beforeJob"; const EVENT_AFTER_JOB = "afterJob"; const EVENT_BEFORE_ITERATION = "beforeIteration"; const EVENT_AFTER_ITERATION = "afterIteration"; /** * @var $demonize boolean Run controller as Daemon * @default false */ public $demonize = false; /** * @var $isMultiInstance boolean allow daemon create a few instances * @see $maxChildProcesses * @default false */ public $isMultiInstance = false; /** * @var $parentPID int main procces pid */ protected $parentPID; /** * @var $maxChildProcesses int max daemon instances * @default 10 */ public $maxChildProcesses = 10; /** * @var $currentJobs [] array of running instances */ protected static $currentJobs = []; /** * @var int Memory limit for daemon, must bee less than php memory_limit * @default 32M */ protected $memoryLimit = 268435456; /** * @var boolean used for soft daemon stop, set 1 to stop */ private static $stopFlag = false; /** * @var int Delay between task list checking * @default 5sec */ protected $sleep = 5; protected $pidDir = "@runtime/daemons/pids"; protected $logDir = "@runtime/daemons/logs"; private $stdIn; private $stdOut; private $stdErr; /** * Init function */ public function init() { parent::init(); //set PCNTL signal handlers pcntl_signal(SIGTERM, ['vyants\daemon\DaemonController', 'signalHandler']); pcntl_signal(SIGINT, ['vyants\daemon\DaemonController', 'signalHandler']); pcntl_signal(SIGHUP, ['vyants\daemon\DaemonController', 'signalHandler']); pcntl_signal(SIGUSR1, ['vyants\daemon\DaemonController', 'signalHandler']); pcntl_signal(SIGCHLD, ['vyants\daemon\DaemonController', 'signalHandler']); } function __destruct() { $this->deletePid(); } /** * Adjusting logger. You can override it. */ protected function initLogger() { $targets = \Yii::$app->getLog()->targets; foreach ($targets as $name => $target) { $target->enabled = false; } $config = [ 'levels' => ['error', 'warning', 'trace', 'info'], 'logFile' => \Yii::getAlias($this->logDir) . DIRECTORY_SEPARATOR . $this->getProcessName() . '.log', 'logVars' => [], 'except' => [ 'yii\db\*', // Don't include messages from db ], ]; $targets['daemon'] = new \yii\log\FileTarget($config); \Yii::$app->getLog()->targets = $targets; \Yii::$app->getLog()->init(); } /** * Daemon worker body * * @param $job * * @return boolean */ abstract protected function doJob($job); /** * Base action, you can\t override or create another actions * @return bool * @throws NotSupportedException */ final public function actionIndex() { if ($this->demonize) { $pid = pcntl_fork(); if ($pid == -1) { $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error'); } elseif ($pid) { $this->cleanLog(); $this->halt(self::EXIT_CODE_NORMAL); } else { posix_setsid(); $this->closeStdStreams(); } } $this->changeProcessName(); //run loop return $this->loop(); } /** * Set new process name */ protected function changeProcessName() { //rename process if (version_compare(PHP_VERSION, '5.5.0') >= 0) { cli_set_process_title($this->getProcessName()); } else { if (function_exists('setproctitle')) { setproctitle($this->getProcessName()); } else { \Yii::error('Can\'t find cli_set_process_title or setproctitle function'); } } } /** * Close std streams and open to /dev/null * need some class properties */ protected function closeStdStreams() { if (is_resource(STDIN)) { fclose(STDIN); $this->stdIn = fopen('/dev/null', 'r'); } if (is_resource(STDOUT)) { fclose(STDOUT); $this->stdOut = fopen('/dev/null', 'ab'); } if (is_resource(STDERR)) { fclose(STDERR); $this->stdErr = fopen('/dev/null', 'ab'); } } /** * Prevent non index action running * * @param \yii\base\Action $action * * @return bool * @throws NotSupportedException */ public function beforeAction($action) { if (parent::beforeAction($action)) { $this->initLogger(); if ($action->id != "index") { throw new NotSupportedException( "Only index action allowed in daemons. So, don't create and call another" ); } return true; } else { return false; } } /** *    * * @param string $actionID * * @return array */ public function options($actionID) { return [ 'demonize', 'taskLimit', 'isMultiInstance', 'maxChildProcesses', ]; } /** * Extract current unprocessed jobs * You can extract jobs from DB (DataProvider will be great), queue managers (ZMQ, RabbiMQ etc), redis and so on * * @return array with jobs */ abstract protected function defineJobs(); /** * Fetch one task from array of tasks * * @param Array * * @return mixed one task */ protected function defineJobExtractor(&$jobs) { return array_shift($jobs); } /** * Main Loop * * * @return boolean 0|1 */ final private function loop() { if (file_put_contents($this->getPidPath(), getmypid())) { $this->parentPID = getmypid(); \Yii::trace('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' started.'); while (!self::$stopFlag) { if (memory_get_usage() > $this->memoryLimit) { \Yii::trace('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' used ' . memory_get_usage() . ' bytes on ' . $this->memoryLimit . ' bytes allowed by memory limit'); break; } $this->trigger(self::EVENT_BEFORE_ITERATION); $this->renewConnections(); $jobs = $this->defineJobs(); if ($jobs && !empty($jobs)) { while (($job = $this->defineJobExtractor($jobs)) !== null) { //if no free workers, wait if ($this->isMultiInstance && (count(static::$currentJobs) >= $this->maxChildProcesses)) { \Yii::trace('Reached maximum number of child processes. Waiting...'); while (count(static::$currentJobs) >= $this->maxChildProcesses) { sleep(1); pcntl_signal_dispatch(); } \Yii::trace( 'Free workers found: ' . ($this->maxChildProcesses - count(static::$currentJobs)) . ' worker(s). Delegate tasks.' ); } pcntl_signal_dispatch(); $this->runDaemon($job); } } else { sleep($this->sleep); } pcntl_signal_dispatch(); $this->trigger(self::EVENT_AFTER_ITERATION); } \Yii::info('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' is stopped.'); return self::EXIT_CODE_NORMAL; } $this->halt(self::EXIT_CODE_ERROR, 'Can\'t create pid file ' . $this->getPidPath()); } /** * Delete pid file */ protected function deletePid() { $pid = $this->getPidPath(); if (file_exists($pid)) { if (file_get_contents($pid) == getmypid()) { unlink($this->getPidPath()); } } else { \Yii::error('Can\'t unlink pid file ' . $this->getPidPath()); } } /** * PCNTL signals handler * * @param $signo * @param null $pid * @param null $status */ final static function signalHandler($signo, $pid = null, $status = null) { switch ($signo) { case SIGINT: case SIGTERM: //shutdown self::$stopFlag = true; break; case SIGHUP: //restart, not implemented break; case SIGUSR1: //user signal, not implemented break; case SIGCHLD: if (!$pid) { $pid = pcntl_waitpid(-1, $status, WNOHANG); } while ($pid > 0) { if ($pid && isset(static::$currentJobs[$pid])) { unset(static::$currentJobs[$pid]); } $pid = pcntl_waitpid(-1, $status, WNOHANG); } break; } } /** * Tasks runner * * @param string $job * * @return boolean */ final public function runDaemon($job) { if ($this->isMultiInstance) { $this->flushLog(); $pid = pcntl_fork(); if ($pid == -1) { return false; } elseif ($pid !== 0) { static::$currentJobs[$pid] = true; return true; } else { $this->cleanLog(); $this->renewConnections(); //child process must die $this->trigger(self::EVENT_BEFORE_JOB); $status = $this->doJob($job); $this->trigger(self::EVENT_AFTER_JOB); if ($status) { $this->halt(self::EXIT_CODE_NORMAL); } else { $this->halt(self::EXIT_CODE_ERROR, 'Child process #' . $pid . ' return error.'); } } } else { $this->trigger(self::EVENT_BEFORE_JOB); $status = $this->doJob($job); $this->trigger(self::EVENT_AFTER_JOB); return $status; } } /** * Stop process and show or write message * * @param $code int -1|0|1 * @param $message string */ protected function halt($code, $message = null) { if ($message !== null) { if ($code == self::EXIT_CODE_ERROR) { \Yii::error($message); if (!$this->demonize) { $message = Console::ansiFormat($message, [Console::FG_RED]); } } else { \Yii::trace($message); } if (!$this->demonize) { $this->writeConsole($message); } } if ($code !== -1) { \Yii::$app->end($code); } } /** * Renew connections * @throws \yii\base\InvalidConfigException * @throws \yii\db\Exception */ protected function renewConnections() { if (isset(\Yii::$app->db)) { \Yii::$app->db->close(); \Yii::$app->db->open(); } } /** * Show message in console * * @param $message */ private function writeConsole($message) { $out = Console::ansiFormat('[' . date('dmY H:i:s') . '] ', [Console::BOLD]); $this->stdout($out . $message . "\n"); } /** * @param string $daemon * * @return string */ public function getPidPath($daemon = null) { $dir = \Yii::getAlias($this->pidDir); if (!file_exists($dir)) { mkdir($dir, 0744, true); } $daemon = $this->getProcessName($daemon); return $dir . DIRECTORY_SEPARATOR . $daemon; } /** * @return string */ public function getProcessName($route = null) { if (is_null($route)) { $route = \Yii::$app->requestedRoute; } return str_replace(['/index', '/'], ['', '.'], $route); } /** * If in daemon mode - no write to console * * @param string $string * * @return bool|int */ public function stdout($string) { if (!$this->demonize && is_resource(STDOUT)) { return parent::stdout($string); } else { return false; } } /** * If in daemon mode - no write to console * * @param string $string * * @return int */ public function stderr($string) { if (!$this->demonize && is_resource(\STDERR)) { return parent::stderr($string); } else { return false; } } /** * Empty log queue */ protected function cleanLog() { \Yii::$app->log->logger->messages = []; } /** * Empty log queue */ protected function flushLog($final = false) { \Yii::$app->log->logger->flush($final); } } 


WatcherDaemonController
 <?php namespace vyants\daemon\controllers; use vyants\daemon\DaemonController; /** * watcher-daemon - check another daemons and run it if need * * @author Vladimir Yants <vladimir.yants@gmail.com> */ abstract class WatcherDaemonController extends DaemonController { /** * @var string subfolder in console/controllers */ public $daemonFolder = 'daemons'; /** * @var boolean flag for first iteration */ protected $firstIteration = true; /** * Prevent double start */ public function init() { $pid_file = $this->getPidPath(); if (file_exists($pid_file) && ($pid = file_get_contents($pid_file)) && file_exists("/proc/$pid")) { $this->halt(self::EXIT_CODE_ERROR, 'Another Watcher is already running.'); } parent::init(); } /** * Job processing body * * @param $job array * * @return boolean */ protected function doJob($job) { $pid_file = $this->getPidPath($job['daemon']); \Yii::trace('Check daemon ' . $job['daemon']); if (file_exists($pid_file)) { $pid = file_get_contents($pid_file); if ($this->isProcessRunning($pid)) { if ($job['enabled']) { \Yii::trace('Daemon ' . $job['daemon'] . ' running and working fine'); return true; } else { \Yii::warning('Daemon ' . $job['daemon'] . ' running, but disabled in config. Send SIGTERM signal.'); if (isset($job['hardKill']) && $job['hardKill']) { posix_kill($pid, SIGKILL); } else { posix_kill($pid, SIGTERM); } return true; } } } \Yii::error('Daemon pid not found.'); if ($job['enabled']) { \Yii::trace('Try to run daemon ' . $job['daemon'] . '.'); $command_name = $job['daemon'] . DIRECTORY_SEPARATOR . 'index'; //flush log before fork $this->flushLog(true); //run daemon $pid = pcntl_fork(); if ($pid === -1) { $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() returned error'); } elseif ($pid === 0) { $this->cleanLog(); \Yii::$app->requestedRoute = $command_name; \Yii::$app->runAction("$command_name", ['demonize' => 1]); $this->halt(0); } else { $this->initLogger(); \Yii::trace('Daemon ' . $job['daemon'] . ' is running with pid ' . $pid); } } \Yii::trace('Daemon ' . $job['daemon'] . ' is checked.'); return true; } /** * @return array */ protected function defineJobs() { if ($this->firstIteration) { $this->firstIteration = false; } else { sleep($this->sleep); } return $this->getDaemonsList(); } /** * Daemons for check. Better way - get it from database * [ * ['daemon' => 'one-daemon', 'enabled' => true] * ... * ['daemon' => 'another-daemon', 'enabled' => false] * ] * @return array */ abstract protected function getDaemonsList(); /** * @param $pid * * @return bool */ public function isProcessRunning($pid) { return file_exists("/proc/$pid"); } } 


DaemonController

This is the parent class for all demons. Here is a minimal daemon example:

 <?php namespace console\controllers\daemons; use vyants\daemon\DaemonController; class TestController extends DaemonController { /** * @param $job * * @return boolean */ protected function doJob($job) { //do some job return true; } /** * @return array */ protected function defineJobs() { return []; } } 

The defineJobs () function must return a set of tasks to perform. By default, it is expected that it will return an array. If you want to return, say MongoCursor, you will need to override the defineJobExtractor (). The doJob () function should receive one task for its input, perform the necessary operations with it, and mark this task at the source as completed so that it does not fall a second time.

Possible parameters and settings:


Connection loss problem

When performing a fork () operation, the connections established in the parent process cease to work in child processes. In order to avoid this problem, after all the forks, the call to the renewConnections () function is placed. By default, this function only reconnects Yii :: $ app-> db, but you can redefine it, and add other sources that you need to maintain connection to in child processes.

Logging

Demons reconfigure the standard Yii logger for themselves. If you are not comfortable with the default behavior, override the initLogger () function.

WatcherDaemonController

This is almost a ready-watch demon. The task of this daemon to monitor other demons, start and stop them if necessary. It cannot start twice, so you can safely put its launch in the crontab. To start using it, you need to create a folder daemons in console / controllers and put a class like:

 <?php namespace console\controllers\daemons; use vyants\daemon\controllers\WatcherDaemonController; /** * Class WatcherController */ class WatcherController extends WatcherDaemonController { protected $sleep = 10; /** * @return array */ protected function getDaemonsList() { return [ ['daemon' => 'daemons/test', 'enabled' => true] ]; } } 

It is required to define only one function - getDaemonsList (), which will return a list of demons that need to be monitored. In its simplest form, this is an array embedded in the code, but in this case you will not be able to change the list on the fly. Put a list of demons in the database or a separate file and get it every time from there. In this case, the watcher will be able to turn the daemon on or off without its own restart.

Conclusion


At the moment, we have more than 50 demons performing various tasks, ranging from sending e-mails and ending with generating reports and updating data between different systems.

Demons work with different sources of tasks - MySQL, RabbitMQ, and even remote web services. The flight is normal.
Of course, demons on php will not compare with the same demons on Go. But the high speed of development, the ability to reuse the already written code and the lack of the need to teach the team another language outweigh the disadvantages.

Source: https://habr.com/ru/post/307464/


All Articles