摘要:鸿蒙轻内核的工作排序链表,用于工作提早到期/超时唤醒等业务场景,是一个十分重要、十分根底的数据结构。

本文会持续给读者介绍鸿蒙轻内核源码中重要的数据结构:工作排序链表TaskSortLinkAttr。鸿蒙轻内核的工作排序链表,用于工作提早到期/超时唤醒等业务场景,是一个十分重要、十分根底的数据结构。本文中所波及的源码,以OpenHarmony LiteOS-M内核为例,均能够在开源站点https://gitee.com/openharmony... 获取。

1 工作排序链表

咱们先看下工作排序链接的数据结构。工作排序链表是一个环状的双向链表数组,工作排序链表属性构造体TaskSortLinkAttr作为双向链表的头结点,指向双向链表数组的第一个元素,还保护游标信息,记录以后的地位信息。咱们先看下排序链表属性的构造体的定义。

1.1 工作排序链表属性构造体定义

在kernel\include\los_task.h头文件中定义了排序链表属性的构造体TaskSortLinkAttr。该构造体定义了排序链表的头节点LOS_DL_LIST *sortLink,游标UINT16 cursor,还有一个保留字段,临时没有应用。

源码如下:

 typedef struct {        LOS_DL_LIST *sortLink;        UINT16      cursor;        UINT16      reserved;    } TaskSortLinkAttr;

在文件kernel\src\los_task.c中定义了排序链表属性构造体TaskSortLinkAttr类型的全局变量g_taskSortLink,该全局变量的成员变量sortLink作为排序链表的头结点,指向一个长度为32的环状的双向链表数组,成员变量cursor作为游标记录环状数组的以后游标位置。源代码如下。

    LITE_OS_SEC_BSS  TaskSortLinkAttr                    g_taskSortLink;

咱们应用示意图来讲述一下。工作排序链表是环状双向链表数组,长度为32,每一个元素是一个双向链表,挂载工作LosTaskCB的链表节点timerList。工作LosTaskCB的成员变量idxRollNum记录数组的索引和滚动数。全局变量g_taskSortLink的成员变量cursor记录以后游标位置,每过一个Tick,游标指向下一个地位,转一轮须要32 ticks。当运行到的数组地位,双向链表不为空,则把第一个节点保护的滚动数减1。这样的数据结构相似钟表表盘,也称为工夫轮。

咱们举个例子来阐明,基于工夫轮实现的工作排序链表是如何治理工作提早超时的。如果以后游标cursor为1,当一个工作须要延时72 ticks,72=2*32+8,示意排序索引sortIndex为8,滚动数rollNum为2。会把工作插入数组索引为sortIndex+cursor=9的双向链表地位,索要9处的双向链表保护节点的滚动为2。随着Tick工夫的进行,从以后游标位置运行到数组索引地位9,历时8 ticks。运行到9时,如果滚动数大于0,则把滚动数减1。等再运行2轮,共须要72 ticks,工作就会提早到期,能够从排序链表移除。每个数组元素对应的双向链表的第一个链表节点的滚动数示意须要转多少轮,节点工作才到期。第二个链表节点的滚动数须要加上第一个节点的滚动数,示意第二个节点须要转的轮数。顺次类推。

示意图如下:

1.2 工作排序链表宏定义

在OS_TSK_SORTLINK_LEN头文件中定义了一些和工作排序链表相干的宏定义。提早工作双向链表数组的长度定义为32,高阶bit位位数为5,低阶bit位位数为27。对于工作的超时工夫,取其高27位作为滚动数,低5位作为数组索引。

源码如下:

  /**        * 提早工作双向链表数组的数量(桶的数量):32        */        #define OS_TSK_SORTLINK_LEN                         32             /**        * 高阶bit位数目:5        */        #define OS_TSK_HIGH_BITS                            5U             /**        * 低阶bit位数目:27        */        #define OS_TSK_LOW_BITS                             (32U - OS_TSK_HIGH_BITS)             /**        * 滚动数最大值:0xFFFF FFDF,1111 0111 1111 1111 1111 1111 1101 1111        */        #define OS_TSK_MAX_ROLLNUM                          (0xFFFFFFFFU - OS_TSK_SORTLINK_LEN)             /**        * 工作延迟时间数的位宽:5        */        #define OS_TSK_SORTLINK_LOGLEN                      5             /**        * 提早工作的桶编号的掩码:31、0001 1111        */        #define OS_TSK_SORTLINK_MASK                        (OS_TSK_SORTLINK_LEN - 1U)             /**        * 滚动数的高阶掩码:1111 1000 0000 0000 0000 0000 0000 0000        */        #define OS_TSK_HIGH_BITS_MASK                       (OS_TSK_SORTLINK_MASK << OS_TSK_LOW_BITS)             /**        * 滚动数的低阶掩码:0000 0111 1111 1111 1111 1111 1111 1111        */        #define OS_TSK_LOW_BITS_MASK                        (~OS_TSK_HIGH_BITS_MASK)

