"The human fantasy is boundless on idiotic and ridiculous architecture ..."
<?php require_once 'Zend/Queue.php'; require_once 'Sibirix/Queue/Smemory/Package.php'; /** * * */ class Sibirix_Queue_Smemory_Manager { const ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE = "BROKEN_OPTIONS_FOR_INIT"; const ERROR_UNKNOWN_IDENTIFIER_STORAGE_MESSAGE = "UNKNOWN_IDENTIFIER_STORAGE"; const ERROR_CAN_NOT_ACTIVATE_THE_SEMAPHORE_MESSAGE = "CAN_NOT_ACTIVATE_THE_SEMAPHORE"; const SHM_VAR_LAST_CHAT_POSTS = 1; const SHM_VAR_LAST_SYSTEM_COMMAND = 2; const SHM_VAR_QUEUE_LAST_ID = 3; private $_count_chat_package_storage; private $_count_system_package_storage; private $_activeMQoptions; private $_queue; /** * semaphore key */ private $_sem_key; /** * Shared memory APP_KEY = one symbol */ private $_app_key; private $_shared_memory_id = false; private $_shared_memory_handler = false; private $_shared_memory_size; private $_sem_id = false; private $_last_queue_message_id = false; public function __construct($options = array()) { if (!is_array($options)) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['sem_key'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['app_key'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } else { $options['app_key'] = (string)$options['app_key']; if (strlen($options['app_key']) != 1) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } } if (!isset($options['activeMQ'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['shared_memory_size'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['count_chat_package_storage'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['count_system_package_storage'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } $this->_sem_key = $options['sem_key']; $this->_app_key = $options['app_key']; $this->_activeMQoptions = $options['activeMQ']; $this->_shared_memory_size = $options['shared_memory_size']; $this->_count_chat_package_storage = $options['count_chat_package_storage']; $this->_count_system_package_storage = $options['count_system_package_storage']; $this->_sem_id = sem_get($this->_sem_key, 1); $this->_shared_memory_id = ftok(".", $this->_app_key); $this->_shared_memory_handler = shm_attach($this->_shared_memory_id, $this->_shared_memory_size); } public function connectTopic() { if (!$this->_queue) { $this->_queue = new Zend_Queue('Activemq', $this->_activeMQoptions); } return $this->_queue; } public function receive() { $this->connectTopic(); $message = false; $msg_receive = $this->_queue->receive(1); foreach ( $msg_receive as $msg ) { $pkg = new Sibirix_Queue_Smemory_Package($msg->body); $this->_last_queue_message_id = $pkg->id; $message = $pkg; } return $message; } public function receiveAll() { $this->connectTopic(); $last_queue_message_id_mem = $this->_sem_getLastQueueMessageId(); $messages = array(); while(true) { $msg_receive = $this->_queue->receive(1); if (!$msg_receive->count()) {break;} foreach ( $msg_receive as $message ) { $pkg = new Sibirix_Queue_Smemory_Package($message->body); $this->_last_queue_message_id = $pkg->id; $messages[] = $pkg; } if ($this->_last_queue_message_id >= $last_queue_message_id_mem ) { $last_queue_message_id_mem = $this->_sem_getLastQueueMessageId(); if ($this->_last_queue_message_id >= $last_queue_message_id_mem ) { break; } } } return $messages; } public function send($action, $data, $recipient = false) { $this->connectTopic(); $package = new Sibirix_Queue_Smemory_Package($action, $data, $recipient); $this->_lock(); $last_queue_message_id_mem = $this->_getLastQueueMessageId(); $last_queue_message_id_mem++; $this->_setLastQueueMessageId($last_queue_message_id_mem); $this->_queue->send( $package->normalize( $last_queue_message_id_mem ) ); if ( $package->isPost() ) { $this->_savePackageToChatStorage( $package ); } else { $this->_savePackageToSystemStorage( $package ); } $this->_unlock(); return $last_queue_message_id_mem; } private function _getLastQueueMessageId() { $last_queue_message_id_mem = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID); if (!$last_queue_message_id_mem) { shm_put_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID, 0); $last_queue_message_id_mem = 0; } return $last_queue_message_id_mem; } private function _setLastQueueMessageId($id) { shm_put_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID, $id); return true; } private function _sem_getLastQueueMessageId() { $this->_lock(); $last_queue_message_id_mem = $this->_getLastQueueMessageId(); $this->_unlock(); return $last_queue_message_id_mem; } private function _savePackageToChatStorage( $package ) { $last_chat_posts = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS); if (!$last_chat_posts) {$last_chat_posts=array();} if (count($last_chat_posts) >= $this->_count_chat_package_storage) { while(count($last_chat_posts) >= $this->_count_chat_package_storage) { array_shift($last_chat_posts); } } array_push($last_chat_posts, $package->normalize()); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS, $last_chat_posts); } private function _savePackageToSystemStorage($package) { $last_sys_cmd = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND); if (!$last_sys_cmd) {$last_sys_cmd=array();} if (count($last_sys_cmd) >= $this->_count_system_package_storage) { while(count($last_sys_cmd) >= $this->_count_system_package_storage) { array_shift($last_sys_cmd); } } array_push($last_sys_cmd, $package->normalize()); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND, $last_sys_cmd); } public function getLastPackage($storage) { if ( $storage != self::SHM_VAR_LAST_SYSTEM_COMMAND && $storage != self::SHM_VAR_LAST_CHAT_POSTS ) { throw new Exception(self::ERROR_UNKNOWN_IDENTIFIER_STORAGE_MESSAGE); } $this->_lock(); $last_packages = shm_get_var($this->_shared_memory_handler, $storage); if (!$last_packages) {$last_packages=array();} $this->_unlock(); $return_last_packages = array(); foreach($last_packages as $normalize_package) { $package = new Sibirix_Queue_Smemory_Package($normalize_package); $return_last_packages[] = $package; } return $return_last_packages; } private function _lock() { if (!sem_acquire($this->_sem_id)) { throw new Exception(self::ERROR_CAN_NOT_ACTIVATE_THE_SEMAPHORE_MESSAGE); } } private function _unlock() { sem_release($this->_sem_id); } public function resetMemory() { $this->_lock(); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS, array()); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND, array()); $this->_setLastQueueMessageId(0); $this->_lock(); } public function getLastQueueMessageId() { $this->_lock(); $last_queue_message_id_mem = $this->_getLastQueueMessageId(); $this->_unlock(); return $last_queue_message_id_mem; } } ?>
<?php /** * * */ class Sibirix_Queue_Smemory_Package { const ERROR_NOT_SUPPORTED_TYPE_ACTION_MESSAGE = 'ERROR_NOT_SUPPORTED_TYPE_ACTION'; const ERROR_FOR_NORMALIZE_NEED_ID_MESSAGE = 'FOR_NORMALIZE_NEED_ID'; const CHAT_ACTION = 'pasteChatMessages'; private $_supported_actions = array( 'updateRating', 'setChannel', 'pasteChatMessages', 'updateTimer', 'connectToServer', 'statistics' ); private $_read_private_property = array( 'id','action','time','data','recipient' ); private $_action; private $_data; private $_recipient; private $_time; private $_id; public function __construct() { $fg_args = func_get_args(); if (count($fg_args) == 1) { list( $this->_id, $this->_action, $this->_time, $this->_data, $this->_recipient ) = unserialize($fg_args[0]); } else { list($action, $data, $recipient) = $fg_args; $this->_validation($action, $data, $recipient); $this->_action = $action; $this->_data = $data; $this->_time = $this->_getmicrotime(); $this->_recipient = $recipient; } } public function normalize( $id = false ) { if ($id) { $this->_id = $id; } else { if ($this->_id === null) { throw new Exception(self::ERROR_FOR_NORMALIZE_NEED_ID_MESSAGE); } } return serialize(array( 0 => $this->_id, 1 => $this->_action, 2 => $this->_time, 3 => $this->_data, 4 => $this->_recipient )); } private function _validation($action, $data, $recipient) { if (!in_array($action, $this->_supported_actions)) { throw new Exception(self::ERROR_NOT_SUPPORTED_TYPE_ACTION_MESSAGE); } return true; } private function _getmicrotime() { list($usec, $sec) = explode(' ', microtime()); return number_format(((float)$usec + (float)$sec), 2, '.', ''); } public function __get($key) { if (in_array($key, $this->_read_private_property)) { return $this->{'_'.$key}; } return null; } public function isPost() { if ( $this->_action == self::CHAT_ACTION ) { return true; } return false; } } ?>
Source: https://habr.com/ru/post/119863/
All Articles