关于github:消息队列新实现Workflow-msgqueue代码详解

38次阅读

共计 6496 个字符,预计需要花费 17 分钟才能阅读完成。

PART – 0 一个小小的翻新

开源我的项目 Workflow 中有许多小翻新的根底模块,明天来介绍最罕用的传统数据结构:多生产者 - 多消费者 - 音讯队列

着重申明一下,这里说的音讯队列不是 kafka 这样的音讯队列服务,而是单机内的 传统数据结构,次要用于机器资源的协调调度,比方线程 / 协程调度、网络异步资源收发等,即能够协调执行资源,又能够给数据资源当长期 buffer 用。

前段时间介绍过 300 行代码线程池 、以及在线程池之上用作计算调度的200 行Executor,而明天介绍的 msgqueue,更简略、更罕用, 代码不到 200 行,极度治愈懒癌早期的你~msgqueue 模块同样十分独立,能够间接拿进去使(把)用(玩)。

👉 一句话概括:外部应用两个队列,别离拆开了生产者和消费者之间的争抢,晋升了吞吐的同时,仍然维持了比拟优良的长尾、且兼顾保序与代码极简的优良传统;而其中极度节约的链表实现形式,在缩小内存调配上也有可学习之处。

代码地位:https://github.com/sogou/workflow/blob/master/src/kernel/msgqueue.c

PART – 1 音讯队列常见实现

意识比拟早的小伙伴可能晓得,其实在我的项目开源以前,我就为音讯队列写过一系列的文章和比照了,以下按集体看过的一些给出常见的实现,也欢送大家评论里补充看过感觉很棒的代码:

  • 简略粗犷版:一把锁两个条件变量的根本实现
    因为太简略了,大家手写即可~
  • 双队列版:Workflow 的 msgqueue
    https://github.com/sogou/work…
  • 链表版:LinkedBlockingQueue
    https://developer.android.com…
    (这个 BlockingQueue 系列还蛮多实现的)
  • grpc 版:mpmc queue
    https://github.com/grpc/grpc/…
  • 无锁版:内核的单生产者单消费者 kfifo
    https://github.com/torvalds/l…
  • 不保序版:go 的调度 workstealing
    https://github.com/golang/go/…

以上这些代码不仅值得一读,闲来无事的时候也很值得学着本人实现一把~

PART – 2 msgqueue 的算法

Workflow 的 msgqueue 很简略,两张图足以说分明内部结构和流程:

几个特点:

  1. 外部有 两个 list生产者 把音讯放到 生产队列 消费者 生产队列 取音讯;
  2. 应用了 两把锁 别离治理两个队列;
  3. 应用了 两个条件变量 别离治理生产者和消费者的期待唤醒;
  4. 队列能够有 block / nonblock 两种状态;
  5. 如果为 block,则 生产者队列最大长度为 maxlen,如果为 nonblock,不限度最大长度;

算法很简略步骤:

  1. 当 get_list(也就是消费者队列)不为空,消费者能够拿到一个音讯;
  2. 否则消费者会期待,直到 put_list(也就是生产者队列)不为空,而后替换两个队列;

这对于队列 十分忙碌 、且 消费者很多 的状况下,性能是十分好的~

很久以前我在集体的 queue 我的项目中有很简略的压测数据:GitHub – holmes1412/queue: some different implements of queue and test,不太欠缺,仅供参考,毕竟我智商摆在这._.

也举荐通过以下小我的项目,看看 msgqueue 如何重构线程池来达到性能的飞跃::https://github.com/Barenboim/msgqueue_thrdpool

PART – 3 代码详解

还是依照以前介绍过的万能七步,咱们能够跟着这 100 多行代码把队列学习一遍~

第一步:通过头文件看接口

关上msgqueue.h:

msgqueue_t *msgqueue_create(size_t maxlen, int linkoff);
void msgqueue_put(void *msg, msgqueue_t *queue);
void *msgqueue_get(msgqueue_t *queue);
void msgqueue_set_nonblock(msgqueue_t *queue);
void msgqueue_set_block(msgqueue_t *queue);
void msgqueue_destroy(msgqueue_t *queue);
  • msgqueue_create()函数创立一个音讯队列。
  • 参数 maxlen 代表生产队列的最大长度,默认的模式下,达到最大长度时生产者将会阻塞。
  • 而第二个参数 linkoff 是这个模块的一个亮点。它让用户指定一个音讯的偏移量,每条音讯的这个地位用户须要预留一个指针大小的空间,用于外部拉链。这一个简略的设计,防止进出音讯队列时,多一次内存调配与开释。

如果感觉这个解释还不清晰,也能够看看模块里作者的正文:

理解过 msgqueue_create() 接口,msgqueue_get()和 msgqueue_put() 就毋庸过多介绍了。留神 msg 的 linkoff 地位,须要预留一个指针。

