📜 ⬆️ ⬇️

Push + ActiveMQ - ZendFramework = ... or the story of a single drive project


One fine morning a young guy ran into our office, with an ambitious idea and “means for realization” in his pocket. “You go to the site, and there - the TV. You can connect to it through your webcam. Only one person can broadcast at a time, the rest are waiting for their turn (but you can see screenshots from their webcams). The task of each - to keep on the air, as long as possible. If the speaker likes the public - everyone shakes “Cool!”, If you disappoint - “Go away!”. And the person is replaced by the next in line. Well, you can write in the chat ".

A good idea is a drive project. We draw a prototype, we decide to implement the update of the chat, user list, rating, etc. using push technology. This is when, after loading the page, the connection between the client and the server does not close, but continues to be used to send the server any events to the client.

Caution! This shnyaga can kill your server! By the way, if you suddenly decide to write a high-loaded Scandinavian auction - the truth and funny pictures are somewhere near, under the cut.
')

A couple of words about push



The use of PUSH technology is justified where it is necessary to quickly update many clients with certain changes on the server side. In general, the client and server communication looks like this.


Obviously, for each client, we need one constantly running server-side push process called Comet. This is expensive in terms of server memory usage, but an alternative to this: constant requests to the server are an order of magnitude worse, slower and heavier. True, there is another good option - working directly with sockets, but this is a separate topic.

Work process

When a client performs an action (for example, writes to a chat or votes), he sends an ajax request to the server with the appropriate command. The result of this command must be distributed to all connected clients. To do this, the script that processes the ajax request must notify all the Comets that have already transferred the update command to their clients, and those in turn will accurately draw the change.

Needless to say, quite a few requests for updates can come at the same time. For this, a queue of such events should be organized, from which Comets can take events and send them further.

Something like this happens when we wrote something in the chat.


There are two key points in this diagram:
  1. The implementation of the comet (a good review provided by the user balepc )
  2. The implementation of the queue. Let's look at them in more detail.


Queue implementation



For our project it was necessary that:


In simple cases of interaction between two processes, you can use the functions of working with queues built into PHP, or the Zend_Queue wrapper. However, they do not provide event distribution to all listening processes (comets) at once.

Experimenting with queues


One of the possible solutions is the queue manager .
Method of implementation:


Like that:


// when we drew this picture on a piece of paper - we were already scared, but the brave cyclist can try to bring the implementation to the end.

Even if we make such a manager, comets will wake up only when new events arrive , and this is clearly not enough for us (remember to know who is alive now?).

Unfortunately, standard php functions make a queue with timeouts impossible. The thought came to mind to implement some kind of third-party server process, which would periodically "awaken" the comet, throwing events into the queue. By the way, the same process can also remove dead queue IDs from shared memory. Here is what we get in the end:


When looking at this picture, the statement of Lugovsky is involuntarily remembered.
"The human fantasy is boundless on idiotic and ridiculous architecture ..."


We begin to analyze existing solutions for queuing in terms of usability in our application.


Our choice fell on Apache ActiveMQ (really chose between him and RabbitMQ)
Pros:


Minus : Can not return the number of messages in the queue

A more detailed overview of queue managers from aleks_raiden habrauzer can be found here
http://habrahabr.ru/blogs/hi/44907/
http://habrahabr.ru/blogs/hi/45891/

The only thing that could not be implemented directly using the Topics mechanism was the restoration of events that could occur when the comet's connection was broken before it rose (this could be done by abandoning the Topics mechanism and using individual queues for each process, but ...)

We decided to put fresh N events in shared memory so that the comet could instantly restore its context. As a result, the architecture was the following:


Branch Marks



To implement the application, we used ZendFramework. Ajax requests came to the appropriate controllers, which processed them, and put the messages into a queue, from where comets took the messages and sent them to clients. The comets themselves are also implemented as an Action inside the ZendFramework controller.

Having launched the Apache Branchmark, we were horrified to find that the system actively eats the processor at 25 simultaneous accesses.

300 requests for 25 simultaneous threads

As it turned out, initialization of ZendFramework and connection to the database took 99% of the time. O-ops Well, this is old news, and the cure for this is known - it is necessary to exclude the inclusion of the ZendFramework classes, except for the really necessary ones. And all the necessary classes combine into one file.

We concluded that it would be necessary to abandon the MVC framework in processing ajax requests for voting and chat, as well as in the implementation of the comet (since its launch speed is very critical for browsers that support only Long poll). We also got rid of any queries to the database in the processing of ajax requests. For what we carried out all necessary data in shared memory. If necessary, the synchronization of shared memory and the database was performed by a third-party process.

Total



