📜 ⬆️ ⬇️

RabbitMQ tutorials in C ++

The rabbitmq.com site in the tutorials section provides examples of implementation in various languages, but C ++ is not among them. Under the cat are collected links to translated manuals, materials and code under the spoiler.

To whom it is more convenient to view the code from under the GitHub interface, you can immediately go to the repository .

This material uses the AMQP-CPP and POCO C ++ client implementation for working with a socket.

“RabbitMQ tutorial 1 - Hello World”
')
receive.cpp
#include <iostream> #include "SimplePocoHandler.h" int main(void) { SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareQueue("hello"); channel.consume("hello", AMQP::noack).onReceived( [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { std::cout <<" [x] Received "<<message.message() << std::endl; }); std::cout << " [*] Waiting for messages. To exit press CTRL-C\n"; handler.loop(); return 0; } 


send.cpp
 #include <iostream> #include "SimplePocoHandler.h" int main(void) { SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.onReady([&]() { if(handler.connected()) { channel.publish("", "hello", "Hello World!"); std::cout << " [x] Sent 'Hello World!'" << std::endl; handler.quit(); } }); handler.loop(); return 0; } 


“RabbitMQ tutorial 2 - Queue of tasks”

worker.cpp
 #include <iostream> #include <algorithm> #include <thread> #include <chrono> #include "SimplePocoHandler.h" int main(void) { SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.setQos(1); channel.declareQueue("task_queue", AMQP::durable); channel.consume("task_queue").onReceived( [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { const auto body = message.message(); std::cout<<" [x] Received "<<body<<std::endl; size_t count = std::count(body.cbegin(), body.cend(), '.'); std::this_thread::sleep_for (std::chrono::seconds(count)); std::cout<<" [x] Done"<<std::endl; channel.ack(deliveryTag); }); std::cout << " [*] Waiting for messages. To exit press CTRL-C\n"; handler.loop(); return 0; } 


new_task.cpp
 #include <iostream> #include "SimplePocoHandler.h" #include "tools.h" int main(int argc, const char* argv[]) { const std::string msg = argc > 1 ? join(&argv[1], &argv[argc], " ") : "Hello World!"; SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); AMQP::QueueCallback callback = [&](const std::string &name, int msgcount, int consumercount) { AMQP::Envelope env(msg); env.setDeliveryMode(2); channel.publish("", "task_queue", env); std::cout<<" [x] Sent '"<<msg<<"'\n"; handler.quit(); }; channel.declareQueue("task_queue", AMQP::durable).onSuccess(callback); handler.loop(); return 0; } 


RabbitMQ tutorial 3 - Publish / Subscribe

receive_logs.cpp
 #include <iostream> #include "SimplePocoHandler.h" int main(void) { SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); auto receiveMessageCallback = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { std::cout <<" [x] "<<message.message() << std::endl; }; AMQP::QueueCallback callback = [&](const std::string &name, int msgcount, int consumercount) { channel.bindQueue("logs", name,""); channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback); }; AMQP::SuccessCallback success = [&]() { channel.declareQueue(AMQP::exclusive).onSuccess(callback); }; channel.declareExchange("logs", AMQP::fanout).onSuccess(success); std::cout << " [*] Waiting for messages. To exit press CTRL-C\n"; handler.loop(); return 0; } 


emit_log.cpp
 #include <iostream> #include "SimplePocoHandler.h" #include "tools.h" int main(int argc, const char* argv[]) { const std::string msg = argc > 1 ? join(&argv[1], &argv[argc], " ") : "info: Hello World!"; SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareExchange("logs", AMQP::fanout).onSuccess([&]() { channel.publish("logs", "", msg); std::cout << " [x] Sent "<<msg<< std::endl; handler.quit(); }); handler.loop(); return 0; } 


RabbitMQ tutorial 4 - Routing

receive_logs_direct.cpp
 #include <iostream> #include <algorithm> #include "SimplePocoHandler.h" int main(int argc, const char* argv[]) { if(argc==1) { std::cout<<"Usage: "<<argv[0]<<" [info] [warning] [error]"<<std::endl; return 1; } SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareExchange("direct_logs", AMQP::direct); auto receiveMessageCallback = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { std::cout <<" [x] " <<message.routingKey() <<":" <<message.message() << std::endl; }; AMQP::QueueCallback callback = [&](const std::string &name, int msgcount, int consumercount) { std::for_each(&argv[1], &argv[argc], [&](const char* severity) { channel.bindQueue("direct_logs","", severity); channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback); }); }; channel.declareQueue(AMQP::exclusive).onSuccess(callback); std::cout << " [*] Waiting for messages. To exit press CTRL-C\n"; handler.loop(); return 0; } 


