📜 ⬆️ ⬇️

Tricks with logging in single-threaded non-blocking servers.

I want to talk about the next result of my research in the field of optimizing the performance of Web-servers.
This time we will discuss the optimization of complex logging in a single-threaded non-blocking web server.

Introduction


Everyone has heard more than once how lighttpd (hereinafter “lashi”) is productive and undemanding of resources. In the last prerelease version 1.5, many innovations have appeared: in particular, the performance has been increased for the return of heavy files on very busy servers (more than 2,000 streams with a total speed of 50-200 MB / s). I, as the manager of a very large web-storage, could not get past such opportunities. A test machine with a fast raid and two GigabitEthernet adapters tied into a bond interface with load sharing was delivered to the Debian Linux distribution. After a couple of hours of trial and error, the link has been compiled and tuned for optimal performance; bang + php_fcgi. The files were really delivered very quickly and 2000 streams for laity did not turn out to be difficulties. But then I came across a serious problem.
The fact is that one of the requirements for the web server to work on the file storage is that it can send log lines through the pipe to a specialized program that would process them in a special way. In this program, the data about the connection is processed (what the user has downloaded, how many bytes are really pumped out for this connection, etc.) and are sent to the database for storage. Light is able to redirect data through a pipe to a logging program, but the problem lies in its single-threaded non-blocking architecture. Because Laiti works in one stream (process) and only sends data to sockets in non-blocking mode — then its main thread waits for the write block to be removed until the auxiliary program processes the previous log entry. And this program can hang up - since the connection to the database is not a constant thing: the request will hang up due to a table or line lock, the connection itself will be lost for unknown reasons and you will have to reconnect. And it turns out that with each such delay, the Laiti will stop sending data to the sockets and will look as if the server is working in jerks — it is running at full speed, then it suddenly hangs.

Idea


This problem can be solved by buffering the log entries and parallelizing the processing task of this log. I implemented it this way: I divided the whole logging subsystem into two parts - an aggregator and a handler. The aggregator will be responsible for instantly reading logs from the pipe (pipe) (so that the server does not idle), writing this data to the queue (FIFO), spawning child handlers (described later), and actually distributing log lines from the queue to these handlers. The handler is simply the final processor of log lines, which reads them from the stdin channel and writes to the database, then signals the aggregator that it is free and ready for a new piece of data.

Implementation


I will begin with the implementation of the simplest part - the handler. All it has to do is open stdin in blocking mode and loop through lines from it, sending the character '1' at the end of each iteration. It should finish its work on detecting eof in the stdin channel (the channel is closed and there is no more data). Here is the code for this program:
')
processor.php:
Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  1. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  2. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  3. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  4. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  5. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  6. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  7. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  8. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  9. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  10. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  11. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  12. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  13. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  14. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>
  15. Copy Source | Copy HTML #!/usr/bin/php <?php $in = fopen( 'php://stdin' , 'r' ); // $db = NULL; while ( $in_str = fgets( $in )) { // if (@mysql_ping( $db ) !== true ) { // - @mysql_close( $db ); $db = mysql_connect(); } mysql_query( ' $in_str ' ); // echo '1' ; // , } mysql_close( $db ); fclose( $in ); ?>

The aggregator is implemented somewhat more complicated. First I will describe a few of the standard PHP features that were used.
When reading from threads, non-blocking calls were used. To translate the stream into non-blocking mode, it is enough to call the function stream_set_blocking with the stream itself as the first element and with zero as the second element. After that, any reading or writing to the stream will be completed immediately, without waiting for the actual reading or writing. The result of these operations will depend on the actual number of bytes that could be written or read.
The presence of data in streams was monitored using the stream_select function, a full description of which can be found in the PHP manual. Briefly, its essence is as follows: three arrays are passed to it, each of which contains stream descriptors, the first for reading, the second for writing, the third for special cases. The call to this function ends when something interesting happens in one of the transferred threads (in general, the blocking of the operation of the corresponding function parameter disappears, that is, you can read data without blocking in one of the read handles, etc.). Also, this function can be completed by timeout, which is transmitted in the form of the fourth and fifth parameters, where the fourth is seconds and the fifth is microseconds.
Now about the algorithm of work:
  1. First of all, the aggregator creates child processes of handlers and organizes channels for communication with them.
  2. Then the handler in non-blocking mode reads data from the standard input and writes it to the end of the queue.
  3. Checking the presence of a unit on the reading channels from the handlers (again, in non-blocking mode), the necessary flags are coded in the readiness array of the handlers.
  4. If there is raw data in the queue and there is a free handler, send the data to it and drop its readiness flag in the corresponding array.
  5. We write all the descriptors of interest to the array and wait on them using stream_select with a timeout of one second.
  6. We check the queue for records and standard input at the end of the data. If the queue is not empty or there is new data - go to step 2.
