文章目录
- 补充
- 任务通知
- 发送
- 处理
- ulTaskGenericNotifyTake
- xTaskGenericNotifyWait
- 清除
- xTaskGenericNotifyStateClear
- ulTaskGenericNotifyValueClear
- 结构体
- StreamBufferHandle_t
- StreamBufferCallbackFunction_t
- 创建
- xStreamBufferGenericCreate
- stream buffer的类型
- 删除
- vStreamBufferDelete
- 发送
- prvWriteBytesToBuffer
- prvWriteMessageToBuffer
- prvBytesInBuffer
- 接收
- xStreamBufferReceive
补充
任务通知
任务通知(Task Notifications)是一种轻量级的、快速的机制,用于在任务之间或中断与任务之间传递简单的信号或少量数据。相比传统的队列、信号量等同步机制,任务通知具有更低的资源消耗和更高的效率。
还记得吗,每个任务控制块都有这么两个字段
//状态
#define taskNOT_WAITING_NOTIFICATION ( ( uint8_t ) 0 ) /* Must be zero as it is the initialised value. */
#define taskWAITING_NOTIFICATION ( ( uint8_t ) 1 )
#define taskNOTIFICATION_RECEIVED ( ( uint8_t ) 2 )//字段
#if ( configUSE_TASK_NOTIFICATIONS == 1 )volatile uint32_t ulNotifiedValue[ configTASK_NOTIFICATION_ARRAY_ENTRIES ]; //通知值volatile uint8_t ucNotifyState[ configTASK_NOTIFICATION_ARRAY_ENTRIES ]; //通知状态
#endif
通过这些,可以实现多种同步模式:
- 简单通知:仅设置通知状态。
- 带值的通知:除了设置状态外,还可以传递一个32位的数值。
- 带掩码的通知:使用特定的位掩码来更新通知值。
大体流程如下
发送
xTaskGenericNotify
是所有任务通知 API(如 xTaskNotify
, xTaskNotifyIndexed
, xTaskNotifyAndQuery
, xTaskNotifyAndQueryIndexed
等)的底层通用函数,用于向一个任务发送通知,并根据不同的操作类型更新该任务的通知值。
BaseType_t xTaskGenericNotify( TaskHandle_t xTaskToNotify,//要通知的任务句柄UBaseType_t uxIndexToNotify,//通知索引号,表示使用哪个通知槽位uint32_t ulValue,//要传递给接收任务的通知值eNotifyAction eAction,//通知操作方式(设置、增加、覆盖写等)uint32_t * pulPreviousNotificationValue )//可选输出参数,返回该任务在此索引上的旧通知值
{TCB_t * pxTCB;BaseType_t xReturn = pdPASS;uint8_t ucOriginalNotifyState;configASSERT( uxIndexToNotify < configTASK_NOTIFICATION_ARRAY_ENTRIES );configASSERT( xTaskToNotify );pxTCB = xTaskToNotify;taskENTER_CRITICAL();{if( pulPreviousNotificationValue != NULL ){ //保存旧的通知值(如果需要)*pulPreviousNotificationValue = pxTCB->ulNotifiedValue[ uxIndexToNotify ];}ucOriginalNotifyState = pxTCB->ucNotifyState[ uxIndexToNotify ];//记录原始通知状态并标记为“已收到”pxTCB->ucNotifyState[ uxIndexToNotify ] = taskNOTIFICATION_RECEIVED;switch( eAction )//根据通知操作类型处理通知值{case eSetBits://将通知值与 ulValue 做按位或操作(常用于标志位)pxTCB->ulNotifiedValue[ uxIndexToNotify ] |= ulValue;break;case eIncrement://通知值自增 1( pxTCB->ulNotifiedValue[ uxIndexToNotify ] )++;break;case eSetValueWithOverwrite://直接设置通知值,不管之前有没有被读取过pxTCB->ulNotifiedValue[ uxIndexToNotify ] = ulValue;break;case eSetValueWithoutOverwrite://如果上次通知还没被读取,则不更新值,返回 pdFAILif( ucOriginalNotifyState != taskNOTIFICATION_RECEIVED ){ //目标任务在此次通知发送之前 没有处于“已收到通知”状态(即上次的通知已经被读取或清除)pxTCB->ulNotifiedValue[ uxIndexToNotify ] = ulValue;}else{xReturn = pdFAIL;}break;case eNoAction://不改变通知值break;default:configASSERT( xTickCount == ( TickType_t ) 0 );break;}if( ucOriginalNotifyState == taskWAITING_NOTIFICATION )//如果目标任务正在等待通知,将其唤醒{listREMOVE_ITEM( &( pxTCB->xStateListItem ) );prvAddTaskToReadyList( pxTCB );configASSERT( listLIST_ITEM_CONTAINER( &( pxTCB->xEventListItem ) ) == NULL );#if ( configUSE_TICKLESS_IDLE != 0 ){prvResetNextTaskUnblockTime();}#endiftaskYIELD_ANY_CORE_IF_USING_PREEMPTION( pxTCB );}}taskEXIT_CRITICAL();return xReturn;
}
vTaskGenericNotifyGiveFromISR()
和 xTaskGenericNotifyFromISR()
都是用于从ISR 中向任务发送通知的函数,vTaskGenericNotifyGiveFromISR()
专门用于递增通知值并可能唤醒任务,而 xTaskGenericNotifyFromISR()
则提供了更多种操作类型来更新通知值,适合需要传递更多控制信息或执行复杂同步逻辑的应用场景。
处理
ulTaskGenericNotifyTake
处理任务通知的通用函数之一,它允许一个任务等待直到其接收到的通知计数值不为零
uint32_t ulTaskGenericNotifyTake( UBaseType_t uxIndexToWaitOn,BaseType_t xClearCountOnExit,//如果设置为非零(pdTRUE),当函数返回时将清除通知计数;如果设置为零(pdFALSE),则将通知计数减一TickType_t xTicksToWait )
{uint32_t ulReturn;BaseType_t xAlreadyYielded, xShouldBlock = pdFALSE;configASSERT( uxIndexToWaitOn < configTASK_NOTIFICATION_ARRAY_ENTRIES );vTaskSuspendAll();//挂起调度器{taskENTER_CRITICAL();{if( pxCurrentTCB->ulNotifiedValue[ uxIndexToWaitOn ] == 0U ){ //当前任务的通知计数若为零,则将任务的状态设为taskWAITING_NOTIFICATION并根据超时时间决定是否需要阻塞(xShouldBlock)。pxCurrentTCB->ucNotifyState[ uxIndexToWaitOn ] = taskWAITING_NOTIFICATION;if( xTicksToWait > ( TickType_t ) 0 ){xShouldBlock = pdTRUE;}}}taskEXIT_CRITICAL();if( xShouldBlock == pdTRUE ){ //如果需要阻塞,则将当前任务添加到延迟列表中等待超时或被通知唤醒。prvAddCurrentTaskToDelayedList( xTicksToWait, pdTRUE );}}xAlreadyYielded = xTaskResumeAll();//尝试恢复调度器,并判断是否发生了上下文切换(xAlreadyYielded)if( ( xShouldBlock == pdTRUE ) && ( xAlreadyYielded == pdFALSE ) ){ //如果需要阻塞且没有发生上下文切换,则触发一次任务切换taskYIELD_WITHIN_API();}taskENTER_CRITICAL();{ //获取当前通知值,并根据xClearCountOnExit决定如何更新通知计数ulReturn = pxCurrentTCB->ulNotifiedValue[ uxIndexToWaitOn ];if( ulReturn != 0U ){if( xClearCountOnExit != pdFALSE ){pxCurrentTCB->ulNotifiedValue[ uxIndexToWaitOn ] = ( uint32_t ) 0U;}else{pxCurrentTCB->ulNotifiedValue[ uxIndexToWaitOn ] = ulReturn - ( uint32_t ) 1;}}//将任务状态重置为taskNOT_WAITING_NOTIFICATIONpxCurrentTCB->ucNotifyState[ uxIndexToWaitOn ] = taskNOT_WAITING_NOTIFICATION;}taskEXIT_CRITICAL();return ulReturn;
}
xTaskGenericNotifyWait
等待某个任务的通知被触发(即收到通知),并且支持在进入和退出时对通知值进行位清除操作
BaseType_t xTaskGenericNotifyWait( UBaseType_t uxIndexToWaitOn,uint32_t ulBitsToClearOnEntry,uint32_t ulBitsToClearOnExit,uint32_t * pulNotificationValue,TickType_t xTicksToWait )
{BaseType_t xReturn, xAlreadyYielded, xShouldBlock = pdFALSE;configASSERT( uxIndexToWaitOn < configTASK_NOTIFICATION_ARRAY_ENTRIES );vTaskSuspendAll();{taskENTER_CRITICAL();{ //检查是否已有通知到达if( pxCurrentTCB->ucNotifyState[ uxIndexToWaitOn ] != taskNOTIFICATION_RECEIVED ){ //如果目标任务 尚未收到通知(状态不是 taskNOTIFICATION_RECEIVED)pxCurrentTCB->ulNotifiedValue[ uxIndexToWaitOn ] &= ~ulBitsToClearOnEntry;//清除一些标志位(按位与非)pxCurrentTCB->ucNotifyState[ uxIndexToWaitOn ] = taskWAITING_NOTIFICATION;//设置状态为 taskWAITING_NOTIFICATION,表示开始等待if( xTicksToWait > ( TickType_t ) 0 ){ //如果设置了超时时间,则标记需要阻塞xShouldBlock = pdTRUE;}}}taskEXIT_CRITICAL();if( xShouldBlock == pdTRUE ){ //如果需要阻塞,将任务加入延迟队列prvAddCurrentTaskToDelayedList( xTicksToWait, pdTRUE );}}xAlreadyYielded = xTaskResumeAll();//恢复调度器并判断是否需要调度if( ( xShouldBlock == pdTRUE ) && ( xAlreadyYielded == pdFALSE ) ){ //如果任务被阻塞且没有发生调度,则手动触发一次调度taskYIELD_WITHIN_API();}taskENTER_CRITICAL();{ //再次进入临界区处理通知结果if( pulNotificationValue != NULL ){*pulNotificationValue = pxCurrentTCB->ulNotifiedValue[ uxIndexToWaitOn ];}if( pxCurrentTCB->ucNotifyState[ uxIndexToWaitOn ] != taskNOTIFICATION_RECEIVED ){ //如果仍然没有收到通知, 返回 pdFALSExReturn = pdFALSE;}else{ //如果收到了通知, 根据 ulBitsToClearOnExit 清除指定的标志位pxCurrentTCB->ulNotifiedValue[ uxIndexToWaitOn ] &= ~ulBitsToClearOnExit;xReturn = pdTRUE;}//将通知状态重置为 taskNOT_WAITING_NOTIFICATIONpxCurrentTCB->ucNotifyState[ uxIndexToWaitOn ] = taskNOT_WAITING_NOTIFICATION;}taskEXIT_CRITICAL();return xReturn;
}
清除
xTaskGenericNotifyStateClear
//xTask若是RECEIVED状态, 改为 WAITING 状态
BaseType_t xTaskGenericNotifyStateClear( TaskHandle_t xTask,UBaseType_t uxIndexToClear )
{TCB_t * pxTCB;BaseType_t xReturn;configASSERT( uxIndexToClear < configTASK_NOTIFICATION_ARRAY_ENTRIES );pxTCB = prvGetTCBFromHandle( xTask );taskENTER_CRITICAL();{if( pxTCB->ucNotifyState[ uxIndexToClear ] == taskNOTIFICATION_RECEIVED ){pxTCB->ucNotifyState[ uxIndexToClear ] = taskNOT_WAITING_NOTIFICATION;xReturn = pdPASS;}else{xReturn = pdFAIL;}}taskEXIT_CRITICAL();return xReturn;
}
ulTaskGenericNotifyValueClear
//清除xTask的uxIndexToClear的通知值的ulBitsToClear位(掩码)
uint32_t ulTaskGenericNotifyValueClear( TaskHandle_t xTask,UBaseType_t uxIndexToClear,uint32_t ulBitsToClear );
结构体
StreamBufferHandle_t
typedef struct StreamBufferDef_t * StreamBufferHandle_t;typedef struct StreamBufferDef_t
{volatile size_t xTail;//指向缓冲区中下一个要读取的位置volatile size_t xHead;//指向缓冲区中下一个可以写入的位置size_t xLength;//缓冲区的总大小size_t xTriggerLevelBytes;//触发接收任务唤醒的最小数据量volatile TaskHandle_t xTaskWaitingToReceive;//正在等待接收数据的任务句柄volatile TaskHandle_t xTaskWaitingToSend;//当前正在等待发送数据的任务句柄uint8_t * pucBuffer; //指向实际用于存储数据的缓冲区内存uint8_t ucFlags;//用于标识 Stream Buffer 的状态, 缓冲区类型,分配方式等#if ( configUSE_TRACE_FACILITY == 1 )UBaseType_t uxStreamBufferNumber;//调试用编号,用于跟踪每个 Stream Buffer#endif#if ( configUSE_SB_COMPLETED_CALLBACK == 1 )//发送/接收完成后的回调函数指针StreamBufferCallbackFunction_t pxSendCompletedCallback;StreamBufferCallbackFunction_t pxReceiveCompletedCallback;#endifUBaseType_t uxNotificationIndex;//用于内部通知机制的索引
} StreamBuffer_t;
StreamBufferCallbackFunction_t
typedef void (* StreamBufferCallbackFunction_t)( StreamBufferHandle_t xStreamBuffer,BaseType_t xIsInsideISR,BaseType_t * const pxHigherPriorityTaskWoken );
创建
xStreamBufferGenericCreate
初始化一个StreamBufferHandle_t并返回,设置字段,分配结构体和缓冲区的内存,具体代码就细说了。
StreamBufferHandle_t xStreamBufferGenericCreate( size_t xBufferSizeBytes,size_t xTriggerLevelBytes,BaseType_t xStreamBufferType,//缓冲区类型:普通流缓冲区、消息缓冲区等StreamBufferCallbackFunction_t pxSendCompletedCallback,StreamBufferCallbackFunction_t pxReceiveCompletedCallback );
stream buffer的类型
#define sbTYPE_STREAM_BUFFER ( ( BaseType_t ) 0 )
#define sbTYPE_MESSAGE_BUFFER ( ( BaseType_t ) 1 )
#define sbTYPE_STREAM_BATCHING_BUFFER ( ( BaseType_t ) 2 )
其他创建相关的宏或函数都是调用xStreamBufferGenericCreate
,区别在于stream buffer的类型以及带不带回调函数。3种类型×2种带不带回调函数,一共是六个宏(动态,如果加上静态分配的就是12个),关于message_buffer
的单独在message_buffer.h
里。
删除
vStreamBufferDelete
释放内存
动态创建的就调用vPortFree
,因为动态创建的时候结构体和数据区的内存是一起申请的,所以会一起释放。
静态创建的只会清除结构体,但数据区没释放。
void vStreamBufferDelete( StreamBufferHandle_t xStreamBuffer )
发送
size_t xStreamBufferSend( StreamBufferHandle_t xStreamBuffer,const void * pvTxData,size_t xDataLengthBytes,TickType_t xTicksToWait )
{StreamBuffer_t * const pxStreamBuffer = xStreamBuffer;size_t xReturn, xSpace = 0;size_t xRequiredSpace = xDataLengthBytes;TimeOut_t xTimeOut;size_t xMaxReportedSpace = 0;configASSERT( pvTxData );configASSERT( pxStreamBuffer );//计算所需空间,如果是MESSAGE_BUFFER还有加上消息长度占用的空间xMaxReportedSpace = pxStreamBuffer->xLength - ( size_t ) 1;if( ( pxStreamBuffer->ucFlags & sbFLAGS_IS_MESSAGE_BUFFER ) != ( uint8_t ) 0 ){xRequiredSpace += sbBYTES_TO_STORE_MESSAGE_LENGTH;configASSERT( xRequiredSpace > xDataLengthBytes );if( xRequiredSpace > xMaxReportedSpace ){ //直接设置不等待xTicksToWait = ( TickType_t ) 0;}}else{if( xRequiredSpace > xMaxReportedSpace ){ //限制请求的空间不超过最大值xRequiredSpace = xMaxReportedSpace;}}if( xTicksToWait != ( TickType_t ) 0 ){vTaskSetTimeOutState( &xTimeOut );//计时do{ //等待空间可用taskENTER_CRITICAL();{xSpace = xStreamBufferSpacesAvailable( pxStreamBuffer );//当前可用空间 xSpaceif( xSpace < xRequiredSpace ){ //清除旧通知( void ) xTaskNotifyStateClearIndexed( NULL, pxStreamBuffer->uxNotificationIndex );configASSERT( pxStreamBuffer->xTaskWaitingToSend == NULL );pxStreamBuffer->xTaskWaitingToSend = xTaskGetCurrentTaskHandle();}else{taskEXIT_CRITICAL();break;}}taskEXIT_CRITICAL();//等待通知( void ) xTaskNotifyWaitIndexed( pxStreamBuffer->uxNotificationIndex, ( uint32_t ) 0, ( uint32_t ) 0, NULL, xTicksToWait );pxStreamBuffer->xTaskWaitingToSend = NULL;} while( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE );}if( xSpace == ( size_t ) 0 ){ //重新检查空间(以防在等待期间有其他任务释放了空间)xSpace = xStreamBufferSpacesAvailable( pxStreamBuffer );}xReturn = prvWriteMessageToBuffer( pxStreamBuffer, pvTxData, xDataLengthBytes, xSpace, xRequiredSpace );if( xReturn > ( size_t ) 0 ){ //如果成功写入数据,并且缓冲区中的数据量达到了触发级别,则调用 prvSEND_COMPLETED() 通知等待接收的任务。if( prvBytesInBuffer( pxStreamBuffer ) >= pxStreamBuffer->xTriggerLevelBytes ){prvSEND_COMPLETED( pxStreamBuffer );}}return xReturn;
}
prvWriteBytesToBuffer
向StreamBuffer_t 的缓冲区(环形)写入数据,返回新的 head 位置。
static size_t prvWriteBytesToBuffer( StreamBuffer_t * const pxStreamBuffer,const uint8_t * pucData,size_t xCount,size_t xHead )
{size_t xFirstLength;configASSERT( xCount > ( size_t ) 0 );xFirstLength = configMIN( pxStreamBuffer->xLength - xHead, xCount );//确定从当前位置 xHead 到缓冲区末尾还有多少空间configASSERT( ( xHead + xFirstLength ) <= pxStreamBuffer->xLength );//把数据从 pucData 拷贝xFirstLength长度到 pxStreamBuffer->pucBuffer[xHead] 开始的位置( void ) memcpy( ( void * ) ( &( pxStreamBuffer->pucBuffer[ xHead ] ) ), ( const void * ) pucData, xFirstLength );if( xCount > xFirstLength )//判断是否还有剩余数据需要写入{configASSERT( ( xCount - xFirstLength ) <= pxStreamBuffer->xLength );//如果一次没写完(比如缓冲区末尾空间不足),则从缓冲区起始位置开始写剩下的部分。可以看出在维护一个环形缓冲区( void ) memcpy( ( void * ) pxStreamBuffer->pucBuffer, ( const void * ) &( pucData[ xFirstLength ] ), xCount - xFirstLength );}xHead += xCount; //更新并返回新的 head 位置if( xHead >= pxStreamBuffer->xLength ){ //如果超出缓冲区总长度,则用减法回绕xHead -= pxStreamBuffer->xLength;}return xHead;
}
prvWriteMessageToBuffer
将一条完整消息写入 Stream Buffer
static size_t prvWriteMessageToBuffer( StreamBuffer_t * const pxStreamBuffer,const void * pvTxData,//数据源地址size_t xDataLengthBytes,//数据长度size_t xSpace,//当前缓冲区可用空间大小size_t xRequiredSpace )//写入该消息所需的最小空间(包括消息头)
{size_t xNextHead = pxStreamBuffer->xHead;configMESSAGE_BUFFER_LENGTH_TYPE xMessageLength;if( ( pxStreamBuffer->ucFlags & sbFLAGS_IS_MESSAGE_BUFFER ) != ( uint8_t ) 0 )//判断是否为 Message Buffer{ //如果是,则每条消息都要先写一个长度字段xMessageLength = ( configMESSAGE_BUFFER_LENGTH_TYPE ) xDataLengthBytes;configASSERT( ( size_t ) xMessageLength == xDataLengthBytes );if( xSpace >= xRequiredSpace ){ //如果有足够空间,把消息长度写入缓冲区头部,更新 xNextHead 指针位置xNextHead = prvWriteBytesToBuffer( pxStreamBuffer, ( const uint8_t * ) &( xMessageLength ), sbBYTES_TO_STORE_MESSAGE_LENGTH, xNextHead );}else{ //直接返回 0,表示本次发送失败。xDataLengthBytes = 0;}}else //如果不是 Message Buffer{ //不需要写入长度字段xDataLengthBytes = configMIN( xDataLengthBytes, xSpace );}if( xDataLengthBytes != ( size_t ) 0 ){ //实际写入数据,更新 xHead 指针pxStreamBuffer->xHead = prvWriteBytesToBuffer( pxStreamBuffer, ( const uint8_t * ) pvTxData, xDataLengthBytes, xNextHead );}return xDataLengthBytes;
}
prvBytesInBuffer
返回当前缓冲区中的有效数据量
static size_t prvBytesInBuffer( const StreamBuffer_t * const pxStreamBuffer )
{size_t xCount;xCount = pxStreamBuffer->xLength + pxStreamBuffer->xHead;xCount -= pxStreamBuffer->xTail;if( xCount >= pxStreamBuffer->xLength ){xCount -= pxStreamBuffer->xLength;}return xCount;
}
接收
xStreamBufferReceive
size_t xStreamBufferReceive( StreamBufferHandle_t xStreamBuffer,void * pvRxData,size_t xBufferLengthBytes,TickType_t xTicksToWait )
{StreamBuffer_t * const pxStreamBuffer = xStreamBuffer;size_t xReceivedLength = 0, xBytesAvailable, xBytesToStoreMessageLength;configASSERT( pvRxData );configASSERT( pxStreamBuffer );//判断缓冲区类型并设置消息头长度if( ( pxStreamBuffer->ucFlags & sbFLAGS_IS_MESSAGE_BUFFER ) != ( uint8_t ) 0 ){xBytesToStoreMessageLength = sbBYTES_TO_STORE_MESSAGE_LENGTH;}else if( ( pxStreamBuffer->ucFlags & sbFLAGS_IS_BATCHING_BUFFER ) != ( uint8_t ) 0 ){xBytesToStoreMessageLength = pxStreamBuffer->xTriggerLevelBytes;}else{xBytesToStoreMessageLength = 0;}if( xTicksToWait != ( TickType_t ) 0 ){ //进入临界区检查是否有数据可读taskENTER_CRITICAL();{xBytesAvailable = prvBytesInBuffer( pxStreamBuffer );if( xBytesAvailable <= xBytesToStoreMessageLength ){( void ) xTaskNotifyStateClearIndexed( NULL, pxStreamBuffer->uxNotificationIndex );configASSERT( pxStreamBuffer->xTaskWaitingToReceive == NULL );pxStreamBuffer->xTaskWaitingToReceive = xTaskGetCurrentTaskHandle();}}taskEXIT_CRITICAL();if( xBytesAvailable <= xBytesToStoreMessageLength ){( void ) xTaskNotifyWaitIndexed( pxStreamBuffer->uxNotificationIndex, ( uint32_t ) 0, ( uint32_t ) 0, NULL, xTicksToWait );pxStreamBuffer->xTaskWaitingToReceive = NULL;xBytesAvailable = prvBytesInBuffer( pxStreamBuffer );}}else{ //非阻塞模式直接检查数据量xBytesAvailable = prvBytesInBuffer( pxStreamBuffer );}if( xBytesAvailable > xBytesToStoreMessageLength ){ //实际读取数据xReceivedLength = prvReadMessageFromBuffer( pxStreamBuffer, pvRxData, xBufferLengthBytes, xBytesAvailable );if( xReceivedLength != ( size_t ) 0 ){ //如果成功读取到数据,调用 prvRECEIVE_COMPLETED() 通知等待发送的任务可以继续发送了prvRECEIVE_COMPLETED( xStreamBuffer );}}return xReceivedLength;
}