关于iot:鸿蒙轻内核M核源码分析数据结构之任务排序链表

8次阅读

共计 9134 个字符,预计需要花费 23 分钟才能阅读完成。

摘要: 鸿蒙轻内核的工作排序链表,用于工作提早到期 / 超时唤醒等业务场景,是一个十分重要、十分根底的数据结构。

本文会持续给读者介绍鸿蒙轻内核源码中重要的数据结构:工作排序链表 TaskSortLinkAttr。鸿蒙轻内核的工作排序链表,用于工作提早到期 / 超时唤醒等业务场景,是一个十分重要、十分根底的数据结构。本文中所波及的源码,以 OpenHarmony LiteOS- M 内核为例,均能够在开源站点 https://gitee.com/openharmony… 获取。

1 工作排序链表

咱们先看下工作排序链接的数据结构。工作排序链表是一个环状的双向链表数组,工作排序链表属性构造体 TaskSortLinkAttr 作为双向链表的头结点,指向双向链表数组的第一个元素,还保护游标信息,记录以后的地位信息。咱们先看下排序链表属性的构造体的定义。

1.1 工作排序链表属性构造体定义

在 kernel\include\los_task.h 头文件中定义了排序链表属性的构造体 TaskSortLinkAttr。该构造体定义了排序链表的头节点 LOS_DL_LIST *sortLink,游标 UINT16 cursor,还有一个保留字段,临时没有应用。

源码如下:

 typedef struct {
        LOS_DL_LIST *sortLink;
        UINT16      cursor;
        UINT16      reserved;
    } TaskSortLinkAttr;

在文件 kernel\src\los_task.c 中定义了排序链表属性构造体 TaskSortLinkAttr 类型的全局变量 g_taskSortLink,该全局变量的成员变量 sortLink 作为排序链表的头结点,指向一个长度为 32 的环状的双向链表数组,成员变量 cursor 作为游标记录环状数组的以后游标位置。源代码如下。

​​​​​​​    LITE_OS_SEC_BSS  TaskSortLinkAttr                    g_taskSortLink;

咱们应用示意图来讲述一下。工作排序链表是环状双向链表数组,长度为 32,每一个元素是一个双向链表,挂载工作 LosTaskCB 的链表节点 timerList。工作 LosTaskCB 的成员变量 idxRollNum 记录数组的索引和滚动数。全局变量 g_taskSortLink 的成员变量 cursor 记录以后游标位置,每过一个 Tick,游标指向下一个地位,转一轮须要 32 ticks。当运行到的数组地位,双向链表不为空,则把第一个节点保护的滚动数减 1。这样的数据结构相似钟表表盘,也称为工夫轮。

咱们举个例子来阐明,基于工夫轮实现的工作排序链表是如何治理工作提早超时的。如果以后游标 cursor 为 1,当一个工作须要延时 72 ticks,72=2*32+8,示意排序索引 sortIndex 为 8,滚动数 rollNum 为 2。会把工作插入数组索引为 sortIndex+cursor= 9 的双向链表地位,索要 9 处的双向链表保护节点的滚动为 2。随着 Tick 工夫的进行,从以后游标位置运行到数组索引地位 9,历时 8 ticks。运行到 9 时,如果滚动数大于 0,则把滚动数减 1。等再运行 2 轮,共须要 72 ticks,工作就会提早到期,能够从排序链表移除。每个数组元素对应的双向链表的第一个链表节点的滚动数示意须要转多少轮,节点工作才到期。第二个链表节点的滚动数须要加上第一个节点的滚动数,示意第二个节点须要转的轮数。顺次类推。

示意图如下:

1.2 工作排序链表宏定义

在 OS_TSK_SORTLINK_LEN 头文件中定义了一些和工作排序链表相干的宏定义。提早工作双向链表数组的长度定义为 32,高阶 bit 位位数为 5,低阶 bit 位位数为 27。对于工作的超时工夫,取其高 27 位作为滚动数,低 5 位作为数组索引。

源码如下:

​​​​​​​

  /**
        * 提早工作双向链表数组的数量(桶的数量):32
        */
        #define OS_TSK_SORTLINK_LEN                         32
     
        /**
        * 高阶 bit 位数目:5
        */
        #define OS_TSK_HIGH_BITS                            5U
     
        /**
        * 低阶 bit 位数目:27
        */
        #define OS_TSK_LOW_BITS                             (32U - OS_TSK_HIGH_BITS)
     
        /**
        * 滚动数最大值:0xFFFF FFDF,1111 0111 1111 1111 1111 1111 1101 1111
        */
        #define OS_TSK_MAX_ROLLNUM                          (0xFFFFFFFFU - OS_TSK_SORTLINK_LEN)
     
        /**
        * 工作延迟时间数的位宽:5
        */
        #define OS_TSK_SORTLINK_LOGLEN                      5
     
        /**
        * 提早工作的桶编号的掩码:31、0001 1111
        */
        #define OS_TSK_SORTLINK_MASK                        (OS_TSK_SORTLINK_LEN - 1U)
     
        /**
        * 滚动数的高阶掩码:1111 1000 0000 0000 0000 0000 0000 0000
        */
        #define OS_TSK_HIGH_BITS_MASK                       (OS_TSK_SORTLINK_MASK << OS_TSK_LOW_BITS)
     
        /**
        * 滚动数的低阶掩码:0000 0111 1111 1111 1111 1111 1111 1111
        */
        #define OS_TSK_LOW_BITS_MASK                        (~OS_TSK_HIGH_BITS_MASK)

2 工作排序链表操作

咱们剖析下工作排序链表的操作,蕴含初始化,插入,删除,滚动数更新,获取下一个到期工夫等。

2.1 初始化排序链表

在系零碎内核初始化启动阶段,在函数 UINT32 OsTaskInit(VOID) 中初始化工作排序链表。该函数的调用关系如下,main.c:main() –> kernel\src\los_init.c:LOS_KernelInit() –> kernel\src\los_task.c:OsTaskInit()。

初始化排序链表函数的源码如下:

​​​​​​​

 LITE_OS_SEC_TEXT_INIT UINT32 OsTaskInit(VOID)
    {
        UINT32 size;
        UINT32 index;
        LOS_DL_LIST *listObject = NULL;
        ......
    ⑴  size = sizeof(LOS_DL_LIST) * OS_TSK_SORTLINK_LEN;
        listObject = (LOS_DL_LIST *)LOS_MemAlloc(m_aucSysMem0, size);
    ⑵  if (listObject == NULL) {(VOID)LOS_MemFree(m_aucSysMem0, g_taskCBArray);
            return LOS_ERRNO_TSK_NO_MEMORY;
        }
     
    ⑶  (VOID)memset_s((VOID *)listObject, size, 0, size);
    ⑷  g_taskSortLink.sortLink = listObject;
        g_taskSortLink.cursor = 0;
        for (index = 0; index < OS_TSK_SORTLINK_LEN; index++, listObject++) {⑸      LOS_ListInit(listObject);
        }
     
        return LOS_OK;
    }
     

⑴处代码计算须要申请的双向链表的内存大小,OS_TSK_SORTLINK_LEN 为 32,即须要为 32 个双向链表节点申请内存空间。而后申请内存,⑵处申请内存失败时返回相应错误码。⑶处初始化申请的内存区域为 0 等。⑷处把申请的双向链表节点赋值给 g_taskSortLink 的链表节点.sortLink,作为排序链表的头节点,游标.cursor 初始化为 0。而后⑸处的循环,调用 LOS_ListInit() 函数把双向链表数组每个元素都初始化为双向循环链表。

2.2 插入排序链表

插入排序链表的函数为 OsTaskAdd2TimerList()。在工作期待互斥锁 / 信号量等资源时,都须要调用该函数将工作退出到对应的排序链表中。该函数蕴含两个入参,第一个参数 LosTaskCB *taskCB 用于指定要提早的工作,第二个参数 UINT32 timeout 指定超时等待时间。

源码如下:

