📜 ⬆️ ⬇️

FreeRTOS: interprocess communication


Hello. In this article I will try to describe the method of interprocess data exchange and synchronization with the events.
Links to other parts:
FreeRTOS: introduction.
FreeRTOS: mutexes and critical sections.

Any multi-threaded OS will not be considered complete, without the appropriate means of supporting a multi-threaded environment. FreeRTOS has everything you need for this, namely:

The queues.


image
Programs written using FreeRTOS are a set of independent tasks, or mini-subprograms, that require an effective and thread-safe data exchange mechanism, in the case of FreeRTOS, these are queues.
A queue is a simple FIFO (although you can also write to the beginning of the queue, and not to the end) a buffer that can store a fixed number of elements of a known size. Writing to a queue is byte-wise copying data to a buffer, reading is copying data and deleting from a queue.
Queues are, in fact, independent objects that many writers and readers may have, without fear of reading / writing broken data. When reading data, optionally, we can specify the time during which the task should be waiting for new data to be received. When recording data, we can also specify this time, but already to wait for a place in the queue.

Let us consider in more detail the basic functions for working with queues in FreeRTOS.
Before using any queue, it must be created. RAM memory for the queue is allocated from the FreeRTOS heap, and its size is equal to the data size + the size of the queue structure. In the code, each queue is represented by its handle, such as xQueueHandle.
xQueueHandle xQueueCreate(unsigned portBASE_TYPE uxQueueLength, unsigned portBASE_TYPE uxItemSize); 

uxQueueLength - the maximum number of items the queue can store per unit of time.
uxItemSize - the size of each item in the queue.
return xQueueHandle, or NULL - if the queue is created, the corresponding handle will be returned, if not, i.e. not enough memory, then NULL will be returned.

For writing to the queue use special functions:
 portBASE_TYPE xQueueSendToFront(xQueueHandle xQueue, const void * pvItemToQueue, portTickType xTicksToWait); portBASE_TYPE xQueueSendToBack(xQueueHandle xQueue, const void * pvItemToQueue, portTickType xTicksToWait); //   portBASE_TYPE xQueueSend(xQueueHandle xQueue, const void * pvItemToQueue, portTickType xTicksToWait); 

xQueue is the handle of the queue in which we write the data.
pvItemToQueue - pointer to the item that will be placed in the queue.
xTicksToWait - the time during which the task must be in a locked state in order for a place in the queue to appear. You can specify portMAX_DELAY so that the task is in the locked state for an indefinite time, i.e. until a place in line appears.
return pdPASS, or errQUEUE_FULL - if the new item is successfully written to the queue, the function returns pdPASS, if there is not enough space, and xTicksToWait time is specified, the task will go to the blocked time to wait in the queue.
')
For reading data, 2 functions are used, the main difference is that xQueueReceive removes an item from the queue, but xQueuePeek does not.
 portBASE_TYPE xQueueReceive(xQueueHandle xQueue, const void * pvBuffer, portTickType xTicksToWait); portBASE_TYPE xQueuePeek(xQueueHandle xQueue, const void * pvBuffer, portTickType xTicksToWait); 

xQueue is the handle of the queue in which we write the data.
pvBuffer is a pointer to a memory buffer into which data from the queue will be read. Buffer type = type of queue items.
xTicksToWait - the time during which the task should be in a locked state for the data to appear in the queue. You can specify portMAX_DELAY so that the task is in the locked state for an indefinite time, i.e. until new data appears in the queue. Next, I will explain how this is used to create the Gatekeeper Task.
return pdPASS, or errQUEUE_EMPTY - if the new item is successfully read from the queue, the function returns pdPASS, if the queue is empty, and xTicksToWait is specified, the task will go to the blocked time to wait for new data in the queue.

To view the number of items in the queue, you can use the function
 unsigned portBASE_TYPE uxQueueMessagesWaiting( xQueueHandle xQueue ); 

Important: The above functions cannot be used in ISR (interrupts) and there are special versions for them with a special ISR suffix, the behavior of which is similar to the previous functions, except for the last parameter:
 portBASE_TYPE xQueueSendToFrontFromISR( xQueueHandle xQueue, void *pvItemToQueue, portBASE_TYPE *pxHigherPriorityTaskWoken ); portBASE_TYPE xQueueSendToBackFromISR(xQueueHandle xQueue, void *pvItemToQueue, portBASE_TYPE *pxHigherPriorityTaskWoken); portBASE_TYPE xQueueReceiveFromISR(xQueueHandle xQueue, const void *pvBuffer, portBASE_TYPE *pxHigherPriorityTaskWoken); 

pxHigherPriorityTaskWoken - since writing to a queue can lead to unblocking a task that waits for data and has a higher priority than the current task, we need to perform a forced context switch (to do this, call the taskYIELD () macro). If necessary, this parameter will be equal to pdTRUE .

A few words about the effective use of queues. For example, consider a UART — with a typical approach, each byte received is immediately written to a queue, which is not worth doing. it is terribly inefficient, already at fairly low frequencies. It is more efficient to conduct basic data processing in an ISR and then transfer them to a queue, but it is important to understand that the ISR code should be as short as possible.

