📜 ⬆️ ⬇️

PHP-AMQP version 2

The article RabbitMQ AMQP for PHP , New Ideas for PHP , published a draft of the PHP-AMQP API

In continuation of previously published ideas, I present their implementation, which is more than the first version.

extension code can be found here the description of the project and svn until the old version (1.0), translated into English

Brief description of version 2.0:
')

Class AMQPConnection - opening a logical connection, including a channel connection.


Constructor:

APMQConection :: APMQConection ([array params])

Parameters (all optional):

Exception - no logical or physical connection.

Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1', 'vhost'=>'v1' ) );

AMQPExchange Exchange Class


constructor:

creating exchange, if the name is specified otherwise class initialization
AMQPExchange :: AMQPExchange (APMQConection cnn, string [name])
name - the name of the exchange

Example:

$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');


Ad exchange:

proto bool AMQPEexchange :: declare ([string name], [string type = direct], [bit params]);

name - the name of the exchange
type - type of exchange, allowed types: direct, topic & fanout
params - parameters:

returns the result of the operation

Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn);
$exchange->declare('ex_name', 'topic',AMQP_DURABLE );


Removing exchange:

proto bool AMQPExchange :: delete ([string name], [bit params]);

name - the name of the exchange
params - parameters:

returns the result of the operation

Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$res = $exchange->delete();
// - , .


Binding exchange to the queue:

proto bool AMQPExchange :: bind (string queueName, string routingKey);

queueName - the name of the queue
key - routing-key, route key, string

returns the result of the operation

Example:
$msg = " , ...";
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$exchange->bind('mylogin','spb.news')
$res = $exchange->publish( $msg, 'spb.news');


Publication:


proto bool AMQPExchange :: publish (string msg, [string key], bit [parms]);

Posting a message with the key key for a topic type topic or direct

msg - message, string
key - routing-key, route key, string
params - parameters:


returns the result of the operation

Example:
$msg = " ...";
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$res = $exchange->publish( $msg, 'spb.news');


Queue Class AMQPQueue


constructor - class initialization

AMQPQueue :: AMQPQueue (AMQPConnection cnn, string [name])

name - the name of the queue

Ad queue

proto int AMQPQueue :: declare (string [name], bit [params])

name - the name of the queue
params - parameters:

returns the number of items in the queue if the queue already exists.

Example: $cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$queue = new AMQPQueue($cnn,'chat_12');
$queue->declare('chat_12', AMQP_AUTEDELETE | AMQP_DURABLE);
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$queue = new AMQPQueue($cnn,'chat_12');
$queue->declare('chat_12', AMQP_AUTEDELETE | AMQP_DURABLE);


Deleting a queue

proto bool AMQPQueue :: delete (string [name], bit [params])

name - the name of the queue
params - parameters:

returns the result of the operation

Example: $queue = new AMQPQueue(new APMQConection(),'chat_12');
$queue->delete();
$queue = new AMQPQueue(new APMQConection(),'chat_12');
$queue->delete();


Binding queue to exchange
:
proto bool AMQPQueue :: bind (string exchangeName, string routingKey);

name - the name of the exchange
routingkey - route key

Example:
// 'mylogin' 'ex_estate' '*.spb'
$queue = new AMQPQueue(APMQConection(), 'mylogin');
$queue->declare();
$queue->bind('ex_estate','*.spb');


Subscribe

proto array AMQPQueue :: consume (int n);
get an array of n-messages from the queue (all others are reset)
n - number of received messages
params - parameters:


Attention!
The number of received messages cannot exceed the total number of messages in the queue, otherwise the API will wait for the broker to receive all messages.
If you specify the number of messages that are less than the current time in the queue, then all not selected messages will be marked as selected, that is, they will be lost when re-reading from the queue, if the AMQP_NOACK flag is not set.

Example: $i=0;
$queue = new AMQPQueue(APMQConection());
$n = $queue->declare('mylogin');
$queueMessages = $queue->consume( $n );
foreach($queueMessages as $item){
$i++;
echo "$i.$item";
}
$i=0;
$queue = new AMQPQueue(APMQConection());
$n = $queue->declare('mylogin');
$queueMessages = $queue->consume( $n );
foreach($queueMessages as $item){
$i++;
echo "$i.$item";
}


Unsubscribe

proto bool AMQPQueue :: unbind (string exchangeName, string routingKey);
detaches the current queue from the exchangeName cheat for routingKey

name - the name of the exchange
routingkey - route key

returns the result of the operation

Queue reset

proto bool AMQPQueue :: purge (string [name])
All messages in the queue are reset, the queue itself remains.

name - the name of the queue
returns the result of the operation

Example: $queue = new AMQPQueue(new APMQConection());
$queue->purge('chat_12');
$queue = new AMQPQueue(new APMQConection());
$queue->purge('chat_12');


Get queue item

proto array AMQPQueue :: get (string [name], bit [params])

name - the name of the queue
params - parameters:


returns an associative array:

Source: https://habr.com/ru/post/73671/


All Articles