乐趣区

关于mq:ZMQ-指南第二章-ZeroMQ进阶

第二章 ZeroMQ 进阶

第一章咱们简略试用了 ZMQ 的若干通信模式:申请 - 应答模式、公布 - 订阅模式、管道模式。这一章咱们将学习更多在理论开发中会应用到的货色:

本章波及的内容有:

  • 创立和应用 ZMQ 套接字
  • 应用套接字发送和接管音讯
  • 应用 ZMQ 提供的异步 I / O 套接字构建你的应用程序
  • 在繁多线程中应用多个套接字
  • 失当地解决致命和非致命谬误
  • 解决诸如 Ctrl- C 的中断信号
  • 正确地敞开 ZMQ 应用程序
  • 查看 ZMQ 应用程序的内存泄露
  • 发送和接管多帧音讯
  • 在网络中转发音讯
  • 建设简略的音讯队列代理
  • 应用 ZMQ 编写多线程应用程序
  • 应用 ZMQ 在线程间传递信号
  • 应用 ZMQ 协调网络中的节点
  • 应用标识创立长久化套接字
  • 在公布 - 订阅模式中创立和应用音讯信封
  • 如何让长久化的订阅者可能从解体中复原
  • 应用阈值(HWM)避免内存溢出

零的哲学

ØMQ 一词中的Ø让咱们纠结了很久。一方面,这个特殊字符会升高 ZMQ 在谷歌和推特中的收录量;另一方面,这会惹恼某些丹麦语种的民族,他们会嚷道Ø并不是一个奇怪的 0。

一开始 ZMQ 代表零中间件、零提早,同时,它又有了新的含意:零治理、零老本、零节约。总的来说,零示意最小、最简,这是贯通于该项目标哲理。咱们致力于缩小复杂程度,进步易用性。

套接字 API

说实话,ZMQ 有些移花接木的嫌疑。不过咱们并不会为此赔罪,因为这种概念上的切换相对不会有害处。ZMQ 提供了一套相似于 BSD 套接字的 API,但将很多音讯解决机制的细节暗藏了起来,你会逐步适应这种变动,并乐于用它进行编程。

套接字事实上是用于网络编程的标准接口,ZMQ 之所那么吸引人眼球,起因之一就是它是建设在规范套接字 API 之上。因而,ZMQ 的套接字操作非常容易了解,其生命周期次要蕴含四个局部:

  • 创立和销毁套接字:zmq_socket(), zmq_close()
  • 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
  • 为套接字建设连贯:zmq_bind(), zmq_connect()
  • 发送和接管音讯:zmq_send(), zmq_recv()

如以下 C 代码:

void *mousetrap;

//  Create socket for catching mice
mousetrap = zmq_socket (context, ZMQ_PULL);

//  Configure the socket
int64_t jawsize = 10000;
zmq_setsockopt (mousetrap, ZMQ_HWM, &jawsize, sizeof jawsize);

//  Plug socket into mouse hole
zmq_connect (mousetrap, "tcp://192.168.55.221:5001");

//  Wait for juicy mouse to arrive
zmq_msg_t mouse;
zmq_msg_init (&mouse);
zmq_recv (mousetrap, &mouse, 0);
//  Destroy the mouse
zmq_msg_close (&mouse);

//  Destroy the socket
zmq_close (mousetrap);

请留神,套接字永远是空指针类型的,而音讯则是一个数据结构(咱们下文会讲述)。所以,在 C 语言中你通过变量传递套接字,而用援用传递音讯。记住一点,在 ZMQ 中所有的套接字都是由 ZMQ 治理的,只有音讯是由程序员治理的。

创立、销毁、以及配置套接字的工作和解决一个对象差不多,但请记住 ZMQ 是异步的,伸缩性很强,因而在将其利用到网络结构中时,可能会须要多一些工夫来了解。

应用套接字构建拓扑构造

在连贯两个节点时,其中一个须要应用 zmq_bind(),另一个则应用 zmq_connect()。通常来讲,应用 zmq_bind()连贯的节点称之为服务端,它有着一个较为固定的网络地址;应用 zmq_connect()连贯的节点称为客户端,其地址不固定。咱们会有这样的说法:绑定套接字至端点;连贯套接字至端点。端点指的是某个广为周知网络地址。

ZMQ 连贯和传统的 TCP 连贯是有区别的,次要有:

  • 应用多种协定,inproc(过程内)、ipc(过程间)、tcp、pgm(播送)、epgm;
  • 当客户端应用 zmq_connect()时连贯就曾经建设了,并不要求该端点已有某个服务应用 zmq_bind()进行了绑定;
  • 连贯是异步的,并由一组音讯队列做缓冲;
  • 连贯会体现出某种音讯模式,这是由创立连贯的套接字类型决定的;
  • 一个套接字能够有多个输出和输入连贯;
  • ZMQ 没有提供相似 zmq_accept()的函数,因为当套接字绑定至端点时它就主动开始承受连贯了;
  • 应用程序无奈间接和这些连贯打交道,因为它们是被封装在 ZMQ 底层的。

在很多架构中都应用了相似于 C / S 的架构。服务端组件式比较稳定的,而客户端组件则较为动静,来去自如。所以说,服务端地址对客户端而言往往是可见的,反之则不然。这样一来,架构中应该将哪些组件作为服务端(应用 zmq_bind()),哪些作为客户端(应用 zmq_connect()),就很显著了。同时,这须要和你应用的套接字类型相分割起来,咱们下文会具体讲述。

让咱们试想一下,如果先关上了客户端,后关上服务端,会产生什么?传统网络连接中,咱们关上客户端时肯定会收到零碎的报错信息,但 ZMQ 让咱们可能自在地启动架构中的组件。当客户端应用 zmq_connect()连贯至某个端点时,它就曾经可能应用该套接字发送音讯了。如果这时,服务端启动起来了,并应用 zmq_bind()绑定至该端点,ZMQ 将主动开始转发音讯。

服务端节点能够仅应用一个套接字就能绑定至多个端点。也就是说,它可能应用不同的协定来建设连贯:

zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "ipc://myserver.ipc");

当然,你不能屡次绑定至同一端点,这样是会报错的。

每当有客户端节点应用 zmq_connect()连贯至上述某个端点时,服务端就会主动创立连贯。ZMQ 没有对连贯数量进行限度。此外,客户端节点也能够应用一个套接字同时建设多个连贯。

大多数状况下,哪个节点充当服务端,哪个作为客户端,是网络架构层面的内容,而非音讯流问题。不过也有一些非凡状况(如失去连贯后的音讯重发),同一种套接字应用绑定和连贯是会有一些不同的行为的。

所以说,当咱们在设计架构时,应该遵循“服务端是稳固的,客户端是灵便的“准则,这样就不太会出错。

套接字是有类型的,套接字类型定义了套接字的行为,它在发送和接管音讯时的规定等。你能够将不同品种的套接字进行连贯,如 PUB-SUB 组合,这种组合称之为公布 - 订阅模式,其余组合也会有相应的模式名称,咱们会在下文详述。

正是因为套接字能够应用不同的形式进行连贯,才形成了 ZMQ 最根本的音讯队列零碎。咱们还能够在此基础之上建设更为简单的安装、路由机制等,下文会详述。总的来说,ZMQ 为你提供了一套组件,供你在网络架构中拼装和应用。

应用套接字传递数据

发送和接管音讯应用的是 zmq_send()和 zmq_recv()这两个函数。尽管函数名称看起来很直白,但因为 ZMQ 的 I / O 模式和传统的 TCP 协定有很大不同,因而还是须要花点工夫去了解的。

让咱们看一看 TCP 套接字和 ZMQ 套接字之间在传输数据方面的区别:

  • ZMQ 套接字传输的是音讯,而不是字节(TCP)或帧(UDP)。音讯指的是一段指定长度的二进制数据块,咱们下文会讲到音讯,这种设计是为了性能优化而思考的,所以可能会比拟难以了解。
  • ZMQ 套接字在后盾进行 I / O 操作,也就是说无论是接管还是发送音讯,它都会先传送到一个本地的缓冲队列,这个内存队列的大小是能够配置的。
  • ZMQ 套接字能够和多个套接字进行连贯(如果套接字类型容许的话)。TCP 协定只能进行点对点的连贯,而 ZMQ 则能够进行一对多(相似于无线播送)、多对多(相似于邮局)、多对一(相似于信箱),当然也包含一对一的状况。
  • ZMQ 套接字能够发送音讯给多个端点(扇出模型),或从多个端点中接管音讯(扇入模型)

所以,向套接字写入一个音讯时可能会将音讯发送给很多节点,相应的,套接字又会从所有已建设的连贯中接管音讯。zmq_recv()办法应用了偏心队列的算法来决定接管哪个连贯的音讯。

调用 zmq_send()办法时其实并没有真正将音讯发送给套接字连贯。音讯会在一个内存队列中保留下来,并由后盾的 I / O 线程异步地进行发送。如果不出意外状况,这一行为是非阻塞的。所以说,即使 zmq_send()有返回值,并不能代表音讯曾经发送。当你在用 zmq_msg_init_data()初始化音讯后,你不能重用或是开释这条音讯,否则 ZMQ 的 I / O 线程会认为它在传输垃圾数据。这对初学者来讲是一个常犯的谬误,下文咱们会讲述如何正确地解决音讯。

单播传输

ZMQ 提供了一组单播传输协定(inporc, ipc, tcp),和两个播送协定(epgm, pgm)。播送协定是比拟高级的协定,咱们会在当前讲述。如果你不能答复我扇出比例会影响一对多的单播传输时,就先不要去学习播送协定了吧。

一般而言咱们会应用 tcp 作为传输协定,这种 TCP 连贯是能够脱机运作的,它灵便、便携、且足够疾速。为什么称之为脱机,是因为 ZMQ 中的 TCP 连贯不须要该端点曾经有某个服务进行了绑定,客户端和服务端能够随时进行连贯和绑定,这对应用程序而言都是通明的。

过程间协定,即ipc,和 tcp 的行为差不多,但已从网络传输中形象进去,不须要指定 IP 地址或者域名。这种协定很多时候会很不便,本指南中的很多示例都会应用这种协定。ZMQ 中的 ipc 协定同样能够是脱机的,但有一个毛病——无奈在 Windows 操作系统上运作,这一点兴许会在将来的 ZMQ 版本中修复。咱们个别会在端点名称的开端附上.ipc 的扩展名,在 UNIX 零碎上,应用 ipc 协定还须要留神权限问题。你还须要保障所有的程序都可能找到这个 ipc 端点。

