菜鸟光系列浅谈SWOOLE协程篇

6次阅读

共计 8923 个字符,预计需要花费 23 分钟才能阅读完成。

阅读本文需要以下知识点

  • 了解进程、线程相关基础
  • 熟练 php 的 hello world 输出
  • 会 swoole 单词拼写

协程的介绍

协程是什么?

A coroutine is a function that can suspend its execution (yield) until the given given YieldInstruction finishes.

简单的说协程是寄宿在线程下程序员实现的一种跟更轻量的并发的协作轻量线程

随着程序员人群的增大, 大佬也不断的爆发式增长, 当然就开始有人觉得线程不好用了, 那怎么办呢? 当然是基于线程的理念上再去实现一套更加轻量、更好骗 star 的一套轻量线程(事实上协程不能完全被认为线程, 因为一个线程可以有多个协程)

协程和线程的区别

本质

线程 内核态
协程 用户态

调度方式

线程的调度方式为 系统调度 , 常用的调度策略有 分时调度 抢占调度。说白就是线程的调度完全不受自己控制

协程的调度方式为 协作式调度 不受内核控制由自由策略调度切换

等等

协作式调度?

上述说了协程是用户态的, 所以所谓的协作式调度直接可以理解为是程序员写的调度方式, 也就是我想怎么调度就怎么调度, 而不用通过系统内核被调度。

深。。。。浅入理解 swoole 的协程

既然打算浅入理解的 swoole 的协程,我们必须要知道 swoole 的协程模型。
swoole 的协程是基于单线程。可以理解为协程的切换是串行的,再同一个时间点只运行一个协程.

说到这里,肯定就有人问了。go 呢,go 的协程的是基于多线程。当然各有各的好处, 具体可以自行使用搜索引擎了解

我们可以直接 copy & paste 下面代码,再本地的环境进行的 demo run

<?php

$func = function ($index, $isCorotunine = true) {$isCorotunine && \Swoole\Coroutine::sleep(2);
    echo "index:" . $index . PHP_EOL;
    echo "is corotunine:" . intval($isCorotunine) . PHP_EOL;
};

$func(1, false);
go($func, 2, true);
go($func, 3, true);
go($func, 4, true);
go($func, 5, true);
go($func, 6, true);
$func(7, false);    

会得到以下结果

index:1
is corotunine:0
index:7
is corotunine:0
index:2
is corotunine:1
index:6
is corotunine:1
index:5
is corotunine:1
index:4
is corotunine:1
index:3
is corotunine:1

肯定有人会想, 哇塞,尽然 2 秒都执行完了,一点都不堵塞啊!!

好了,事实上关于 2 秒执行完的事情可以回过头再去看下协程的概念。
我们可以关注的是执行顺序,1 和 7 是非协程的执行能立马返回结果符合预期。
关于协程的调度顺序
为什么是 26543 不是 65432 或者 23456 有序的返回呢

为了找到我们的答案, 我们只能通过源码进行知晓一些东西

分析源码

图来自 https://segmentfault.com/a/11…

如果没有较强的基础还有啃烂的 apue 的前提下(当然我也没有!T_T)
我们需要关心的是以下两个
yield 切换协程
resume 恢复协程

协程的创建

<?php
go (function(){echo "swoole 太棒了";});

调用的 swoole 封装给 PHPgo函数为创建一个协程

我们根据拓展源码中的

大部分的 PHP 扩展函数以及扩展方法的参数声明放在 swoole_*.ccswoole.cc 里面。

PHP_FALIAS(go, swoole_coroutine_create, arginfo_swoole_coroutine_create)

可以知道 go->swoole_coroutine_create

在 swoole_coroutine.cc 文件里找到

PHP_FUNCTION(swoole_coroutine_create)
{
    ....
    // 划重点 要考
    long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params);
    ....
}

