📜 ⬆️ ⬇️

Atomic processing of data blocks without blocking

Using non-blocking algorithms has always been frightening for the developer. It is very difficult to imagine the organization of data access without blocking, so that two or more streams could not simultaneously process the same data block. Most developers use standard containers such as stacks or linked lists without blocking, but no more. In this article, I would like to tell you how to organize data access in a multi-threaded environment without blocking.

The main idea of ​​this method is that each stream uses a separate buffer, into which it copies data from the main buffer, processes it, and then swaps the pointer to its buffer with the pointer to the main buffer. Consider the following code:

#include <stdio.h> struct data_s { int a; int b; }; struct data_s reader_tmp, writer_tmp, data; struct data_s *reader_tmp_ptr, *writer_tmp_ptr, *data_ptr; int done = 0; void process(struct data_s *data) { data->a++; data->b++; } void* writer(void* p) { struct data_s *tmp_ptr; int i; for(i = 0; i < 1000000; i++) { do { tmp_ptr = data_ptr; writer_tmp_ptr->a = tmp_ptr->a; writer_tmp_ptr->b = tmp_ptr->b; process(writer_tmp_ptr); } while(!__sync_compare_and_swap(&data_ptr, tmp_ptr, writer_tmp_ptr)); writer_tmp_ptr = tmp_ptr; } } void* reader(void *p) { struct data_s *tmp_ptr; int a, b; while(!done) { do { tmp_ptr = data_ptr; reader_ptr->a = tmp_ptr->a; reader_ptr->b = tmp_ptr->b; a = tmp_ptr->a; b = tmp_ptr->b; } while(!__sync_bool_compare_and_swap(&data_ptr, tmp_ptr, reader_tmp_ptr)); reader_tmp_ptr = tmp_ptr; printf(“data = {%d, %d}\n”, a, b); } } int main() { pthread_t reader_thread, writer_thread; data.a = 0; data.b = 0; data_ptr = &data; writer_tmp_ptr = &writer_tmp; reader_tmp_ptr = &reader_tmp; pthread_create(&read_thread, NULL, reader, NULL); pthread_create(&write_thread, NULL, writer, NULL); pthread_join(write_thread, NULL); done = 1; pthread_join(read_thread, NULL); return 0; } 

In the above code, the data before processing is copied from the buffer pointed by data_ptr to the buffer pointed to by writer_tmp_ptr. And then these pointers are swapped. Moreover, in data_ptr, a writer_tmp_ptr is written using the atomic operation compare_and_swap, which compares the first argument with the second, and if they match, writes the third argument to the first and returns true. Otherwise, it returns false. What is it for? Consider the example of the reader function. Let a thread performing this function pause after the line a = tmp_ptr-> a; At this moment, tmp_ptr points to data. Immediately began to work the stream that performs the function writer. Having performed the first iteration, he reversed the writer_tmp_ptr and data_ptr and started the next iteration, stopping after the line data-> b ++; In this situation, writer_tmp_ptr points to data and tmp_ptr in the reader function points to data. It turns out simultaneous reading and modification of the same buffer. But since the data_ptr and tmp_ptr pointers no longer match, the compare_and_swap operation will detect the collision and perform the read operation again. Why does the assignment of reader_tmp_ptr = tmp_ptr fail this check?
')
It's simple. The variable reader_tmp_ptr is a specific variable for the stream in which it runs. In this example, I made it global, which is not entirely correct, because in the case of several reading streams, one would have to set another global variable for the second stream, and determine which thread is currently running in the function to use one or another variable as a unique pointer to the buffer. The best option is to use a so-called. flow-specific variables. For example, the pthread library has such great features as pthread_getspecific / pthread_setspecific. The purpose of writing this code was to show the reader how the algorithm works. Without optimizations that can only confuse the idea of ​​the essence.