过程内协定,即inproc,能够在同一个过程的不同线程之间进行音讯传输,它比 ipc 或 tcp 要快得多。这种协定有一个要求,必须先绑定到端点,能力建设连贯,兴许将来也会修复。通常的做法是先启动服务端线程,绑定至端点,后启动客户端线程,连贯至端点。

ZMQ 不只是数据传输

常常有新人会问,如何应用 ZMQ 建设一项服务?我能应用 ZMQ 建设一个 HTTP 服务器吗?

他们冀望失去的答复是,咱们用一般的套接字来传输 HTTP 申请和应答,那用 ZMQ 套接字也可能实现这个工作,且能运行得更快、更好。

只惋惜答案并不是这样的。ZMQ 不只是一个数据传输的工具,而是在现有通信协议之上建设起来的新架构。它的数据帧和现有的协定并不兼容,如上面是一个 HTTP 申请和 ZMQ 申请的比照,同样应用的是 TCP/IPC 协定:

HTTP 申请应用 CR-LF(换行符)作为信息帧的距离,而 ZMQ 则应用指定长度来定义帧:

所以说,你确实是能够用 ZMQ 来写一个相似于 HTTP 协定的货色,然而这并不是 HTTP。

不过,如果有人问我如何更好地应用 ZMQ 建设一个新的服务,我会给出一个不错的答案,那就是:你能够自行设计一种通信协议,用 ZMQ 进行连贯,应用不同的语言提供服务和扩大,能够在本地,亦可通过近程传输。赛德•肖的 Mongrel2 网络服务的架构就是一个很好的示例。

I/ O 线程

咱们提过 ZMQ 是通过后盾的 I / O 线程进行音讯传输的。一个 I / O 线程曾经足以解决多个套接字的数据传输要求,当然,那些极其的应用程序除外。这也就是咱们在创立上下文时传入的 1 所代表的意思:

void *context = zmq_init (1);

ZMQ 应用程序和传统应用程序的区别之一就是你不须要为每个套接字都创立一个连贯。单个 ZMQ 套接字能够解决所有的发送和接管工作。如,当你须要向一千个订阅者公布音讯时,应用一个套接字就能够了;当你须要向二十个服务过程散发工作时,应用一个套接字就能够了;当你须要从一千个网页应用程序中获取数据时,也是应用一个套接字就能够了。

这一个性可能会颠覆网络应用程序的编写步骤,传统应用程序每个过程或线程会有一个近程连贯,它又只能解决一个套接字。ZMQ 让你突破这种构造,应用一个线程来实现所有工作,更易于扩大。

外围音讯模式

ZMQ 的套接字 API 中提供了多种音讯模式。如果你相熟企业级音讯利用,那这些模式会看起来很相熟。不过对于老手来说,ZMQ 的套接字还是会让人大吃一惊的。

让咱们回顾一下 ZMQ 会为你做些什么:它会将音讯疾速高效地发送给其余节点,这里的节点能够是线程、过程、或是其余计算机;ZMQ 为应用程序提供了一套简略的套接字 API,不必思考理论应用的协定类型(过程内、过程间、TPC、或播送);当节点调动时,ZMQ 会主动进行连贯或重连;无论是发送音讯还是接管音讯,ZMQ 都会先将音讯放入队列中,并保障过程不会因为内存溢出而解体,适时地将音讯写入磁盘;ZMQ 会解决套接字异样;所有的 I / O 操作都在后盾进行;ZMQ 不会产生死锁。

然而,以上种种的前提是用户可能正确地应用音讯模式,这种模式往往也体现出了 ZMQ 的智慧。音讯模式将咱们从实际中获取的教训进行形象和重组,用于解决之后遇到的所有问题。ZMQ 的音讯模式目前是编译在类库中的,不过将来的 ZMQ 版本可能会容许用户自行制订音讯模式。

ZMQ 的音讯模式是指不同类型套接字的组合。换句话说,要了解 ZMQ 的音讯模式,你须要了解 ZMQ 的套接字类型,它们是如何一起工作的。这一部分是须要死记硬背的。

ZMQ 的外围音讯模式有:

  • 申请 - 应答模式 将一组服务端和一组客户端相连,用于近程过程调用或工作散发。
  • 公布 - 订阅模式 将一组发布者和一组订阅者相连,用于数据散发。
  • 管道模式 应用扇入或扇出的模式组装多个节点,能够产生多个步骤或循环,用于构建并行处理架构。

咱们在第一章中曾经讲述了这些模式,不过还有一种模式是为那些依然认为 ZMQ 是相似 TCP 那样点对点连贯的人们筹备的:

  • 排他对接模式 将两个套接字一对一地连接起来,这种模式利用场景很少,咱们会在本章最开端看到一个示例。

zmq_socket()函数的阐明页中有对所有音讯模式的阐明,比较清楚,因而值得研读几次。咱们会介绍每种音讯模式的内容和利用场景。

以下是非法的套接字连贯 - 绑定对(一端绑定、一端连贯即可):

  • PUB – SUB
  • REQ – REP
  • REQ – ROUTER
  • DEALER – REP
  • DEALER – ROUTER
  • DEALER – DEALER
  • ROUTER – ROUTER
  • PUSH – PULL
  • PAIR – PAIR

其余的组合模式会产生不可预知的后果,在未来的 ZMQ 版本中可能会间接返回谬误。你也能够通过代码去理解这些套接字类型的行为。

下层音讯模式

上文中的四种外围音讯模式是内建在 ZMQ 中的,他们是 API 的一部分,在 ZMQ 的 C ++ 外围类库中实现,可能保障正确地运行。如果有朝一日 Linux 内核将 ZMQ 驳回了进来,那这些外围模式也必定会蕴含其中。

在这些音讯模式之上,咱们会建设更为_下层的音讯模式_。这种模式能够用任何语言编写,他们不属于外围类型的一部分,不随 ZMQ 发行,只在你本人的应用程序中呈现,或者在 ZMQ 社区中保护。

本指南的目标之一就是为你提供一些下层的音讯模式,有简略的(如何正确处理音讯),也有简单的(牢靠的公布 - 订阅模式)。

音讯的应用办法

ZMQ 的传输单位是音讯,即一个二进制块。你能够应用任意的序列化工具,如谷歌的 Protocal Buffers、XDR、JSON 等,将内容转化成 ZMQ 音讯。不过这种转化工具最好是便捷和疾速的,这个请本人掂量。

在内存中,ZMQ 音讯由 zmq_msg_t 构造示意(每种语言有特定的示意)。在 C 语言中应用 ZMQ 音讯时须要留神以下几点:

  • 你须要创立和传递 zmq_msg_t 对象,而不是一组数据块;
  • 读取音讯时,先用 zmq_msg_init()初始化一个空音讯,再将其传递给 zmq_recv()函数;
  • 写入音讯时,先用 zmq_msg_init_size()来创立音讯(同时也已初始化了一块内存区域),而后用 memcpy()函数将信息拷贝到该对象中,最初传给 zmq_send()函数;
  • 开释音讯(并不是销毁)时,应用 zmq_msg_close()函数,它会将对音讯对象的援用删除,最终由 ZMQ 将音讯销毁;
  • 获取音讯内容时需应用 zmq_msg_data()函数;若想晓得音讯的长度,能够应用 zmq_msg_size()函数;
  • 至于 zmq_msg_move()、zmq_msg_copy()、zmq_msg_init_data()函数,在充沛了解手册中的阐明之前,倡议不好贸然应用。

以下是一段解决音讯的典型代码,如果之前的代码你有看的话,那应该会感到相熟。这段代码其实是从 zhelpers.h 文件中抽出的:

//  从套接字中获取 ZMQ 字符串,并转换为 C 语言字符串
static char *
s_recv (void *socket) {
    zmq_msg_t message;
    zmq_msg_init (&message);
    zmq_recv (socket, &message, 0);
    int size = zmq_msg_size (&message);
    char *string = malloc (size + 1);
    memcpy (string, zmq_msg_data (&message), size);
    zmq_msg_close (&message);
    string [size] = 0;
    return (string);
}

//  将 C 语言字符串转换为 ZMQ 字符串,并发送给套接字
static int
s_send (void *socket, char *string) {
    int rc;
    zmq_msg_t message;
    zmq_msg_init_size (&message, strlen (string));
    memcpy (zmq_msg_data (&message), string, strlen (string));
    rc = zmq_send (socket, &message, 0);
    assert (!rc);
    zmq_msg_close (&message);
    return (rc);
}

你能够对以上代码进行扩大,让其反对发送和承受任一长度的数据。

须要留神的是,当你将一个音讯对象传递给 zmq_send()函数后,该对象的长度就会被清零,因而你无奈发送同一个音讯对象两次,也无奈取得已发送音讯的内容。

如果你想发送同一个音讯对象两次,就须要在发送第一次前新建一个对象,应用 zmq_msg_copy()函数进行拷贝。这个函数不会拷贝音讯内容,只是拷贝援用。而后你就能够再次发送这个音讯了(或者任意屡次,只有进行了足够的拷贝)。当音讯最初一个援用被开释时,音讯对象就会被销毁。

ZMQ 反对多帧音讯,即在一条音讯中保留多个音讯帧。这在理论利用中被宽泛应用,咱们会在第三章进行解说。

对于音讯,还有一些须要留神的中央:

  • ZMQ 的音讯是作为一个整体来收发的,你不会只收到音讯的一部分;
  • ZMQ 不会立刻发送音讯,而是有肯定的提早;
  • 你能够发送 0 字节长度的音讯,作为一种信号;
  • 音讯必须可能在内存中保留,如果你想发送文件或超长的音讯,就须要将他们切割成小块,在独立的音讯中进行发送;
  • 必须应用 zmq_msg_close()函数来敞开音讯,但在一些会在变量超出作用域时主动开释音讯对象的语言中除外。

再反复一句,不要贸然应用 zmq_msg_init_data()函数。它是用于零拷贝,而且可能会造成麻烦。对于 ZMQ 还有太多货色须要你去学习,因而当初临时不必去思考如何削减几微秒的开销。

解决多个套接字