2 工作排序链表操作

咱们剖析下工作排序链表的操作,蕴含初始化,插入,删除,滚动数更新,获取下一个到期工夫等。

2.1 初始化排序链表

在系零碎内核初始化启动阶段,在函数UINT32 OsTaskInit(VOID)中初始化工作排序链表。该函数的调用关系如下,main.c:main() --> kernel\src\los_init.c:LOS_KernelInit() --> kernel\src\los_task.c:OsTaskInit()。

初始化排序链表函数的源码如下:

 LITE_OS_SEC_TEXT_INIT UINT32 OsTaskInit(VOID)    {        UINT32 size;        UINT32 index;        LOS_DL_LIST *listObject = NULL;        ......    ⑴  size = sizeof(LOS_DL_LIST) * OS_TSK_SORTLINK_LEN;        listObject = (LOS_DL_LIST *)LOS_MemAlloc(m_aucSysMem0, size);    ⑵  if (listObject == NULL) {            (VOID)LOS_MemFree(m_aucSysMem0, g_taskCBArray);            return LOS_ERRNO_TSK_NO_MEMORY;        }         ⑶  (VOID)memset_s((VOID *)listObject, size, 0, size);    ⑷  g_taskSortLink.sortLink = listObject;        g_taskSortLink.cursor = 0;        for (index = 0; index < OS_TSK_SORTLINK_LEN; index++, listObject++) {    ⑸      LOS_ListInit(listObject);        }             return LOS_OK;    }     

⑴处代码计算须要申请的双向链表的内存大小,OS_TSK_SORTLINK_LEN为32,即须要为32个双向链表节点申请内存空间。而后申请内存,⑵处申请内存失败时返回相应错误码。⑶处初始化申请的内存区域为0等。⑷处把申请的双向链表节点赋值给g_taskSortLink的链表节点.sortLink,作为排序链表的头节点,游标.cursor初始化为0。而后⑸处的循环,调用LOS_ListInit()函数把双向链表数组每个元素都初始化为双向循环链表。

2.2 插入排序链表

插入排序链表的函数为OsTaskAdd2TimerList()。在工作期待互斥锁/信号量等资源时,都须要调用该函数将工作退出到对应的排序链表中。该函数蕴含两个入参,第一个参数LosTaskCB *taskCB用于指定要提早的工作,第二个参数UINT32 timeout指定超时等待时间。

源码如下:

 LITE_OS_SEC_TEXT VOID OsTaskAdd2TimerList(LosTaskCB *taskCB, UINT32 timeout)    {        LosTaskCB *taskDelay = NULL;        LOS_DL_LIST *listObject = NULL;        UINT32 sortIndex;        UINT32 rollNum;         ⑴  sortIndex = timeout & OS_TSK_SORTLINK_MASK;        rollNum = (timeout >> OS_TSK_SORTLINK_LOGLEN);    ⑵  (sortIndex > 0) ? 0 : (rollNum--);    ⑶  EVALUATE_L(taskCB->idxRollNum, rollNum);    ⑷  sortIndex = (sortIndex + g_taskSortLink.cursor);        sortIndex = sortIndex & OS_TSK_SORTLINK_MASK;    ⑸  EVALUATE_H(taskCB->idxRollNum, sortIndex);    ⑹  listObject = g_taskSortLink.sortLink + sortIndex;    ⑺  if (listObject->pstNext == listObject) {            LOS_ListTailInsert(listObject, &taskCB->timerList);        } else {    ⑻      taskDelay = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList);            do {    ⑼          if (UWROLLNUM(taskDelay->idxRollNum) <= UWROLLNUM(taskCB->idxRollNum)) {                    UWROLLNUMSUB(taskCB->idxRollNum, taskDelay->idxRollNum);                } else {    ⑽              UWROLLNUMSUB(taskDelay->idxRollNum, taskCB->idxRollNum);                    break;                }         ⑾          taskDelay = LOS_DL_LIST_ENTRY(taskDelay->timerList.pstNext, LosTaskCB, timerList);            } while (&taskDelay->timerList != (listObject));         ⑿      LOS_ListTailInsert(&taskDelay->timerList, &taskCB->timerList);        }    }

⑴处代码计算等待时间timeout的低5位作为数组索引,高27位作为滚动数rollNum。这2行代码数学上的意义,就是把等待时间处于32失去的商作为滚动数,余数作为数组索引。⑵处代码,如果余数为0,能够整除时,滚动数减1。减1设计的起因是,在函数VOID OsTaskScan(VOID)中,每一个tick到来时,如果滚动数大于0,滚动数减1,并持续滚动一圈。后文会剖析该函数VOID OsTaskScan(VOID)。

