📜 ⬆️ ⬇️

ZeroMQ. Chapter 2: Introduction to Sockets

Hello!
I continue the free translation of the book "ZeroMQ.Use ZeroMQ". I apologize in advance that I did not publish a sequel for so long, but as they say: “Laziness was born in front of us ...”. Well, the lyrics aside, continue.

Content


After we looked at the basic structures of ZeroMQ in the previous chapter, in this we will look at sockets, as follows:


Publish-subscribe pattern


First, let's introduce the classic pattern, the client-server pattern (publish-subscribe), which is one-sided by the nature of the distribution when the server sends messages to a specific list of clients. This model is one to many. The main idea of ​​this pattern is that the server sends a message and the connected clients receive this message, while the disconnected ones just skip it. The server is weakly connected with clients, it does not care at all whether there are any clients at all. This is similar to how television channels or radio stations work. TV channels always broadcast TV shows and only viewers decide whether or not to broadcast. If you miss the right time, you won’t be able to watch your favorite TV show (if you don’t have TiVo or something like that, but let's assume that our script happens in a world where the records were not invented). The advantage of the publish-subscribe pattern is that it provides a dynamic network topology.
The client-server model can be viewed from the following main parties:

')
Let's take an example to clarify the situation. Consider the scenario where we would like to create an exchange program. There are brokers, and they want to know what actions are taking place in the market. Our server will be the stock market, and brokers will be customers.
Instead of real prices from stock markets, we simply generate several numbers.
Before moving on to any code, first let's see what the client-server model looks like.



Below is the code of the creator (server):

/* * Stock Market Server * Binds PUB socket to tcp://*:4040 * Publishes random stock values of random companies */ #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* publisher = zmq_socket(context, ZMQ_PUB); printf("Starting server...\n"); int conn = zmq_bind(publisher, "tcp://*:4040"); const char* companies[2] = {"Company1", "Company2"}; int count = 0; for(;;) { int price = count % 2; int which_company = count % 2; int index = strlen(companies[0]); char update[12]; snprintf(update, sizeof update, "%s", companies[which_company]); zmq_msg_t message; zmq_msg_init_size(&message, index); memcpy(zmq_msg_data(&message), update, index); zmq_msg_send(&message, publisher, 0); zmq_msg_close(&message); count++; } zmq_close(publisher); zmq_ctx_destroy(context); return 0; } 


We also give the client code below:

 /* * Stock Market Client * Connects SUB socket to tcp://localhost:4040 * Collects stock exchange values */ #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* subscriber = zmq_socket(context, ZMQ_SUB); printf("Collecting stock information from the server.\n"); int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0); int i; for(i = 0; i < 10; i++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, subscriber, 0); int length = zmq_msg_size(&reply); char* value = malloc(length); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); printf("%s\n", value); free(value); } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; } 


Signing with zmq_setsockopt () and subscribeis is required whenever you use a SUB socket, otherwise you will not receive any messages. This is a very common mistake.
A client can set multiple signatures to any of the messages he receives if the update matches any of the subscriptions. He can also unsubscribe from certain subscriptions. Subscriptions have a fixed length.
The client receives the message using zmq_msg_recv (). zmq_msg_recv () receives and saves the message. The previous message, if any, is unloaded.

 int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags); 


Option flag can take only one value - ZMQ_DONTWAIT . If the flag is ZMQ_DONTWAIT , then the operation is performed in non-blocking mode. If a message is received, zmq_msg_recv () returns the size of the message in bytes, otherwise -1 is returned and an error message flag is returned.
The client-server model is asynchronous and sending a message to the SUB socket causes an error. You could call zmq_msg_send () to send messages, but you should never call zmq_msg_recv () for a PUB socket.
The following is an example of client-side output:

 Company2 570 Company2 878 Company2 981 Company2 783 Company1 855 Company1 524 Company2 639 Company1 984 Company1 158 Company2 145 


The server will always send messages, even if there are no clients. You can try it out and see the result. If you do, you will see something like the following:

 Sending... Company2 36 Sending... Company2 215 Sending... Company2 712 Sending... Company2 924 Sending... Company2 721 Sending... Company1 668 Sending... Company2 83 Sending... Company2 209 Sending... Company1 450 Sending... Company1 940 Sending... Company1 57 Sending... Company2 3 Sending... Company1 100 Sending... Company2 947 


Let's say we want to get results for Company1, or another company whose name you specify as an argument. In this case, we would need to change our client program as follows:

 // // Stock Market Client // Connects SUB socket to tcp://localhost:4040 // Collects stock exchange values // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* subscriber = zmq_socket(context, ZMQ_SUB); const char* filter; if(argc > 1) { filter = argv[1]; } else { filter = "Company1"; } printf("Collecting stock information from the server.\n"); int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter)); int i = 0; for(i = 0; i < 10; i++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, subscriber, 0); int length = zmq_msg_size(&reply); char* value = malloc(length + 1); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); printf("%s\n", value); free(value); } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; } 


