📜 ⬆️ ⬇️

C parallel network computing

Good day, habrazhiteli! Recently, I had to write a program for parallel computation of a definite integral. Naturally, after finishing work, some experience was acquired, and I would like to share this experience with you. I’ll say at once that within the framework of this article I will not consider parsing an integrable function, but I’ll focus on the interaction between threads and computers.

Formulation of the problem


Of course, before you start writing code you need to set yourself a task correctly, the requirements for the program were as follows:


That is, it is necessary to write a program that would run on several computers (apparently on workstations) and waited for instructions to calculate the integral, then read it and send the result back to the one who asked to count.

Since we have to deal with the network, we choose a probabilistic method for calculating the integral, we will use the Monte Carlo method . It is easily parallelized, two ways are obvious:

I will choose the second method, without particularly citing anything, I simply will not have to transfer the beginning and end of their segments to the calculators.
So, we need a server (calculator) that will wait for the task, read it and return the result. And we also need a client with whom the user will interact.
How to find a server for computing? Knowing all IPs by heart or writing them down somewhere is not the best option, considering that the IP addresses of the servers are dynamic, plus they may consider another task at the moment or simply be offline. The solution is simple; we will use a broadcast request to find the currently available servers.
')
The second problem is which transport to use to exchange information? For the broadcast request, obviously we will use UDP. But for the interaction between the client and the server, you can use both UDP and TCP. But there will be fewer problems with TCP, since we will not have to check the status of the connection. If the socket closes on the other hand, the OS itself will detect this and let us know.

The final interaction is as follows: the server waits for a connection on a TCP socket and simultaneously responds to broadcast requests, indicating its presence to clients. As soon as the client has connected and has given the task - we suspend the answers to the broadcast requests, we calculate, we answer the client, we start the cycle again. The client is the same: sends a request, forms a list of servers, shares a task between them, gets the results and displays them to the user.

Server


Let's write the server first. First of all, we will agree on which port we will wait for requests from clients, let it be the port - number 38199. Then we will declare the structure for the client to send the job to the server.
#define RCVPORT 38199
#define FUNC(x) x*x // x^2
//
typedef struct {
int limits;
int numoftry;
} task_data_t;


* This source code was highlighted with Source Code Highlighter .


As can be seen from the code above, the client will send the server the upper limit of integration and the number of attempts that he must make.

It is clear that we will do the calculations in several streams, so we will create an argument and a structure for the calculator threads:
//
typedef struct {
int limits; //
long numoftry; //
long double *results; //
} thread_args_t;

//
void *calculate( void *arg) {
// thread_args_t
thread_args_t *tinfo = (thread_args_t*) arg;
long double result = 0;
int xlim = tinfo->limits;
int trys = tinfo->numoftry;
unsigned a = xlim;
for ( int i = 0; i < trys; ++i) {
int div = rand_r(&a);
int div2 = rand_r(&a);
double x = div % xlim + (div2/(div2*1.0 + div*1.0)) ;
result += FUNC(x);
}
*(tinfo->results) = result;
return NULL;
}


* This source code was highlighted with Source Code Highlighter .


I draw attention to the fact that the rand_r function is used (unsigned int * seedp) . Because this function uses a local variable to store the intermediate value between calls. We cannot allow the use of a global variable for all threads, as will be the case when using the rand () function, as this will cause their mutual blocking.

After running all computational threads, the main thread will hang on the pthread_join () function and will not be able to do anything if the client dies during the calculations to avoid empty calculations, run another thread that will check the client status.
//
typedef struct {
int sock; //
pthread_t *calcthreads; //
int threadnum; //
} checker_args_t;

// SIGUSR1
void thread_cancel( int signo) {
pthread_exit(PTHREAD_CANCELED);
}

