📜 ⬆️ ⬇️

Asynchronous work with PostgreSQL in C

Today I wanted to write a small note about asynchronous work with PostgreSQL in C. The motives are simple: for a small utility, there was a need to implement such functionality, Google on the topic of clear and working examples was treacherously silent (there was only an example in pqxx for C ++ - there is an asynchronous connection method and pipeline is a class for queries), and the official documentation on this issue, although very detailed, but not very structured, and the algorithm for working with the libpq library in asynchronous mode has many pitfalls. Therefore, having understood the question I would like to share the results with the public, in case if it will be useful to someone.

So, we will assume that no one needs to tell what PostgreSQL is, and how the synchronous (blocking) mode of operation differs from asynchronous readers too. By the way, besides the first and obvious advantage of asynchronous calls (they do not block input-output and thread execution, which frees from the need to create additional threads, synchronize them, etc.), in the case of Postgre there is another plus: the usual PQexec method allows you to get the result of executing only one SQL query at a time, and the asynchronous libpq functions do not have this restriction.

As I said before, libpq has quite a few pitfalls in asynchronous mode. There are libraries where the asynchronous mode is implemented beautifully and completely (the developer calls absolutely any asynchronous method, assigning a callback to it, and after that it’s enough just to “rotate” the library's event loop (endlessly or by timer to call the method), and then the library will take care of processing commands in the right sequence, catching events and calling callbacks), then PostgreSQL has a different work model.

There are a large number of commands for asynchronous connections and requests that must be called in a strictly defined sequence depending on the current state and the result of the previous operation, plus you must manually check the readiness of the sockets. It is enough to make a mistake in some place and call the function at the wrong time or not to call it, or try to access the busy socket, and this can lead to blocking the flow (in the worst case, infinite, that is, freeze). And the library in the asynchronous mode has almost no control over the timeout of operations - everything will need to be taken care of by ourselves.
')
In the official documentation, most of the information on working in asynchronous mode is given in the following two sections: one and two .

Well, we get right to the point.

To establish a connection with the database in asynchronous mode, the procedure should be approximately as follows:

1. Allocate memory for the connection structure and start the connection using the PQconnectStart () method

2. Remember the current time so that you can further control the timeout of the operation.

3. Check the connection success by calling PQstatus (). If the result is CONNECTION_BAD, then the initialization was not successful (for example, an error in the connection string or the memory could not be allocated), otherwise you can continue

4. Check the current connection status using the PQconnectPoll () method.

Possible results:

PGRES_POLLING_WRITING -       PGRES_POLLING_READING -       PGRES_POLLING_FAILED -         PGRES_POLLING_OK -    

5. If the status is PGRES_POLLING_WRITING or PGRES_POLLING_READING, you need to get the connection socket used by the PQsocket () method and select () or poll () system functions to check its availability for writing or reading data until it is free, then repeat step 4 until you reach OK or FAILED, or until the timeout expires (do not forget, the timeout must be checked manually).

If the next PQconnectPoll () call will _do_ release the socket, the thread is blocked, and this must be borne in mind.

After all this, if everything went well, we get an established connection to the database. The procedure for executing SQL queries will look something like this:

1. Prepare a request for sending to the server using the PQsendQuery () command.

2. Set a non-blocking mode for sending a request using the PQsetnonblocking () method, because by default libPq only reads asynchronously, not writes to the socket.

3. Run PQflush () until it gives 0 (the request was sent successfully) or -1 (error).

4. Get the active socket and check it for readability through select () or poll () until it is ready for operation.

5. Run PQconsumeInput (). If the function returned 0, then an error occurred.

6. Run PQisBusy (). If the function returns 1, it means that the processing of the request or the reading of the server response has not yet been completed and the algorithm must be repeated from step 4 again.
Well, do not forget to control timeouts, of course.

After performing all the above operations, you can work with the results of the query as usual - PQgetResult (), PQgetvalue (), etc.

And now for the practice. The code is in C, but if you want to wrap it in a class for use in a C ++ program, then as you wish, everything is very simple.

 //   - : gcc pgtest4.c -I/usr/include/postgresql -lpq #include <libpq-fe.h> //<     PostgreSQL #include <sys/socket.h> //< setsockopt()    #include <sys/select.h> //< select() #include <sys/time.h> //< gettimeoftheday() #include <unistd.h> //< usleep()    #define SOCK_POLL_TIMEOUT 100 //     (      ?)   typedef enum { DISCONNECTED = 0, CONN_POLLING, CONN_READING, CONN_WRITING, READY, QUERY_SENT, QUERY_FLUSHING, QUERY_BUSY, QUERY_READING, CLOSING, ERROR } pq_state; typedef enum { NO_ERROR = 0, ALLOCATION_FAIL, POLLING_FAIL, READING_FAIL, WRITING_FAIL, TIMEOUT_FAIL } pq_error; struct pqconn_s{ pq_state state; //<   PGconn* conn; //<        unsigned long start; //<     ( ) long timeout; //<    pq_error error; //<   -  ,     }; /** * @brief    * @return    */ unsigned long time_ms(void) { struct timespec tp; // gettimeoftheday()   ,    clock_gettime(CLOCK_MONOTONIC, &tp); return (tp.tv_sec * 1000 + tp.tv_nsec / 1000000); } /** * @brief   ()   / * @param socket_fd -    * @param rw - 0    , 1    * @return   select(): -1 = , 0 -  (), 1 -  */ int try_socket(int socket_fd, int rw) { fd_set fset; struct timeval sock_timeout; sock_timeout.tv_sec = 0; sock_timeout.tv_usec = SOCK_POLL_TIMEOUT; FD_ZERO(&fset); FD_SET(socket_fd, &fset); setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&sock_timeout, sizeof(struct timeval)); //       SO_SNDTIMEO. . return select(socket_fd + 1, ((!rw) ? &fset : NULL), ((rw) ? &fset : NULL), NULL, &sock_timeout); } /** * @brief       * @param conninfo -     * @param s -    pqconn_s        * @param timeout -     * @return 0 -  (     s->error), 1 -  */ int pgsql_connection_start(const char* conninfo, struct pqconn_s* s, long timeout) { if (!s) return 0; if (!conninfo) { s->error = ALLOCATION_FAIL; return 0; } s->conn = PQconnectStart(conninfo); s->state = CONN_POLLING; s->start = time_ms(); s->timeout = timeout; s->error = NO_ERROR; ConnStatusType status; status = PQstatus(s->conn); if (status == CONNECTION_BAD) { s->state = ERROR; s->error = POLLING_FAIL; return 0; } return 1; } /** * @brief          * @param command - SQL- * @param s -    pqconn_s        * @param timeout -     * @return 0 - , 1 -  */ int pgsql_send_query(struct pqconn_s* s, const char *command, long timeout) { if (s->state != READY) { return 0; } if (!PQsendQuery(s->conn, command)) { return 0; } PQsetnonblocking(s->conn, 0); s->state = QUERY_FLUSHING; s->start = time_ms(); s->timeout = timeout; s->error = NO_ERROR; return 1; } /** * @brief  ,     * @param s -    pqconn_s        */ void pgsql_event_loop(struct pqconn_s* s) { if ((s->state == DISCONNECTED) || (s->state == READY)) return; if ((time_ms() - s->start) > s->timeout) { s->state = CLOSING; s->error = TIMEOUT_FAIL; } if (s->state == CONN_POLLING) { PostgresPollingStatusType poll_result; poll_result = PQconnectPoll(s->conn); if (poll_result == PGRES_POLLING_WRITING) s->state = CONN_WRITING; if (poll_result == PGRES_POLLING_READING) s->state = CONN_READING; if (poll_result == PGRES_POLLING_FAILED) { s->state = ERROR; s->error = POLLING_FAIL; } if (poll_result == PGRES_POLLING_OK) s->state = READY; } if (s->state == CONN_READING) { int sock_state = try_socket(PQsocket(s->conn), 0); if (sock_state == -1) { s->error = READING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = CONN_POLLING; } if (s->state == CONN_WRITING) { int sock_state = try_socket(PQsocket(s->conn), 1); if (sock_state == -1) { s->error = WRITING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = CONN_POLLING; } if (s->state == CLOSING) { PQfinish(s->conn); s->state = ERROR; } if (s->state == QUERY_FLUSHING) { int flush_res = PQflush(s->conn); if (0 == flush_res) s->state = QUERY_READING; if (-1 == flush_res) { s->error = WRITING_FAIL; s->state = CLOSING; } } if (s->state == QUERY_READING) { int sock_state = try_socket(PQsocket(s->conn), 0); if (sock_state == -1) { s->error = READING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = QUERY_BUSY; } if (s->state == QUERY_BUSY) { if (!PQconsumeInput(s->conn)) { s->error = READING_FAIL; s->state = CLOSING; } if (PQisBusy(s->conn)) s->state = QUERY_READING; else s->state = READY; } } 

At the beginning, we describe all the states we need and possible errors, and declare the structure in which the connection data and the action to be performed are stored - a pointer to the PGconn structures, the required library for working with the server, the state of the machine, the error code (if any) and the start time of the current operation (to control the timeout).

The two small functions time_ms () and try_socket () are wrappers over the functions of the standard library for getting the current time in milliseconds and checking the socket for busyness, respectively.

You can use all of this as something like this:

 int main(void) { struct pqconn_s s; pgsql_connection_start("dbname=db1 user=user1 password=password1 hostaddr=10.0.0.1 port=5432", &s, 15000); while ((s.state != ERROR) && (s.state != READY)) { pgsql_event_loop(&s); } if (s.state == ERROR) { perror("DB connection failed \n"); return 1; } pgsql_send_query(&s, "SELECT * FROM history;", 50000); while ((s.state != ERROR) && (s.state != READY)) { pgsql_event_loop(&s); } if (s.state == ERROR) { perror("DB query failed \n"); return 1; } PGresult *res; int rec_count; int row; int col; res = PQgetResult(s.conn); if (PQresultStatus(res) != PGRES_TUPLES_OK) { perror("We did not get any data!\n"); return 1; } rec_count = PQntuples(res); printf("Received %d records.\n", rec_count); for (row=0; row<rec_count; row++) { for (col=0; col<3; col++) { printf("%s\t", PQgetvalue(res, row, col)); } puts(""); } PQclear(res); } 

It is clear that the above example works after all in the blocking mode (because waiting for the installation of the state field of the structure of the structure to ERROR or READY occurs forcibly), but as you can guess, it remains for the small: you need to add instead to pgsql_event_loop () call callbacks in case of successful connection, data acquisition or an error, and twist the event loop along with the other actions in the main program loop or call it on a timer, and then the work with the database will go really asynchronously.

I sincerely hope that the above will be useful to someone.

Source: https://habr.com/ru/post/350140/


All Articles