在之前的示例中,主程序的循环体内会做以下几件事:

  1. 期待套接字的音讯;
  2. 解决音讯;
  3. 返回第一步。

如果咱们想要读取多个套接字中的音讯呢?最简略的办法是将套接字连贯到多个端点上,让 ZMQ 应用偏心队列的机制来承受音讯。如果不同端点上的套接字类型是统一的,那能够应用这种办法。然而,如果一个套接字的类型是 PULL,另一个是 PUB 怎么办?如果当初开始混用套接字类型,那未来就没有可靠性可言了。

正确的办法应该是应用 zmq_poll()函数。更好的办法是将 zmq_poll()包装成一个框架,编写一个事件驱动的反应器,但这个就比较复杂了,咱们这里暂不探讨。

咱们先不应用 zmq_poll(),而用 NOBLOCK(非阻塞)的形式来实现从多个套接字读取音讯的性能。上面将气象信息服务和并行处理这两个示例联合起来:

msreader: Multiple socket reader in C

//
//  从多个套接字中获取音讯
//  本示例简略地再循环中应用 recv 函数
//
#include "zhelpers.h"
 
int main (void) 
{
    //  筹备上下文和套接字
    void *context = zmq_init (1);
 
    //  连贯至工作散发器
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "tcp://localhost:5557");
 
    //  连贯至天气服务
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5556");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001", 6);
 
    //  解决从两个套接字中接管到的音讯
    //  这里咱们会优先解决从工作散发器接管到的音讯
    while (1) {
        //  解决期待中的工作
        int rc;
        for (rc = 0; !rc;) {
            zmq_msg_t task;
            zmq_msg_init (&task);
            if ((rc = zmq_recv (receiver, &task, ZMQ_NOBLOCK)) == 0) {//  解决工作}
            zmq_msg_close (&task);
        }
        //  解决期待中的气象更新
        for (rc = 0; !rc;) {
            zmq_msg_t update;
            zmq_msg_init (&update);
            if ((rc = zmq_recv (subscriber, &update, ZMQ_NOBLOCK)) == 0) {//  解决气象更新}
            zmq_msg_close (&update);
        }
        // 没有音讯,期待 1 毫秒
        s_sleep (1);
    }
    //  程序不会运行到这里,但还是做正确的退出清理工作
    zmq_close (receiver);
    zmq_close (subscriber);
    zmq_term (context);
    return 0;
}

这种形式的毛病之一是,在收到第一条音讯之前会有 1 毫秒的提早,这在高压力的程序中还是会形成问题的。此外,你还须要翻阅诸如 nanosleep()的函数,不会造成循环次数的激增。

示例中将工作散发器的优先级晋升了,你能够做一个改良,轮流解决音讯,正如 ZMQ 外部做的偏心队列机制一样。

上面,让咱们看看如何用 zmq_poll()来实现同样的性能:

mspoller: Multiple socket poller in C

//
//  从多个套接字中接管音讯
//  本例应用 zmq_poll()函数
//
#include "zhelpers.h"
 
int main (void) 
{void *context = zmq_init (1);
 
    //  连贯工作散发器
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "tcp://localhost:5557");
 
    //  连贯气象更新服务
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5556");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001", 6);
 
    //  初始化轮询对象
    zmq_pollitem_t items [] = {{ receiver, 0, ZMQ_POLLIN, 0},
        {subscriber, 0, ZMQ_POLLIN, 0}
    };
    //  解决来自两个套接字的音讯
    while (1) {
        zmq_msg_t message;
        zmq_poll (items, 2, -1);
        if (items [0].revents & ZMQ_POLLIN) {zmq_msg_init (&message);
            zmq_recv (receiver, &message, 0);
            //  解决工作
            zmq_msg_close (&message);
        }
        if (items [1].revents & ZMQ_POLLIN) {zmq_msg_init (&message);
            zmq_recv (subscriber, &message, 0);
            //  解决气象更新
            zmq_msg_close (&message);
        }
    }
    //  程序不会运行到这儿
    zmq_close (receiver);
    zmq_close (subscriber);
    zmq_term (context);
    return 0;
}

处理错误和 ETERM 信号

ZMQ 的错误处理机制提倡的是疾速解体。咱们认为,一个过程对于本身外部的谬误来说要越软弱越好,而对外部的攻打和谬误要足够强壮。举个例子,活细胞会因检测到本身问题而瓦解,但对外界的攻打却能竭力抵制。在 ZMQ 编程中,断言用得是十分多的,如同细胞膜一样。如果咱们无奈确定一个谬误是来自于外部还是内部,那这就是一个设计缺点了,须要修复。

在 C 语言中,断言失败会让程序立刻停止。其余语言中能够应用异样来做到。

当 ZMQ 检测到来自内部的问题时,它会返回一个谬误给调用程序。如果 ZMQ 不能从谬误中复原,那它是不会宁静地将音讯抛弃的。某些状况下,ZMQ 也会去断言内部谬误,这些能够被归结为 BUG。

到目前为止,咱们很少看到 C 语言的示例中有对谬误进行解决。事实中的代码应该对每一次的 ZMQ 函数调用作错误处理。如果你不是应用 C 语言进行编程,可能那种语言的 ZMQ 类库曾经做了错误处理。但在 C 语言中,你须要本人入手。以下是一些惯例的错误处理伎俩,从 POSIX 标准开始:

  • 创建对象的办法如果失败了会返回 NULL;
  • 其余办法执行胜利时会返回 0,失败时会返回其余值(个别是 -1);
  • 错误代码能够从变量 errno 中取得,或者调用 zmq_errno()函数;
  • 谬误音讯能够调用 zmq_strerror()函数取得。

有两种状况不应该被认为是谬误:

  • 当线程应用 NOBLOCK 形式调用 zmq_recv()时,若没有接管到音讯,该办法会返回 -1,并设置 errno 为 EAGAIN;
  • 当线程调用 zmq_term()时,若其余线程正在进行阻塞式的解决,该函数会停止所有的解决,敞开套接字,并使得那些阻塞办法的返回值为 -1,errno 设置为 ETERM。

遵循以上规定,你就能够在 ZMQ 程序中应用断言了:

void *context = zmq_init (1);
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc;
rc = zmq_bind (socket, "tcp://*:5555");
assert (rc == 0);

第一版的程序中我将函数调用间接放在了 assert()函数外面,这样做会有问题,因为一些优化程序会间接将程序中的 assert()函数去除。

让咱们看看如何正确地敞开一个过程,咱们用管道模式举例。当咱们在后盾开启了一组 worker 时,咱们须要在工作执行结束后敞开它们。咱们能够向这些 worker 发送他杀的音讯,这项工作由后果收集器来实现会比拟失当。

如何将后果收集器和 worker 相连呢?PUSH-PULL 套接字是单向的。ZMQ 的准则是:如果须要解决一个新的问题,就该应用新的套接字。这里咱们应用公布 - 订阅模式来发送他杀的音讯:

  • 后果收集器创立 PUB 套接字,并连贯至一个新的端点;
  • worker 将 SUB 套接字连贯至这个端点;
  • 当后果收集器检测到工作执行结束时,会通过 PUB 套接字发送他杀信号;
  • worker 收到他杀信号后便会停止。

这一过程不会增加太多的代码:

    void *control = zmq_socket (context, ZMQ_PUB);
    zmq_bind (control, "tcp://*:5559");
    ...
    //  Send kill signal to workers
    zmq_msg_init_data (&message, "KILL", 5);
    zmq_send (control, &message, 0);
    zmq_msg_close (&message);

上面是 worker 过程的代码,它会关上三个套接字:用于接管工作的 PULL、用于发送后果的 PUSH、以及用于接管他杀信号的 SUB,应用 zmq_poll()进行轮询:

taskwork2: Parallel task worker with kill signaling in C

//
//  管道模式 - worker 设计 2
//  增加公布 - 订阅音讯流,用以接管他杀音讯
//
#include "zhelpers.h"
 
int main (void) 
{void *context = zmq_init (1);
 
    //  用于接管音讯的套接字
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "tcp://localhost:5557");
 
    //  用户发送音讯的套接字
    void *sender = zmq_socket (context, ZMQ_PUSH);
    zmq_connect (sender, "tcp://localhost:5558");
 
    //  用户接管管制音讯的套接字
    void *controller = zmq_socket (context, ZMQ_SUB);
    zmq_connect (controller, "tcp://localhost:5559");
    zmq_setsockopt (controller, ZMQ_SUBSCRIBE, "", 0);
 
    //  解决接管到的工作或管制音讯
    zmq_pollitem_t items [] = {{ receiver, 0, ZMQ_POLLIN, 0},
        {controller, 0, ZMQ_POLLIN, 0}
    };
    //  解决音讯
    while (1) {
        zmq_msg_t message;
        zmq_poll (items, 2, -1);
        if (items [0].revents & ZMQ_POLLIN) {zmq_msg_init (&message);
            zmq_recv (receiver, &message, 0);
 
            //  工作
            s_sleep (atoi ((char *) zmq_msg_data (&message)));
 
            //  发送后果
            zmq_msg_init (&message);
            zmq_send (sender, &message, 0);
 
            //  简略的工作进图批示
            printf (".");
            fflush (stdout);
 
            zmq_msg_close (&message);
        }
        //  任何管制命令都示意他杀
        if (items [1].revents & ZMQ_POLLIN)
            break;                      //  退出循环
    }
    //  完结程序
    zmq_close (receiver);
    zmq_close (sender);
    zmq_close (controller);
    zmq_term (context);
    return 0;
}

上面是批改后的后果收集器代码,在收集完后果后向所有 worker 发送他杀音讯:

tasksink2: Parallel task sink with kill signaling in C

//
//  管道模式 - 构造收集器 设计 2
//  增加公布 - 订阅音讯流,用以向 worker 发送他杀信号
//
#include "zhelpers.h"
 