//
void *client_check( void *arg) {
// checker_args_t
checker_args_t *args = (checker_args_t*) arg;
char a[10];
recv(args->sock, &a, 10, 0); // TCP,
// , recv -1
int st;
for ( int i = 0; i < args->threadnum; ++i)
st = pthread_kill(args->calcthreads[i], SIGUSR1); // SIGUSR1
return NULL;
}


* This source code was highlighted with Source Code Highlighter .


For compulsory completion of computing threads, I decided to use signals, since no memory is allocated as a result of their work, there is nothing to fear. Although using the pthread_cancel () function and the pthread_cleanup_push () and pthread_cleanup_pop () macros would be more appropriate. Of course, the thread_cancel () function is written which will be executed when the signal is received. And remember that at the beginning of the program, before starting threads, you need to set the correct mask for signal processing, otherwise we risk just leaving the program.

Now let's write a thread that will respond to broadcast requests. So that the main thread could easily hang, waiting for the client, and our additional thread would respond to requests at that moment.
void *listen_broadcast( void *arg) {
int *isbusy = arg;
// broadcast
int sockbrcast = socket(PF_INET, SOCK_DGRAM, 0);
if (sockbrcast == -1) {
perror( "Create broadcast socket failed" );
exit(EXIT_FAILURE);
}

// broadcast
int port_rcv = RCVPORT;
struct sockaddr_in addrbrcast_rcv;
bzero(&addrbrcast_rcv, sizeof (addrbrcast_rcv));
addrbrcast_rcv.sin_family = AF_INET;
addrbrcast_rcv.sin_addr.s_addr = htonl(INADDR_ANY);
addrbrcast_rcv.sin_port = htons(port_rcv);
//
if (bind(sockbrcast, ( struct sockaddr *) &addrbrcast_rcv,
sizeof (addrbrcast_rcv)) < 0) {
perror( "Bind broadcast socket failed" );
close(sockbrcast);
exit(EXIT_FAILURE);
}

int msgsize = sizeof ( char ) * 18;
char hellomesg[18];
bzero(hellomesg, msgsize);
// broadcast'
fcntl(sockbrcast, F_SETFL, O_NONBLOCK);

//
fd_set readset;
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);

//
struct timeval timeout;
timeout.tv_sec = 3;
timeout.tv_usec = 0;

struct sockaddr_in client;;
bzero(&client, sizeof (client));
socklen_t servaddrlen = sizeof ( struct sockaddr_in);
char helloanswer[18];
bzero(helloanswer, msgsize);
strcpy(helloanswer, "Hello Client" );
int sockst = 1;
while (sockst > 0) {
sockst = select (sockbrcast + 1, &readset, NULL, &readset, NULL);
if (sockst == -1) {
perror( "Broblems on broadcast socket" );
exit(EXIT_FAILURE);
}
int rdbyte = recvfrom(sockbrcast, ( void *) hellomesg, msgsize,MSG_TRUNC,
( struct sockaddr*) &client,
&servaddrlen);
if (rdbyte == msgsize && strcmp(hellomesg, "Hello Integral" ) == 0 &&
*isbusy == 0) {
if (sendto(sockbrcast, helloanswer, msgsize, 0,
( struct sockaddr*) &client, sizeof ( struct sockaddr_in)) < 0) {
perror( "Sending answer" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
}
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);
}
return NULL;
}


* This source code was highlighted with Source Code Highlighter .


Everything is simple, create a socket and wait for requests. As a request received - respond to it. And one complication, whether to respond to a request or not, is decided by the value of the isbusy variable.

