📜 ⬆️ ⬇️

The amqpcpp library. Part 2 - Queues

The article "Lib amqpcpp wrapper for librabbitmq" was an overview of publishing messages using the AMQP protocol. This article is a continuation of it, which describes how to use the Queues below.

If the Exchange is intended for the publication of messages, then the Queue is intended for receiving messages. A Link is established between the Exchange and the Queue (Bind or I often translate the Binding) through the Routing key (routing_key).
The queue, similar to the Exchange - must be declared:
AMQP amqp ;
auto_ptr < AMQPQueue > qu ( amqp. createQueue ( "q2" ) ) ;
qu - > Declare ( ) ;
// or
auto_ptr < AMQPQueue > qu2 ( amqp. createQueue ( ) ) ;
qu2 - > Declare ( "q2" ) ;

Queues can be:

If no parameter is specified, then the default queue is declared as self-deleted (AMQP_AUTODELETE). In the example below, the Queue is declared as self-deleting and persisting.
AMQP amqp ;
auto_ptr < AMQPQueue > qu3 ( amqp. createQueue ( ) ) ;
qu3 - > Declare ( "q2" , AMQP_AUTODELETE | AMQP_DURABLE ) ; // durable and autodelete mode
The queue can be:

The queue can be tied to the Exchange (Bind) or untied (unBind). Binding is done through the routing key. The key can be simple or composite. For composite keys, patterns are used. For example, we subscribe to all the news that is published in the Exchange realty. Then the key can be "* .news" or we subscribe to all messages that relate to St. Petersburg: "spb. *". As previously mentioned in part 1, exchanges can be of three types: direct, topic, fanout. For topic type exchanges pattrens can be used, for direct type exchanges, only simple keys are used, and the fanout type does not use keys at all, so the key value is specified formally, an empty string is specified.
AMQP amqp ;
auto_ptr < AMQPQueue > qu3 ( amqp. createQueue ( ) ) ;
// queue and exchange were previously announced
qu3 - > Bind ( "ex" , "news" ) ; // durable and autodelete mode
If the Queue is linked to an exchange and messages with a binding key are published in the Exchange, then these messages are redirected to the corresponding queue binding. There are two ways to read from the Message Queue:
The AMQPQueu :: Get () method reads one Message from the Queue. When reading a message, information is transmitted in the header frame - how many more messages are left in the Queue. Below is an example explained.
AMQP amqp ;
auto_ptr < AMQPQueue > qu3 ( amqp. createQueue ( ) ) ;

while ( 1 ) {
qu2 - > Get ( ) ;
auto_ptr < AMQPMessage > m ( qu2 - > getMessage ( ) ) ;

cout << "count:" << m - > getMessageCount ( ) << endl ;
if ( m - > getMessageCount ( ) > - 1 ) {
cout << "message \ n " << m - > getMessage ( ) << " \ n message key:" << m - > getRoutingKey ( ) << endl ;
cout << "exchange:" << m - > getExchange ( ) << endl ;
} else
break ;
}
The AMQPQueu :: Get () method may have an AMQP_NOACK parameter that “tells” the broker that this message is not marked as “read”. Together with the AMQP_NOACK parameter, the AMQPQueu :: Ack () method is used, which confirms that the message has been delivered. All message information is encapsulated in an AMQPMessage data object. The Message Object has methods for accessing fields, the names speak for themselves: getMessage (), getExchange (), getRoutingKey (), get MessageCount (). The focus should be on the getConsumerTag () and getDeliveryTag () methods.
')
Subscriber's tag (consumer_tag) is an individual unique string, assigned either when publishing, or automatically assigned by a broker, something like a session name. You can unsubscribe by sending a Cancel command with passing consumer_tag data in the parameter, for example AMQPQueu :: Cancel (m-> getConsumerTag ()).

The delivery tag is a numeric value equal to the counter of the delivered messages of this session, for the first message the delivery tag is - 1, for the second - 2, for the third - 3 and so on. To confirm the receipt of a message, you must call the AMQPQueu :: Ack method (delivery_tag), where the variable delivery_tag is set to the delivery tag.
AMQP amqp ;
auto_ptr < AMQPQueue > qu3 ( amqp. createQueue ( ) ) ;

while ( 1 ) {
qu2 - > Get ( AMQP_NOACK ) ;
auto_ptr < AMQPMessage > m ( qu2 - > getMessage ( ) ) ;

if ( m - > getMessageCount ( ) > - 1 ) {
qu2 - > Ack ( m - > getDeliveryTag ( ) ) ;
} else
break ;
}
Unlike the AMQPQueue :: Get method, the AMQPQueue :: Consume method has a synchronous reception scheme, so the event model is used here. Before using the Subscription (Consume) method, you must add the AMQP_MESSAGE event. An event handler is a function whose input parameter is Message data. And output, boolean value: stop / no data reception. More clearly on the example:
AMQP amqp ;
auto_ptr < AMQPQueue > qu ( amqp. createQueue ( "q2" ) ) ;
qu - > Declare ( ) ;
// or
auto_ptr < AMQPQueue > qu2 ( amqp. createQueue ( ) ) ;
qu2 - > Declare ( "q2" ) ;
static int i = 0 ;

int onMessage ( AMQPMessage * message ) {
char * data = message - > getMessage ( ) ;
if ( data )
cout << data << endl ;
i ++ ;

cout << "#" << i << "tag =" << message - > getDeliveryTag ( ) << endl ;
if ( i > 5 )
return 1 ;
return 0 ;
} ;
something toe was not printed;)

What to do
This library does not pretend to completeness, of course I want to develop event logic, add more events, for example, onCancel, onSignal, onTimer.
I think to make multitread for subscription. It may be necessary to rewrite the network part of librabbitmq, do everything on non-blocking scripts.
Related Links

AMQP in Russian
RabbitMQ: Introduction to AMQP
AMQP. Debugging Applications
PHP-AMQP What's New in Friends?
AMQP-php chat
Instead of conclusion
While the status is beta, it works relatively stably. MPL license. Those who wish to help the project are always welcome.
Bugs please unsubscribe in tracker
Ideas for further development can be discussed here or in the mailing list.

Ps. I ask for Russian not to kick hard, I was always at odds with him.
correct errors, write to the PM.

Source: https://habr.com/ru/post/91721/


All Articles