⑶处代码把滚动数赋值给工作taskCB->idxRollNum的低27位。⑷处把数组索引加上游标,而后执行⑸赋值给工作taskCB->idxRollNum的高5位。⑹依据数组索引获取双向链表头结点,⑺如果此处双向链表为空,直接插入链表里。如果链表不为空,执行⑻获取第一个链表节点对应的工作taskDelay,而后遍历循环双向链表,把工作插入到适合的地位。⑼处如果待插入工作taskCB的滚动数大于等于以后链表节点对应工作的滚动数,则从待插入工作taskCB的滚动数中减去以后链表节点对应工作的滚动数,而后执行⑾获取下一个节点持续遍历。⑽处如果待插入工作taskCB的滚动数小于以后链表节点对应工作的滚动数,则从以后链表节点对应工作的滚动数中减去待插入工作taskCB的滚动数,而后跳出循环。执行⑿,实现工作插入。插入过程,能够联合上文的示意图进行了解。

2.3 从排序链表中删除

从排序链表中删除的函数为VOID OsTimerListDelete(LosTaskCB taskCB)。在工作复原/删除等场景中,须要调用该函数将工作从工作排序链表中删除。该函数蕴含一个参数LosTaskCB taskCB,用于指定要从排序链表中删除的工作。

源码如下:

 LITE_OS_SEC_TEXT VOID OsTimerListDelete(LosTaskCB *taskCB)    {        LOS_DL_LIST  *listObject = NULL;        LosTaskCB  *nextTask = NULL;        UINT32 sortIndex;         ⑴  sortIndex = UWSORTINDEX(taskCB->idxRollNum);    ⑵  listObject = g_taskSortLink.sortLink + sortIndex;         ⑶  if (listObject != taskCB->timerList.pstNext) {    ⑷      nextTask = LOS_DL_LIST_ENTRY(taskCB->timerList.pstNext, LosTaskCB, timerList);            UWROLLNUMADD(nextTask->idxRollNum, taskCB->idxRollNum);        }         ⑸  LOS_ListDelete(&taskCB->timerList);    }

⑴处代码获取待从排序链表中删除的工作对应的数字索引。⑵处代码获取排序链表的头节点listObject。⑶处代码判断待删除节点是否是最初一个节点,如果不是最初一个节点,执行执行⑷处代码获取待删除节点的下一个节点对应的工作nextTask,在下一个节点的滚动数中减少待删除节点的滚动数,而后执行⑸处代码执行删除操作。如果是最初一个节点,间接执行⑸处代码删除该节点即可。

2.4 获取下一个超时到期工夫

获取下一个超时到期工夫的函数为OsTaskNextSwitchTimeGet(),咱们剖析下其代码。

源码如下:

    UINT32 OsTaskNextSwitchTimeGet(VOID)    {        LosTaskCB *taskCB = NULL;        UINT32 taskSortLinkTick = LOS_WAIT_FOREVER;        LOS_DL_LIST *listObject = NULL;        UINT32 tempTicks;        UINT32 index;         ⑴  for (index = 0; index < OS_TSK_SORTLINK_LEN; index++) {    ⑵      listObject = g_taskSortLink.sortLink + ((g_taskSortLink.cursor + index) % OS_TSK_SORTLINK_LEN);    ⑶      if (!LOS_ListEmpty(listObject)) {    ⑷          taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList);    ⑸          tempTicks = (index == 0) ? OS_TSK_SORTLINK_LEN : index;    ⑹          tempTicks += (UINT32)(UWROLLNUM((UINT32)taskCB->idxRollNum) * OS_TSK_SORTLINK_LEN);    ⑺          if (taskSortLinkTick > tempTicks) {                    taskSortLinkTick = tempTicks;                }            }        }        return taskSortLinkTick;    }

⑴处代码循环遍历双向链表数组,⑵处代码从以后游标位置开始获取排序链表的头节点listObject。⑶处代码判断排序链表是否为空,如果排序链表为空,则持续遍历下一个数组。如果链表不为空,⑷处代码获取排序链表的第一个链表节点对应的工作。⑸处如果遍历的数字索引为0,tick数目应用32,否则应用具体的数字索引。⑹处获取工作的滚动数,计算出须要的等待时间,加上⑸处计算出的有余滚动一圈的工夫。⑺处计算出须要期待的最小工夫,即下一个最快到期的工夫。

3 排序链表和Tick工夫的关系

工作退出到排序链表后,工夫一个tick一个tick的逝去,排序链表中的滚动数该如何更新呢?