Finally got to the main'a:
int main( int argc, char ** argv) {
// - -
if (argc > 2) {
fprintf(stderr, "Usage: %s [numofcpus]\n" , argv[0]);
exit(EXIT_FAILURE);
}

int numofthread;

if (argc == 2) {
numofthread = atoi(argv[1]);
if (numofthread < 1) {
fprintf(stderr, "Incorrect num of threads!\n" );
exit(EXIT_FAILURE);
}
fprintf(stdout, "Num of threads forced to %d\n" , numofthread);
} else {
// , -
numofthread = sysconf(_SC_NPROCESSORS_ONLN);
if (numofthread < 1) {
fprintf(stderr, "Can't detect num of processors\n"
"Continue in two threads\n" );
numofthread = 2;
}
fprintf(stdout, "Num of threads detected automatically it's %d\n\n" ,
numofthread);
}


* This source code was highlighted with Source Code Highlighter .


I think the verification of the arguments can not be explained ...

Set a mask for signals, and run the thread to listen to requests:
struct sigaction cancel_act;
memset(&cancel_act, 0, sizeof (cancel_act));
cancel_act.sa_handler = thread_cancel;
sigfillset(&cancel_act.sa_mask);
sigaction(SIGUSR1, &cancel_act, NULL);

// broadcast'
pthread_t broadcast_thread;
int isbusy = 1; //(int*) malloc(sizeof(int));
// broadcast
// 0 - , 1-
isbusy = 1;
if (pthread_create(&broadcast_thread, NULL, listen_broadcast, &isbusy)) {
fprintf(stderr, "Can't create broadcast listen thread" );
perror( "Detail:" );
exit(EXIT_FAILURE);
}


* This source code was highlighted with Source Code Highlighter .


Now we create a socket with which the clients will connect:
int listener;
struct sockaddr_in addr;
listener = socket(PF_INET, SOCK_STREAM, 0);
if (listener < 0) {
perror( "Can't create listen socket" );
exit(EXIT_FAILURE);
}

addr.sin_family = AF_INET;
addr.sin_port = htons(RCVPORT);
addr.sin_addr.s_addr = INADDR_ANY;
int a = 1;
// SO_REUSEADDR
if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &a, sizeof (a)) < 0) {
perror( "Set listener socket options" );
exit(EXIT_FAILURE);
}

//
if (bind(listener, ( struct sockaddr*) &addr, sizeof (addr)) < 0) {
perror( "Can't bind listen socket" );
exit(EXIT_FAILURE);
}

//
if (listen(listener, 1) < 0) {
perror( "Eror listen socket" );
exit(EXIT_FAILURE);
}


* This source code was highlighted with Source Code Highlighter .