Consider an example of the so-called gatekeeper task (I don’t know how to correctly translate this, if someone tells me, I will be grateful :)).
The gatekeeper task is a simple method that allows you to avoid the main problems of multi-threaded programming: inverting priorities, and getting the task to a deadlock.
The gatekeeper task is the only method that has direct access to the resource, all other tasks must access the resource through this task.
Consider a simple skeleton gatekeeper task. This is a purely contrived example, but which will help to understand the general principle. We assume that we need to safely send some data, for example, through the UART.
 // Gatekeeper    . void vGatekeeperTask( void *pvParameteres ) { char oneByte; //   -   ,      UART ,         . for( ;; ) { //    . //     portMAX_DELAY    ,      . //     ,     . xQueueReceive( xDataQueue, &oneByte, portMAX_DELAY); //    ,     . vSendByteToUART( oneByte ); //    . } } 

Thus, any task that wants to send a byte using the UART can use one of the service functions, for example, the following:
 void vUARTPutByte( char byte) { //      ,       ,    . xQueueSend( xDataQueue, &byte, 0 ); } 

It is also worth noting that you should not limit yourself, only char as the data type of the queue, but you can organize entire pipelines using structures.

Binary semaphores.


image
Binary semaphores, can be used to unlock a task whenever an event occurs (for example, pressing a button).
Typical work scenario: when a certain interruption occurs in the ISR, we give up the semaphore, as a result of a task, the waiting semaphore picks up the semaphore and goes out of the unlocked state to perform any operations. This mechanism is shown in the following figure.
image
To store all types of semaphores, the data type is xSemaphoreHandle .
Consider the functions for working with semaphores:
 void vSemaphoreCreateBinary( xSemaphoreHandle xSemaphore ); 

xSemaphore - this function is implemented using a macro, so it is necessary to transfer the value of the handle, and not a pointer to it. If the semaphore was successfully created, the xSemaphore value is not NULL.

In order to "take" a semaphore, a special function is used:
 portBASE_TYPE xSemaphoreTake( xSemaphoreHandle xSemaphore, portTickType xTicksToWait ); 

xSemaphore is the handle of the semaphore that is planned to be taken.
xTicksToWait - the time during which the task should be in a locked state, after which the semaphore becomes available. You can specify portMAX_DELAY so that the task is in a locked state for an indefinite time, i.e. until the semaphore is available.
return pdPASS, or pdFALSE - pdPASS - if the semaphore is received, pdFALSE - if the semaphore is unavailable.

In order to “give away” the semaphore, special functions are also used. I will consider the ISR version because most often semaphores are used in conjunction with ISR.
 portBASE_TYPE xSemaphoreGiveFromISR( xSemaphoreHandle xSemaphore, portBASE_TYPE *pxHigherPriorityTaskWoken ); 

xSemaphore - the handle of the semaphore, which is planned to "give."
pxHigherPriorityTaskWoken - since the “transfer” of a semaphore can lead to unblocking a task that is waiting for semaphore data that has a higher priority than the current task, we need to perform a forced context switch (to do this, call the taskYIELD () macro). If necessary, this parameter will be equal to pdTRUE .
return pdPASS, or pdFALSE - pdPASS - if the semaphore is given, pdFALSE - if the semaphore is already available, but not processed.

As an example, I will give a short program code, for the ISR, I wrote pseudocode:
 xSemaphoreHandle xButtonSemaphore; void vButtonHandlerTask( void *pvParameteres ) { for( ;; ) { xSemaphoreTake( xButtonSemaphore, portMAX_DELAY ); //       . } } void main() { //   vInitSystem(); vSemaphoreCreateBinary( xButtonSemaphore ); if( xButtonSemaphore != NULL ) { //  .      ,     ! xTaskCreate( &vButtonHandlerTask, (signed char *)"GreenBlink", configMINIMAL_STACK_SIZE, NULL, 1, NULL ); //  , ..   . vTaskStartScheduler(); } //      ,      ,     .       . for( ;; ) { } } ISR_FUNCTION processButton() { portBASE_TYPE xTaskWoken; if( buttonOnPressed ) { xSemaphoreGiveFromISR( xButtonSemaphore, &xTaskWoken ); if( xTaskWoken == pdTRUE) { taskYIELD(); } } } 

Counting semaphores.


Consider a typical situation that exists when using semaphores:
1. An event has occurred that caused an interrupt.
2. The ISR "gives" the semaphore, i.e. unlocks the waiting semaphore task.
3. Waiting task “takes” semaphore.
4. After the execution of the necessary code, the task again goes into a locked state, waiting for new events.
This algorithm works fine, but not at high frequencies. At high frequencies it is necessary to use countable semaphores, which, as a rule, are used in 2 cases:

As mentioned above, the xSemaphoreHandle data type is used to store all types of semaphores, and since it must be created before using a semaphore, it is necessary to use a special function to create counting semaphores:
 xSemaphoreHandle xSemaphoreCreateCounting( unsigned portBASE_TYPE uxMaxCount, unsigned portBASE_TYPE uxInitialCount ); 

uxMaxCount - the maximum number of semaphores that a counter can store. By analogy with the queue - is the length of the queue.
uxInitialCount - the value of the counter after creating the semaphore.
return is not NULL - the function returns a non-NULL value if the semaphore was created.
Otherwise, functions similar to the previous ones are used to work with counting semaphores.

Source: https://habr.com/ru/post/129180/


All Articles