Here is the source code of the aggregator:

aggregator.php:
Copy Source | Copy HTML
  1. #! / usr / bin / php
  2. <? php
  3. // number of handlers
  4. define ( 'PROCESSORS_TO_SPAWN' , 5 );
  5. // full path to the handler
  6. define ( 'PROCESSOR_PATH' , '/path/to/processor.php' );
  7. $ in = fopen ( "php: // stdin" , 'r' );
  8. // transfer the standard input to non-blocking mode
  9. stream_set_blocking ( $ in , 0);
  10. // list of handler readiness flags
  11. $ processors_states = array_fill (0, PROCESSORS_TO_SPAWN, true );
  12. $ processors = array ();
  13. $ proc_signal_pipes = array ();
  14. $ proc_data_pipes = array ();
  15. $ descriptorspec = array (
  16. 0 => array ( "pipe" , "r" ),
  17. 1 => array ( "pipe" , "w" ),
  18. 2 => array ( "file" , "/ dev / null" , "a" )
  19. );
  20. $ buffer = array ();
  21. // run the handlers and translate the reading channel into non-blocking mode
  22. while (count ( $ processors ) <PROCESSORS_TO_SPAWN) {
  23. $ processors [] = proc_open (PROCESSOR_PATH, $ descriptorspec , $ pipes , NULL, NULL);
  24. stream_set_blocking ( $ pipes [ 1 ], 0 );
  25. $ proc_data_pipes [] = $ pipes [0];
  26. $ proc_signal_pipes [] = $ pipes [ 1 ];
  27. }
  28. while ( true ) {
  29. $ in_str = fgets ( $ in );
  30. if ( $ in_str ! == false ) {
  31. // here you can check the validity of the log line
  32. if ( true ) {
  33. $ buffer [] = $ in_str ;
  34. }
  35. }
  36. foreach ( $ processors_states as $ proc_num => $ processor_state ) {
  37. // in non-blocking mode, check the handler readiness
  38. if (fgets ( $ proc_signal_pipes [ $ proc_num ]) == '1' ) {
  39. $ processors_states [ $ proc_num ] = true ;
  40. }
  41. }
  42. // while there are free handlers and the queue is not empty - we feed them data
  43. while (count ( $ buffer )> 0 and
  44. ( $ selected_proc = array_search ( true , $ processors_states ))! == false ) {
  45. $ item = array_shift ( $ buffer );
  46. fwrite ( $ proc_data_pipes [ $ selected_proc ], $ item );
  47. $ processors_states [ $ selected_proc ] = false ;
  48. }
  49. // if the standard input is closed and the queue is empty - finish the work
  50. if (feof ( $ in ) and count ( $ buffer ) == 0 ) {
  51. break ;
  52. }
  53. $ check_list = $ proc_signal_pipes ;
  54. $ check_list [] = $ in ;
  55. // wait for data to be read on the standard input or from one of the handlers
  56. stream_select ( $ check_list , $ w = NULL, $ e = NULL, 1 );
  57. }
  58. // close handlers and tidy up
  59. foreach ( $ processors as $ proc_num => $ proc ) {
  60. fclose ( $ proc_data_pipes [ $ proc_num ]);
  61. fclose ( $ proc_signal_pipes [ $ proc_num ]);
  62. proc_close ( $ proc );
  63. }
  64. fclose ( $ in );
  65. ?>


Afterword


As a result, we received an excellent tool for organizing arbitrarily complex logging for single-threaded servers, which gives us the opportunity to use all the advantages of the server without lingering on saving log records.

Source: https://habr.com/ru/post/54140/


All Articles