第二步:.h 接口上的数据结构

上述接口上的msgqueue_t,是音讯队列的真身,看起来实现在 msgqueue.c 里。

typedef struct __msgqueue msgqueue_t;

第三步:.c 文件的外部数据结构

接下来是激动人心的时刻,为了不便起见,这里间接用了 void ** 去做链表,这样做的一大劣势是:

充分利用用户调配的 msg 内存,音讯队列外部能够省去调配开释空间的开销(我可真是个小机灵鬼(๑´ ▽ `๑)ノ

当然怎么实现不重要,无需纠结~

struct __msgqueue
{
    size_t msg_max;
    size_t msg_cnt;
    int linkoff;
    int nonblock;
    void *head1;
    void *head2;
    void **get_head;
    void **put_head;
    void **put_tail;
    pthread_mutex_t get_mutex;
    pthread_mutex_t put_mutex;
    pthread_cond_t get_cond;
    pthread_cond_t put_cond;
};

这里就能够对得上先前讲述的:

  • 两个外部队列:get_head, put_head
  • 两把锁:get_mutex, put_mutex
  • 两个条件变量:get_cond, put_cond
  • 另外的 msg_maxmsg_cnt很好了解,别离是外部生产者队列容许的最大长度,以及生产者队列以后理论长度。
  • nonblock 显然是标记这个队列 是否为阻塞模式,为了简化,咱们上面的代码都只探讨阻塞模式。
  • 咱们看到有 put_tail,然而没有 get_tail,因为消费者 get 时只管从 head 里拿就能够了,只有生产者 put 才须要通过 head 和 tail, 来保障音讯的全局有序
  • linkoff曾经介绍过,这是 外部链表算偏移量 的关键点。

第四步:看接口的实现

先看一下msgqueue_create(),根本足以看清楚外部数据管理形式了:

msgqueue_t *msgqueue_create(size_t maxlen, int linkoff)
{
    // 各种初始化,最初设置 queue 的成员变量如下:queue->msg_max = maxlen;
    queue->linkoff = linkoff;
    queue->head1 = NULL;
    queue->head2 = NULL;
    // 借助两个 head 别离作为两个外部队列的地位
    queue->get_head = &queue->head1;
    queue->put_head = &queue->head2;
    // 一开始队列为空,所以生产者队尾也等于队头
    queue->put_tail = &queue->head2;
    queue->msg_cnt = 0;
    queue->nonblock = 0;
    ...
}

msgqueue_create()的接口会传入linkoff,之后这个音讯队列里都是用这个来作为每一条音讯的理论长度,从而计算出下一个地位的偏移量应该是多少。

而后看看生产者接口msgqueue_put():

void msgqueue_put(void *msg, msgqueue_t *queue)
{
    // 1. 通过 create 的时候传进来的 linkoffset,算出音讯尾部的偏移量
    void **link = (void **)((char *)msg + queue->linkoff);

    // 2. 设置为空,用于示意生产者队列开端的前面没有其余数据
    *link = NULL;
    // 3. 加生产者锁
    pthread_mutex_lock(&queue->put_mutex);
    // 4. 如果以后曾经有 msg_max 个音讯的话
    //    就要期待消费者通过 put_cond 来叫醒我
    while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock)                
        pthread_cond_wait(&queue->put_cond, &queue->put_mutex);

    // 5. put_tail 指向这条音讯尾部,保护生产者队列的音讯个数
    *queue->put_tail = link;
    queue->put_tail = link;
    queue->msg_cnt++;
    pthread_mutex_unlock(&queue->put_mutex);
    // 6. 如果有消费者在等,通过 get_cond 叫醒他~
    pthread_cond_signal(&queue->get_cond);
} 

对应的,消费者接口msgqueue_get()

void *msgqueue_get(msgqueue_t *queue)
{
    void *msg;

    // 1. 加消费者锁
    pthread_mutex_lock(&queue->get_mutex);
    // 2. 如果目前 get_head 不为空,示意有数据;//    如果空,那么通过__msgqueue_swap()切换队列,也能够拿到数据
    if (*queue->get_head || __msgqueue_swap(queue) > 0)
    {
        // 3. 对应 put 中的计算形式,依据尾巴的偏移量把音讯起始偏移量算进去
        msg = (char *)*queue->get_head - queue->linkoff;
        // 4. 往后挪,这时候的 *get_head 就是下一条数据的偏移量尾部了
        *queue->get_head = *(void **)*queue->get_head;
    }
    else
    {
        // 5. 没有数据,同时设置 errno~~~
        msg = NULL;
        errno = ENOENT;
    }

    pthread_mutex_unlock(&queue->get_mutex);
    return msg;
}

第五步:其余外围函数的实现

毫无疑问,还有一个外围函数是__msgqueue_swap() ,这是切换队列的算法的要害:

static size_t __msgqueue_swap(msgqueue_t *queue)
{
    // 1. 用长期变量记录下以后的 get 队列偏移量
    void **get_head = queue->get_head;
    size_t cnt;

    // 2. 把方才的生产者队列换给消费者队列
    queue->get_head = queue->put_head;
    // 3. 只有这个中央才会同时持有消费者锁和生产者锁
    pthread_mutex_lock(&queue->put_mutex);
    // 4. 如果以后对列自身就是空的
    //    这里就会帮期待下一个降临的生产者通 get_cond 叫醒我
    while (queue->msg_cnt == 0 && !queue->nonblock)
        pthread_cond_wait(&queue->get_cond, &queue->put_mutex);                    

    cnt = queue->msg_cnt;
    // 5. 如果以后对列是满的,阐明可能有生产者在期待
    //    通过 put_cond 叫醒生产者(可能有多个,所以用 broadcast)if (cnt > queue->msg_max - 1)
        pthread_cond_broadcast(&queue->put_cond);

    // 6. 把第一行的长期变量换给生产者队列,清空生产者队列
    queue->put_head = get_head;
    queue->put_tail = get_head;
    queue->msg_cnt = 0;
    pthread_mutex_unlock(&queue->put_mutex);
    // 7. 返回方才多少个,这个会影响 get 里的逻辑
    return cnt;
}

第六步:把函数关联起来

这个模块接口比较简单,曾经在 put 和 get 中别离依照生产者和消费者的流程相互关联上了。这里为了加深了解,咱们换个角度,从两个锁和两个条件变量的维度 进行整顿:

  • put_mutex: 生产者之间抢 put 锁,只有当生产队列为空,且有消费者要进行 swap 的时候,才会同时抢put 锁
  • get_mutex: 消费者之间抢 get 锁,如果生产队列为空,那么抢到 get 锁 的那个消费者会进入 swap 去换队列;
  • put_cond: 等不到寄存空间的生产者会期待在 put 条件变量 上,由换完队列的那个消费者叫醒 0 到多个生产者;
  • get_cond: 如果发现生产队列为空,如上所述会有一个消费者进入 swap,此时会先看看生产队列有没有货色,如果连生产队列都没有余粮,这个消费者就会期待在get 条件变量 上,由下一个来的生产者唤醒这一个消费者;

第七步:其余流程

对于如此简略的 msgqueue 来说,其余流程就是设置 nonblock 了。值得咱们关注的是设置 nonblock 之后的一些行为:

void msgqueue_set_nonblock(msgqueue_t *queue)                                      
{
    queue->nonblock = 1;
    pthread_mutex_lock(&queue->put_mutex);
    // 叫醒一个消费者
    pthread_cond_signal(&queue->get_cond);
    // 叫醒所有生产者
    pthread_cond_broadcast(&queue->put_cond);
    pthread_mutex_unlock(&queue->put_mutex);
}

这里会发现,应用 signalbroadcast的逻辑和原先 put_cont、get_cond 的应用是十分对立的。

咱们方才看过两个中央有判断 nonblock,都依照 block 形式去了解了。那么如果设置了 nonblock 的话流程是怎么的呢?这个不细说了,留给大家当作课后习题→_→(不是作者懒

PART – 4 总结

其实写了这么多篇,可能大家能够发现,Workflow 外部有很多新思路、新算法、新数据结构,尽管并不都像 Executor 那样的大翻新,但许多轻微的中央在做法上都能够有 小翻新 ,从而实现 性能的大幅晋升

这也是我参加这个我的项目时最大的感叹:传统的、原始的代码,仍然值得咱们推敲、优化,努力做到精益求精。

另外的一点工程教训,就是这么多篇文章都在体现的 通用性:工程实际上有很多 trade off 的事件,比方在音讯队列里就是吞吐和提早,如果减少伎俩晋升吞吐,则往往意味着每一个申请的提早都会减少哪怕一点点的额定开销。

主观来说,msgqueue 自身实现还是 十分克服、极其精简 了,而实际上音讯队列有十分十分多的实现。为什么 workflow 没有用上更简单更高效的数据结构呢?起因都在通用性上:

  1. 吞吐与提早:用更简单的算法往往有整体收益,但是否所有场景下整体收益可能比简单代码引入的开销大,还是要实测能力晓得,一个通用的框架还是比拟适宜应用通用的模型;
  2. 剖析场景瓶颈所在:对于 Workflow 理论网络收发来说,只有音讯十分小、QPS 十分高、无需任何序列化 / 反序列化 / 业务计算逻辑的场景下,瓶颈才会落到 msgqueue 上;

但咱们对新做法都是抱着凋谢的态度 ^^(好啦次要先前也太忙了没有工夫去深刻调研)之后有空了我也会替换到 workflow 里跑跑看,期待到时候能够再引发一些新思路吧~~~

正文完
 0