We start the computational cycle:
//
int needexit = 0;
while (needexit == 0) {
fprintf(stdout, "\nWait new connection...\n\n" );
int client;
isbusy = 0; //
struct sockaddr_in addrclient;
socklen_t addrclientsize = sizeof (addrclient);
client = accept(listener, ( struct sockaddr*)&addrclient,
&addrclientsize);
if (client < 0) {
perror( "Client accepting" );
}


* This source code was highlighted with Source Code Highlighter .


We quietly hang on accept'e, since a separate thread deals with responses to broadcast requests.
After the client has connected to us, we check the data from him, and we start to calculate:
isbusy = 1; //
task_data_t data;
int read_bytes = recv(client, &data, sizeof (data), 0);
if (read_bytes != sizeof (data) || data.limits < 1 || data.numoftry < 1) {
fprintf(stderr, "Invalid data from %s on port %d, reset peer\n" ,
inet_ntoa(addrclient.sin_addr), ntohs(addrclient.sin_port));
close(client);
isbusy = 0;
} else {
fprintf(stdout, "New task from %s on port %d\nlimits: %d\n"
"numoftrys: %d\n" , inet_ntoa(addrclient.sin_addr),
ntohs(addrclient.sin_port), data.limits, data.numoftry);
thread_args_t *tinfo;
pthread_t *calc_threads =
(pthread_t*) malloc( sizeof (pthread_t) * numofthread);
int threads_trys = data.numoftry % numofthread;
long double *results =
( long double *) malloc( sizeof ( long double ) * numofthread);
tinfo = (thread_args_t*) malloc( sizeof (thread_args_t) *
numofthread);
//
int numofthreadtry = data.numoftry / numofthread + 1;
for ( int i = 0; i < numofthread; ++i) {
tinfo[i].limits = data.limits;
tinfo[i].numoftry = numofthreadtry;
tinfo[i].results = &results[i];
if (pthread_create(&calc_threads[i], NULL, calculate, &tinfo[i])
!= 0) {
fprintf(stderr, "Can't create thread by num %d" , i);
perror( "Detail:" );
exit(EXIT_FAILURE);
}
}

//
checker_args_t checker_arg;
checker_arg.calcthreads = calc_threads;
checker_arg.threadnum = numofthread;
checker_arg.sock = client;
pthread_t checker_thread;
if (pthread_create(&checker_thread, NULL, client_check,
&checker_arg) != 0) {
fprintf(stderr, "Can't create checker thread" );
perror( "Detail:" );
exit(EXIT_FAILURE);
}
int iscanceled = 0; // ?
int *exitstat;
for ( int i = 0; i < numofthread; ++i) {
pthread_join(calc_threads[i], ( void *) &exitstat);
if (exitstat == PTHREAD_CANCELED)
iscanceled = 1; //
}
if (iscanceled != 1) {
long double *res = ( long double *) malloc( sizeof ( long double ));
bzero(res, sizeof ( long double ));
*res = 0.0;
for ( int i = 0; i < numofthread; ++i)
*res += results[i];
pthread_kill(checker_thread, SIGUSR1);
if (send(client, res, sizeof ( long double ), 0) < 0) {
perror( "Sending error" );
}
close(client);
free(res);
//free(checker_arg);
free(results);
free(calc_threads);
free(tinfo);
isbusy = 0;
fprintf(stdout, "Calculate and send finish!\n" );
} else {
fprintf(stderr, "Client die!\n" );
close(client);
//free(checker_arg);
free(results);
free(calc_threads);
free(tinfo);
}


}

}

return (EXIT_SUCCESS);
}


* This source code was highlighted with Source Code Highlighter .


The rest of the code is simple:
  1. Run computational threads
  2. Run a thread checking state
  3. We wait until everything is calculated
  4. We respond to the client
  5. We start the computational cycle again

I draw your attention to the fact that it is not necessary to complete the checking thread, because it completes itself when the socket is closed with the client.

All server is ready.

Customer


The client will be arranged as follows:
  1. We form the list of servers
  2. To work with each server create thread
  3. Waiting for results and exit

To work with each server, it is convenient to create a separate thread, this will allow working with them asynchronously.

Just like in the server, we will declare the structure for data exchange. As well as the structure of the argument for threads working with servers.
//
typedef struct {
int limits;
int numoftry;
} task_data_t;

//
typedef struct {
int limits; //
int numoftry; //
struct sockaddr_in *server; //
long double *results; //
} thread_args_t;


* This source code was highlighted with Source Code Highlighter .


In the client, it does not need to set the port to listen on, because the server responds to where the packet came from, so let's take the one that the OS assigns to us. So, the function (thread) working with the server:
//
void *send_thread( void *arg) {
thread_args_t *task_data = (thread_args_t*) arg;
int servsock = socket(PF_INET, SOCK_STREAM, 0);
if (servsock < 0) {
perror( "Create new socket to server" );
exit(EXIT_FAILURE);
}
struct sockaddr_in listenaddr;
listenaddr.sin_family = AF_INET;
listenaddr.sin_addr.s_addr = INADDR_ANY;
listenaddr.sin_port = 0;

if (bind(servsock, ( struct sockaddr*) &listenaddr, sizeof (listenaddr)) < 0) {
perror( "Can't create listen socket" );
exit(EXIT_FAILURE);
}
socklen_t servaddrlen = sizeof ( struct sockaddr_in);
if (connect(servsock, ( struct sockaddr*)task_data->server,
servaddrlen) < 0) {
perror( "Connect to server failed!" );
exit(EXIT_FAILURE);
}
task_data_t senddata;
senddata.limits = task_data->limits;
senddata.numoftry = task_data->numoftry;

if (send(servsock, &senddata, sizeof (senddata), 0) < 0) {
perror( "Sending data to server failed" );
exit(EXIT_FAILURE);
}

int recv_byte = recv(servsock, task_data->results, sizeof ( long double ), 0);
if (recv_byte == 0) {
fprintf(stderr, "Server %s on port %d die!\nCancel calculate, on all" ,
inet_ntoa(task_data->server->sin_addr),
ntohs(task_data->server->sin_port));
exit(EXIT_FAILURE);
}
fprintf(stdout, "Server %s on port %d finish!\n" ,
inet_ntoa(task_data->server->sin_addr),
ntohs(task_data->server->sin_port));
return NULL;
}