The output will contain something like the following:

 Company1 575 Company1 504 Company1 513 Company1 584 Company1 444 Company1 1010 Company1 524 Company1 963 Company1 929 Company1 718 


Message Filtering


Our main stock exchange application sends messages to customers. It seems all the messages delivered, as expected, right? Unfortunately not.
Let's change our server code to the following:

 // // Stock Market Server // Binds PUB socket to tcp://*:4040 // Publishes random stock values of random companies // #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* publisher = zmq_socket(context, ZMQ_PUB); int conn = zmq_bind(publisher, "tcp://*:4040"); const char* companies[3] = {"Company1", "Company10", "Company101"}; int count = 0; for(;;) { int price = count % 17; int which_company = count % 3; int index = strlen(companies[which_company]); char update[64]; snprintf(update, sizeof update, "%s", companies[which_company]); zmq_msg_t message; zmq_msg_init_size(&message, index); memcpy(zmq_msg_data(&message), update, index); zmq_msg_send(&message, publisher, 0); zmq_msg_close(&message); count++; } zmq_close(publisher); zmq_ctx_destroy(context); return 0; } 


Now let's change our client code to the following:

 // // Stock Market Client // Connects SUB socket to tcp://localhost:4040 // Collects stock exchange values // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* subscriber = zmq_socket(context, ZMQ_SUB); const char* filter; if(argc > 1) { filter = argv[1]; } else { filter = "Company1"; } printf("Collecting stock information from the server.\n"); int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter)); int i = 0; for(i = 0; i < 10; i++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, subscriber, 0); int length = zmq_msg_size(&reply); char* value = malloc(length + 1); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); printf("%s\n", value); free(value); } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; } 


In this case, the output will be something similar to the following:

 Collecting stock information from the server. Company101 950 Company10 707 Company101 55 Company101 343 Company10 111 Company1 651 Company10 287 Company101 8 Company1 889 Company101 536 


Our client code clearly says that we want to see the result for Company1. However, the server again sends us the results for Company10 and Company101. This, of course, is not what we want. We have to solve this little problem.
We can do a little hack to get what we want, but using the delimiter is a simpler option.
We need to make some changes, both in the client code and in the server code, we will filter company names using a separator.
Below is the updated server code that fixes past problems. Pay attention to the highlighted lines, they show how we can use a separator to send a message to customers:

 // // Stock Market Server // Binds PUB socket to tcp://*:4040 // Publishes random stock values of random companies // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* publisher = zmq_socket(context, ZMQ_PUB); int conn = zmq_bind(publisher, "tcp://*:4040"); conn = zmq_bind(publisher, "ipc://stock.ipc"); const char* companies[3] = {"Company1", "Company10", "Company101"}; for(;;) { int price = count % 17; int which_company = count % 3; int index = strlen(companies[which_company]); char update[64]; sprintf(update, "%s| %d", companies[which_company], price); zmq_msg_t message; zmq_msg_init_size(&message, index); memcpy(zmq_msg_data(&message), update, index); zmq_msg_send(&message, publisher, 0); zmq_msg_close(&message); count++; } zmq_close(publisher); zmq_ctx_destroy(context); return 0; } 


Take a look at the updated client code to filter the results:

 // // Stock Market Client // Connects SUB socket to tcp://localhost:4040 // Collects stock exchange values // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* subscriber = zmq_socket(context, ZMQ_SUB); const char* filter; filter = "Company1|"; printf("Collecting stock information from the server.\n"); int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter)); int i = 0; for(i = 0; i < 10; i++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, subscriber, 0); int length = zmq_msg_size(&reply); char* value = malloc(length + 1); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); printf("%s\n", value); free(value); } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; } 


After the changes that were made to the client and server code, we can see exactly the results that were expected.

Socket options


Since we use the client-server model, we use the parameter named ZMQ_SUBSCRIBE .

 int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, option_value, strlen(option_value)); 


Socket options are set in the zmq_setsockopt () function. It takes four parameters:


This can be seen from the following line:

 int zmq_setsockopt (void *socket, int option_name, const void *option_ value, size_t option_len); 


Subscribe

ZMQ_SUBSCRIBE creates a new message in the ZMQ_SUB socket. If the option_value argument option_value not empty, then we subscribe to all messages that begin with option_value . You can configure multiple filters for one ZMQ_SUB socket.

Unsubscribe