long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
{if (sw_unlikely(Coroutine::count() >= config.max_num))
    {php_swoole_fatal_error(E_WARNING, "exceed max number of coroutine %zu", (uintmax_t) Coroutine::count());
        return SW_CORO_ERR_LIMIT;
    }

    if (sw_unlikely(!active))
    {
        // 划重点 要考
        activate();}

    // 保存回调函数
    php_coro_args php_coro_args;
    // 函数信息
    php_coro_args.fci_cache = fci_cache;
    // 参数
    php_coro_args.argv = argv;
    php_coro_args.argc = argc;
    // 划重点 要考
    save_task(get_task());

    // 划重点 要考
    return Coroutine::create(main_func, (void*) &php_coro_args);
}
// 保存栈 
void PHPCoroutine::save_task(php_coro_task *task)
{save_vm_stack(task);
    save_og(task);
}
// 初始化 reactor 的事件
inline void PHPCoroutine::activate()
{if (sw_unlikely(active))
    {return;}

    /* init reactor and register event wait */
    php_swoole_check_reactor();

    /* replace interrupt function */
    orig_interrupt_function = zend_interrupt_function;
    zend_interrupt_function = coro_interrupt_function;
    
    /* replace the error function to save execute_data */
    orig_error_function = zend_error_cb;
    zend_error_cb = error;

    if (config.hook_flags)
    {enable_hook(config.hook_flags);
    }

    if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler)
    {
        /* create a thread to interrupt the coroutine that takes up too much time */
        interrupt_thread_start();}

    if (!coro_global_active)
    {if (zend_hash_str_find_ptr(&module_registry, ZEND_STRL("xdebug")))
        {php_swoole_fatal_error(E_WARNING, "Using Xdebug in coroutines is extremely dangerous, please notice that it may lead to coredump!");
        }

        /* replace functions that can not work correctly in coroutine */
        inject_function();

        coro_global_active = true;
    }
    /**
     * deactivate when reactor free.
     */
    swReactor_add_destroy_callback(SwooleG.main_reactor, deactivate, nullptr);
    active = true;
}

根据 Coroutine::create 继续往下跳转

    static inline long create(coroutine_func_t fn, void* args = nullptr)
    {return (new Coroutine(fn, args))->run();}

在创建完协程后立马执行
我们观察下构造方法

    Coroutine(coroutine_func_t fn, void *private_data) :
            ctx(stack_size, fn, private_data)
    {
        cid = ++last_cid;
        coroutines[cid] = this;
        if (sw_unlikely(count() > peak_num))
        {peak_num = count();
        }
    }

上述代码我可以发现还有一个 Context 的类 这个构造函数我们可以猜到做了 3 件事情

  1. 分配对应协程 id(每个协程都有自己的 id)
  2. 保存上下文
  3. 更新当前的协程的数量

swoole使用的协程库为 boost.context 可自行搜索
主要暴露的函数接口为jump_fcontextmake_fcontext
具体的作用 保存当前执行状态的上下文 暂停当前的执行状态 够跳转到其他位置继续执行

创建完协程立马执行

inline long run()
    {
        long cid = this->cid;
        origin = current;
        current = this;
        // 依赖 boost.context 切栈
        ctx.swap_in();
        // 判断是否执行结束
        check_end();
        return cid;
    }

判断是否结束

inline void check_end()
    {if (ctx.is_end())
        {close();
        }
        else if (sw_unlikely(on_bailout))
        {SW_ASSERT(current == nullptr);
            on_bailout();
            // expect that never here
            exit(1);
        }
    }

根据 ctx.is_end()的函数找到

    inline bool is_end()
    {return end_;}
bool Context::swap_in()
{jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true);
    return true;
}

我们可以总结下 swoole 在创建协程的时候主要做了哪些事情

  1. 检测环境
  2. 解析参数
  3. 保存上下文
  4. 切换 C 栈
  5. 执行协程

协程的 yield

上述的 demo 我们使用 \Swoole\Coroutine::sleep(2)
根据上述说函数申明的我们在 swoole_corotunine_system.cc 发现对应函数为 swoole_coroutine_systemsleep

PHP_METHOD(swoole_coroutine_system, sleep)
{
    double seconds;

    ZEND_PARSE_PARAMETERS_START(1, 1)
        Z_PARAM_DOUBLE(seconds)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

    if (UNEXPECTED(seconds < SW_TIMER_MIN_SEC))
    {php_swoole_fatal_error(E_WARNING, "Timer must be greater than or equal to" ZEND_TOSTR(SW_TIMER_MIN_SEC));
        RETURN_FALSE;
    }
    System::sleep(seconds);
    RETURN_TRUE;
}

调用了 sleep 函数之后对当前的协程做了三件事
1. 增加了timer 定时器
2. 注册回掉函数再延迟之后resume 协程
3. 通过yield 让出调度

int System::sleep(double sec)
{
// 获取当前的协程
    Coroutine* co = Coroutine::get_current_safe();
   //swTimer_add 注册定时器 sleep_timeout 回调的函数
   if (swTimer_add(&SwooleG.timer, (long) (sec * 1000), 0, co, sleep_timeout) == NULL)
    {return -1;}
    // 让出当前 cpu
    co->yield();
    return 0;
}

// 回调函数
static void sleep_timeout(swTimer *timer, swTimer_node *tnode)
{
   // 恢复调度
    ((Coroutine *) tnode->data)->resume();}
swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback)
{
    ....
    // 保存当前上下文和对应过期时间
    tnode->data = data;
    tnode->type = SW_TIMER_TYPE_KERNEL;
    tnode->exec_msec = now_msec + _msec;
    tnode->interval = interval ? _msec : 0;
    tnode->removed = 0;
    tnode->callback = callback;
    tnode->round = timer->round;
    tnode->dtor = NULL;

    // _next_msec 保存最快过期的事件
    if (timer->_next_msec < 0 || timer->_next_msec > _msec)
    {timer->set(timer, _msec);
        timer->_next_msec = _msec;
    }

    tnode->id = timer->_next_id++;
    if (sw_unlikely(tnode->id < 0))
    {
        tnode->id = 1;
        timer->_next_id = 2;
    }

    tnode->heap_node = swHeap_push(timer->heap, tnode->exec_msec, tnode);
    ....
    timer->num++;
    return tnode;
}

协程的切换

我们

void Coroutine::resume()
{SW_ASSERT(current != this);
    if (sw_unlikely(on_bailout))
    {return;}
    state = SW_CORO_RUNNING;
    if (sw_likely(on_resume))
    {on_resume(task);
    }
    // 将当前的协程保存为 origin -> 理解程 previous
    origin = current;
    // 需要执行的协程 变成 current
    current = this;
    // 入栈执行
    ctx.swap_in();
    check_end();}

到这里时候 关于协程调用顺序的答案已经出来了

在创建协程的时候 (new Coroutine(fn, args))->run();sleep触发 yield 都在不断变更的 Corotuninecurrentorigin 再执切换的时候和 php 代码创建协程的时间发生穿插,而不是我们想象中的 队列 有序执行
比如当创建协程只有 2 个的时候

<?php

$func = function ($index, $isCorotunine = true) {$isCorotunine && \Swoole\Coroutine::sleep(2);
    echo "index:" . $index . PHP_EOL;
    echo "is corotunine:" . intval($isCorotunine) . PHP_EOL;
};

$func(1, false);

go($func, 2, true);
go($func, 3, true);

返回输出 因为连续创建协程的执行时间小没有被打乱

php swoole_go_demo1.php
index:1
is corotunine:0
index:2
is corotunine:1
index:3
is corotunine:1

当连续创建的时候 200 个协程的时候
返回就变得打乱的index 符合预计猜想

index:1,index:2,index:4,index:8,index:16,index:32,index:64,index:128,index:129,index:65,index:130,index:131,index:33,index:66,index:132,index:133,index:67,index:134,index:135,index:17,index:34,index:68,index:136,index:137,index:69,index:138,index:139,index:35,index:70,index:140,index:141,index:71,index:142,index:143,index:9,index:18,index:36,index:72,index:144,index:145,index:73,index:146,index:147,index:158,index:157,index:156,index:155,index:154,index:153,index:152,index:151,index:37,index:74,index:148,index:149,index:75,index:150,index:19,index:38,index:76,index:77,index:39,index:78,index:79,index:5,index:10,index:20,index:40,index:80,index:81,index:41,index:82,index:83,index:21,index:127,index:126,index:125,index:124,index:123,index:122,index:121,index:120,index:119,index:118,index:117,index:116,index:115,index:114,index:113,index:112,index:111,index:110,index:109,index:108,index:107,index:106,index:105,index:104,index:103,index:102,index:101,index:100,index:99,index:98,index:97,index:96,index:95,index:94,index:93,index:92,index:91,index:90,index:89,index:88,index:87,index:42,index:84,index:85,index:43,index:86,index:11,index:22,index:44,index:45,index:23,index:46,index:47,index:3,index:6,index:12,index:24,index:48,index:49,index:25,index:50,index:51,index:13,index:26,index:63,index:62,index:61,index:60,index:59,index:58,index:57,index:56,index:55,index:52,index:53,index:27,index:54,index:7,index:14,index:28,index:29,index:15,index:30,index:31,index:200,index:199,index:192,index:185,index:175,index:168,index:161,index:163,index:172,index:179,index:187,index:194,index:174,index:160,index:173,index:176,index:198,index:195,index:180,index:167,index:169,index:184,index:197,index:193,index:177,index:162,index:171,index:186,index:182,index:164,index:191,index:183,index:166,index:196,index:178,index:170,index:189,index:188,index:165,index:181,index:190,index:159

最后彩蛋

我们使用 GO 的协程的来实现上述的 demo

package main

import (
    "fmt"
    "time"
)

var count int = 0

func main() {output(false, 1)

    go output(true, 2)
    go output(true, 3)
    go output(true, 4)
    go output(true, 5)
    go output(true, 6)

    output(false, 7)

    time.Sleep(time.Second)
}

func output(isCorotunine bool, index int) {time.Sleep(time.Second)
    count = count + 1
    fmt.Println(count, isCorotunine, index)
}

猜猜返回结果是如何的 可以根据 go 的协程基于多线程的方式再去研究下

写给最后, 文章纯属自己根据代码和资料理解, 如果有错误麻烦提出来, 倍感万分, 如果因为一些错误的观点被误导我只能说

正文完
 0