int main (void) 
{void *context = zmq_init (1);
 
    //  用于接管音讯的套接字
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_bind (receiver, "tcp://*:5558");
 
    //  用以发送管制信息的套接字
    void *controller = zmq_socket (context, ZMQ_PUB);
    zmq_bind (controller, "tcp://*:5559");
 
    //  期待工作开始
    char *string = s_recv (receiver);
    free (string);
 
    //  开始计时
    int64_t start_time = s_clock ();
 
    //  确认 100 个工作处理完毕
    int task_nbr;
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {char *string = s_recv (receiver);
        free (string);
        if ((task_nbr / 10) * 10 == task_nbr)
            printf (":");
        else
            printf (".");
        fflush (stdout);
    }
    printf ("总执行工夫: %d msec\n", 
        (int) (s_clock () - start_time));
 
    //  发送他杀音讯给 worker
    s_send (controller, "KILL");
 
    //  完结
    sleep (1);              //  期待发送结束
 
    zmq_close (receiver);
    zmq_close (controller);
    zmq_term (context);
    return 0;
}

解决中断信号

事实环境中,当应用程序收到 Ctrl- C 或其余诸如 ETERM 的信号时须要可能正确地清理和退出。默认状况下,这一信号会杀掉过程,意味着尚未发送的音讯就此失落,文件不能被正确地敞开等。

在 C 语言中咱们是这样解决音讯的:

interrupt: Handling Ctrl-C cleanly in C

//
//  Shows how to handle Ctrl-C
//
#include <zmq.h>
#include <stdio.h>
#include <signal.h>
 
//  ---------------------------------------------------------------------
//  音讯解决
//
//  程序开始运行时调用 s_catch_signals()函数;//  在循环中判断 s_interrupted 是否为 1,是则跳出循环;//  很实用于 zmq_poll()。static int s_interrupted = 0;
static void s_signal_handler (int signal_value)
{s_interrupted = 1;}
 
static void s_catch_signals (void)
{
    struct sigaction action;
    action.sa_handler = s_signal_handler;
    action.sa_flags = 0;
    sigemptyset (&action.sa_mask);
    sigaction (SIGINT, &action, NULL);
    sigaction (SIGTERM, &action, NULL);
}
 
int main (void)
{void *context = zmq_init (1);
    void *socket = zmq_socket (context, ZMQ_REP);
    zmq_bind (socket, "tcp://*:5555");
 
    s_catch_signals ();
    while (1) {
        //  阻塞式的读取会在收到信号时进行
        zmq_msg_t message;
        zmq_msg_init (&message);
        zmq_recv (socket, &message, 0);
 
        if (s_interrupted) {printf ("W: 收到中断音讯,程序停止...\n");
            break;
        }
    }
    zmq_close (socket);
    zmq_term (context);
    return 0;
}

这段程序应用 s_catch_signals()函数来捕获像 Ctrl-C(SIGINT)和 SIGTERM 这样的信号。收到任一信号后,该函数会将全局变量 s_interrupted 设置为 1。你的程序并不会主动进行,须要显式地做一些清理和退出工作。

  • 在程序开始时调用 s_catch_signals()函数,用来进行信号捕获的设置;
  • 如果程序在 zmq_recv()、zmq_poll()、zmq_send()等函数中阻塞,当有信号传来时,这些函数会返回 EINTR;
  • 像 s_recv()这样的函数会将这种中断包装为 NULL 返回;
  • 所以,你的应用程序能够查看是否有 EINTR 错误码、或是 NULL 的返回、或者 s_interrupted 变量是否为 1。

如果以下代码就非常典型:

s_catch_signals ();
client = zmq_socket (...);
while (!s_interrupted) {char *message = s_recv (client);
    if (!message)
        break;          //  按下了 Ctrl-C
}
zmq_close (client);

如果你在设置 s_catch_signals()之后没有进行相应的解决,那么你的程序将对 Ctrl- C 和 ETERM 免疫。

检测内存泄露

任何长时间运行的程序都应该妥善的治理内存,否则最终会产生内存溢出,导致程序解体。如果你所应用的编程序言会主动帮你实现内存治理,那就要祝贺你了。但若你应用相似 C /C++ 之类的语言时,就须要本人入手进行内存治理了。上面会介绍一个名为 valgrind 的工具,能够用它来报告内存泄露的问题。

  • 在 Ubuntu 或 Debian 操作系统上装置 valgrind:sudo apt-get install valgrind
  • 缺省状况下,ZMQ 会让 valgrind 不停地报错,想要屏蔽正告的话能够在编译 ZMQ 时应用 ZMQ_MAKE_VALGRIND_HAPPY 宏选项:
$ cd zeromq2
$ export CPPFLAGS=-DZMQ_MAKE_VALGRIND_HAPPY
$ ./configure
$ make clean; make
$ sudo make install
  • 应用程序应该正确地解决 Ctrl-C,特地是对于长时间运行的程序(如队列安装),如果不这么做,valgrind 会报告所有已调配的内存产生了谬误。
  • 应用 -DDEBUG 选项编译程序,这样能够让 valgrind 通知你具体是哪段代码产生了内存溢出。
  • 最初,应用如下办法运行 valgrind:
valgrind --tool=memcheck --leak-check=full someprog

解决完所有的问题后,你会看到以下信息:

==30536== ERROR SUMMARY: 0 errors from 0 contexts...

多帧音讯

ZMQ 音讯能够蕴含多个帧,这在理论利用中十分常见,特地是那些无关“信封”的利用,咱们下文谈判到。咱们这一节要讲的是如何正确地收发多帧音讯。

多帧音讯的每一帧都是一个 zmq_msg 构造,也就是说,当你在收发含有五个帧的音讯时,你须要解决五个 zmq_msg 构造。你能够将这些帧放入一个数据结构中,或者间接一个个地解决它们。

上面的代码演示如何发送多帧音讯:

zmq_send (socket, &message, ZMQ_SNDMORE);
...
zmq_send (socket, &message, ZMQ_SNDMORE);
...
zmq_send (socket, &message, 0);

而后咱们看看如何接管并解决这些音讯,这段代码对单帧音讯和多帧音讯都实用:

while (1) {
    zmq_msg_t message;
    zmq_msg_init (&message);
    zmq_recv (socket, &message, 0);
    // 解决一帧音讯
    zmq_msg_close (&message);
    int64_t more;
    size_t more_size = sizeof (more);
    zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
    if (!more)
        break; // 已达到最初一帧
}

对于多帧音讯,你须要理解的还有:

  • 在发送多帧音讯时,只有当最初一帧提交发送了,整个音讯才会被发送;
  • 如果应用了 zmq_poll()函数,当收到了音讯的第一帧时,其它帧其实也曾经收到了;
  • 多帧音讯是整体传输的,不会只收到一部分;
  • 多帧音讯的每一帧都是一个 zmq_msg 构造;
  • 无论你是否查看套接字的 ZMQ_RCVMORE 选项,你都会收到所有的音讯;
  • 发送时,ZMQ 会将开始的音讯帧缓存在内存中,直到收到最初一帧才会发送;
  • 咱们无奈在发送了一部分音讯后勾销发送,只能敞开该套接字。

中间件和安装

当网络组件的数量较少时,所有节点都晓得其它节点的存在。但随着节点数量的减少,这种构造的老本也会回升。因而,咱们须要将这些组件拆分成更小的模块,应用一个中间件来连贯它们。

这种构造在事实世界中是十分常见的,咱们的社会和经济体系中充斥了中间件的机制,用以升高复杂度,压缩构建大型网络的老本。中间件也会被称为批发商、分包商、管理者等等。

ZMQ 网络也是一样,如果规模一直增长,就肯定会须要中间件。ZMQ 中,咱们称其为“安装”。在构建 ZMQ 软件的初期,咱们会画出几个节点,而后将它们连接起来,不应用中间件:

随后,咱们对这个构造一直地进行裁减,将安装放到特定的地位,进一步减少节点数量:

ZMQ 安装没有具体的设计规定,但个别会有一组“前端”端点和一组“后端”端点。安装是无状态的,因而能够被宽泛地部署在网络中。你能够在过程中启动一个线程来运行安装,或者间接在一个过程中运行安装。ZMQ 外部也提供了根本的安装实现可供使用。

ZMQ 安装能够用作路由和寻址、提供服务、队列调度、以及其余你所能想到的事件。不同的音讯模式须要用到不同类型的安装来构建网络。如,申请 - 应答模式中能够应用队列安装、形象服务;公布 - 订阅模式中则可应用流安装、主题安装等。

ZMQ 安装比起其余中间件的劣势在于,你能够将它放在网络中任何一个中央,实现任何你想要的事件。

公布 - 订阅代理服务

咱们常常会须要将公布 - 订阅模式裁减到不同类型的网络中。比如说,有一组订阅者是在外网上的,咱们想用播送的形式公布音讯给内网的订阅者,而用 TCP 协定发送给外网订阅者。

咱们要做的就是写一个简略的代理服务安装,在发布者和外网订阅者之间搭起桥梁。这个安装有两个端点,一端连贯内网上的发布者,另一端连贯到外网上。它会从发布者处接管订阅的音讯,并转发给外网上的订阅者们。

wuproxy: Weather update proxy in C

//
//  气象信息代理服务安装
//
#include "zhelpers.h"
 
int main (void)
{void *context = zmq_init (1);
 
    //  订阅气象信息
    void *frontend = zmq_socket (context, ZMQ_SUB);
    zmq_connect (frontend, "tcp://192.168.55.210:5556");
 
    //  转发气象信息
    void *backend = zmq_socket (context, ZMQ_PUB);
    zmq_bind (backend, "tcp://10.1.1.0:8100");
 
    //  订阅所有音讯
    zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0);
 
    //  转发音讯
    while (1) {while (1) {
            zmq_msg_t message;
            int64_t more;
 
            //  解决所有的音讯帧
            zmq_msg_init (&message);
            zmq_recv (frontend, &message, 0);
            size_t more_size = sizeof (more);
            zmq_getsockopt (frontend, ZMQ_RCVMORE, &more, &more_size);
            zmq_send (backend, &message, more? ZMQ_SNDMORE: 0);
            zmq_msg_close (&message);
            if (!more)
                break;      //  达到最初一帧
        }
    }
    //  程序不会运行到这里,但仍然要正确地退出
    zmq_close (frontend);
    zmq_close (backend);
    zmq_term (context);
    return 0;
}

咱们称这个安装为代理,因为它既是订阅者,又是发布者。这就意味着,增加该安装时不须要更改其余程序的代码,只需让外网订阅者晓得新的网络地址即可。

能够留神到,这段程序可能正确处理多帧音讯,会将它残缺的转发给订阅者。如果咱们在发送时不指定 ZMQ_SNDMORE 选项,那么上游节点收到的音讯就可能是破损的。编写安装时应该要保障可能正确地解决多帧音讯,否则会造成音讯的失落。