ZMQ_UNSUBSCRIBE deletes the message from the ZMQ_SUB socket. It only deletes one message, even if several filters are configured.
The main thing that we need to learn about the client-server socket is that we will never know when the client starts to receive messages. In this case, it is a good idea to start the client first, and then the server. Because the client always perceives the first message as a connection to the server, which takes a lot of time, and the server can already send messages at this time.
However, we will talk about how to synchronize the server with the client, since we should not send messages if the client is not in touch.

Notes on the client-server model


In the client-server model, you should pay attention to the following points:

In Chapter 4, we will return to the client-server model, consider more complex examples and show how to deal with “slow” clients.

Pattern pipeline


Let's continue, consider the model pipeline. The pipeline pattern transfers data between the ordered nodes to the pipeline. Data is transmitted continuously and at each step the pipe is attached to one of several nodes. Between nodes, a cyclic data transfer strategy is used. This is a bit like a request-response model.

Strategy divide and conquer

There is no salvation from this strategy of divide and conquer when you program. Remember, when you first started learning programming, and your teacher used it almost in merge sorting, and after a week half of the group stopped attending classes. I am sure that everyone remembers this very well. And here again, divide and conquer!
Let's write something parallel on ZeroMQ. Consider a scenario where we have a generator that generates random numbers. We have workers who find the square root of these numbers by the Newton method. We also have a collector who collects the results from the workers.
Below is the server code:

 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/time.h> #include <time.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); // This is the socket that we send messages. void* socket = zmq_socket(context, ZMQ_PUSH); zmq_bind(socket, "tcp://*:4040"); // This is the socket that we send batch message. void* connector = zmq_socket(context, ZMQ_PUSH); zmq_connect(connector, "tcp://localhost:5050"); printf("Please press enter when workers are ready..."); getchar(); printf("Sending tasks to workers...\n"); // The first message. It's also the signal start of batch. int length = strlen("-1"); zmq_msg_t message; zmq_msg_init_size(&message, length); memcpy(zmq_msg_data(&message), "-1", length); zmq_msg_send(&message, connector, 0); zmq_msg_close(&message); // Generate some random numbers. srandom((unsigned) time(NULL)); // Send the tasks. int count; int msec = 0; for(count = 0; count < 100; count++) { int load = (int) ((double) (100) * random () / RAND_MAX); msec += load; char string[10]; sprintf(string, "%d", load); } printf("Total: %d msec\n", msec); sleep(1); zmq_close(connector); zmq_close(socket); zmq_ctx_destroy(context); return 0; } 


Just take a look at the employee's code, where we do some calculations for calculating the square root of a number by the Newton method:

 #include <stdlib.h> #include <string.h> #include <unistd.h> #include "zmq.h" double square(double x) { return x * x; } double average(double x, double y) { return (x + y) / 2.0; } double good_enough(double guess, double x) { return abs(square(guess) - x) < 0.000001; } double improve(double guess, double x) { return average(guess, x / guess); } double sqrt_inner(double guess, double x) { if(good_enough(guess, x)) return guess; else return sqrt_inner(improve(guess, x), x); } double newton_sqrt(double x) { return sqrt_inner(1.0, x); } int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); // Let's initialize a socket to receive messages. void* receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:4040"); // Let's initialize a socket to send the messages. void* sender = zmq_socket(context, ZMQ_PUSH); zmq_connect(sender, "tcp://localhost:5050"); for(;;) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* msg = malloc(length + 1); memcpy(msg, zmq_msg_data(&reply), length); zmq_msg_close(&reply); fflush(stdout); double val = atof(msg); printf("%.1f: %.1f\n", val, newton_sqrt(val)); sleep(1); free(msg); zmq_msg_t message; char* ssend = "T"; int t_length = strlen(ssend); zmq_msg_init_size(&message, t_length); memcpy(zmq_msg_data(&message), ssend, t_length); zmq_msg_send(&message, receiver, 0); zmq_msg_close(&message); } zmq_close(receiver); zmq_close(sender); zmq_ctx_destroy(context); return 0; } 


We give the code and collector:

 #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* receiver = zmq_socket(context, ZMQ_PULL); zmq_bind(receiver, "tcp://*:5050"); // We receive the first message and discard it since it's the // signal start of batch which is -1. zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* msg = malloc(length + 1); memcpy(msg, zmq_msg_data(&reply), length); zmq_msg_close(&reply); free(msg); int count; for(count = 0; count < 100; count++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* value = malloc(length + 1); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); free(value); if(count / 10 == 0) printf("10 Tasks have been processed."); fflush(stdout); } zmq_close(receiver); zmq_ctx_destroy(context); return 0; } 


The following diagram represents the code written above:



What do we have:

We mentioned that workers are connected to both the server and the collector. Let's look at these links in more detail.
Let's look at the following lines from our employee’s code:

 // Let's initialize a socket to receive messages. void* receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:4040"); 