300 requests per 100 simultaneous streams

The reaction to the events from the client has become almost instantaneous. Hurray, victory! Lightweight comets use memory and the processor is very economical. The narrow neck of the application is the distribution of the video stream (but if you do a Scandinavian auction, where effectively using a similar queue architecture is not your problem ;-), we can write about it in the following articles if we withstand the habraeffect.

Below is published our queue manager.
<?php require_once 'Zend/Queue.php'; require_once 'Sibirix/Queue/Smemory/Package.php'; /** *   * */ class Sibirix_Queue_Smemory_Manager { const ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE = "BROKEN_OPTIONS_FOR_INIT"; const ERROR_UNKNOWN_IDENTIFIER_STORAGE_MESSAGE = "UNKNOWN_IDENTIFIER_STORAGE"; const ERROR_CAN_NOT_ACTIVATE_THE_SEMAPHORE_MESSAGE = "CAN_NOT_ACTIVATE_THE_SEMAPHORE"; const SHM_VAR_LAST_CHAT_POSTS = 1; const SHM_VAR_LAST_SYSTEM_COMMAND = 2; const SHM_VAR_QUEUE_LAST_ID = 3; private $_count_chat_package_storage; private $_count_system_package_storage; private $_activeMQoptions; private $_queue; /** * semaphore key */ private $_sem_key; /** * Shared memory APP_KEY = one symbol */ private $_app_key; private $_shared_memory_id = false; private $_shared_memory_handler = false; private $_shared_memory_size; private $_sem_id = false; private $_last_queue_message_id = false; public function __construct($options = array()) { if (!is_array($options)) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['sem_key'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['app_key'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } else { $options['app_key'] = (string)$options['app_key']; if (strlen($options['app_key']) != 1) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } } if (!isset($options['activeMQ'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['shared_memory_size'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['count_chat_package_storage'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['count_system_package_storage'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } $this->_sem_key = $options['sem_key']; $this->_app_key = $options['app_key']; $this->_activeMQoptions = $options['activeMQ']; $this->_shared_memory_size = $options['shared_memory_size']; $this->_count_chat_package_storage = $options['count_chat_package_storage']; $this->_count_system_package_storage = $options['count_system_package_storage']; $this->_sem_id = sem_get($this->_sem_key, 1); $this->_shared_memory_id = ftok(".", $this->_app_key); $this->_shared_memory_handler = shm_attach($this->_shared_memory_id, $this->_shared_memory_size); } public function connectTopic() { if (!$this->_queue) { $this->_queue = new Zend_Queue('Activemq', $this->_activeMQoptions); } return $this->_queue; } public function receive() { $this->connectTopic(); $message = false; $msg_receive = $this->_queue->receive(1); foreach ( $msg_receive as $msg ) { $pkg = new Sibirix_Queue_Smemory_Package($msg->body); $this->_last_queue_message_id = $pkg->id; $message = $pkg; } return $message; } public function receiveAll() { $this->connectTopic(); $last_queue_message_id_mem = $this->_sem_getLastQueueMessageId(); $messages = array(); while(true) { $msg_receive = $this->_queue->receive(1); if (!$msg_receive->count()) {break;} foreach ( $msg_receive as $message ) { $pkg = new Sibirix_Queue_Smemory_Package($message->body); $this->_last_queue_message_id = $pkg->id; $messages[] = $pkg; } if ($this->_last_queue_message_id >= $last_queue_message_id_mem ) { $last_queue_message_id_mem = $this->_sem_getLastQueueMessageId(); if ($this->_last_queue_message_id >= $last_queue_message_id_mem ) { break; } } } return $messages; } public function send($action, $data, $recipient = false) { $this->connectTopic(); $package = new Sibirix_Queue_Smemory_Package($action, $data, $recipient); $this->_lock(); $last_queue_message_id_mem = $this->_getLastQueueMessageId(); $last_queue_message_id_mem++; $this->_setLastQueueMessageId($last_queue_message_id_mem); $this->_queue->send( $package->normalize( $last_queue_message_id_mem ) ); if ( $package->isPost() ) { $this->_savePackageToChatStorage( $package ); } else { $this->_savePackageToSystemStorage( $package ); } $this->_unlock(); return $last_queue_message_id_mem; } private function _getLastQueueMessageId() { $last_queue_message_id_mem = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID); if (!$last_queue_message_id_mem) { shm_put_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID, 0); $last_queue_message_id_mem = 0; } return $last_queue_message_id_mem; } private function _setLastQueueMessageId($id) { shm_put_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID, $id); return true; } private function _sem_getLastQueueMessageId() { $this->_lock(); $last_queue_message_id_mem = $this->_getLastQueueMessageId(); $this->_unlock(); return $last_queue_message_id_mem; } private function _savePackageToChatStorage( $package ) { $last_chat_posts = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS); if (!$last_chat_posts) {$last_chat_posts=array();} if (count($last_chat_posts) >= $this->_count_chat_package_storage) { while(count($last_chat_posts) >= $this->_count_chat_package_storage) { array_shift($last_chat_posts); } } array_push($last_chat_posts, $package->normalize()); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS, $last_chat_posts); } private function _savePackageToSystemStorage($package) { $last_sys_cmd = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND); if (!$last_sys_cmd) {$last_sys_cmd=array();} if (count($last_sys_cmd) >= $this->_count_system_package_storage) { while(count($last_sys_cmd) >= $this->_count_system_package_storage) { array_shift($last_sys_cmd); } } array_push($last_sys_cmd, $package->normalize()); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND, $last_sys_cmd); } public function getLastPackage($storage) { if ( $storage != self::SHM_VAR_LAST_SYSTEM_COMMAND && $storage != self::SHM_VAR_LAST_CHAT_POSTS ) { throw new Exception(self::ERROR_UNKNOWN_IDENTIFIER_STORAGE_MESSAGE); } $this->_lock(); $last_packages = shm_get_var($this->_shared_memory_handler, $storage); if (!$last_packages) {$last_packages=array();} $this->_unlock(); $return_last_packages = array(); foreach($last_packages as $normalize_package) { $package = new Sibirix_Queue_Smemory_Package($normalize_package); $return_last_packages[] = $package; } return $return_last_packages; } private function _lock() { if (!sem_acquire($this->_sem_id)) { throw new Exception(self::ERROR_CAN_NOT_ACTIVATE_THE_SEMAPHORE_MESSAGE); } } private function _unlock() { sem_release($this->_sem_id); } public function resetMemory() { $this->_lock(); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS, array()); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND, array()); $this->_setLastQueueMessageId(0); $this->_lock(); } public function getLastQueueMessageId() { $this->_lock(); $last_queue_message_id_mem = $this->_getLastQueueMessageId(); $this->_unlock(); return $last_queue_message_id_mem; } } ?> 


 <?php /** *    * */ class Sibirix_Queue_Smemory_Package { const ERROR_NOT_SUPPORTED_TYPE_ACTION_MESSAGE = 'ERROR_NOT_SUPPORTED_TYPE_ACTION'; const ERROR_FOR_NORMALIZE_NEED_ID_MESSAGE = 'FOR_NORMALIZE_NEED_ID'; const CHAT_ACTION = 'pasteChatMessages'; private $_supported_actions = array( 'updateRating', 'setChannel', 'pasteChatMessages', 'updateTimer', 'connectToServer', 'statistics' ); private $_read_private_property = array( 'id','action','time','data','recipient' ); private $_action; private $_data; private $_recipient; private $_time; private $_id; public function __construct() { $fg_args = func_get_args(); if (count($fg_args) == 1) { list( $this->_id, $this->_action, $this->_time, $this->_data, $this->_recipient ) = unserialize($fg_args[0]); } else { list($action, $data, $recipient) = $fg_args; $this->_validation($action, $data, $recipient); $this->_action = $action; $this->_data = $data; $this->_time = $this->_getmicrotime(); $this->_recipient = $recipient; } } public function normalize( $id = false ) { if ($id) { $this->_id = $id; } else { if ($this->_id === null) { throw new Exception(self::ERROR_FOR_NORMALIZE_NEED_ID_MESSAGE); } } return serialize(array( 0 => $this->_id, 1 => $this->_action, 2 => $this->_time, 3 => $this->_data, 4 => $this->_recipient )); } private function _validation($action, $data, $recipient) { if (!in_array($action, $this->_supported_actions)) { throw new Exception(self::ERROR_NOT_SUPPORTED_TYPE_ACTION_MESSAGE); } return true; } private function _getmicrotime() { list($usec, $sec) = explode(' ', microtime()); return number_format(((float)$usec + (float)$sec), 2, '.', ''); } public function __get($key) { if (in_array($key, $this->_read_private_property)) { return $this->{'_'.$key}; } return null; } public function isPost() { if ( $this->_action == self::CHAT_ACTION ) { return true; } return false; } } ?> 

findings



Now, looking at what the path was made, it becomes just scary. I hope this article will help habravchanam draw the right conclusions and not repeat our mistakes in the future.

Thank you for your attention, I will be glad to hear questions and comments and will be happy to answer them.

UPD. At the request of readers - publish a link to the project http://feelyoustar.com/

Source: https://habr.com/ru/post/119863/


All Articles