浅析State-Thread

61次阅读

共计 7283 个字符,预计需要花费 19 分钟才能阅读完成。

State-Thread(以下简称 st),是一个由 C 语言编写的小巧、简洁却高效的开源协程库。这个库基于单线程运作、不强制占用用户线程,给予了开发者最大程度的轻量级和较低的侵入性。本篇文章中,网易云信音视频研发大神将为大家简要分析 State-Thread,欢迎大家积极留言,和我们共同讨论。在开始这个话题之前,我们先来聊一聊协程。什么是协程?协程是一种程序组件。通常我们把协程理解为是一种程序自己实现调度、用于提高运行效率、降低开发复杂度的东西。提高运行效率很好理解,因为在程序层自己完成了部分的调度,降低了对系统调度的依赖,减少了大量的中断和换页操作。而降低了开发复杂度,则是指对于开发者而言,可以使用同步的方式去进行代码开发(不需要考虑异步模型的诸多回调),也不需要考虑多线程模型的线程调度和诸多的临界资源问题。很多语言都拥有协程,例如 python 或者 golang。而对于 c /c++ 而言,通常实现协程的常见方式,通常是依赖于 glibc 提供的 setjump&longjump 或者基于汇编语言,当然还有基于语义实现(protothread)。linux 上使用协程库的方式,通常也会分为替换函数和更为暴力的替换 so 来实现。当然而各种方式有各自的优劣。而 st 选用的汇编语言实现 setjump&longjump 和要求用户调用 st_打头的函数来嵌入程序。所以 st 具备了跨平台的能力,以及让开发者们更开心的“与允许调用者自行选择切换时机”的能力。st 究竟是如何实现了这一切?首先我们先看看 st 的整体工作流程:
在宏观的来看,ST 的结构主要分成:vp_schedule。主要是负责了一个调度的能力。有点类似于 linux 内核当中的 schedule() 函数。每次当这个函数被调用的时候,都会完成一次线程的切换。各种 Queue。用于保存各种状态下等待被调度协程(st_thread)Timer。用于记录各种超时和 sleep。poll。用于监听各种 io 事件,会根据系统能力不同而进行切换(kqueue、epoll、poll、select)。st_thread。用于保存各种协程的信息。其中比较重要的是 schedule 模块和 thread 模块两者。这两者实现了一个完整的协程切换和调度。属于 st 的核心。而 schedule 部分通常是开发者们最需要关心的部分。接下来我们会深入到代码层,看一下具体在这个过程里做了些什么。通常对于 st 而言,所有暴露给用户的除了 init 函数,就是一系列的 st_xxx 函数了。那么先看看 init 函数。int st_init(void){_st_thread_t *thread;
if (_st_active_count) {/ Already initialized / return 0;}
/ We can ignore return value here / st_set_eventsys(ST_EVENTSYS_DEFAULT);
if (_st_io_init() < 0) return -1;
memset(&_st_this_vp, 0, sizeof(_st_vp_t));
ST_INIT_CLIST(&_ST_RUNQ); ST_INIT_CLIST(&_ST_IOQ); ST_INIT_CLIST(&_ST_ZOMBIEQ);
if ((*_st_eventsys->init)() < 0) return -1;
_st_this_vp.pagesize = getpagesize(); _st_this_vp.last_clock = st_utime();
/*
Create idle thread
*/ _st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0); if (!_st_this_vp.idle_thread) return -1; _st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD; _st_active_count–; _ST_DEL_RUNQ(_st_this_vp.idle_thread);
/*
Initialize primordial thread
*/ thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) + (ST_KEYS_MAX sizeof(void))); if (!thread) return -1; thread->private_data = (void **) (thread + 1); thread->state = _ST_ST_RUNNING; thread->flags = _ST_FL_PRIMORDIAL; _ST_SET_CURRENT_THREAD(thread); _st_active_count++;
return 0;} 这段函数一共做了 3 事情,创建了一个 idle_thread, 初始化了_ST_RUNQ、_ST_IOQ、_ST_ZOMBIEQ 三个队列,把当前调用者初始化成原始函数(通常 st_init 会在 main 里面调用, 所以这个原始的 thread 相当于是主线程)。idle_thread 函数,其实就是整个 IO 和定时器相关的本体函数了。st 会在每一次_ST_RUNQ 运行完成后,调用 idle_thread 来获取可读写的 io 和定时器。这个我们后续再说。那么,st_xxx 一般会分成 io 类和延迟类(sleep)。两者入口其实是同一个,只不过在 io 类的会多调用一层。我们这里选择 st_send 为代表。int st_sendmsg(_st_netfd_t fd, const struct msghdr msg, int flags, st_utime_t timeout){int n;
while ((n = sendmsg(fd->osfd, msg, flags)) < 0) {if (errno == EINTR) continue; if (!_IO_NOT_READY_ERROR) return -1; / Wait until the socket becomes writable / if (st_netfd_poll(fd, POLLOUT, timeout) < 0) return -1; }
return n;} 本质上所有的 st 函数都是以异步接口 + st_netfd_poll 来实现的。在 st_netfd_poll 以内,会去调用 st_poll,而 st_poll 本质上会调用并且切换线程。int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout){struct pollfd pd; int n;
pd.fd = fd->osfd; pd.events = (short) how; pd.revents = 0;
if ((n = st_poll(&pd, 1, timeout)) < 0) return -1; if (n == 0) {/ Timed out / errno = ETIME; return -1;} if (pd.revents & POLLNVAL) {errno = EBADF; return -1;}
return 0;}
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout){struct pollfd *pd; struct pollfd *epd = pds + npds; _st_pollq_t pq; _st_thread_t *me = _ST_CURRENT_THREAD(); int n;
if (me->flags & _ST_FL_INTERRUPT) {me->flags &= ~_ST_FL_INTERRUPT; errno = EINTR; return -1;}
if ((*_st_eventsys->pollset_add)(pds, npds) < 0) return -1;
pq.pds = pds; pq.npds = npds; pq.thread = me; pq.on_ioq = 1; _ST_ADD_IOQ(pq); if (timeout != ST_UTIME_NO_TIMEOUT) _ST_ADD_SLEEPQ(me, timeout); me->state = _ST_ST_IO_WAIT;
_ST_SWITCH_CONTEXT(me);
n = 0; if (pq.on_ioq) {/ If we timed out, the pollq might still be on the ioq. Remove it / _ST_DEL_IOQ(pq); (*_st_eventsys->pollset_del)(pds, npds); } else {/ Count the number of ready descriptors / for (pd = pds; pd < epd; pd++) {if (pd->revents) n++; } }
if (me->flags & _ST_FL_INTERRUPT) {me->flags &= ~_ST_FL_INTERRUPT; errno = EINTR; return -1;}
return n;} 那么到此为止,st_poll 中就出现了我们最关心的调度部分了。当一个线程进行调度的时候一般都是 poll_add(如果是 io 操作),add_queue, _ST_SWITCH_CONTEXT 完成一次调度。根据不同的类型,会 add 到不同的 queue。例如需要超时,则会 add 到 IOQ 和 SLEEPQ。而_ST_SWITCH_CONTEXT,则是最关键的切换线程操作了。_ST_SWITCH_CONTEXT 其实是一个宏,它的本质是调用了 MD_SETJMP 和_st_vp_schedule().
define _ST_SWITCH_CONTEXT(_thread) \
ST_BEGIN_MACRO \ ST_SWITCH_OUT_CB(_thread); \ if (!MD_SETJMP((_thread)->context)) {\ _st_vp_schedule(); \ } \ ST_DEBUG_ITERATE_THREADS(); \ ST_SWITCH_IN_CB(_thread); \ ST_END_MACRO 这个函数其实就是一个完成的线程切换了。在 st 里线程的切换会使用 MD_SETJMP->_st_vp_schedule->MD_LONGJMP。MD_SETJMP 和 MD_LONGJMP 其实就是 st 使用汇编自己写的 setjmp 和 longjmp 函数(glibc),效果也是几乎等效的。(因为 st 本身会做平台适配,所以我们以 x86-64 的汇编为例)
elif defined(__amd64__) || defined(__x86_64__)
/*
Internal __jmp_buf layout
*/
define JB_RBX 0
define JB_RBP 1
define JB_R12 2
define JB_R13 3
define JB_R14 4
define JB_R15 5
define JB_RSP 6
define JB_PC 7
.file “md.S” .text
/ _st_md_cxt_save(__jmp_buf env) /.globl _st_md_cxt_save .type _st_md_cxt_save, @function .align 16_st_md_cxt_save: /*
Save registers.
*/ movq %rbx, (JB_RBX*8)(%rdi) movq %rbp, (JB_RBP*8)(%rdi) movq %r12, (JB_R12*8)(%rdi) movq %r13, (JB_R13*8)(%rdi) movq %r14, (JB_R14*8)(%rdi) movq %r15, (JB_R15*8)(%rdi) / Save SP / leaq 8(%rsp), %rdx movq %rdx, (JB_RSP*8)(%rdi) / Save PC we are returning to / movq (%rsp), %rax movq %rax, (JB_PC*8)(%rdi) xorq %rax, %rax ret .size _st_md_cxt_save, .-_st_md_cxt_save
//
/ _st_md_cxt_restore(__jmp_buf env, int val) /.globl _st_md_cxt_restore .type _st_md_cxt_restore, @function .align 16_st_md_cxt_restore: /*
Restore registers.
*/ movq (JB_RBX*8)(%rdi), %rbx movq (JB_RBP*8)(%rdi), %rbp movq (JB_R12*8)(%rdi), %r12 movq (JB_R13*8)(%rdi), %r13 movq (JB_R14*8)(%rdi), %r14 movq (JB_R15*8)(%rdi), %r15 / Set return value / test %esi, %esi mov $01, %eax cmove %eax, %esi mov %esi, %eax movq (JB_PC*8)(%rdi), %rdx movq (JB_RSP*8)(%rdi), %rsp / Jump to saved PC / jmpq *%rdx .size _st_md_cxt_restore, .-_st_md_cxt_restore
//MD_SETJMP 的时候,会使用汇编把所有寄存器的信息保留下来,而 MD_LONGJMP 则会把所有的寄存器信息重新加载出来。两者配合使用的时候,可以完成一次函数间的跳转。那么我们已经看到了 MD_SETJMP 的调用,MD_LONGJMP 调用在哪儿呢?让我们继续看下去,在最一开始,我们就提及过_st_vp_schedule() 这个核心函数。void _st_vp_schedule(void){_st_thread_t *thread;
if (_ST_RUNQ.next != &_ST_RUNQ) {/ Pull thread off of the run queue / thread = _ST_THREAD_PTR(_ST_RUNQ.next); _ST_DEL_RUNQ(thread); } else {/ If there are no threads to run, switch to the idle thread / thread = _st_this_vp.idle_thread;} ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
/ Resume the thread / thread->state = _ST_ST_RUNNING; _ST_RESTORE_CONTEXT(thread);} 这个函数其实非常简单,基本工作原理可以认为是执行以下几步:1. 查看当前 RUNQ 是否有可以调用的,如果有,则 RUNQ pop 一个 thread。2. 如果没有,则运行 idle_thread。3. 调用_ST_RESTORE_CONTEXT。那么_ST_RESTORE_CONTEXT 做了什么呢?
define _ST_RESTORE_CONTEXT(_thread) \
ST_BEGIN_MACRO \ _ST_SET_CURRENT_THREAD(_thread); \ MD_LONGJMP((_thread)->context, 1); \ ST_END_MACRO 简单来说,_ST_RESTORE_CONTEXT 就是调用了我们之前所没有看到的 MD_LONGJMP。所以,我们可以简单地认为,在携程需要 schedule 的时候,会先把自身当前的栈通过 MD_SETJMP 保存起来,当线程被 schedule 再次调度出来的时候,则会使用 MD_SETJMP 来还原栈,完成一次协程切换。然后我们来看看 idle_thread 做了什么。虽然这个协程名字叫做 idle,但是其实做了很多的事情。void _st_idle_thread_start(void arg){_st_thread_t *me = _ST_CURRENT_THREAD();
while (_st_active_count > 0) {/ Idle vp till I/O is ready or the smallest timeout expired / _ST_VP_IDLE();
/ Check sleep queue for expired threads / _st_vp_check_clock();
me->state = _ST_ST_RUNNABLE; _ST_SWITCH_CONTEXT(me); }
/ No more threads / exit(0);
/ NOTREACHED / return NULL;} 总的来说,idle_thread 做了两件事情。1. _ST_VP_IDLE() 2. _st_vp_check_clock()。_st_vp_check_clock 很好理解,就是检查定时器是否超时,如果超时了,则设置超时标记之后,放回 RUNQ。而_ST_VP_IDLE,其实就是查看 io 是否已经 ready 了。例如 linux 的话,则会调用 epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist,_st_epoll_data->evtlist_size, timeout) 去查看是否有可响应的 io。timeout 值会根据当前空闲情况进行变化,通常来说会是一个极小的值。那么看到这里,整体的线程调度已经全部走完了。(详见前面最一开始的流程图)总体流程总结来说基本上是 func() -> st_xxxx() -> AddQ -> MD_SETJMP -> schedule() -> MD_LONG -> func()。所以对于 st 而言,所以的调度,是基于用户调用。那么如果用户一直不调用 st_xxx()(例如计算密集性服务),st 也就无法进行协程切换,那么其他协程也就产生极大的阻塞了。这也是为什么 st 并不太合适计算密集型的原因(其实单线程框架大多都不合适计算密集型)
想要阅读更多技术干货文章,欢迎关注网易云信博客。了解网易云信,来自网易核心架构的通信与视频云服务。

正文完
 0