ZMQ_PULL socket

When we want to get data from the input to the nodes, we use ZMQ_PULL . The socket type ZMQ_PULL used to receive messages from the upstream nodes in the pipeline. As we said earlier, this process is carried out through planning a fair queue.

ZMQ_PUSH socket

When we want to communicate with the lower nodes, we use ZMQ_PUSH . The socket type ZMQ_PUSH used to send messages to the following nodes in the pipeline.
ZMQ_PUSH never discards messages. If the upstream node is ready to send a message to the downstream node, but the latter is not ready to receive the message and process it, then all messages sent with zmq_send () are blocked until at least one node is available to receive the message.

Get ZeroMQ context


Most likely, you noticed that all the examples that were previously given, began with zmq_ctx_new (). ZeroMQ applications always start by creating a context. All sockets are created within one process using a context that participates in the process of creating sockets, as they are the fastest way to connect threads in one process. The ZeroMQ context is thread-safe, so it can be easily transferred between threads.
If the ZeroMQ context cannot be created, then NULL is returned.
Although it is possible to create several contexts that will be considered as separate ZeroMQ applications, the best idea would be to create one context and transfer it to other threads.

Context destructor ZeroMQ


At the end of each application, you need to destroy the context that you created by calling zmq_ctx_destroy (). After calling zmq_ctx_destroy (), all processes return an error code ( ETERM ), zmq_ctx_destroy () blocks calls to open sockets and closes them, calling zmq_close ().

Cleaning


When you program in programming languages ​​like Pyhton or Java, you don’t need to worry about memory management, as these languages ​​have built-in garbage collectors.
For example, Pyhton uses reference counting, when the counter becomes zero, then the memory is freed automatically. Thus, you should explicitly close the connection when writing a ZeroMQ application on Pyhton, since it will be automatically closed as soon as the object’s reference count is zero. However, it should be noted that this will not work in Jython, PyPy, or IronPython. Anyway, you can find enough information in the Python documentation. Let's go back to our main task.
When you write in C, memory management is entirely your responsibility. Otherwise, you will have an unstable application that will have memory leaks.
You must take care of closing the sockets, deleting messages and the ZeroMQ context. There are several things to consider in order to successfully complete an application:

You may be surprised to see what happens with your application if it is written incorrectly, especially if it is multi-threaded. In this case, it will be extremely difficult to catch the error.

Memory leak detection


An application written in C or C ++ needs a well-written memory manager, since memory management completely falls on the programmer’s shoulders. For this we will use a wonderful Linux tool called Valgrind. Among many other useful features for source code analysis, this tool can be used to detect memory leaks.
The next section is a small tutorial on Valgrind, in which we take a closer look at how to use Valgrind when writing applications on ZeroMQ.

Introduction to Valgrind

You can compile your application using the –g option to display debug information. In this case, the error messages will contain the exact line numbers.
Consider the following example:

 #include <stdio.h> #include <stdlib.h> int main(int argc, char const *argv[]) { char* a = malloc(4); int b; printf("b = %d\n", b); return 0; } 


Let's compile in gcc by typing gcc –g –o test test.c Now it's time to run Valgrind to check for memory leaks. Let's run the following command:

 valgrind --leak-check=full --show-reachable=yes test 


After we entered the previous command, Valgrind will start checking the code for memory errors using the memcheck tool. You can call it separately by running tool = memcheck, but that would be meaningless, since memcheck is the default tool. The output will be similar to the following:

 ==98190== Conditional jump or move depends on uninitialised value(s) ==98190== at 0x2D923: __vfprintf ==98190== by 0x4AC5A: vfprintf_l ==98190== by 0x952BE: printf ==98190== by 0x1F5E: main (test.c:8) ==98190== 4 bytes in 1 blocks are definitely lost in loss record 1 of 5 ==98190== at 0xF656: malloc (vg_replace_malloc.c:195) ==98190== by 0x1F46: main (test.c:6) ==98190== LEAK SUMMARY: ==98190== definitely lost: 4 bytes in 1 blocks ==98190== indirectly lost: 0 bytes in 0 blocks ==98190== possibly lost: 0 bytes in 0 blocks 


Now let's describe a little the previous output:

Valgrind $PREFIX/lib/valgrind/default.supp . , ZeroMQ, :

 { <socketcall_sendto> Memcheck:Param socketcall.sendto(msg) fun:send ... } { <socketcall_sendto> Memcheck:Param socketcall.send(msg) fun:send ... } 


Valgrind :

 valgrind --leak-check=full --show-reachable=yes --suppressions=zeromq.supp server 


Conclusion


, - pipeline. , . Valgrind.

, .

Source: https://habr.com/ru/post/216957/


All Articles