/* * Stock Market Server * Binds PUB socket to tcp://*:4040 * Publishes random stock values of random companies */ #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* publisher = zmq_socket(context, ZMQ_PUB); printf("Starting server...\n"); int conn = zmq_bind(publisher, "tcp://*:4040"); const char* companies[2] = {"Company1", "Company2"}; int count = 0; for(;;) { int price = count % 2; int which_company = count % 2; int index = strlen(companies[0]); char update[12]; snprintf(update, sizeof update, "%s", companies[which_company]); zmq_msg_t message; zmq_msg_init_size(&message, index); memcpy(zmq_msg_data(&message), update, index); zmq_msg_send(&message, publisher, 0); zmq_msg_close(&message); count++; } zmq_close(publisher); zmq_ctx_destroy(context); return 0; }
/* * Stock Market Client * Connects SUB socket to tcp://localhost:4040 * Collects stock exchange values */ #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* subscriber = zmq_socket(context, ZMQ_SUB); printf("Collecting stock information from the server.\n"); int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0); int i; for(i = 0; i < 10; i++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, subscriber, 0); int length = zmq_msg_size(&reply); char* value = malloc(length); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); printf("%s\n", value); free(value); } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
zmq_setsockopt ()
and subscribeis
is required whenever you use a SUB socket, otherwise you will not receive any messages. This is a very common mistake.zmq_msg_recv
(). zmq_msg_recv
() receives and saves the message. The previous message, if any, is unloaded. int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);
ZMQ_DONTWAIT
. If the flag is ZMQ_DONTWAIT
, then the operation is performed in non-blocking mode. If a message is received, zmq_msg_recv
() returns the size of the message in bytes, otherwise -1 is returned and an error message flag is returned.SUB
socket causes an error. You could call zmq_msg_send
() to send messages, but you should never call zmq_msg_recv
() for a PUB socket. Company2 570 Company2 878 Company2 981 Company2 783 Company1 855 Company1 524 Company2 639 Company1 984 Company1 158 Company2 145
Sending... Company2 36 Sending... Company2 215 Sending... Company2 712 Sending... Company2 924 Sending... Company2 721 Sending... Company1 668 Sending... Company2 83 Sending... Company2 209 Sending... Company1 450 Sending... Company1 940 Sending... Company1 57 Sending... Company2 3 Sending... Company1 100 Sending... Company2 947
// // Stock Market Client // Connects SUB socket to tcp://localhost:4040 // Collects stock exchange values // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* subscriber = zmq_socket(context, ZMQ_SUB); const char* filter; if(argc > 1) { filter = argv[1]; } else { filter = "Company1"; } printf("Collecting stock information from the server.\n"); int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter)); int i = 0; for(i = 0; i < 10; i++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, subscriber, 0); int length = zmq_msg_size(&reply); char* value = malloc(length + 1); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); printf("%s\n", value); free(value); } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
Company1 575 Company1 504 Company1 513 Company1 584 Company1 444 Company1 1010 Company1 524 Company1 963 Company1 929 Company1 718
// // Stock Market Server // Binds PUB socket to tcp://*:4040 // Publishes random stock values of random companies // #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* publisher = zmq_socket(context, ZMQ_PUB); int conn = zmq_bind(publisher, "tcp://*:4040"); const char* companies[3] = {"Company1", "Company10", "Company101"}; int count = 0; for(;;) { int price = count % 17; int which_company = count % 3; int index = strlen(companies[which_company]); char update[64]; snprintf(update, sizeof update, "%s", companies[which_company]); zmq_msg_t message; zmq_msg_init_size(&message, index); memcpy(zmq_msg_data(&message), update, index); zmq_msg_send(&message, publisher, 0); zmq_msg_close(&message); count++; } zmq_close(publisher); zmq_ctx_destroy(context); return 0; }
// // Stock Market Client // Connects SUB socket to tcp://localhost:4040 // Collects stock exchange values // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* subscriber = zmq_socket(context, ZMQ_SUB); const char* filter; if(argc > 1) { filter = argv[1]; } else { filter = "Company1"; } printf("Collecting stock information from the server.\n"); int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter)); int i = 0; for(i = 0; i < 10; i++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, subscriber, 0); int length = zmq_msg_size(&reply); char* value = malloc(length + 1); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); printf("%s\n", value); free(value); } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
Collecting stock information from the server. Company101 950 Company10 707 Company101 55 Company101 343 Company10 111 Company1 651 Company10 287 Company101 8 Company1 889 Company101 536
// // Stock Market Server // Binds PUB socket to tcp://*:4040 // Publishes random stock values of random companies // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* publisher = zmq_socket(context, ZMQ_PUB); int conn = zmq_bind(publisher, "tcp://*:4040"); conn = zmq_bind(publisher, "ipc://stock.ipc"); const char* companies[3] = {"Company1", "Company10", "Company101"}; for(;;) { int price = count % 17; int which_company = count % 3; int index = strlen(companies[which_company]); char update[64]; sprintf(update, "%s| %d", companies[which_company], price); zmq_msg_t message; zmq_msg_init_size(&message, index); memcpy(zmq_msg_data(&message), update, index); zmq_msg_send(&message, publisher, 0); zmq_msg_close(&message); count++; } zmq_close(publisher); zmq_ctx_destroy(context); return 0; }
// // Stock Market Client // Connects SUB socket to tcp://localhost:4040 // Collects stock exchange values // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* subscriber = zmq_socket(context, ZMQ_SUB); const char* filter; filter = "Company1|"; printf("Collecting stock information from the server.\n"); int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter)); int i = 0; for(i = 0; i < 10; i++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, subscriber, 0); int length = zmq_msg_size(&reply); char* value = malloc(length + 1); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); printf("%s\n", value); free(value); } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
ZMQ_SUBSCRIBE
. int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, option_value, strlen(option_value));
int zmq_setsockopt (void *socket, int option_name, const void *option_ value, size_t option_len);
ZMQ_SUBSCRIBE
creates a new message in the ZMQ_SUB
socket. If the option_value
argument option_value
not empty, then we subscribe to all messages that begin with option_value
. You can configure multiple filters for one ZMQ_SUB
socket.ZMQ_UNSUBSCRIBE
deletes the message from the ZMQ_SUB
socket. It only deletes one message, even if several filters are configured. #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/time.h> #include <time.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); // This is the socket that we send messages. void* socket = zmq_socket(context, ZMQ_PUSH); zmq_bind(socket, "tcp://*:4040"); // This is the socket that we send batch message. void* connector = zmq_socket(context, ZMQ_PUSH); zmq_connect(connector, "tcp://localhost:5050"); printf("Please press enter when workers are ready..."); getchar(); printf("Sending tasks to workers...\n"); // The first message. It's also the signal start of batch. int length = strlen("-1"); zmq_msg_t message; zmq_msg_init_size(&message, length); memcpy(zmq_msg_data(&message), "-1", length); zmq_msg_send(&message, connector, 0); zmq_msg_close(&message); // Generate some random numbers. srandom((unsigned) time(NULL)); // Send the tasks. int count; int msec = 0; for(count = 0; count < 100; count++) { int load = (int) ((double) (100) * random () / RAND_MAX); msec += load; char string[10]; sprintf(string, "%d", load); } printf("Total: %d msec\n", msec); sleep(1); zmq_close(connector); zmq_close(socket); zmq_ctx_destroy(context); return 0; }
#include <stdlib.h> #include <string.h> #include <unistd.h> #include "zmq.h" double square(double x) { return x * x; } double average(double x, double y) { return (x + y) / 2.0; } double good_enough(double guess, double x) { return abs(square(guess) - x) < 0.000001; } double improve(double guess, double x) { return average(guess, x / guess); } double sqrt_inner(double guess, double x) { if(good_enough(guess, x)) return guess; else return sqrt_inner(improve(guess, x), x); } double newton_sqrt(double x) { return sqrt_inner(1.0, x); } int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); // Let's initialize a socket to receive messages. void* receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:4040"); // Let's initialize a socket to send the messages. void* sender = zmq_socket(context, ZMQ_PUSH); zmq_connect(sender, "tcp://localhost:5050"); for(;;) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* msg = malloc(length + 1); memcpy(msg, zmq_msg_data(&reply), length); zmq_msg_close(&reply); fflush(stdout); double val = atof(msg); printf("%.1f: %.1f\n", val, newton_sqrt(val)); sleep(1); free(msg); zmq_msg_t message; char* ssend = "T"; int t_length = strlen(ssend); zmq_msg_init_size(&message, t_length); memcpy(zmq_msg_data(&message), ssend, t_length); zmq_msg_send(&message, receiver, 0); zmq_msg_close(&message); } zmq_close(receiver); zmq_close(sender); zmq_ctx_destroy(context); return 0; }
#include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* receiver = zmq_socket(context, ZMQ_PULL); zmq_bind(receiver, "tcp://*:5050"); // We receive the first message and discard it since it's the // signal start of batch which is -1. zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* msg = malloc(length + 1); memcpy(msg, zmq_msg_data(&reply), length); zmq_msg_close(&reply); free(msg); int count; for(count = 0; count < 100; count++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* value = malloc(length + 1); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); free(value); if(count / 10 == 0) printf("10 Tasks have been processed."); fflush(stdout); } zmq_close(receiver); zmq_ctx_destroy(context); return 0; }
// Let's initialize a socket to receive messages. void* receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:4040");
ZMQ_PULL
. The socket type ZMQ_PULL
used to receive messages from the upstream nodes in the pipeline. As we said earlier, this process is carried out through planning a fair queue.ZMQ_PUSH
. The socket type ZMQ_PUSH
used to send messages to the following nodes in the pipeline.ZMQ_PUSH
never discards messages. If the upstream node is ready to send a message to the downstream node, but the latter is not ready to receive the message and process it, then all messages sent with zmq_send
() are blocked until at least one node is available to receive the message.zmq_ctx_destroy
(). After calling zmq_ctx_destroy
(), all processes return an error code ( ETERM
), zmq_ctx_destroy
() blocks calls to open sockets and closes them, calling zmq_close
().zmq_ctx_destroy
(). However, if there are open sockets, then zmq_ctx_destroy
can wait to close them indefinitely. Thus, you must first close all sockets and then call zmq_ctx_destroy
() to destroy the context.zmq_ctx_destroy
() will wait indefinitely if there is an open connection or there are messages in the queue to send.zmq_msg_close
(), otherwise your application will have memory leaks. #include <stdio.h> #include <stdlib.h> int main(int argc, char const *argv[]) { char* a = malloc(4); int b; printf("b = %d\n", b); return 0; }
gcc –g –o test test.c
Now it's time to run Valgrind to check for memory leaks. Let's run the following command: valgrind --leak-check=full --show-reachable=yes test
==98190== Conditional jump or move depends on uninitialised value(s) ==98190== at 0x2D923: __vfprintf ==98190== by 0x4AC5A: vfprintf_l ==98190== by 0x952BE: printf ==98190== by 0x1F5E: main (test.c:8) ==98190== 4 bytes in 1 blocks are definitely lost in loss record 1 of 5 ==98190== at 0xF656: malloc (vg_replace_malloc.c:195) ==98190== by 0x1F46: main (test.c:6) ==98190== LEAK SUMMARY: ==98190== definitely lost: 4 bytes in 1 blocks ==98190== indirectly lost: 0 bytes in 0 blocks ==98190== possibly lost: 0 bytes in 0 blocks
Conditional jump or move depends on uninitialised value(s)
means that the initialization in our code was successfuldefinitely lost
means there is a memory leak and we need to fix it.indirectly lost
,possibly lost
,$PREFIX/lib/valgrind/default.supp
. , ZeroMQ, : { <socketcall_sendto> Memcheck:Param socketcall.sendto(msg) fun:send ... } { <socketcall_sendto> Memcheck:Param socketcall.send(msg) fun:send ... }
valgrind --leak-check=full --show-reachable=yes --suppressions=zeromq.supp server
Source: https://habr.com/ru/post/216957/
All Articles