The article
RabbitMQ AMQP for PHP ,
New Ideas for PHP , published a draft of the PHP-AMQP API
In continuation of previously published ideas, I present their implementation, which is more than the first version.
extension code can be found
here the description of the project and svn until the old version (1.0), translated into English
Brief description of version 2.0:
')
Class AMQPConnection - opening a logical connection, including a channel connection.
Constructor:
APMQConection :: APMQConection ([array params])
Parameters (all optional):
- host = [localhost]
- port = [5672]
- login = [guest]
- psw = [guest]
- vhost = [/]
Exception - no logical or physical connection.
Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1', 'vhost'=>'v1' ) );
AMQPExchange Exchange Class
constructor:
creating exchange, if the name is specified otherwise class initialization
AMQPExchange :: AMQPExchange (APMQConection cnn, string [name])
name - the name of the exchange
Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
Ad exchange:
proto bool AMQPEexchange :: declare ([string name], [string type = direct], [bit params]);
name - the name of the exchange
type - type of exchange, allowed types: direct, topic & fanout
params - parameters:
- AMQP_PASSIVE
- AMQP_DURABLE
- AMQP_AUTODELETE
- AMQP_INTERNAL
returns the result of the operation
Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn);
$exchange->declare('ex_name', 'topic',AMQP_DURABLE );
Removing exchange:
proto bool AMQPExchange :: delete ([string name], [bit params]);
name - the name of the exchange
params - parameters:
returns the result of the operation
Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$res = $exchange->delete();
// - , .
Binding exchange to the queue:
proto bool AMQPExchange :: bind (string queueName, string routingKey);
queueName - the name of the queue
key - routing-key, route key, string
returns the result of the operation
Example:
$msg = " , ...";
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$exchange->bind('mylogin','spb.news')
$res = $exchange->publish( $msg, 'spb.news');
Publication:
proto bool AMQPExchange :: publish (string msg, [string key], bit [parms]);
Posting a message with the key key for a topic type topic or direct
msg - message, string
key - routing-key, route key, string
params - parameters:
- AMQP_MANDATORY
- AMQP_IMMEDIATE
returns the result of the operation
Example:
$msg = " ...";
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$res = $exchange->publish( $msg, 'spb.news');
Queue Class AMQPQueue
constructor - class initialization
AMQPQueue :: AMQPQueue (AMQPConnection cnn, string [name])
name - the name of the queue
Ad queue
proto int AMQPQueue :: declare (string [name], bit [params])
name - the name of the queue
params - parameters:
- AMQP_AUTODELETE (default)
- AMQP_DURABLE
- AMQP_PASSIVE
- AMQP_EXCLUSIVE
returns the number of items in the queue if the queue already exists.
Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$queue = new AMQPQueue($cnn,'chat_12');
$queue->declare('chat_12', AMQP_AUTEDELETE | AMQP_DURABLE);
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$queue = new AMQPQueue($cnn,'chat_12');
$queue->declare('chat_12', AMQP_AUTEDELETE | AMQP_DURABLE);
Deleting a queue
proto bool AMQPQueue :: delete (string [name], bit [params])
name - the name of the queue
params - parameters:
- AMQP_IFUNUSED
- AMQP_IFEMPTY
returns the result of the operation
Example:
$queue = new AMQPQueue(new APMQConection(),'chat_12');
$queue->delete();
$queue = new AMQPQueue(new APMQConection(),'chat_12');
$queue->delete();
Binding queue to exchange
:
proto bool AMQPQueue :: bind (string exchangeName, string routingKey);
name - the name of the exchange
routingkey - route key
Example:
// 'mylogin' 'ex_estate' '*.spb'
$queue = new AMQPQueue(APMQConection(), 'mylogin');
$queue->declare();
$queue->bind('ex_estate','*.spb');
Subscribe
proto array AMQPQueue :: consume (int n);
get an array of n-messages from the queue (all others are reset)
n - number of received messages
params - parameters:
- AMQP_NOLOCAL
- AMQP_NOACK
- AMQP_EXCLUSIVE
Attention!The number of received messages cannot exceed the total number of messages in the queue, otherwise the API will wait for the broker to receive all messages.
If you specify the number of messages that are less than the current time in the queue, then all not selected messages will be marked as selected, that is, they will be lost when re-reading from the queue, if the AMQP_NOACK flag is not set.
Example:
$i=0;
$queue = new AMQPQueue(APMQConection());
$n = $queue->declare('mylogin');
$queueMessages = $queue->consume( $n );
foreach($queueMessages as $item){
$i++;
echo "$i.$item";
}
$i=0;
$queue = new AMQPQueue(APMQConection());
$n = $queue->declare('mylogin');
$queueMessages = $queue->consume( $n );
foreach($queueMessages as $item){
$i++;
echo "$i.$item";
}
Unsubscribe
proto bool AMQPQueue :: unbind (string exchangeName, string routingKey);
detaches the current queue from the exchangeName cheat for routingKey
name - the name of the exchange
routingkey - route key
returns the result of the operation
Queue reset
proto bool AMQPQueue :: purge (string [name])
All messages in the queue are reset, the queue itself remains.
name - the name of the queue
returns the result of the operation
Example:
$queue = new AMQPQueue(new APMQConection());
$queue->purge('chat_12');
$queue = new AMQPQueue(new APMQConection());
$queue->purge('chat_12');
Get queue item
proto array AMQPQueue :: get (string [name], bit [params])
name - the name of the queue
params - parameters:
returns an associative array:
- msg - current message
- count - the number of messages remaining in the queue if:
- count = 0 - this message is the last in the queue
- count = -1 queue is empty, the msg key is absent
If you compare speed, the consume () method is faster than get ()
but the get method is more reliable.