​​​​​​​

 LITE_OS_SEC_TEXT VOID OsTaskAdd2TimerList(LosTaskCB *taskCB, UINT32 timeout)
    {
        LosTaskCB *taskDelay = NULL;
        LOS_DL_LIST *listObject = NULL;
        UINT32 sortIndex;
        UINT32 rollNum;
     
    ⑴  sortIndex = timeout & OS_TSK_SORTLINK_MASK;
        rollNum = (timeout >> OS_TSK_SORTLINK_LOGLEN);
    ⑵  (sortIndex > 0) ? 0 : (rollNum--);
    ⑶  EVALUATE_L(taskCB->idxRollNum, rollNum);
    ⑷  sortIndex = (sortIndex + g_taskSortLink.cursor);
        sortIndex = sortIndex & OS_TSK_SORTLINK_MASK;
    ⑸  EVALUATE_H(taskCB->idxRollNum, sortIndex);
    ⑹  listObject = g_taskSortLink.sortLink + sortIndex;
    ⑺  if (listObject->pstNext == listObject) {LOS_ListTailInsert(listObject, &taskCB->timerList);
        } else {⑻      taskDelay = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList);
            do {⑼          if (UWROLLNUM(taskDelay->idxRollNum) <= UWROLLNUM(taskCB->idxRollNum)) {UWROLLNUMSUB(taskCB->idxRollNum, taskDelay->idxRollNum);
                } else {⑽              UWROLLNUMSUB(taskDelay->idxRollNum, taskCB->idxRollNum);
                    break;
                }
     
    ⑾          taskDelay = LOS_DL_LIST_ENTRY(taskDelay->timerList.pstNext, LosTaskCB, timerList);
            } while (&taskDelay->timerList != (listObject));
     
    ⑿      LOS_ListTailInsert(&taskDelay->timerList, &taskCB->timerList);
        }
    }

⑴处代码计算等待时间 timeout 的低 5 位作为数组索引,高 27 位作为滚动数 rollNum。这 2 行代码数学上的意义,就是把等待时间处于 32 失去的商作为滚动数,余数作为数组索引。⑵处代码,如果余数为 0,能够整除时,滚动数减 1。减 1 设计的起因是,在函数 VOID OsTaskScan(VOID) 中,每一个 tick 到来时,如果滚动数大于 0,滚动数减 1,并持续滚动一圈。后文会剖析该函数 VOID OsTaskScan(VOID)。

⑶处代码把滚动数赋值给工作 taskCB->idxRollNum 的低 27 位。⑷处把数组索引加上游标,而后执行⑸赋值给工作 taskCB->idxRollNum 的高 5 位。⑹依据数组索引获取双向链表头结点,⑺如果此处双向链表为空,直接插入链表里。如果链表不为空,执行⑻获取第一个链表节点对应的工作 taskDelay,而后遍历循环双向链表,把工作插入到适合的地位。⑼处如果待插入工作 taskCB 的滚动数大于等于以后链表节点对应工作的滚动数,则从待插入工作 taskCB 的滚动数中减去以后链表节点对应工作的滚动数,而后执行⑾获取下一个节点持续遍历。⑽处如果待插入工作 taskCB 的滚动数小于以后链表节点对应工作的滚动数,则从以后链表节点对应工作的滚动数中减去待插入工作 taskCB 的滚动数,而后跳出循环。执行⑿,实现工作插入。插入过程,能够联合上文的示意图进行了解。

2.3 从排序链表中删除

从排序链表中删除的函数为 VOID OsTimerListDelete(LosTaskCB taskCB)。在工作复原 / 删除等场景中,须要调用该函数将工作从工作排序链表中删除。该函数蕴含一个参数 LosTaskCB taskCB,用于指定要从排序链表中删除的工作。

源码如下:

​​​​​​​

 LITE_OS_SEC_TEXT VOID OsTimerListDelete(LosTaskCB *taskCB)
    {
        LOS_DL_LIST  *listObject = NULL;
        LosTaskCB  *nextTask = NULL;
        UINT32 sortIndex;
     
    ⑴  sortIndex = UWSORTINDEX(taskCB->idxRollNum);
    ⑵  listObject = g_taskSortLink.sortLink + sortIndex;
     
    ⑶  if (listObject != taskCB->timerList.pstNext) {⑷      nextTask = LOS_DL_LIST_ENTRY(taskCB->timerList.pstNext, LosTaskCB, timerList);
            UWROLLNUMADD(nextTask->idxRollNum, taskCB->idxRollNum);
        }
     
    ⑸  LOS_ListDelete(&taskCB->timerList);
    }

⑴处代码获取待从排序链表中删除的工作对应的数字索引。⑵处代码获取排序链表的头节点 listObject。⑶处代码判断待删除节点是否是最初一个节点,如果不是最初一个节点,执行执行⑷处代码获取待删除节点的下一个节点对应的工作 nextTask,在下一个节点的滚动数中减少待删除节点的滚动数,而后执行⑸处代码执行删除操作。如果是最初一个节点,间接执行⑸处代码删除该节点即可。

2.4 获取下一个超时到期工夫

获取下一个超时到期工夫的函数为 OsTaskNextSwitchTimeGet(),咱们剖析下其代码。

源码如下:

​​​​​​​

    UINT32 OsTaskNextSwitchTimeGet(VOID)
    {
        LosTaskCB *taskCB = NULL;
        UINT32 taskSortLinkTick = LOS_WAIT_FOREVER;
        LOS_DL_LIST *listObject = NULL;
        UINT32 tempTicks;
        UINT32 index;
     
    ⑴  for (index = 0; index < OS_TSK_SORTLINK_LEN; index++) {⑵      listObject = g_taskSortLink.sortLink + ((g_taskSortLink.cursor + index) % OS_TSK_SORTLINK_LEN);
    ⑶      if (!LOS_ListEmpty(listObject)) {⑷          taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList);
    ⑸          tempTicks = (index == 0) ? OS_TSK_SORTLINK_LEN : index;
    ⑹          tempTicks += (UINT32)(UWROLLNUM((UINT32)taskCB->idxRollNum) * OS_TSK_SORTLINK_LEN);
    ⑺          if (taskSortLinkTick > tempTicks) {taskSortLinkTick = tempTicks;}
            }
        }
        return taskSortLinkTick;
    }

⑴处代码循环遍历双向链表数组,⑵处代码从以后游标位置开始获取排序链表的头节点 listObject。⑶处代码判断排序链表是否为空,如果排序链表为空,则持续遍历下一个数组。如果链表不为空,⑷处代码获取排序链表的第一个链表节点对应的工作。⑸处如果遍历的数字索引为 0,tick 数目应用 32,否则应用具体的数字索引。⑹处获取工作的滚动数,计算出须要的等待时间,加上⑸处计算出的有余滚动一圈的工夫。⑺处计算出须要期待的最小工夫,即下一个最快到期的工夫。

3 排序链表和 Tick 工夫的关系

工作退出到排序链表后,工夫一个 tick 一个 tick 的逝去,排序链表中的滚动数该如何更新呢?

工夫每走过一个 tick,零碎就会调用 Tick 中断的处理函数 OsTickHandler(),该函数在 kernel\src\los_tick.c 文件中实现。上面是该函数的代码片段,⑴处代码别离工作的超时到期状况。

​​​​​​​

   LITE_OS_SEC_TEXT VOID OsTickHandler(VOID)
    {#if (LOSCFG_BASE_CORE_TICK_HW_TIME == 1)
        platform_tick_handler();
    #endif
     
        g_ullTickCount++;
     
    #if (LOSCFG_BASE_CORE_TIMESLICE == 1)
        OsTimesliceCheck();
    #endif
     
    ⑴  OsTaskScan();  // task timeout scan
     
    #if (LOSCFG_BASE_CORE_SWTMR == 1)
        (VOID)OsSwtmrScan();
    #endif
    }

详细分析下函数 OsTaskScan(),来理解排序链表和 tick 工夫的关系。函数在 kernel\base\los_task.c 文件中实现,代码片段如下:

​​​​​​​

    LITE_OS_SEC_TEXT VOID OsTaskScan(VOID)
    {
        LosTaskCB *taskCB = NULL;
        BOOL needSchedule = FALSE;
        LOS_DL_LIST *listObject = NULL;
        UINT16 tempStatus;
        UINTPTR intSave;
        intSave = LOS_IntLock();
     
    ⑴  g_taskSortLink.cursor = (g_taskSortLink.cursor + 1) % OS_TSK_SORTLINK_LEN;
        listObject = g_taskSortLink.sortLink + g_taskSortLink.cursor;
    ⑵  if (listObject->pstNext == listObject) {LOS_IntRestore(intSave);
            return;
        }
     
    ⑶  for (taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList);
             &taskCB->timerList != (listObject);) {
            tempStatus = taskCB->taskStatus;
    ⑷      if (UWROLLNUM(taskCB->idxRollNum) > 0) {UWROLLNUMDEC(taskCB->idxRollNum);
                break;
            }
     
    ⑸      LOS_ListDelete(&taskCB->timerList);
    ⑹      if (tempStatus & OS_TASK_STATUS_PEND) {taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND);
                LOS_ListDelete(&taskCB->pendList);
                taskCB->taskSem = NULL;
                taskCB->taskMux = NULL;
            }
    ⑺      else if (tempStatus & OS_TASK_STATUS_EVENT) {taskCB->taskStatus &= ~(OS_TASK_STATUS_EVENT);
            }
    ⑻      else if (tempStatus & OS_TASK_STATUS_PEND_QUEUE) {LOS_ListDelete(&taskCB->pendList);
                taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND_QUEUE);
            } else {taskCB->taskStatus &= ~(OS_TASK_STATUS_DELAY);
            }
     
    ⑼      if (!(tempStatus & OS_TASK_STATUS_SUSPEND)) {
                taskCB->taskStatus |= OS_TASK_STATUS_READY;
                OsHookCall(LOS_HOOK_TYPE_MOVEDTASKTOREADYSTATE, taskCB);
                OsPriqueueEnqueue(&taskCB->pendList, taskCB->priority);
                needSchedule = TRUE;
            }
     
            if (listObject->pstNext == listObject) {break;}
     
            taskCB = LOS_DL_LIST_ENTRY(listObject->pstNext, LosTaskCB, timerList);
        }
     
        LOS_IntRestore(intSave);
     
    ⑽  if (needSchedule) {LOS_Schedule();
        }
    }

⑴处代码更新全局变量 g_taskSortLink 的游标,指向双向链表数组下一个地位,而后获取该地位的双向链表头结点 listObject。⑵如果链表为空,则返回。如果双向链表不为空,则执行⑶循环遍历每一个链表节点。⑷处如果链表节点的滚动数大于 0,则滚动数减 1,阐明工作还须要持续期待一轮。如果链表节点的滚动数等于 0,阐明工作超时到期,执行⑸从排序链表中删除。接下来须要依据工作状态别离解决,⑹处如果代码是阻塞状态,勾销阻塞状态,并从阻塞链表中删除。⑺处如果工作阻塞在事件中,勾销阻塞状态。⑻如果工作阻塞在队列,从阻塞链表中删除,勾销阻塞状态,如果不是上述状态,勾销提早状态 OS_TASK_STATUS_DELAY。⑼处如果代码是挂起状态,设置工作为就绪状态,退出工作就绪队列,设置须要从新调度标记。⑽如果设置须要从新调度,调用调度函数触发任务调度。

小结

把握鸿蒙轻内核的排序链表 TaskSortLinkAttr 这一重要的数据结构,会给进一步学习、剖析鸿蒙轻内核源代码打下了根底,让后续的学习更加容易。后续也会陆续推出更多的分享文章,敬请期待,也欢送大家分享学习、应用鸿蒙轻内核的心得,有任何问题、倡议,都能够留言给咱们:https://gitee.com/openharmony…。为了更容易找到鸿蒙轻内核代码仓,倡议拜访 https://gitee.com/openharmony…,关注 Watch、点赞 Star、并 Fork 到本人账户下,谢谢。

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0