申请 - 应答代理

上面让咱们在申请 - 应答模式中编写一个小型的音讯队列代理安装。

在 Hello World 客户 / 服务模型中,一个客户端和一个服务端进行通信。但在实在环境中,咱们会须要让多个客户端和多个服务端进行通信。关键问题在于,服务端应该是无状态的,所有的状态都应该蕴含在一次申请中,或者寄存其它介质中,如数据库。

咱们有两种形式来连贯多个客户端和多个服务端。第一种是让客户端间接和多个服务端进行连贯。客户端套接字能够连贯至多个服务端套接字,它所发送的申请会通过负载平衡的形式分发给服务端。比如说,有一个客户端连贯了三个服务端,A、B、C,客户端产生了 R1、R2、R3、R4 四个申请,那么,R1 和 R4 会由服务 A 解决,R2 由 B 解决,R3 由 C 解决:

这种设计的益处在于能够不便地增加客户端,但若要增加服务端,那就得批改每个客户端的配置。如果你有 100 个客户端,须要增加三个服务端,那么这些客户端都须要从新进行配置,让其晓得新服务端的存在。

这种形式必定不是咱们想要的。一个网络结构中如果有太多固化的模块就越不容易扩大。因而,咱们须要有一个模块位于客户端和服务端之间,将所有的常识都汇聚到这个网络拓扑构造中。现实状态下,咱们能够任意地增减客户端或是服务端,不须要更改任何组件的配置。

上面就让咱们编写这样一个组件。这个代理会绑定到两个端点,前端端点供客户端连贯,后端端点供服务端连贯。它会应用 zmq_poll()来轮询这两个套接字,接管音讯并进行转发。安装中不会有队列的存在,因为 ZMQ 曾经主动在套接字中实现了。

在应用 REQ 和 REP 套接字时,其申请 - 应答的会话是严格同步。客户端发送申请,服务端接管申请并发送应答,由客户端接管。如果客户端或服务端中的一个产生问题(如间断两次发送申请),程序就会报错。

然而,咱们的代理安装必须要是非阻塞式的,尽管能够应用 zmq_poll()同时解决两个套接字,但这里显然不能应用 REP 和 REQ 套接字。

侥幸的是,咱们有 DEALER 和 ROUTER 套接字能够胜任这项工作,进行非阻塞的音讯收发。DEALER 过来被称为 XREQ,ROUTER 被称为 XREP,但新的代码中应尽量应用 DEALER/ROUTER 这种名称。在第三章中你会看到如何用 DEALER 和 ROUTER 套接字构建不同类型的申请 - 应答模式。

上面就让咱们看看 DEALER 和 ROUTER 套接字是怎么在安装中工作的。

下方的简图形容了一个申请 - 应答模式,REQ 和 ROUTER 通信,DEALER 再和 REP 通信。ROUTER 和 DEALER 之间咱们则须要进行音讯转发:

申请 - 应答代理会将两个套接字别离绑定到前端和后端,供客户端和服务端套接字连贯。在应用该安装之前,还须要对客户端和服务端的代码进行调整。

rrclient: Request-reply client in C

//
//  Hello world 客户端
//  连贯 REQ 套接字至 tcp://localhost:5559 端点
//  发送 Hello 给服务端,期待 World 应答
//
#include "zhelpers.h"
 
int main (void) 
{void *context = zmq_init (1);
 
    //  用于和服务端通信的套接字
    void *requester = zmq_socket (context, ZMQ_REQ);
    zmq_connect (requester, "tcp://localhost:5559");
 
    int request_nbr;
    for (request_nbr = 0; request_nbr != 10; request_nbr++) {s_send (requester, "Hello");
        char *string = s_recv (requester);
        printf ("收到应答 %d [%s]\n", request_nbr, string);
        free (string);
    }
    zmq_close (requester);
    zmq_term (context);
    return 0;
}

上面是服务代码:

rrserver: Request-reply service in C

//
//  Hello World 服务端
//  连贯 REP 套接字至 tcp://*:5560 端点
//  接管 Hello 申请,返回 World 应答
//
#include "zhelpers.h"
 
int main (void) 
{void *context = zmq_init (1);
 
    //  用于何客户端通信的套接字
    void *responder = zmq_socket (context, ZMQ_REP);
    zmq_connect (responder, "tcp://localhost:5560");
 
    while (1) {
        //  期待下一个申请
        char *string = s_recv (responder);
        printf ("Received request: [%s]\n", string);
        free (string);
 
        //  做一些“工作”sleep (1);
 
        //  返回应答信息
        s_send (responder, "World");
    }
    //  程序不会运行到这里,不过还是做好清理工作
    zmq_close (responder);
    zmq_term (context);
    return 0;
}

最初是代理程序,能够看到它是可能解决多帧音讯的:

rrbroker: Request-reply broker in C

//
//  繁难申请 - 应答代理
//
#include "zhelpers.h"
 
int main (void) 
{
    //  筹备上下文和套接字
    void *context = zmq_init (1);
    void *frontend = zmq_socket (context, ZMQ_ROUTER);
    void *backend  = zmq_socket (context, ZMQ_DEALER);
    zmq_bind (frontend, "tcp://*:5559");
    zmq_bind (backend,  "tcp://*:5560");
 
    //  初始化轮询汇合
    zmq_pollitem_t items [] = {{ frontend, 0, ZMQ_POLLIN, 0},
        {backend,  0, ZMQ_POLLIN, 0}
    };
    //  在套接字间转发音讯
    while (1) {
        zmq_msg_t message;
        int64_t more;           //  检测多帧音讯
 
        zmq_poll (items, 2, -1);
        if (items [0].revents & ZMQ_POLLIN) {while (1) {
                //  解决所有音讯帧
                zmq_msg_init (&message);
                zmq_recv (frontend, &message, 0);
                size_t more_size = sizeof (more);
                zmq_getsockopt (frontend, ZMQ_RCVMORE, &more, &more_size);
                zmq_send (backend, &message, more? ZMQ_SNDMORE: 0);
                zmq_msg_close (&message);
                if (!more)
                    break;      //  最初一帧
            }
        }
        if (items [1].revents & ZMQ_POLLIN) {while (1) {
                //  解决所有音讯帧
                zmq_msg_init (&message);
                zmq_recv (backend, &message, 0);
                size_t more_size = sizeof (more);
                zmq_getsockopt (backend, ZMQ_RCVMORE, &more, &more_size);
                zmq_send (frontend, &message, more? ZMQ_SNDMORE: 0);
                zmq_msg_close (&message);
                if (!more)
                    break;      //  最初一帧
            }
        }
    }
    //  程序不会运行到这里,不过还是做好清理工作
    zmq_close (frontend);
    zmq_close (backend);
    zmq_term (context);
    return 0;
}

应用申请 - 应答代理能够让你的 C / S 网络结构更易于扩大:客户端不晓得服务端的存在,服务端不晓得客户端的存在。网络中惟一稳固的组件是两头的代理安装:

内置安装

ZMQ 提供了一些内置的安装,不过大多数人须要本人手动编写这些安装。内置安装有:

  • QUEUE,可用作申请 - 应答代理;
  • FORWARDER,可用作公布 - 订阅代理服务;
  • STREAMER,可用作管道模式代理。

能够应用 zmq_device()来启动一个安装,须要传递两个套接字给它:

zmq_device (ZMQ_QUEUE, frontend, backend);

启动了 QUEUE 队列就如同在网络中退出了一个申请 - 应答代理,只需为其创立已绑定或连贯的套接字即可。上面这段代码是应用内置安装的情景:

msgqueue: Message queue broker in C

//
//  简略音讯队列代理
//  性能和申请 - 应答代理雷同,但应用了内置的安装
//
#include "zhelpers.h"
 
int main (void) 
{void *context = zmq_init (1);
 
    //  客户端套接字
    void *frontend = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (frontend, "tcp://*:5559");
 
    //  服务端套接字
    void *backend = zmq_socket (context, ZMQ_DEALER);
    zmq_bind (backend, "tcp://*:5560");
 
    //  启动内置安装
    zmq_device (ZMQ_QUEUE, frontend, backend);
 
    //  程序不会运行到这里
    zmq_close (frontend);
    zmq_close (backend);
    zmq_term (context);
    return 0;
}

内置安装会失当地处理错误,而咱们手工实现的代理并没有退出错误处理机制。所以说,当你可能在程序中应用内置安装的时候就尽量用吧。

可能你会像某些 ZMQ 开发者一样提出这样一个问题:如果我将其余类型的套接字传入这些安装中会产生什么?答案是:别这么做。你能够随便传入不同类型的套接字,然而执行后果会十分奇怪。所以,QUEUE 安装应应用 ROUTER/DEALER 套接字、FORWARDER 应应用 SUB/PUB、STREAMER 应应用 PULL/PUSH。

当你须要其余的套接字类型进行组合时,那就须要本人编写安装了。

ZMQ 多线程编程

应用 ZMQ 进行多线程编程(MT 编程)将会是一种享受。在多线程中应用 ZMQ 套接字时,你不须要思考额定的货色,让它们自若地运作就好。

应用 ZMQ 进行多线程编程时,不须要思考互斥、锁、或其余并发程序中要思考的因素,你惟一要关怀的仅仅是线程之间的音讯

什么叫“完满”的多线程编程,指的是代码易写易读,能够跨零碎、跨语言地应用同一种技术,可能在任意颗外围的计算机上运行,没有状态,没有速度的瓶颈。

如果你有多年的多线程编程教训,晓得如何应用锁、信号灯、临界区等机制来使代码运行得正确(尚未思考疾速),那你可能会很丧气,因为 ZMQ 将扭转这所有。三十多年来,并发式利用程序开发所总结的教训是:不要共享状态。这就好比两个醉汉想要分享一杯啤酒,如果他们不是铁哥们儿,那他们很快就会打起来。当有更多的醉汉退出时,状况就会更糟。多线程编程有时就像醉汉争夺啤酒那样蹩脚。

进行多线程编程往往是苦楚的,当程序因为压力过大而解体时,你会不知所然。有人写过一篇《多线程代码中的 11 个谬误易发点》的文章,在大公司中广为流传,列举其中的几项:没有进行同步、谬误的粒度、读写拆散、无锁排序、锁传递、优先级抵触等。