emit_log_direct.cpp
 #include <iostream> #include "SimplePocoHandler.h" #include "tools.h" int main(int argc, const char* argv[]) { const std::string severity = argc > 2 ? argv[1] : "info"; const std::string msg = argc > 2 ? join(&argv[2], &argv[argc], " ") : "Hello World!"; SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareExchange("direct_logs", AMQP::direct).onSuccess([&]() { channel.publish("direct_logs", severity, msg); std::cout << " [x] Sent "<<severity<<":"<<msg<< std::endl; handler.quit(); }); handler.loop(); return 0; } 


"RabbitMQ tutorial 5 - Topics"

receive_logs_topic.cpp
 #include <iostream> #include <algorithm> #include "SimplePocoHandler.h" int main(int argc, const char* argv[]) { if(argc==1) { std::cout<<"Usage: "<<argv[0]<<" [binding_key]..."<<std::endl; return 1; } SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareExchange("topic_logs", AMQP::topic); auto receiveMessageCallback = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { std::cout <<" [x] " <<message.routingKey() <<":" <<message.message() << std::endl; }; AMQP::QueueCallback callback = [&](const std::string &name, int msgcount, int consumercount) { std::for_each(&argv[1], &argv[argc], [&](const char* bindingKeys) { std::cout<<bindingKeys<<std::endl; channel.bindQueue("topic_logs",name, bindingKeys); channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback); }); }; channel.declareQueue(AMQP::exclusive).onSuccess(callback); std::cout << " [*] Waiting for messages. To exit press CTRL-C\n"; handler.loop(); return 0; } 


emit_log_topic.cpp
 #include <iostream> #include "SimplePocoHandler.h" #include "tools.h" int main(int argc, const char* argv[]) { const std::string msg = argc > 1 ? join(&argv[2], &argv[argc], " ") : "Hello World!"; const std::string routing_key = argc > 1 ? argv[1] : "anonymous.info"; SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareExchange("topic_logs", AMQP::topic).onSuccess([&]() { channel.publish("topic_logs", routing_key, msg); std::cout << " [x] Sent "<<routing_key<<":"<<msg<< std::endl; handler.quit(); }); handler.loop(); return 0; } 


“RabbitMQ tutorial 6 - Remote procedure call”

rpc_server.cpp
 #include <iostream> #include <algorithm> #include <thread> #include <chrono> #include "SimplePocoHandler.h" int fib(int n) { switch (n) { case 0: return 0; case 1: return 1; default: return fib(n - 1) + fib(n - 2); } } int main(void) { SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.setQos(1); channel.declareQueue("rpc_queue"); channel.consume("").onReceived([&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { const auto body = message.message(); std::cout<<" [.] fib("<<body<<")"<<std::endl; AMQP::Envelope env(std::to_string(fib(std::stoi(body)))); env.setCorrelationID(message.correlationID()); channel.publish("", message.replyTo(), env); channel.ack(deliveryTag); }); std::cout << " [x] Awaiting RPC requests" << std::endl; handler.loop(); return 0; } 


rpc_client.cpp
 #include <iostream> #include "tools.h" #include "SimplePocoHandler.h" int main(int argc, const char* argv[]) { const std::string correlation(uuid()); SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); AMQP::QueueCallback callback = [&](const std::string &name, int msgcount, int consumercount) { AMQP::Envelope env("30"); env.setCorrelationID(correlation); env.setReplyTo(name); channel.publish("","rpc_queue",env); std::cout<<" [x] Requesting fib(30)"<<std::endl; }; channel.declareQueue(AMQP::exclusive).onSuccess(callback); auto receiveCallback = [&](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { if(message.correlationID() != correlation) return; std::cout<<" [.] Got "<<message.message()<<std::endl; handler.quit(); }; channel.consume("", AMQP::noack).onReceived(receiveCallback); handler.loop(); return 0; } 


UPD: 2015.04.09 fix: prefetch count installation is correct; work tutorial 2; code compiles under g ++ 4.7

Source: https://habr.com/ru/post/253317/


All Articles