PGRES_POLLING_WRITING - PGRES_POLLING_READING - PGRES_POLLING_FAILED - PGRES_POLLING_OK -
// - : gcc pgtest4.c -I/usr/include/postgresql -lpq #include <libpq-fe.h> //< PostgreSQL #include <sys/socket.h> //< setsockopt() #include <sys/select.h> //< select() #include <sys/time.h> //< gettimeoftheday() #include <unistd.h> //< usleep() #define SOCK_POLL_TIMEOUT 100 // ( ?) typedef enum { DISCONNECTED = 0, CONN_POLLING, CONN_READING, CONN_WRITING, READY, QUERY_SENT, QUERY_FLUSHING, QUERY_BUSY, QUERY_READING, CLOSING, ERROR } pq_state; typedef enum { NO_ERROR = 0, ALLOCATION_FAIL, POLLING_FAIL, READING_FAIL, WRITING_FAIL, TIMEOUT_FAIL } pq_error; struct pqconn_s{ pq_state state; //< PGconn* conn; //< unsigned long start; //< ( ) long timeout; //< pq_error error; //< - , }; /** * @brief * @return */ unsigned long time_ms(void) { struct timespec tp; // gettimeoftheday() , clock_gettime(CLOCK_MONOTONIC, &tp); return (tp.tv_sec * 1000 + tp.tv_nsec / 1000000); } /** * @brief () / * @param socket_fd - * @param rw - 0 , 1 * @return select(): -1 = , 0 - (), 1 - */ int try_socket(int socket_fd, int rw) { fd_set fset; struct timeval sock_timeout; sock_timeout.tv_sec = 0; sock_timeout.tv_usec = SOCK_POLL_TIMEOUT; FD_ZERO(&fset); FD_SET(socket_fd, &fset); setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&sock_timeout, sizeof(struct timeval)); // SO_SNDTIMEO. . return select(socket_fd + 1, ((!rw) ? &fset : NULL), ((rw) ? &fset : NULL), NULL, &sock_timeout); } /** * @brief * @param conninfo - * @param s - pqconn_s * @param timeout - * @return 0 - ( s->error), 1 - */ int pgsql_connection_start(const char* conninfo, struct pqconn_s* s, long timeout) { if (!s) return 0; if (!conninfo) { s->error = ALLOCATION_FAIL; return 0; } s->conn = PQconnectStart(conninfo); s->state = CONN_POLLING; s->start = time_ms(); s->timeout = timeout; s->error = NO_ERROR; ConnStatusType status; status = PQstatus(s->conn); if (status == CONNECTION_BAD) { s->state = ERROR; s->error = POLLING_FAIL; return 0; } return 1; } /** * @brief * @param command - SQL- * @param s - pqconn_s * @param timeout - * @return 0 - , 1 - */ int pgsql_send_query(struct pqconn_s* s, const char *command, long timeout) { if (s->state != READY) { return 0; } if (!PQsendQuery(s->conn, command)) { return 0; } PQsetnonblocking(s->conn, 0); s->state = QUERY_FLUSHING; s->start = time_ms(); s->timeout = timeout; s->error = NO_ERROR; return 1; } /** * @brief , * @param s - pqconn_s */ void pgsql_event_loop(struct pqconn_s* s) { if ((s->state == DISCONNECTED) || (s->state == READY)) return; if ((time_ms() - s->start) > s->timeout) { s->state = CLOSING; s->error = TIMEOUT_FAIL; } if (s->state == CONN_POLLING) { PostgresPollingStatusType poll_result; poll_result = PQconnectPoll(s->conn); if (poll_result == PGRES_POLLING_WRITING) s->state = CONN_WRITING; if (poll_result == PGRES_POLLING_READING) s->state = CONN_READING; if (poll_result == PGRES_POLLING_FAILED) { s->state = ERROR; s->error = POLLING_FAIL; } if (poll_result == PGRES_POLLING_OK) s->state = READY; } if (s->state == CONN_READING) { int sock_state = try_socket(PQsocket(s->conn), 0); if (sock_state == -1) { s->error = READING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = CONN_POLLING; } if (s->state == CONN_WRITING) { int sock_state = try_socket(PQsocket(s->conn), 1); if (sock_state == -1) { s->error = WRITING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = CONN_POLLING; } if (s->state == CLOSING) { PQfinish(s->conn); s->state = ERROR; } if (s->state == QUERY_FLUSHING) { int flush_res = PQflush(s->conn); if (0 == flush_res) s->state = QUERY_READING; if (-1 == flush_res) { s->error = WRITING_FAIL; s->state = CLOSING; } } if (s->state == QUERY_READING) { int sock_state = try_socket(PQsocket(s->conn), 0); if (sock_state == -1) { s->error = READING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = QUERY_BUSY; } if (s->state == QUERY_BUSY) { if (!PQconsumeInput(s->conn)) { s->error = READING_FAIL; s->state = CLOSING; } if (PQisBusy(s->conn)) s->state = QUERY_READING; else s->state = READY; } }
int main(void) { struct pqconn_s s; pgsql_connection_start("dbname=db1 user=user1 password=password1 hostaddr=10.0.0.1 port=5432", &s, 15000); while ((s.state != ERROR) && (s.state != READY)) { pgsql_event_loop(&s); } if (s.state == ERROR) { perror("DB connection failed \n"); return 1; } pgsql_send_query(&s, "SELECT * FROM history;", 50000); while ((s.state != ERROR) && (s.state != READY)) { pgsql_event_loop(&s); } if (s.state == ERROR) { perror("DB query failed \n"); return 1; } PGresult *res; int rec_count; int row; int col; res = PQgetResult(s.conn); if (PQresultStatus(res) != PGRES_TUPLES_OK) { perror("We did not get any data!\n"); return 1; } rec_count = PQntuples(res); printf("Received %d records.\n", rec_count); for (row=0; row<rec_count; row++) { for (col=0; col<3; col++) { printf("%s\t", PQgetvalue(res, row, col)); } puts(""); } PQclear(res); }
Source: https://habr.com/ru/post/350140/
All Articles