假如某一天的下午三点,当证券市场正交易得热火朝天的时候,忽然之间,应用程序因为锁的问题解体了,那将会是何等的场景?所以,作为程序员的咱们,为解决那些简单的多线程问题,只能用上更简单的编程机制。

有人曾这样比喻,那些多线程程序本来应作为大型公司的外围支柱,但往往又最容易出错;那些想要通过网络一直进行延长的产品,最初总以失败告终。

如何用 ZMQ 进行多线程编程,以下是一些规定:

  • 不要在不同的线程之间拜访同一份数据,如果要用到传统编程中的互斥机制,那就有违 ZMQ 的思维了。惟一的例外是 ZMQ 上下文对象,它是线程平安的。
  • 必须为过程创立 ZMQ 上下文,并将其传递给所有你须要应用 inproc 协定进行通信的线程;
  • 你能够将线程作为独自的工作来看待,应用本人的上下文,然而这些线程之间就不能应用 inproc 协定进行通信了。这样做的益处是能够在日后不便地将程序拆分为不同的过程来运行。
  • 不要在不同的线程之间传递套接字对象,这些对象不是线程平安的。从技术上来说,你是能够这样做的,然而会用到互斥和锁的机制,这会让你的应用程序变得迟缓和软弱。惟一正当的情景是,在某些语言的 ZMQ 类库外部,须要应用垃圾回收机制,这时可能会进行套接字对象的传递。

当你须要在应用程序中应用两个安装时,可能会将套接字对象从一个线程传递给另一个线程,这样做一开始可能会胜利,但最初肯定会随机地产生谬误。所以说,应在同一个线程中关上和敞开套接字。

如果你能遵循下面的规定,就会发现多线程程序能够很容易地拆分成多个过程。程序逻辑能够在线程、过程、或是计算机中运行,依据你的需要进行部署即可。

ZMQ 应用的是零碎原生的线程机制,而不是某种“绿色线程”。这样做的益处是你不须要学习新的多线程编程 API,而且能够和指标操作系统进行很好的联合。你能够应用相似英特尔的 ThreadChecker 工具来查看线程工作的状况。毛病在于,如果程序创立了太多的线程(如上千个),则可能导致操作系统负载过高。

上面咱们举一个实例,让原来的 Hello World 服务变得更为弱小。原来的服务是单线程的,如果申请较少,天然没有问题。ZMQ 的线程能够在一个外围上高速地运行,执行大量的工作。然而,如果有一万次申请同时发送过去会怎么样?因而,事实环境中,咱们会启动多个 worker 线程,他们会尽可能地接管客户端申请,解决并返回应答。

当然,咱们能够应用启动多个 worker 过程的形式来实现,然而启动一个过程总比启动多个过程要来的不便且易于治理。而且,作为线程启动的 worker,所占用的带宽会比拟少,提早也会较低。
以下是多线程版的 Hello World 服务:

mtserver: Multithreaded service in C

//
//  多线程版 Hello World 服务
//
#include "zhelpers.h"
#include <pthread.h>
 
static void *
worker_routine (void *context) {
    //  连贯至代理的套接字
    void *receiver = zmq_socket (context, ZMQ_REP);
    zmq_connect (receiver, "inproc://workers");
 
    while (1) {char *string = s_recv (receiver);
        printf ("Received request: [%s]\n", string);
        free (string);
        //  工作
        sleep (1);
        //  返回应答
        s_send (receiver, "World");
    }
    zmq_close (receiver);
    return NULL;
}
 
int main (void)
{void *context = zmq_init (1);
 
    //  用于和 client 进行通信的套接字
    void *clients = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (clients, "tcp://*:5555");
 
    //  用于和 worker 进行通信的套接字
    void *workers = zmq_socket (context, ZMQ_DEALER);
    zmq_bind (workers, "inproc://workers");
 
    //  启动一个 worker 池
    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
    }
    //  启动队列安装
    zmq_device (ZMQ_QUEUE, clients, workers);
 
    //  程序不会运行到这里,但仍进行清理工作
    zmq_close (clients);
    zmq_close (workers);
    zmq_term (context);
    return 0;
}

所有的代码应该都曾经很相熟了:

  • 服务端启动一组 worker 线程,每个 worker 创立一个 REP 套接字,并解决收到的申请。worker 线程就像是一个单线程的服务,惟一的区别是应用了 inproc 而非 tcp 协定,以及绑定 - 连贯的方向调换了。
  • 服务端创立 ROUTER 套接字用以和 client 通信,因而提供了一个 TCP 协定的内部接口。
  • 服务端创立 DEALER 套接字用以和 worker 通信,应用了外部接口(inproc)。
  • 服务端启动了 QUEUE 外部安装,连贯两个端点上的套接字。QUEUE 安装会将收到的申请分发给连贯上的 worker,并将应答路由给申请方。

须要留神的是,在某些编程语言中,创立线程并不是特地不便,POSIX 提供的类库是 pthreads,但 Windows 中就须要应用不同的 API 了。咱们会在第三章中讲述如何包装一个多线程编程的 API。

示例中的“工作”仅仅是 1 秒钟的停留,咱们能够在 worker 中进行任意的操作,包含与其余节点进行通信。音讯的流向是这样的:REQ-ROUTER-queue-DEALER-REP。

线程间的信号传输

当你刚开始应用 ZMQ 进行多线程编程时,你可能会问:要如何协调两个线程的工作呢?可能会想要应用 sleep()这样的办法,或者应用诸如信号、互斥等机制。事实上,你惟一要用的就是 ZMQ 自身。回顾一下那个醉汉抢啤酒的例子吧。

上面的示例演示了三个线程之间须要如何进行同步:

咱们应用 PAIR 套接字和 inproc 协定。

mtrelay: Multithreaded relay in C

//
// 多线程同步
//
#include "zhelpers.h"
#include <pthread.h>
 
static void *
step1 (void *context) {
    //  连贯至步骤 2,告知我已就绪
    void *xmitter = zmq_socket (context, ZMQ_PAIR);
    zmq_connect (xmitter, "inproc://step2");
    printf ("步骤 1 就绪,正在告诉步骤 2……\n");
    s_send (xmitter, "READY");
    zmq_close (xmitter);
 
    return NULL;
}
 
static void *
step2 (void *context) {
    //  启动步骤 1 前先绑定至 inproc 套接字
    void *receiver = zmq_socket (context, ZMQ_PAIR);
    zmq_bind (receiver, "inproc://step2");
    pthread_t thread;
    pthread_create (&thread, NULL, step1, context);
 
    //  期待信号
    char *string = s_recv (receiver);
    free (string);
    zmq_close (receiver);
 
    //  连贯至步骤 3,告知我已就绪
    void *xmitter = zmq_socket (context, ZMQ_PAIR);
    zmq_connect (xmitter, "inproc://step3");
    printf ("步骤 2 就绪,正在告诉步骤 3……\n");
    s_send (xmitter, "READY");
    zmq_close (xmitter);
 
    return NULL;
}
 
int main (void)
{void *context = zmq_init (1);
 
    //  启动步骤 2 前先绑定至 inproc 套接字
    void *receiver = zmq_socket (context, ZMQ_PAIR);
    zmq_bind (receiver, "inproc://step3");
    pthread_t thread;
    pthread_create (&thread, NULL, step2, context);
 
    //  期待信号
    char *string = s_recv (receiver);
    free (string);
    zmq_close (receiver);
 
    printf ("测试胜利!\n");
    zmq_term (context);
    return 0;
}

这是一个 ZMQ 多线程编程的典型示例:

  1. 两个线程通过 inproc 协定进行通信,应用同一个上下文;
  2. 父线程创立一个套接字,绑定至 inproc:// 端点,而后再启动子线程,将上下文对象传递给它;
  3. 子线程创立第二个套接字,连贯至 inproc:// 端点,而后发送已就绪信号给父线程。

须要留神的是,这段代码无奈扩大到多个过程之间的协调。如果你应用 inproc 协定,只能建设构造十分严密的应用程序。在延迟时间必须严格控制的状况下能够应用这种办法。对其余应用程序来说,每个线程应用同一个上下文,协定选用 ipc 或 tcp。而后,你就能够自在地将应用程序拆分为多个过程甚至是多台计算机了。

这是咱们第一次应用 PAIR 套接字。为什么要应用 PAIR?其余类型的套接字也能够应用,但都有一些毛病会影响到线程间的通信:

  • 你能够让信号发送方应用 PUSH,接管方应用 PULL,这看上去可能能够,然而须要留神的是,PUSH 套接字发送音讯时会进行负载平衡,如果你不小心开启了两个接管方,就会“失落”一半的信号。而 PAIR 套接字建设的是一对一的连贯,具备排他性。
  • 能够让发送方应用 DEALER,接管方应用 ROUTER。然而,ROUTER 套接字会在音讯的外层包裹一个起源地址,这样一来本来零字节的信号就可能要成为一个多段音讯了。如果你不在乎这个问题,并且不会反复读取那个套接字,天然能够应用这种办法。然而,如果你想要应用这个套接字接管真正的数据,你就会发现 ROUTER 提供的音讯是谬误的。至于 DEALER 套接字,它同样有负载平衡的机制,和 PUSH 套接字有雷同的危险。
  • 能够让发送方应用 PUB,接管方应用 SUB。一来音讯能够照原样发送,二来 PUB 套接字不会进行负载平衡。然而,你须要对 SUB 套接字设置一个空的订阅信息(用以接管所有音讯);而且,如果 SUB 套接字没有及时和 PUB 建设连贯,音讯很有可能会失落。

综上,应用 PAIR 套接字进行线程间的协调是最合适的。

节点协调

当你想要对节点进行协调时,PAIR 套接字就不怎么适合了,这也是线程和节点之间的不同之处。一般来说,节点是来去自由的,而线程则较为稳固。应用 PAIR 套接字时,若近程节点断开连接后又进行重连,PAIR 不会予以理睬。

第二个区别在于,线程的数量个别是固定的,而节点数量则会常常变动。让咱们以气象信息模型为根底,看看要怎么进行节点的协调,以保障客户端不会失落最开始的那些音讯。

上面是程序运行逻辑:

  • 发布者晓得预期的订阅者数量,这个数字能够任意指定;
  • 发布者启动后会先期待所有订阅者进行连贯,也就是节点协调。每个订阅者会应用另一个套接字来告知发布者本人已就绪;
  • 当所有订阅者准备就绪后,发布者才开始发送音讯。