工夫每走过一个tick,零碎就会调用Tick中断的处理函数OsTickHandler(),该函数在kernel\src\los_tick.c文件中实现。上面是该函数的代码片段,⑴处代码别离工作的超时到期状况。

   LITE_OS_SEC_TEXT VOID OsTickHandler(VOID)    {    #if (LOSCFG_BASE_CORE_TICK_HW_TIME == 1)        platform_tick_handler();    #endif             g_ullTickCount++;         #if (LOSCFG_BASE_CORE_TIMESLICE == 1)        OsTimesliceCheck();    #endif         ⑴  OsTaskScan();  // task timeout scan         #if (LOSCFG_BASE_CORE_SWTMR == 1)        (VOID)OsSwtmrScan();    #endif    }

详细分析下函数OsTaskScan(),来理解排序链表和tick工夫的关系。函数在kernel\base\los_task.c文件中实现,代码片段如下:

    LITE_OS_SEC_TEXT VOID OsTaskScan(VOID)    {        LosTaskCB *taskCB = NULL;        BOOL needSchedule = FALSE;        LOS_DL_LIST *listObject = NULL;        UINT16 tempStatus;        UINTPTR intSave;        intSave = LOS_IntLock();         ⑴  g_taskSortLink.cursor = (g_taskSortLink.cursor + 1) % OS_TSK_SORTLINK_LEN;        listObject = g_taskSortLink.sortLink + g_taskSortLink.cursor;    ⑵  if (listObject->pstNext == listObject) {            LOS_IntRestore(intSave);            return;        }         ⑶  for (taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList);             &taskCB->timerList != (listObject);) {            tempStatus = taskCB->taskStatus;    ⑷      if (UWROLLNUM(taskCB->idxRollNum) > 0) {                UWROLLNUMDEC(taskCB->idxRollNum);                break;            }         ⑸      LOS_ListDelete(&taskCB->timerList);    ⑹      if (tempStatus & OS_TASK_STATUS_PEND) {                taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND);                LOS_ListDelete(&taskCB->pendList);                taskCB->taskSem = NULL;                taskCB->taskMux = NULL;            }    ⑺      else if (tempStatus & OS_TASK_STATUS_EVENT) {                taskCB->taskStatus &= ~(OS_TASK_STATUS_EVENT);            }    ⑻      else if (tempStatus & OS_TASK_STATUS_PEND_QUEUE) {                LOS_ListDelete(&taskCB->pendList);                taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND_QUEUE);            } else {                taskCB->taskStatus &= ~(OS_TASK_STATUS_DELAY);            }         ⑼      if (!(tempStatus & OS_TASK_STATUS_SUSPEND)) {                taskCB->taskStatus |= OS_TASK_STATUS_READY;                OsHookCall(LOS_HOOK_TYPE_MOVEDTASKTOREADYSTATE, taskCB);                OsPriqueueEnqueue(&taskCB->pendList, taskCB->priority);                needSchedule = TRUE;            }                 if (listObject->pstNext == listObject) {                break;            }                 taskCB = LOS_DL_LIST_ENTRY(listObject->pstNext, LosTaskCB, timerList);        }             LOS_IntRestore(intSave);         ⑽  if (needSchedule) {            LOS_Schedule();        }    }

⑴处代码更新全局变量g_taskSortLink的游标,指向双向链表数组下一个地位,而后获取该地位的双向链表头结点listObject。⑵如果链表为空,则返回。如果双向链表不为空,则执行⑶循环遍历每一个链表节点。⑷处如果链表节点的滚动数大于0,则滚动数减1,阐明工作还须要持续期待一轮。如果链表节点的滚动数等于0,阐明工作超时到期,执行⑸从排序链表中删除。接下来须要依据工作状态别离解决,⑹处如果代码是阻塞状态,勾销阻塞状态,并从阻塞链表中删除。⑺处如果工作阻塞在事件中,勾销阻塞状态。⑻如果工作阻塞在队列,从阻塞链表中删除,勾销阻塞状态,如果不是上述状态,勾销提早状态OS_TASK_STATUS_DELAY。⑼处如果代码是挂起状态,设置工作为就绪状态,退出工作就绪队列,设置须要从新调度标记。⑽如果设置须要从新调度,调用调度函数触发任务调度。

小结

把握鸿蒙轻内核的排序链表TaskSortLinkAttr这一重要的数据结构,会给进一步学习、剖析鸿蒙轻内核源代码打下了根底,让后续的学习更加容易。后续也会陆续推出更多的分享文章,敬请期待,也欢送大家分享学习、应用鸿蒙轻内核的心得,有任何问题、倡议,都能够留言给咱们: https://gitee.com/openharmony... 。为了更容易找到鸿蒙轻内核代码仓,倡议拜访 https://gitee.com/openharmony... ,关注Watch、点赞Star、并Fork到本人账户下,谢谢。

点击关注,第一工夫理解华为云陈腐技术~