* This source code was highlighted with Source Code Highlighter .


main is very similar to the main server, plus commented out in some detail, I will not discuss it too much.
int main( int argc, char ** argv) {
if (argc < 3) {
fprintf(stderr, "Usage: %s limits numoftry [maxserv]\n" , argv[0]);
exit(EXIT_FAILURE);
}

int numoftry = atoi(argv[2]);
if (numoftry == 0) {
fprintf(stderr, "Num of try is invalid\n" );
exit(EXIT_FAILURE);
}
int maxservu = 1000000;
if (argc == 4) {
maxservu = atoi(argv[3]);
if (maxservu < 1) {
fprintf(stderr, "Error number of max servers\n" );
exit(EXIT_FAILURE);
}
}
int limits = atoi(argv[1]);
if (limits == 0) {
fprintf(stderr, "Limits is invalid\n" );
exit(EXIT_FAILURE);
}

// broadcast
int sockbrcast = socket(PF_INET, SOCK_DGRAM, 0);
if (sockbrcast == -1) {
perror( "Create broadcast socket failed" );
exit(EXIT_FAILURE);
}


// broadcast
int port_rcv = 0;
struct sockaddr_in addrbrcast_rcv;
bzero(&addrbrcast_rcv, sizeof (addrbrcast_rcv));
addrbrcast_rcv.sin_family = AF_INET;
addrbrcast_rcv.sin_addr.s_addr = htonl(INADDR_ANY);
addrbrcast_rcv.sin_port = 0; //htons(port_rcv);
//
if (bind(sockbrcast, ( struct sockaddr *) &addrbrcast_rcv,
sizeof (addrbrcast_rcv)) < 0) {
perror( "Bind broadcast socket failed" );
close(sockbrcast);
exit(EXIT_FAILURE);
}

// broadcast
int port_snd = 38199;
struct sockaddr_in addrbrcast_snd;
bzero(&addrbrcast_snd, sizeof (addrbrcast_snd));
addrbrcast_snd.sin_family = AF_INET;
addrbrcast_snd.sin_port = htons(port_snd);
addrbrcast_snd.sin_addr.s_addr = htonl(0xffffffff);

// broadcast
int access = 1;
if (setsockopt(sockbrcast, SOL_SOCKET, SO_BROADCAST,
( const void *) &access, sizeof (access)) < 0) {
perror( "Can't accept broadcast option at socket to send" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
int msgsize = sizeof ( char ) * 18;
void *hellomesg = malloc(msgsize);
bzero(hellomesg, msgsize);
strcpy(hellomesg, "Hello Integral" );
// broadcast
if (sendto(sockbrcast, hellomesg, msgsize, 0,
( struct sockaddr*) &addrbrcast_snd, sizeof (addrbrcast_snd)) < 0) {
perror( "Sending broadcast" );
close(sockbrcast);
exit(EXIT_FAILURE);
}

// broadcast'
fcntl(sockbrcast, F_SETFL, O_NONBLOCK);

//
fd_set readset;
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);

//
struct timeval timeout;
timeout.tv_sec = 3;
timeout.tv_usec = 0;

struct sockaddr_in *servers =
( struct sockaddr_in*) malloc( sizeof ( struct sockaddr_in));
bzero(servers, sizeof ( struct sockaddr_in));
int servcount = 0;
int maxserv = 1;
socklen_t servaddrlen = sizeof ( struct sockaddr_in);
// servers
while ( select (sockbrcast + 1, &readset, NULL, &readset, &timeout) > 0) {
int rdbyte = recvfrom(sockbrcast, ( void *) hellomesg, msgsize,MSG_TRUNC,
( struct sockaddr*) &servers[servcount],
&servaddrlen);
if (rdbyte == msgsize && strcmp(hellomesg, "Hello Client" ) == 0) {
servcount++;

if (servcount >= maxserv) {
servers = realloc(servers,
sizeof ( struct sockaddr_in) * (maxserv + 1));
if (servers == NULL) {
perror( "Realloc failed" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
bzero(&servers[servcount], servaddrlen);
maxserv++;
}
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);
}
}
int i;
if (servcount < 1) {
fprintf(stderr, "No servers found!\n" );
exit(EXIT_FAILURE);
}
if (argc > 3 && maxservu <= servcount)
servcount = maxservu;
for (i = 0; i < servcount; ++i) {
printf( "Server answer from %s on port %d\n" ,
inet_ntoa(servers[i].sin_addr), ntohs(servers[i].sin_port));
}
printf( "\n" );
free(hellomesg);

long double *results =
( long double *) malloc( sizeof ( long double ) * servcount);
//
pthread_t *tid = (pthread_t*) malloc( sizeof (pthread_t) * servcount);
for (i = 0; i < servcount; ++i) {
thread_args_t *args = (thread_args_t*) malloc ( sizeof (thread_args_t));
args->limits = limits;
args->numoftry = numoftry / servcount + 1;
args->results = &results[i];
args->server = &servers[i];

if (pthread_create(&tid[i], NULL, send_thread, args) != 0) {
perror( "Create send thread failed" );
exit(EXIT_FAILURE);
}
}
long double res = 0;
//
for (i = 0; i < servcount; ++i)
pthread_join(tid[i], NULL);

//
for (i = 0; i < servcount; ++i)
res += results[i];
res /= numoftry;
res *= limits;


free(servers);
printf( "\nResult: %Lf\n" , res);
return (EXIT_SUCCESS);
}


* This source code was highlighted with Source Code Highlighter .


Performance testing


Performance depending on the number of threads

I ran the program on the same computer, with the same input data, with a different number of threads.
Client run with parameters 2 1000000000; Server with parameters 1, 2, 4, 8.
Results respectively: 0m37.063s, 0m20.576s, 0m20.329s, 0m21.029s. It should be borne in mind that the results that showed time need to take 4 seconds that we wait for the servers.
The processor on the machine Core (TM) 2 Duo CPU T5470 . As expected, the processor has two cores and, accordingly, it makes no sense to run more than 2 threads, and so: doubling the number of threads gives
double acceleration.

We also tested the Asus 1215p , the result has the same dependence on the number of threads as above

Performance depending on the number of machines

Now I started the server on 1, 2, 4, 8, 16 computers, the number of threads is equal to the number of cores, that is, two threads.
Here are the results: 0m10.268s, 0m5.122s, 0m2.487s, 0m1.265, 0m0.766s.
Again we get the expected result, with each new computer the calculations will be accelerated by 2 times.

Total


As a result, we get a program that simultaneously computes the integral and is resistant to all sorts of troubles that may occur during operation. Unfortunately, due to the large size of the article, it was not possible to consider everything in somewhat greater detail. But I hope that my example will help beginners to avoid some of the mistakes that occurred to me. Source codes of programs with make files can be found here .

Thanks for attention!

Source: https://habr.com/ru/post/122754/


All Articles