这里咱们会应用 REQ-REP 套接字来同步发布者和订阅者。发布者的代码如下:

syncpub: Synchronized publisher in C

//
//  发布者 - 同步版
//
#include "zhelpers.h"
 
//  期待 10 个订阅者连贯
#define SUBSCRIBERS_EXPECTED  10
 
int main (void)
{void *context = zmq_init (1);
 
    //  用于和客户端通信的套接字
    void *publisher = zmq_socket (context, ZMQ_PUB);
    zmq_bind (publisher, "tcp://*:5561");
 
    //  用于接管信号的套接字
    void *syncservice = zmq_socket (context, ZMQ_REP);
    zmq_bind (syncservice, "tcp://*:5562");
 
    //  接管订阅者的就绪信号
    printf ("正在期待订阅者就绪 \n");
    int subscribers = 0;
    while (subscribers < SUBSCRIBERS_EXPECTED) {
        //  - 期待就绪信息
        char *string = s_recv (syncservice);
        free (string);
        //  - 发送应答
        s_send (syncservice, "");
        subscribers++;
    }
    //  开始发送 100 万条数据
    printf ("正在播送音讯 \n");
    int update_nbr;
    for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
        s_send (publisher, "Rhubarb");
 
    s_send (publisher, "END");
 
    zmq_close (publisher);
    zmq_close (syncservice);
    zmq_term (context);
    return 0;
}

以下是订阅者的代码:

syncsub: Synchronized subscriber in C

//
//  订阅者 - 同步版
//
#include "zhelpers.h"
 
int main (void)
{void *context = zmq_init (1);
 
    //  一、连贯 SUB 套接字
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5561");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
 
    //  ZMQ 太快了,咱们提早一会儿……
    sleep (1);
 
    //  二、与发布者进行同步
    void *syncclient = zmq_socket (context, ZMQ_REQ);
    zmq_connect (syncclient, "tcp://localhost:5562");
 
    //  - 发送申请
    s_send (syncclient, "");
 
    //  - 期待应答
    char *string = s_recv (syncclient);
    free (string);
 
    //  三、解决音讯
    int update_nbr = 0;
    while (1) {char *string = s_recv (subscriber);
        if (strcmp (string, "END") == 0) {free (string);
            break;
        }
        free (string);
        update_nbr++;
    }
    printf ("收到 %d 条音讯 \n", update_nbr);
 
    zmq_close (subscriber);
    zmq_close (syncclient);
    zmq_term (context);
    return 0;
}

以下这段 shell 脚本会启动 10 个订阅者、1 个发布者:

echo "正在启动订阅者..."
for a in 1 2 3 4 5 6 7 8 9 10; do
    syncsub &
done
echo "正在启动发布者..."
syncpub

后果如下:

正在启动订阅者...
正在启动发布者...
收到 1000000 条音讯
收到 1000000 条音讯
收到 1000000 条音讯
收到 1000000 条音讯
收到 1000000 条音讯
收到 1000000 条音讯
收到 1000000 条音讯
收到 1000000 条音讯
收到 1000000 条音讯
收到 1000000 条音讯

当 REQ-REP 申请实现时,咱们仍无奈保障 SUB 套接字已胜利建设连贯。除非应用 inproc 协定,否则对外连贯的程序是不肯定的。因而,示例程序中应用了 sleep(1)的形式来进行解决,随后再发送同步申请。

更牢靠的模型能够是:

  • 发布者关上 PUB 套接字,开始发送 Hello 音讯(非数据);
  • 订阅者连贯 SUB 套接字,当收到 Hello 音讯后再应用 REQ-REP 套接字进行同步;
  • 当发布者取得所有订阅者的同步音讯后,才开始发送真正的数据。

零拷贝

第一章中咱们曾提过零拷贝是很危险的,其实那是吓唬你的。既然你曾经读到这里了,阐明你曾经具备了足够的常识,可能应用零拷贝。但须要记住,条条大路通天堂,过早地对程序进行优化其实是没有必要的。简略的说,如果你用不好零拷贝,那可能会让程序架构变得更糟。

ZMQ 提供的 API 能够让你间接发送和接管音讯,不必思考缓存的问题。正因为音讯是由 ZMQ 在后盾收发的,所以应用零拷贝须要一些额定的工作。

做零拷贝时,应用 zmq_msg_init_data()函数创立一条音讯,其内容指向某个曾经调配好的内存区域,而后将该消息传递给 zmq_send()函数。创立音讯时,你还须要提供一个用于开释音讯内容的函数,ZMQ 会在音讯发送结束时调用。上面是一个简略的例子,咱们假如曾经调配好的内存区域为 1000 个字节:

void my_free (void *data, void *hint) {free (data);
}
//  Send message from buffer, which we allocate and 0MQ will free for us
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_send (socket, &message, 0);

在接管音讯的时候是无奈应用零拷贝的:ZMQ 会将收到的音讯放入一块内存区域供你读取,但不会将音讯写入程序指定的内存区域。

ZMQ 的多段音讯可能很好地反对零拷贝。在传统音讯零碎中,你须要将不同缓存中的内容保留到同一个缓存中,而后能力发送。但 ZMQ 会将来自不同内存区域的内容作为音讯的一个帧进行发送。而且在 ZMQ 外部,一条音讯会作为一个整体进行收发,因此十分高效。

刹时套接字和长久套接字

在传统网络编程中,套接字是一个 API 对象,它们的生命周期不会长过程序的生命周期。但认真端详一下套接字,它会占用一项特定的资源——缓存,这时 ZMQ 的开发者可能会问:是否有方法在程序解体时让这些套接字缓存得以保留,稍后可能复原?

这种个性应该会十分有用,尽管不能应答所有的危险,但至多能够挽回一部分损失,特地是多公布 - 订阅模式来说。让咱们来讨论一下。

这里有两个套接字正在欢快地传送着气象信息:

如果接管方(SUB、PULL、REQ)指定了套接字标识,当它们断开网络时,发送方(PUB、PUSH、REP)会为它们缓存信息,直至达到阈值(HWM)。这里发送方不须要有套接字标识。

须要留神,ZMQ 的套接字缓存对程序员来说是不可见的,正如 TCP 缓存一样。

到目前为止,咱们应用的套接字都是刹时套接字。要将刹时套接字转化为长久套接字,须要为其设定一个套接字标识。所有的 ZMQ 套接字都会有一个标识,不过是由 ZMQ 主动生成的 UUID。

在 ZMQ 外部,两个套接字相连时会先替换各自的标识。如果产生对方没有 ID,则会自行生成一个用以标识对方:

但套接字也能够告知对方本人的标识,那当它们第二次连贯时,就能晓得对方的身份:

        +-----------+
        |           |
        |  Sender   |
        |           |
        +-----------+
        |  Socket   |
        \-----------/
              ^  "Lucy! Nice to see you again..."
              |
              |
              |  "My name's Lucy"
        /-----+-----\
        |  Socket   |
        +-----------+
        |           |
        | Receiver  |
        |           |
        +-----------+


        Figure # - Durable socket

上面这行代码就能够为套接字设置标识,从而建设了一个长久的套接字:

zmq_setsockopt (socket, ZMQ_IDENTITY, "Lucy", 4);

对于套接字标识还有几点阐明:

  • 如果要为套接字设置标识,必须在连贯或绑定至端点之前设置;
  • 接管方会抉择应用套接字标识,正如 cookie 在 HTTP 网页利用中的性质,是由服务器去抉择要应用哪个 cookie 的;
  • 套接字标识是二进制字符串;以字节 0 结尾的套接字标识为 ZMQ 保留标识;
  • 不必为多个套接字指定雷同的标识,若套接字应用的标识已被占用,它将无奈连贯至其余套接字;
  • 不要应用随机的套接字标识,这样会生成很多长久化套接字,最终让节点解体;
  • 如果你想获取对方套接字的标识,只有 ROUTER 套接字会帮你主动实现这件事,应用其余套接字类型时,须要将标识作为音讯的一帧发送过去;
  • 说了以上这些,应用长久化套接字其实并不明智,因为它会让发送者越来越凌乱,让架构变得软弱。如果咱们能从新设计 ZMQ,很可能会去掉这种显式申明套接字标识的性能。

其余信息能够查看 zmq_setsockopt()函数的 ZMQ_IDENTITY 一节。留神,该办法只能获取程序中套接字的标识,而不能取得对方套接字的标识。

公布 - 订阅音讯信封

咱们简略介绍了多帧音讯,上面就来看看它的典型用法——音讯信封。信封是指为音讯注明起源地址,而不批改音讯内容。

在公布 - 订阅模式中,信封蕴含了订阅信息,用以过滤掉不须要接管的音讯。

如果你想要应用公布 - 订阅信封,就须要自行生成和设置。这个动作是可选的,咱们在之前的示例中也没有应用到。在公布 - 订阅模式中应用信封可能会比拟麻烦,但在事实利用中还是很有必要的,毕竟信封和音讯确实是两块不想干的数据。

这是公布 - 订阅模式中一个带有信封的音讯:

咱们回顾一下,公布 - 订阅模式中,音讯的接管是依据订阅信息来的,也就是音讯的前缀。将这个前缀放入独自的音讯帧,能够让匹配变得非常明显。因为不会有一个应用程序恰好只匹配了一部分数据。

上面是一个最简的公布 - 订阅音讯信封示例。发布者会发送两类音讯:A 和 B,信封中指明了音讯类型:

psenvpub: Pub-sub envelope publisher in C

//
//  公布 - 订阅音讯信封 - 发布者
//  s_sendmore()函数也是 zhelpers.h 提供的
//
#include "zhelpers.h"
 
int main (void)
{
    //  筹备上下文和 PUB 套接字
    void *context = zmq_init (1);
    void *publisher = zmq_socket (context, ZMQ_PUB);
    zmq_bind (publisher, "tcp://*:5563");
 
    while (1) {
        //  公布两条音讯,A 类型和 B 类型
        s_sendmore (publisher, "A");
        s_send (publisher, "We don't want to see this");
        s_sendmore (publisher, "B");
        s_send (publisher, "We would like to see this");
        sleep (1);
    }
    //  正确退出
    zmq_close (publisher);
    zmq_term (context);
    return 0;
}