It would seem that everything is perfect, the program should display a pair of identical values, but everything is not so simple. Imagine also that the thread performing the reader function stopped after the line a = tmp_ptr-> a; whereupon, the thread performing the writer function has completed 2 iterations and executes the third. Stopping after the completion of the process function. Next, the thread performing the reader function resumes its work. In this situation, the values ​​of the variables a and b do not match, but the operation compare_and_swap returns true, since data_ptr points to data again, in other words data_ptr and tmp_ptr match again. This is called an ABA problem . One way to solve this problem is to add a counter to the index, which is incremented every time a new value is assigned to it. In the following example, there is no such problem.

 #include <stdio.h> #include <stdint.h> #include <pthread.h> struct data_s { int a; int b; }; struct data_pointer_s { union { uint64_t qw[2]; struct { struct data_s *data_ptr; uint64_t aba_counter; }; }; }; static inline char cas128bit(volatile struct data_pointer_s *a, struct data_pointer_s b, struct data_pointer_s c) { char result; __asm__ __volatile__( "lock cmpxchg16b %1\n\t" "setz %0\n" : "=q" (result) , "+m" (a->qw) : "a" (b.data_ptr), "d" (b.aba_counter) , "b" (c.data_ptr), "c" (c.aba_counter) : "cc" ); return result; } struct data_s reader_tmp, writer_tmp, data; struct data_pointer_s reader_tmp_ptr, writer_tmp_ptr, data_ptr; int done = 0; void process(struct data_s *data) { data->a++; data->b++; } void* writer(void* p) { struct data_pointer_s tmp_ptr; int i; for(i = 0; i < 1000000; i++) { do { tmp_ptr = data_ptr; writer_tmp_ptr.data_ptr->a = tmp_ptr.data_ptr->a; writer_tmp_ptr.data_ptr->b = tmp_ptr.data_ptr->b; process(writer_tmp_ptr.data_ptr); writer_tmp_ptr.aba_counter = tmp_ptr.aba_counter + 1; } while(!cas128bit(&data_ptr, tmp_ptr, writer_tmp_ptr)); writer_tmp_ptr = tmp_ptr; } } void* reader(void *p) { struct data_pointer_s tmp_ptr; int a, b; while(!done) { do { tmp_ptr = data_ptr; reader_tmp_ptr.data_ptr->a = tmp_ptr.data_ptr->a; reader_tmp_ptr.data_ptr->b = tmp_ptr.data_ptr->b; a = tmp_ptr.data_ptr->a; b = tmp_ptr.data_ptr->b; reader_tmp_ptr.aba_counter = tmp_ptr.aba_counter + 1; } while(!cas128bit(&data_ptr, tmp_ptr, reader_tmp_ptr)); reader_tmp_ptr = tmp_ptr; printf("data = {%d, %d}\n", a, b); } } int main() { pthread_t reader_thread, writer_thread; data.a = 0; data.b = 0; data_ptr.data_ptr = &data; data_ptr.aba_counter = 0; writer_tmp_ptr.data_ptr = &writer_tmp; writer_tmp_ptr.aba_counter = 0; reader_tmp_ptr.data_ptr = &reader_tmp; reader_tmp_ptr.aba_counter = 0; pthread_create(&reader_thread, NULL, reader, NULL); pthread_create(&writer_thread, NULL, writer, NULL); pthread_join(writer_thread, NULL); done = 1; pthread_join(reader_thread, NULL); return 0; } 

It should be noted that the effectiveness of this code depends on the volume of copied data and on the complexity of the process function. If atomic processing of data blocks of several tens of megabytes is required, then using mutexes would be much more efficient. It would also be nice to consider adding a small delay (on the order of a few microseconds) every time after compare_and_swap returns false, to allow another thread to complete the operation. Again, the presence of delay and time will directly depend on the specifics of the task being performed.

Separately, I would like to thank vladvic for helping to understand and understand how this algorithm works.

Source: https://habr.com/ru/post/271583/


All Articles