#define RCVPORT 38199
#define FUNC(x) x*x // x^2
//
typedef struct {
int limits;
int numoftry;
} task_data_t;
* This source code was highlighted with Source Code Highlighter .
//
typedef struct {
int limits; //
long numoftry; //
long double *results; //
} thread_args_t;
//
void *calculate( void *arg) {
// thread_args_t
thread_args_t *tinfo = (thread_args_t*) arg;
long double result = 0;
int xlim = tinfo->limits;
int trys = tinfo->numoftry;
unsigned a = xlim;
for ( int i = 0; i < trys; ++i) {
int div = rand_r(&a);
int div2 = rand_r(&a);
double x = div % xlim + (div2/(div2*1.0 + div*1.0)) ;
result += FUNC(x);
}
*(tinfo->results) = result;
return NULL;
}
* This source code was highlighted with Source Code Highlighter .
//
typedef struct {
int sock; //
pthread_t *calcthreads; //
int threadnum; //
} checker_args_t;
// SIGUSR1
void thread_cancel( int signo) {
pthread_exit(PTHREAD_CANCELED);
}
//
void *client_check( void *arg) {
// checker_args_t
checker_args_t *args = (checker_args_t*) arg;
char a[10];
recv(args->sock, &a, 10, 0); // TCP,
// , recv -1
int st;
for ( int i = 0; i < args->threadnum; ++i)
st = pthread_kill(args->calcthreads[i], SIGUSR1); // SIGUSR1
return NULL;
}
* This source code was highlighted with Source Code Highlighter .
void *listen_broadcast( void *arg) {
int *isbusy = arg;
// broadcast
int sockbrcast = socket(PF_INET, SOCK_DGRAM, 0);
if (sockbrcast == -1) {
perror( "Create broadcast socket failed" );
exit(EXIT_FAILURE);
}
// broadcast
int port_rcv = RCVPORT;
struct sockaddr_in addrbrcast_rcv;
bzero(&addrbrcast_rcv, sizeof (addrbrcast_rcv));
addrbrcast_rcv.sin_family = AF_INET;
addrbrcast_rcv.sin_addr.s_addr = htonl(INADDR_ANY);
addrbrcast_rcv.sin_port = htons(port_rcv);
//
if (bind(sockbrcast, ( struct sockaddr *) &addrbrcast_rcv,
sizeof (addrbrcast_rcv)) < 0) {
perror( "Bind broadcast socket failed" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
int msgsize = sizeof ( char ) * 18;
char hellomesg[18];
bzero(hellomesg, msgsize);
// broadcast'
fcntl(sockbrcast, F_SETFL, O_NONBLOCK);
//
fd_set readset;
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);
//
struct timeval timeout;
timeout.tv_sec = 3;
timeout.tv_usec = 0;
struct sockaddr_in client;;
bzero(&client, sizeof (client));
socklen_t servaddrlen = sizeof ( struct sockaddr_in);
char helloanswer[18];
bzero(helloanswer, msgsize);
strcpy(helloanswer, "Hello Client" );
int sockst = 1;
while (sockst > 0) {
sockst = select (sockbrcast + 1, &readset, NULL, &readset, NULL);
if (sockst == -1) {
perror( "Broblems on broadcast socket" );
exit(EXIT_FAILURE);
}
int rdbyte = recvfrom(sockbrcast, ( void *) hellomesg, msgsize,MSG_TRUNC,
( struct sockaddr*) &client,
&servaddrlen);
if (rdbyte == msgsize && strcmp(hellomesg, "Hello Integral" ) == 0 &&
*isbusy == 0) {
if (sendto(sockbrcast, helloanswer, msgsize, 0,
( struct sockaddr*) &client, sizeof ( struct sockaddr_in)) < 0) {
perror( "Sending answer" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
}
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);
}
return NULL;
}
* This source code was highlighted with Source Code Highlighter .
int main( int argc, char ** argv) {
// - -
if (argc > 2) {
fprintf(stderr, "Usage: %s [numofcpus]\n" , argv[0]);
exit(EXIT_FAILURE);
}
int numofthread;
if (argc == 2) {
numofthread = atoi(argv[1]);
if (numofthread < 1) {
fprintf(stderr, "Incorrect num of threads!\n" );
exit(EXIT_FAILURE);
}
fprintf(stdout, "Num of threads forced to %d\n" , numofthread);
} else {
// , -
numofthread = sysconf(_SC_NPROCESSORS_ONLN);
if (numofthread < 1) {
fprintf(stderr, "Can't detect num of processors\n"
"Continue in two threads\n" );
numofthread = 2;
}
fprintf(stdout, "Num of threads detected automatically it's %d\n\n" ,
numofthread);
}
* This source code was highlighted with Source Code Highlighter .
struct sigaction cancel_act;
memset(&cancel_act, 0, sizeof (cancel_act));
cancel_act.sa_handler = thread_cancel;
sigfillset(&cancel_act.sa_mask);
sigaction(SIGUSR1, &cancel_act, NULL);
// broadcast'
pthread_t broadcast_thread;
int isbusy = 1; //(int*) malloc(sizeof(int));
// broadcast
// 0 - , 1-
isbusy = 1;
if (pthread_create(&broadcast_thread, NULL, listen_broadcast, &isbusy)) {
fprintf(stderr, "Can't create broadcast listen thread" );
perror( "Detail:" );
exit(EXIT_FAILURE);
}
* This source code was highlighted with Source Code Highlighter .
int listener;
struct sockaddr_in addr;
listener = socket(PF_INET, SOCK_STREAM, 0);
if (listener < 0) {
perror( "Can't create listen socket" );
exit(EXIT_FAILURE);
}
addr.sin_family = AF_INET;
addr.sin_port = htons(RCVPORT);
addr.sin_addr.s_addr = INADDR_ANY;
int a = 1;
// SO_REUSEADDR
if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &a, sizeof (a)) < 0) {
perror( "Set listener socket options" );
exit(EXIT_FAILURE);
}
//
if (bind(listener, ( struct sockaddr*) &addr, sizeof (addr)) < 0) {
perror( "Can't bind listen socket" );
exit(EXIT_FAILURE);
}
//
if (listen(listener, 1) < 0) {
perror( "Eror listen socket" );
exit(EXIT_FAILURE);
}
* This source code was highlighted with Source Code Highlighter .
//
int needexit = 0;
while (needexit == 0) {
fprintf(stdout, "\nWait new connection...\n\n" );
int client;
isbusy = 0; //
struct sockaddr_in addrclient;
socklen_t addrclientsize = sizeof (addrclient);
client = accept(listener, ( struct sockaddr*)&addrclient,
&addrclientsize);
if (client < 0) {
perror( "Client accepting" );
}
* This source code was highlighted with Source Code Highlighter .
isbusy = 1; //
task_data_t data;
int read_bytes = recv(client, &data, sizeof (data), 0);
if (read_bytes != sizeof (data) || data.limits < 1 || data.numoftry < 1) {
fprintf(stderr, "Invalid data from %s on port %d, reset peer\n" ,
inet_ntoa(addrclient.sin_addr), ntohs(addrclient.sin_port));
close(client);
isbusy = 0;
} else {
fprintf(stdout, "New task from %s on port %d\nlimits: %d\n"
"numoftrys: %d\n" , inet_ntoa(addrclient.sin_addr),
ntohs(addrclient.sin_port), data.limits, data.numoftry);
thread_args_t *tinfo;
pthread_t *calc_threads =
(pthread_t*) malloc( sizeof (pthread_t) * numofthread);
int threads_trys = data.numoftry % numofthread;
long double *results =
( long double *) malloc( sizeof ( long double ) * numofthread);
tinfo = (thread_args_t*) malloc( sizeof (thread_args_t) *
numofthread);
//
int numofthreadtry = data.numoftry / numofthread + 1;
for ( int i = 0; i < numofthread; ++i) {
tinfo[i].limits = data.limits;
tinfo[i].numoftry = numofthreadtry;
tinfo[i].results = &results[i];
if (pthread_create(&calc_threads[i], NULL, calculate, &tinfo[i])
!= 0) {
fprintf(stderr, "Can't create thread by num %d" , i);
perror( "Detail:" );
exit(EXIT_FAILURE);
}
}
//
checker_args_t checker_arg;
checker_arg.calcthreads = calc_threads;
checker_arg.threadnum = numofthread;
checker_arg.sock = client;
pthread_t checker_thread;
if (pthread_create(&checker_thread, NULL, client_check,
&checker_arg) != 0) {
fprintf(stderr, "Can't create checker thread" );
perror( "Detail:" );
exit(EXIT_FAILURE);
}
int iscanceled = 0; // ?
int *exitstat;
for ( int i = 0; i < numofthread; ++i) {
pthread_join(calc_threads[i], ( void *) &exitstat);
if (exitstat == PTHREAD_CANCELED)
iscanceled = 1; //
}
if (iscanceled != 1) {
long double *res = ( long double *) malloc( sizeof ( long double ));
bzero(res, sizeof ( long double ));
*res = 0.0;
for ( int i = 0; i < numofthread; ++i)
*res += results[i];
pthread_kill(checker_thread, SIGUSR1);
if (send(client, res, sizeof ( long double ), 0) < 0) {
perror( "Sending error" );
}
close(client);
free(res);
//free(checker_arg);
free(results);
free(calc_threads);
free(tinfo);
isbusy = 0;
fprintf(stdout, "Calculate and send finish!\n" );
} else {
fprintf(stderr, "Client die!\n" );
close(client);
//free(checker_arg);
free(results);
free(calc_threads);
free(tinfo);
}
}
}
return (EXIT_SUCCESS);
}
* This source code was highlighted with Source Code Highlighter .
//
typedef struct {
int limits;
int numoftry;
} task_data_t;
//
typedef struct {
int limits; //
int numoftry; //
struct sockaddr_in *server; //
long double *results; //
} thread_args_t;
* This source code was highlighted with Source Code Highlighter .
//
void *send_thread( void *arg) {
thread_args_t *task_data = (thread_args_t*) arg;
int servsock = socket(PF_INET, SOCK_STREAM, 0);
if (servsock < 0) {
perror( "Create new socket to server" );
exit(EXIT_FAILURE);
}
struct sockaddr_in listenaddr;
listenaddr.sin_family = AF_INET;
listenaddr.sin_addr.s_addr = INADDR_ANY;
listenaddr.sin_port = 0;
if (bind(servsock, ( struct sockaddr*) &listenaddr, sizeof (listenaddr)) < 0) {
perror( "Can't create listen socket" );
exit(EXIT_FAILURE);
}
socklen_t servaddrlen = sizeof ( struct sockaddr_in);
if (connect(servsock, ( struct sockaddr*)task_data->server,
servaddrlen) < 0) {
perror( "Connect to server failed!" );
exit(EXIT_FAILURE);
}
task_data_t senddata;
senddata.limits = task_data->limits;
senddata.numoftry = task_data->numoftry;
if (send(servsock, &senddata, sizeof (senddata), 0) < 0) {
perror( "Sending data to server failed" );
exit(EXIT_FAILURE);
}
int recv_byte = recv(servsock, task_data->results, sizeof ( long double ), 0);
if (recv_byte == 0) {
fprintf(stderr, "Server %s on port %d die!\nCancel calculate, on all" ,
inet_ntoa(task_data->server->sin_addr),
ntohs(task_data->server->sin_port));
exit(EXIT_FAILURE);
}
fprintf(stdout, "Server %s on port %d finish!\n" ,
inet_ntoa(task_data->server->sin_addr),
ntohs(task_data->server->sin_port));
return NULL;
}
* This source code was highlighted with Source Code Highlighter .
int main( int argc, char ** argv) {
if (argc < 3) {
fprintf(stderr, "Usage: %s limits numoftry [maxserv]\n" , argv[0]);
exit(EXIT_FAILURE);
}
int numoftry = atoi(argv[2]);
if (numoftry == 0) {
fprintf(stderr, "Num of try is invalid\n" );
exit(EXIT_FAILURE);
}
int maxservu = 1000000;
if (argc == 4) {
maxservu = atoi(argv[3]);
if (maxservu < 1) {
fprintf(stderr, "Error number of max servers\n" );
exit(EXIT_FAILURE);
}
}
int limits = atoi(argv[1]);
if (limits == 0) {
fprintf(stderr, "Limits is invalid\n" );
exit(EXIT_FAILURE);
}
// broadcast
int sockbrcast = socket(PF_INET, SOCK_DGRAM, 0);
if (sockbrcast == -1) {
perror( "Create broadcast socket failed" );
exit(EXIT_FAILURE);
}
// broadcast
int port_rcv = 0;
struct sockaddr_in addrbrcast_rcv;
bzero(&addrbrcast_rcv, sizeof (addrbrcast_rcv));
addrbrcast_rcv.sin_family = AF_INET;
addrbrcast_rcv.sin_addr.s_addr = htonl(INADDR_ANY);
addrbrcast_rcv.sin_port = 0; //htons(port_rcv);
//
if (bind(sockbrcast, ( struct sockaddr *) &addrbrcast_rcv,
sizeof (addrbrcast_rcv)) < 0) {
perror( "Bind broadcast socket failed" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
// broadcast
int port_snd = 38199;
struct sockaddr_in addrbrcast_snd;
bzero(&addrbrcast_snd, sizeof (addrbrcast_snd));
addrbrcast_snd.sin_family = AF_INET;
addrbrcast_snd.sin_port = htons(port_snd);
addrbrcast_snd.sin_addr.s_addr = htonl(0xffffffff);
// broadcast
int access = 1;
if (setsockopt(sockbrcast, SOL_SOCKET, SO_BROADCAST,
( const void *) &access, sizeof (access)) < 0) {
perror( "Can't accept broadcast option at socket to send" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
int msgsize = sizeof ( char ) * 18;
void *hellomesg = malloc(msgsize);
bzero(hellomesg, msgsize);
strcpy(hellomesg, "Hello Integral" );
// broadcast
if (sendto(sockbrcast, hellomesg, msgsize, 0,
( struct sockaddr*) &addrbrcast_snd, sizeof (addrbrcast_snd)) < 0) {
perror( "Sending broadcast" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
// broadcast'
fcntl(sockbrcast, F_SETFL, O_NONBLOCK);
//
fd_set readset;
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);
//
struct timeval timeout;
timeout.tv_sec = 3;
timeout.tv_usec = 0;
struct sockaddr_in *servers =
( struct sockaddr_in*) malloc( sizeof ( struct sockaddr_in));
bzero(servers, sizeof ( struct sockaddr_in));
int servcount = 0;
int maxserv = 1;
socklen_t servaddrlen = sizeof ( struct sockaddr_in);
// servers
while ( select (sockbrcast + 1, &readset, NULL, &readset, &timeout) > 0) {
int rdbyte = recvfrom(sockbrcast, ( void *) hellomesg, msgsize,MSG_TRUNC,
( struct sockaddr*) &servers[servcount],
&servaddrlen);
if (rdbyte == msgsize && strcmp(hellomesg, "Hello Client" ) == 0) {
servcount++;
if (servcount >= maxserv) {
servers = realloc(servers,
sizeof ( struct sockaddr_in) * (maxserv + 1));
if (servers == NULL) {
perror( "Realloc failed" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
bzero(&servers[servcount], servaddrlen);
maxserv++;
}
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);
}
}
int i;
if (servcount < 1) {
fprintf(stderr, "No servers found!\n" );
exit(EXIT_FAILURE);
}
if (argc > 3 && maxservu <= servcount)
servcount = maxservu;
for (i = 0; i < servcount; ++i) {
printf( "Server answer from %s on port %d\n" ,
inet_ntoa(servers[i].sin_addr), ntohs(servers[i].sin_port));
}
printf( "\n" );
free(hellomesg);
long double *results =
( long double *) malloc( sizeof ( long double ) * servcount);
//
pthread_t *tid = (pthread_t*) malloc( sizeof (pthread_t) * servcount);
for (i = 0; i < servcount; ++i) {
thread_args_t *args = (thread_args_t*) malloc ( sizeof (thread_args_t));
args->limits = limits;
args->numoftry = numoftry / servcount + 1;
args->results = &results[i];
args->server = &servers[i];
if (pthread_create(&tid[i], NULL, send_thread, args) != 0) {
perror( "Create send thread failed" );
exit(EXIT_FAILURE);
}
}
long double res = 0;
//
for (i = 0; i < servcount; ++i)
pthread_join(tid[i], NULL);
//
for (i = 0; i < servcount; ++i)
res += results[i];
res /= numoftry;
res *= limits;
free(servers);
printf( "\nResult: %Lf\n" , res);
return (EXIT_SUCCESS);
}
* This source code was highlighted with Source Code Highlighter .
Source: https://habr.com/ru/post/122754/
All Articles