假如订阅者只须要 B 类型的音讯:

psenvsub: Pub-sub envelope subscriber in C

//
//  公布 - 订阅音讯信封 - 订阅者
//
#include "zhelpers.h"
 
int main (void)
{
    //  筹备上下文和 SUB 套接字
    void *context = zmq_init (1);
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5563");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "B", 1);
 
    while (1) {
        //  读取音讯信封
        char *address = s_recv (subscriber);
        //  读取音讯内容
        char *contents = s_recv (subscriber);
        printf ("[%s] %s\n", address, contents);
        free (address);
        free (contents);
    }
    //  正确退出
    zmq_close (subscriber);
    zmq_term (context);
    return 0;
}

执行下面的程序时,订阅者会打印如下信息:

[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
...

这个示例阐明订阅者会抛弃未订阅的音讯,且接管残缺的多帧音讯——你不会只取得音讯的一部分。

如果你订阅了多个套接字,又想晓得这些套接字的标识,从而通过另一个套接字来发送音讯给它们(这个用例很常见),你能够让发布者创立一条含有三帧的音讯:

(半)长久订阅者和阈值(HWM)

所有的套接字类型都能够应用标识。如果你在应用 PUB 和 SUB 套接字,其中 SUB 套接字为本人申明了标识,那么,当 SUB 断开连接时,PUB 会保留要发送给 SUB 的音讯。

这种机制有好有坏。好的中央在于发布者会暂存这些音讯,当订阅者重连后进行发送;不好的中央在于这样很容易让发布者因内存溢出而解体。

如果你在应用长久化的 SUB 套接字(即为 SUB 设置了套接字标识),那么你必须设法防止音讯在发布者队列中堆砌并溢出,应该应用阈值(HWM)来爱护发布者套接字。发布者的阈值会别离影响所有的订阅者。

咱们能够运行一个示例来证实这一点,用第一章中的 wuclient 和 wuserver 具体,在 wuclient 中进行套接字连贯前退出这一行:

    zmq_setsockopt (subscriber, ZMQ_IDENTITY, "Hello", 5);

编译并运行这两段程序,所有看起来都很平时。然而察看一下发布者的内存占用状况,能够看到当订阅者一一退出后,发布者的内存占用会逐步回升。若此时你重启订阅者,会发现发布者的内存占用不再增长了,一旦订阅者进行,就又会增长。很快地,它就会耗尽系统资源。

咱们先来看看如何设置阈值,而后再看如何设置得正确。上面的发布者和订阅者应用了上文提到的“节点协调”机制。发布者会每隔一秒发送一条音讯,这时你能够中断订阅者,重新启动它,看看会产生什么。

以下是发布者的代码:

durapub: Durable publisher in C

//
//  发布者 - 连贯长久化的订阅者
//
#include "zhelpers.h"
 
int main (void) 
{void *context = zmq_init (1);
 
    //  订阅者会发送已就绪的音讯
    void *sync = zmq_socket (context, ZMQ_PULL);
    zmq_bind (sync, "tcp://*:5564");
 
    //  应用该套接字公布音讯
    void *publisher = zmq_socket (context, ZMQ_PUB);
    zmq_bind (publisher, "tcp://*:5565");
 
    //  期待同步音讯
    char *string = s_recv (sync);
    free (string);
 
    //  播送 10 条音讯,一秒一条
    int update_nbr;
    for (update_nbr = 0; update_nbr < 10; update_nbr++) {char string [20];
        sprintf (string, "Update %d", update_nbr);
        s_send (publisher, string);
        sleep (1);
    }
    s_send (publisher, "END");
 
    zmq_close (sync);
    zmq_close (publisher);
    zmq_term (context);
    return 0;
}

上面是订阅者的代码:

durasub: Durable subscriber in C

//
//  长久化的订阅者
//
#include "zhelpers.h"
 
int main (void)
{void *context = zmq_init (1);
 
    //  连贯 SUB 套接字
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_setsockopt (subscriber, ZMQ_IDENTITY, "Hello", 5);
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
    zmq_connect (subscriber, "tcp://localhost:5565");
 
    //  发送同步音讯
    void *sync = zmq_socket (context, ZMQ_PUSH);
    zmq_connect (sync, "tcp://localhost:5564");
    s_send (sync, "");
 
    //  获取更新,并按指令退出
    while (1) {char *string = s_recv (subscriber);
        printf ("%s\n", string);
        if (strcmp (string, "END") == 0) {free (string);
            break;
        }
        free (string);
    }
    zmq_close (sync);
    zmq_close (subscriber);
    zmq_term (context);
    return 0;
}

运行以上代码,在不同的窗口中先后关上发布者和订阅者。当订阅者获取了一至两条音讯后按 Ctrl- C 停止,而后重新启动,看看执行后果:

$ durasub
Update 0
Update 1
Update 2
^C
$ durasub
Update 3
Update 4
Update 5
Update 6
Update 7
^C
$ durasub
Update 8
Update 9
END

能够看到订阅者的惟一区别是为套接字设置了标识,发布者就会将音讯缓存起来,待重建连贯后发送。设置套接字标识能够让刹时套接字转变为长久套接字。实际中,你须要小心地给套接字起名字,能够从配置文件中获取,或者生成一个 UUID 并保存起来。

当咱们为 PUB 套接字设置了阈值,发布者就会缓存指定数量的音讯,转而抛弃溢出的音讯。让咱们将阈值设置为 2,看看会产生什么:

uint64_t hwm = 2;
zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm));

运行程序,中断订阅者后期待一段时间再重启,能够看到后果如下:

$ durasub
Update 0
Update 1
^C
$ durasub
Update 2
Update 3
Update 7
Update 8
Update 9
END

看认真了,发布者只为咱们保留了两条音讯(2 和 3)。阈值使得 ZMQ 抛弃溢出队列的音讯。

简而言之,如果你要应用长久化的订阅者,就必须在发布者端设置阈值,否则可能造成服务器因内存溢出而解体。然而,还有另一种办法。ZMQ 提供了名为替换区(swap)的机制,它是一个磁盘文件,用于寄存从队列中溢出的音讯。启动它很简略:

// 指定替换区大小,单位:字节。uint64_t swap = 25000000;
zmq_setsockopt (publisher, ZMQ_SWAP, &swap, sizeof (swap));

咱们能够将下面的办法综合起来,编写一个既能承受长久化套接字,又不至于内存溢出的发布者:

durapub2: Durable but cynical publisher in C

//
//  发布者 - 连贯长久化订阅者
//
#include "zhelpers.h"
 
int main (void) 
{void *context = zmq_init (1);
 
    //  订阅者会告知咱们它已就绪
    void *sync = zmq_socket (context, ZMQ_PULL);
    zmq_bind (sync, "tcp://*:5564");
 
    //  应用该套接字发送音讯
    void *publisher = zmq_socket (context, ZMQ_PUB);
 
    //  防止慢长久化订阅者音讯溢出的问题
    uint64_t hwm = 1;
    zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm));
 
    //  设置替换区大小,供所有订阅者应用
    uint64_t swap = 25000000;
    zmq_setsockopt (publisher, ZMQ_SWAP, &swap, sizeof (swap));
    zmq_bind (publisher, "tcp://*:5565");
 
    //  期待同步音讯
    char *string = s_recv (sync);
    free (string);
 
    //  公布 10 条音讯,一秒一条
    int update_nbr;
    for (update_nbr = 0; update_nbr < 10; update_nbr++) {char string [20];
        sprintf (string, "Update %d", update_nbr);
        s_send (publisher, string);
        sleep (1);
    }
    s_send (publisher, "END");
 
    zmq_close (sync);
    zmq_close (publisher);
    zmq_term (context);
    return 0;
}

若在事实环境中将阈值设置为 1,以致所有待发送的音讯都保留到磁盘上,会大大降低处理速度。这里有一些典型的办法用以解决不同的订阅者:

  • 必须为 PUB 套接字设置阈值,具体数字能够通过最大订阅者数、可供队列应用的最大内存区域、以及音讯的均匀大小来掂量。举例来说,你预计会有 5000 个订阅者,有 1G 的内存可供使用,音讯大小在 200 个字节左右,那么,一个正当的阈值是 1,000,000,000 / 200 / 5,000 = 1,000。
  • 如果你不心愿慢速或解体的订阅者失落音讯,能够设置一个替换区,在高峰期的时候寄存这些音讯。替换区的大小能够依据订阅者数、顶峰音讯比率、音讯均匀大小、暂存工夫等来掂量。比方,你预计有 5000 个订阅者,音讯大小为 200 个字节左右,每秒会有 10 万条音讯。这样,你每秒就须要 100MB 的磁盘空间来寄存音讯。加总起来,你会须要 6GB 的磁盘空间,而且必须足够的快(这超出了本指南的解说范畴)。

对于长久化订阅者:

  • 数据可能会失落,这要看音讯公布的频率、网络缓存大小、通信协议等。长久化的订阅者比起刹时套接字要牢靠一些,但也并不是完满的。
  • 替换区文件是无奈复原的,所以当发布者或代理沦亡时,替换区中的数据依然会失落。

对于阈值:

  • 这个选项会同时影响套接字的发送和接管队列。当然,PUB、PUSH 不会有接管队列,SUB、PULL、REQ、REP 不会有发送队列。而像 DEALER、ROUTER、PAIR 套接字时,他们既有发送队列,又有接管队列。
  • 当套接字达到阈值时,ZMQ 会产生阻塞,或间接抛弃音讯。
  • 应用 inproc 协定时,发送者和接受者共享同一个队列缓存,所以说,真正的阈值是两个套接字阈值之和。如果一方套接字没有设置阈值,那么它就不会有缓存方面的限度。

这就是你想要的!

ZMQ 就像是一盒积木,只有你有足够的想象力,就能够用它组装出任何造型的网络架构。

这种高可扩、高弹性的架构肯定会关上你的眼界。其实这并不是 ZMQ 原创的,早就有像 Erlang 这样的基于流的编程语言曾经可能做到了,只是 ZMQ 提供了更为友善和易用的接口。

正如 Gonzo Diethelm 所言:“我想用一句话来总结,‘如果 ZMQ 不存在,那它就应该被创造进去。’作为一个有着多年相干工作教训的人,ZMQ 太能引起我的共鸣了。我只能说,‘这就是我想要的!’”

退出移动版