共计 22070 个字符,预计需要花费 56 分钟才能阅读完成。
ZMQ 指南
作者: Pieter Hintjens <ph@imatix.com>, CEO iMatix Corporation.
翻译: 张吉 <jizhang@anjuke.com>, 安居客团体 好租网工程师
With thanks to Bill Desmarais, Brian Dorsey, CAF, Daniel Lin, Eric Desgranges, Gonzalo Diethelm, Guido Goldstein, Hunter Ford, Kamil Shakirov, Martin Sustrik, Mike Castleman, Naveen Chawla, Nicola Peduzzi, Oliver Smith, Olivier Chamoux, Peter Alexander, Pierre Rouleau, Randy Dryburgh, John Unwin, Alex Thomas, Mihail Minkov, Jeremy Avnet, Michael Compton, Kamil Kisiel, Mark Kharitonov, Guillaume Aubert, Ian Barber, Mike Sheridan, Faruk Akgul, Oleg Sidorov, Lev Givon, Allister MacLeod, Alexander D’Archangel, Andreas Hoelzlwimmer, Han Holl, Robert G. Jakabosky, Felipe Cruz, Marcus McCurdy, Mikhail Kulemin, Dr. Gergő Érdi, Pavel Zhukov, Alexander Else, Giovanni Ruggiero, Rick “Technoweenie”, Daniel Lundin, Dave Hoover, Simon Jefford, Benjamin Peterson, Justin Case, Devon Weller, Richard Smith, Alexander Morland, Wadim Grasza, Michael Jakl, and Zed Shaw for their contributions, and to Stathis Sideris for Ditaa.
Please use the issue tracker for all comments and errata. This version covers the latest stable release of 0MQ and was published on Mon 10 October, 2011.
The Guide is mainly in C, but also in PHP and Lua.
This work is licensed under a Creative Commons Attribution-ShareAlike 3.0 License.
第一章 ZeroMQ 根底
援救世界
如何解释 ZMQ?有些人会先说一堆 ZMQ 的好:它是一套用于疾速构建的套接字组件;它的信箱零碎有超强的路由能力;它太快了!而有些人则喜爱分享他们被 ZMQ 点悟的时刻,那些被灵感击中的霎时:所有的事件忽然变得简单明了,让人大开眼界。另一些人则会拿 ZMQ 同其余产品做个比拟:它更小,更简略,但却让人感觉如此相熟。对于我集体而言,我则更偏向于和他人分享 ZMQ 的诞生史,置信会和各位读者有所共鸣。
编程是一门迷信,但往往会乔装成一门艺术。咱们从不去理解软件最底层的机理,或者说基本没有人在乎这些。软件并不只是算法、数据结构、编程语言、或者形象云云,这些不过是一些工具而已,被咱们发明、应用、最初摈弃。软件真正的实质,其实是人的实质。
举例来说,当咱们遇到一个高度简单的问题时,咱们会集思广益,分工合作,将问题拆分为若干个局部,一起解决。这里就体现了编程的迷信:创立一组小型的构建模块,让人们易于了解和应用,那么大家就会一起用它来解决问题。
咱们生存在一个广泛分割的世界里,须要古代的编程软件为咱们做指引。所以,将来咱们所须要的用于解决大规模计算的构建模块,必须是广泛分割的,而且可能并行运作。那时,程序代码不能再只关注本人,它们须要相互交换,变得足够健谈。程序代码须要像人脑一样,数以兆计的神经元高速地传输信号,在一个没有地方管制的环境下,没有单点故障的环境下,解决问题。这一点其实并不意外,因为就当今的网络来讲,每个节点其实就像是连贯了一个人脑一样。
如果你曾和线程、协定、或网络打过交道,你会感觉我下面的话像是天方夜谭。因为在理论利用过程中,只是连贯几个程序或网络就曾经十分艰难和麻烦了。数以兆计的节点?那真是无奈设想的。现今只有资金雄厚的企业能力负担得起这种软件和服务。
当今世界的网络结构曾经远远超过了咱们本身的驾驭能力。十九世纪八十年代的软件危机,弗莱德•布鲁克斯曾说过,这个世上[没有银弹](http://en.wikipedia.org/wiki/…
)。起初,收费和开源解决了这次软件危机,让咱们可能高效地分享常识。现在,咱们又面临一次新的软件危机,只不过咱们议论得不多。只有那些大型的、富足的企业才有财力建设高度分割的应用程序。那里有云的存在,但它是公有的。咱们的数据和常识正在从咱们的个人电脑中隐没,流入云端,无奈取得或与其竞争。是谁坐拥咱们的社交网络?这真像一次巨型主机的反动。
咱们暂且不谈其中的政治因素,光那些就能够另外出本书了。目前的现状是,尽管互联网可能让千万个程序相连,但咱们之中的大多数却无奈做到这些。这样一来,那些真正乏味的大型问题(如衰弱、教育、经济、交通等畛域),依然无奈解决。咱们没有能力将代码连接起来,也就不能像大脑中的神经元一样解决那些大规模的问题。
曾经有人尝试用各种办法来连贯应用程序,如数以千计的 IETF 标准,每种标准解决一个特定问题。对于开发人员来说,HTTP 协定是比较简单和易用的,但这也往往让问题变得更糟,因为它激励人们造成一种重服务端、轻客户端的思维。
所以迄今为止人们还在应用原始的 TCP/UDP 协定、公有协定、HTTP 协定、网络套接字等模式连贯应用程序。这种做法仍旧让人苦楚,速度慢又不易扩大,须要集中化治理。而分布式的 P2P 协定又仅仅实用于娱乐,而非真正的利用。有谁会应用 Skype 或者 Bittorrent 来替换数据呢?
这就让咱们回归到编程迷信的问题上来。想要援救这个世界,咱们须要做两件事件:一,如何在任何地点连贯任何两个应用程序;二、将这个解决方案用最为简略的形式包装起来,供程序员应用。
兴许这听起来太简略了,但事实的确如此。
ZMQ 简介
ZMQ(ØMQ、ZeroMQ, 0MQ)看起来像是一套嵌入式的网络链接库,但工作起来更像是一个并发式的框架。它提供的套接字能够在多种协定中传输音讯,如线程间、过程间、TCP、播送等。你能够应用套接字构建多对多的连贯模式,如扇出、公布 - 订阅、工作散发、申请 - 应答等。ZMQ 的疾速足以胜任集群利用产品。它的异步 I / O 机制让你可能构建多核应用程序,实现异步音讯解决工作。ZMQ 有着多语言反对,并能在简直所有的操作系统上运行。ZMQ 是 [iMatix][] 公司的产品,以 LGPL 开源协定公布。
须要具备的常识
- 应用最新的 ZMQ 稳固版本;
- 应用 Linux 零碎或其余类似的操作系统;
- 可能浏览 C 语言代码,这是本指南示例程序的默认语言;
- 当咱们书写诸如 PUSH 或 SUBSCRIBE 等常量时,你可能找到相应语言的实现,如 ZMQ_PUSH、ZMQ_SUBSCRIBE。
获取示例
本指南的所有示例都寄存于 github 仓库中,最简略的获取形式是运行以下代码:
git clone git://github.com/imatix/zguide.git
浏览 examples 目录,你能够看到多种语言的实现。如果其中短少了某种你正在应用的语言,咱们很心愿你能够提交一份补充。这也是本指南实用的起因,要感激所有做出过奉献的人。
所有的示例代码都以 MIT/X11 协定公布,若在源代码中有其余限定的除外。
发问 - 答复
让咱们从简略的代码开始,一段传统的 Hello World 程序。咱们会创立一个客户端和一个服务端,客户端发送 Hello 给服务端,服务端返回 World。下文是 C 语言编写的服务端,它在 5555 端口关上一个 ZMQ 套接字,期待申请,收到后应答 World。
hwserver.c: Hello World server
//
// Hello World 服务端
// 绑定一个 REP 套接字至 tcp://*:5555
// 从客户端接管 Hello,并应答 World
//
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
int main (void)
{void *context = zmq_init (1);
// 与客户端通信的套接字
void *responder = zmq_socket (context, ZMQ_REP);
zmq_bind (responder, "tcp://*:5555");
while (1) {
// 期待客户端申请
zmq_msg_t request;
zmq_msg_init (&request);
zmq_recv (responder, &request, 0);
printf ("收到 Hello\n");
zmq_msg_close (&request);
// 做些“解决”sleep (1);
// 返回应答
zmq_msg_t reply;
zmq_msg_init_size (&reply, 5);
memcpy (zmq_msg_data (&reply), "World", 5);
zmq_send (responder, &reply, 0);
zmq_msg_close (&reply);
}
// 程序不会运行到这里,以下只是演示咱们应该如何完结
zmq_close (responder);
zmq_term (context);
return 0;
}
应用 REQ-REP 套接字发送和承受音讯是须要遵循肯定法则的。客户端首先应用 zmq_send()发送音讯,再用 zmq_recv()接管,如此循环。如果打乱了这个程序(如间断发送两次)则会报错。相似地,服务端必须先进行接管,后进行发送。
ZMQ 应用 C 语言作为它参考手册的语言,本指南也以它作为示例程序的语言。如果你正在浏览本指南的在线版本,你能够看到示例代码的下方有其余语言的实现。如以下是 C ++ 语言:
hwserver.cpp: Hello World server
//
// Hello World 服务端 C++ 语言版
// 绑定一个 REP 套接字至 tcp://*:5555
// 从客户端接管 Hello,并应答 World
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
int main () {
// 筹备上下文和套接字
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REP);
socket.bind ("tcp://*:5555");
while (true) {
zmq::message_t request;
// 期待客户端申请
socket.recv (&request);
std::cout << "收到 Hello" << std::endl;
// 做一些“解决”sleep (1);
// 应答 World
zmq::message_t reply (5);
memcpy ((void *) reply.data (), "World", 5);
socket.send (reply);
}
return 0;
}
能够看到 C 语言和 C ++ 语言的 API 代码差不多,而在 PHP 这样的语言中,代码就会更为简洁:
hwserver.php: Hello World server
<?php
/**
* Hello World 服务端
* 绑定 REP 套接字至 tcp://*:5555
* 从客户端接管 Hello,并应答 World
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext(1);
// 与客户端通信的套接字
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");
while(true) {
// 期待客户端申请
$request = $responder->recv();
printf ("Received request: [%s]\n", $request);
// 做一些“解决”sleep (1);
// 应答 World
$responder->send("World");
}
上面是客户端的代码:
hwclient: Hello World client in C
//
// Hello World 客户端
// 连贯 REQ 套接字至 tcp://localhost:5555
// 发送 Hello 给服务端,并接管 World
//
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
int main (void)
{void *context = zmq_init (1);
// 连贯至服务端的套接字
printf ("正在连接至 hello world 服务端...\n");
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5555");
int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
zmq_msg_t request;
zmq_msg_init_size (&request, 5);
memcpy (zmq_msg_data (&request), "Hello", 5);
printf ("正在发送 Hello %d...\n", request_nbr);
zmq_send (requester, &request, 0);
zmq_msg_close (&request);
zmq_msg_t reply;
zmq_msg_init (&reply);
zmq_recv (requester, &reply, 0);
printf ("接管到 World %d\n", request_nbr);
zmq_msg_close (&reply);
}
zmq_close (requester);
zmq_term (context);
return 0;
}
这看起来是否太简略了?ZMQ 就是这样一个货色,你往里加点儿料就能制作出一枚无穷能量的原子弹,用它来援救世界吧!
实践上你能够连贯千万个客户端到这个服务端上,同时连贯都没问题,程序仍会运作得很好。你能够尝试一下先关上客户端,再关上服务端,能够看到程序依然会失常工作,想想这意味着什么。
让我简略介绍一下这两段程序到底做了什么。首先,他们创立了一个 ZMQ 上下文,而后是一个套接字。不要被这些生疏的名词吓到,前面咱们都会讲到。服务端将 REP 套接字绑定到 5555 端口上,并开始期待申请,收回应答,如此循环。客户端则是发送申请并期待服务端的应答。
这些代码背地其实产生了很多很多事件,然而程序员齐全不用理睬这些,只有晓得这些代码短小精悍,极少出错,耐低压。这种通信模式咱们称之为申请 - 应答模式,是 ZMQ 最间接的一种利用。你能够拿它和 RPC 及经典的 C / S 模型做类比。
对于字符串
ZMQ 不会关怀发送音讯的内容,只有晓得它所蕴含的字节数。所以,程序员须要做一些工作,保障对方节点可能正确读取这些音讯。如何将一个对象或简单数据类型转换成 ZMQ 能够发送的音讯,这有相似 Protocol Buffers 的序列化软件能够做到。但对于字符串,你也是须要有所留神的。
在 C 语言中,字符串都以一个空字符结尾,你能够像这样发送一个残缺的字符串:
zmq_msg_init_data (&request, "Hello", 6, NULL, NULL);
然而,如果你用其余语言发送这个字符串,很可能不会蕴含这个空字节,如你应用 Python 发送:
socket.send ("Hello")
理论发送的音讯是:
如果你从 C 语言中读取该音讯,你会读到一个相似于字符串的内容,甚至它可能就是一个字符串(第六位在内存中正好是一个空字符),然而这并不适合。这样一来,客户端和服务端对字符串的定义就不对立了,你会失去一些奇怪的后果。
当你用 C 语言从 ZMQ 中获取字符串,你不可能置信该字符串有一个正确的结尾。因而,当你在承受字符串时,应该建设多一个字节的缓冲区,将字符串放进去,并增加结尾。
所以,让咱们做如下假如:ZMQ 的字符串是有长度的,且传送时不加结束符。在最简略的状况下,ZMQ 字符串和 ZMQ 音讯中的一帧是等价的,就如上图所展示的,由一个长度属性和一串字节示意。
上面这个性能函数会帮忙咱们在 C 语言中正确的承受字符串音讯:
// 从 ZMQ 套接字中接管字符串,并转换为 C 语言的字符串
static char *
s_recv (void *socket) {
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
int size = zmq_msg_size (&message);
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
zmq_msg_close (&message);
string [size] = 0;
return (string);
}
这段代码咱们会在日后的示例中应用,咱们能够棘手写一个 s_send()办法,并打包成一个.h 文件供咱们应用。
这就诞生了 zhelpers.h,一个供 C 语言应用的 ZMQ 性能函数库。它的源代码比拟长,而且只对 C 语言程序员有用,你能够在空闲时看一看。
获取版本号
ZMQ 目前有多个版本,而且仍在继续更新。如果你遇到了问题,兴许这在下一个版本中曾经解决了。想晓得目前的 ZMQ 版本,你能够在程序中运行如下:
version: ØMQ version reporting in C
//
// 返回以后 ZMQ 的版本号
//
#include "zhelpers.h"
int main (void)
{
int major, minor, patch;
zmq_version (&major, &minor, &patch);
printf ("以后 ZMQ 版本号为 %d.%d.%d\n", major, minor, patch);
return EXIT_SUCCESS;
}
让音讯流动起来
第二种经典的音讯模式是单向数据散发:服务端将更新事件发送给一组客户端。让咱们看一个天气信息公布的例子,包含邮编、温度、相对湿度。咱们生成这些随机信息,用来模仿气象站所做的那样。
上面是服务端的代码,应用 5556 端口:
wuserver: Weather update server in C
//
// 气象信息更新服务
// 绑定 PUB 套接字至 tcp://*:5556 端点
// 公布随机气象信息
//
#include "zhelpers.h"
int main (void)
{
// 筹备上下文和 PUB 套接字
void *context = zmq_init (1);
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5556");
zmq_bind (publisher, "ipc://weather.ipc");
// 初始化随机数生成器
srandom ((unsigned) time (NULL));
while (1) {
// 生成数据
int zipcode, temperature, relhumidity;
zipcode = randof (100000);
temperature = randof (215) - 80;
relhumidity = randof (50) + 10;
// 向所有订阅者发送音讯
char update [20];
sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send (publisher, update);
}
zmq_close (publisher);
zmq_term (context);
return 0;
}
这项更新服务没有开始、没有完结,就像永不隐没的电波一样。
上面是客户端程序,它会承受发布者的音讯,只解决特定邮编标注的信息,如纽约的邮编是 10001:
wuclient: Weather update client in C
//
// 气象信息客户端
// 连贯 SUB 套接字至 tcp://*:5556 端点
// 收集指定邮编的气象信息,并计算平均温度
//
#include "zhelpers.h"
int main (int argc, char *argv [])
{void *context = zmq_init (1);
// 创立连贯至服务端的套接字
printf ("正在收集气象信息...\n");
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
// 设置订阅信息,默认为纽约,邮编 10001
char *filter = (argc > 1)? argv [1]: "10001";
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
// 解决 100 条更新信息
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {char *string = s_recv (subscriber);
int zipcode, temperature, relhumidity;
sscanf (string, "%d %d %d",
&zipcode, &temperature, &relhumidity);
total_temp += temperature;
free (string);
}
printf ("地区邮编'%s'的平均温度为 %dF\n",
filter, (int) (total_temp / update_nbr));
zmq_close (subscriber);
zmq_term (context);
return 0;
}
须要留神的是,在应用 SUB 套接字时,必须应用 zmq_setsockopt()办法来设置订阅的内容。如果你不设置订阅内容,那将什么音讯都收不到,老手很容易犯这个谬误。订阅信息能够是任何字符串,能够设置屡次。只有音讯满足其中一条订阅信息,SUB 套接字就会收到。订阅者能够抉择不接管某类音讯,也是通过 zmq_setsockopt()办法实现的。
PUB-SUB 套接字组合是异步的。客户端在一个循环体中应用 zmq_recv()接管音讯,如果向 SUB 套接字发送音讯则会报错;相似地,服务端能够一直地应用 zmq_send()发送音讯,但不能在 PUB 套接字上应用 zmq_recv()。
对于 PUB-SUB 套接字,还有一点须要留神:你无奈得悉 SUB 是何时开始接管音讯的。就算你先关上了 SUB 套接字,后关上 PUB 发送音讯,这时 SUB 还是会失落一些音讯的,因为建设连贯是须要一些工夫的。很少,但并不是零。
这种“慢连贯”的症状一开始会让很多人困惑,所以这里我要具体解释一下。还记得 ZMQ 是在后盾进行异步的 I / O 传输的,如果你有两个节点用以下程序相连:
- 订阅者连贯至端点接管音讯并计数;
- 发布者绑定至端点并立即发送 1000 条音讯。
运行的后果很可能是订阅者一条音讯都收不到。这时你可能会傻眼,忙于查看有没有设置订阅信息,并从新尝试,但后果还是一样。
咱们晓得在建设 TCP 连贯时须要进行三次握手,会消耗几毫秒的工夫,而当节点数减少时这个数字也会回升。在这么短的工夫里,ZMQ 就能够发送很多很多音讯了。举例来说,如果建设连贯须要耗时 5 毫秒,而 ZMQ 只须要 1 毫秒就能够发送完这 1000 条音讯。
第二章中我会解释如何使发布者和订阅者同步,只有当订阅者筹备好时发布者才会开始发送音讯。有一种简略的办法来同步 PUB 和 SUB,就是让 PUB 提早一段时间再发送音讯。事实编程中我不倡议应用这种形式,因为它太软弱了,而且不好管制。不过这里咱们先暂且应用 sleep 的形式来解决,等到第二章的时候再讲述正确的解决形式。
另一种同步的形式则是认为发布者的音讯流是无穷无尽的,因而失落了后面一部分信息也没有关系。咱们的气象信息客户端就是这么做的。
示例中的气象信息客户端会收集指定邮编的一千条信息,其间大概有 1000 万条信息被公布。你能够先关上客户端,再关上服务端,工作一段时间后重启服务端,这时客户端仍会失常工作。当客户端收集完所需信息后,会计算并输入平均温度。
对于公布 - 订阅模式的几点阐明:
- 订阅者能够连贯多个发布者,轮流接管音讯;
- 如果发布者没有订阅者与之相连,那它发送的音讯将间接被抛弃;
- 如果你应用 TCP 协定,那当订阅者处理速度过慢时,音讯会在发布者处沉积。当前咱们会探讨如何应用阈值(HWM)来爱护发布者。
- 在目前版本的 ZMQ 中,音讯的过滤是在订阅者处进行的。也就是说,发布者会向订阅者发送所有的音讯,订阅者会将未订阅的音讯抛弃。
我在本人的四核计算机上尝试公布 1000 万条音讯,速度很快,但没什么特地的:
ph@ws200901:~/work/git/0MQGuide/examples/c$ time wuclient
Collecting updates from weather server...
Average temperature for zipcode '10001' was 18F
real 0m5.939s
user 0m1.590s
sys 0m2.290s
分布式解决
上面一个示例程序中,咱们将应用 ZMQ 进行超级计算,也就是并行处理模型:
- 工作散发器会生成大量能够并行计算的工作;
- 有一组 worker 会解决这些工作;
- 后果收集器会在末端接管所有 worker 的处理结果,进行汇总。
事实中,worker 可能散落在不同的计算机中,利用 GPU(图像处理单元)进行简单计算。上面是工作散发器的代码,它会生成 100 个工作,工作内容是让收到的 worker 提早若干毫秒。
taskvent: Parallel task ventilator in C
//
// 工作散发器
// 绑定 PUSH 套接字至 tcp://localhost:5557 端点
// 发送一组工作给已建设连贯的 worker
//
#include "zhelpers.h"
int main (void)
{void *context = zmq_init (1);
// 用于发送音讯的套接字
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// 用于发送开始信号的套接字
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
printf ("筹备好 worker 后按任意键开始:");
getchar ();
printf ("正在向 worker 分配任务...\n");
// 发送开始信号
s_send (sink, "0");
// 初始化随机数生成器
srandom ((unsigned) time (NULL));
// 发送 100 个工作
int task_nbr;
int total_msec = 0; // 预计执行工夫(毫秒)for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// 随机产生 1 -100 毫秒的工作量
workload = randof (100) + 1;
total_msec += workload;
char string [10];
sprintf (string, "%d", workload);
s_send (sender, string);
}
printf ("预计执行工夫: %d 毫秒 \n", total_msec);
sleep (1); // 提早一段时间,让工作散发实现
zmq_close (sink);
zmq_close (sender);
zmq_term (context);
return 0;
}
上面是 worker 的代码,它承受信息并提早指定的毫秒数,并发送执行结束的信号:
taskwork: Parallel task worker in C
//
// 工作执行器
// 连贯 PULL 套接字至 tcp://localhost:5557 端点
// 从工作散发器处获取工作
// 连贯 PUSH 套接字至 tcp://localhost:5558 端点
// 向后果采集器发送后果
//
#include "zhelpers.h"
int main (void)
{void *context = zmq_init (1);
// 获取工作的套接字
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// 发送后果的套接字
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// 循环解决工作
while (1) {char *string = s_recv (receiver);
// 输入解决进度
fflush (stdout);
printf ("%s.", string);
// 开始解决
s_sleep (atoi (string));
free (string);
// 发送后果
s_send (sender, "");
}
zmq_close (receiver);
zmq_close (sender);
zmq_term (context);
return 0;
}
上面是后果收集器的代码。它会收集 100 个处理结果,并计算总的执行工夫,让咱们由此判断工作是否是并行计算的。
tasksink: Parallel task sink in C
//
// 工作收集器
// 绑定 PULL 套接字至 tcp://localhost:5558 端点
// 从 worker 处收集处理结果
//
#include "zhelpers.h"
int main (void)
{
// 筹备上下文和套接字
void *context = zmq_init (1);
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// 期待开始信号
char *string = s_recv (receiver);
free (string);
// 开始计时
int64_t start_time = s_clock ();
// 确定 100 个工作均已解决
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {char *string = s_recv (receiver);
free (string);
if ((task_nbr / 10) * 10 == task_nbr)
printf (":");
else
printf (".");
fflush (stdout);
}
// 计算并输入总执行工夫
printf ("执行工夫: %d 毫秒 \n",
(int) (s_clock () - start_time));
zmq_close (receiver);
zmq_term (context);
return 0;
}
一组工作的均匀执行工夫在 5 秒左右,以下是别离开始 1 个、2 个、4 个 worker 时的执行后果:
# 1 worker
Total elapsed time: 5034 msec
# 2 workers
Total elapsed time: 2421 msec
# 4 workers
Total elapsed time: 1018 msec
对于这段代码的几个细节:
- worker 上游和工作散发器相连,上游和后果收集器相连,这就意味着你能够开启任意多个 worker。但若 worker 是绑定至端点的,而非连贯至端点,那咱们就须要筹备更多的端点,并配置工作散发器和后果收集器。所以说,工作散发器和后果收集器是这个网络结构中较为稳固的局部,因而应该由它们绑定至端点,而非 worker,因为它们较为动静。
- 咱们须要做一些同步的工作,期待 worker 全副启动之后再散发工作。这点在 ZMQ 中很重要,且不易解决。连贯套接字的动作会消耗肯定的工夫,因而当第一个 worker 连贯胜利时,它会一下收到很多工作。所以说,如果咱们不进行同步,那这些工作基本就不会被并行地执行。你能够本人试验一下。
- 工作散发器应用 PUSH 套接字向 worker 平均地散发工作(假如所有的 worker 都曾经连贯上了),这种机制称为_负载平衡_,当前咱们会见得更多。
- 后果收集器的 PULL 套接字会平均地从 worker 处收集音讯,这种机制称为_偏心队列_:
管道模式也会呈现慢连贯的状况,让人误以为 PUSH 套接字没有进行负载平衡。如果你的程序中某个 worker 接管到了更多的申请,那是因为它的 PULL 套接字连贯得比拟快,从而在别的 worker 连贯之前获取了额定的音讯。
应用 ZMQ 编程
看着这些示例程序后,你肯定急不可待想要用 ZMQ 进行编程了。不过在开始之前,我还有几条倡议想给到你,这样能够省去将来的一些麻烦:
- 学习 ZMQ 要循序渐进,尽管它只是一套 API,但却提供了无尽的可能。一步一步学习它提供的性能,并齐全把握。
- 编写丑陋的代码。俊俏的代码会暗藏问题,让想要帮忙你的人无从下手。比方,你会习惯于应用无意义的变量名,但读你代码的人并不知道。应应用有意义的变量名称,而不是随便起一个。代码的缩进要对立,布局清晰。丑陋的代码能够让你的世界变得更美妙。
- 边写边测试,当代码呈现问题,你就能够疾速定位到某些行。这一点在编写 ZMQ 应用程序时尤为重要,因为很多时候你无奈第一次就编写出正确的代码。
- 当你发现自己编写的代码无奈失常工作时,你能够将其拆分成一些代码片段,看看哪段没有正确地执行。ZMQ 能够让你构建十分模块化的代码,所以应该好好利用这一点。
- 须要时应应用形象的办法来编写程序(类、成员函数等等),不要随便拷贝代码,因为拷贝代码的同时也是在拷贝谬误。
咱们看看上面这段代码,是某位同仁让我帮忙批改的:
// 留神:不要应用这段代码!static char *topic_str = "msg.x|";
void* pub_worker(void* arg){
void *ctx = arg;
assert(ctx);
void *qskt = zmq_socket(ctx, ZMQ_REP);
assert(qskt);
int rc = zmq_connect(qskt, "inproc://querys");
assert(rc == 0);
void *pubskt = zmq_socket(ctx, ZMQ_PUB);
assert(pubskt);
rc = zmq_bind(pubskt, "inproc://publish");
assert(rc == 0);
uint8_t cmd;
uint32_t nb;
zmq_msg_t topic_msg, cmd_msg, nb_msg, resp_msg;
zmq_msg_init_data(&topic_msg, topic_str, strlen(topic_str) , NULL, NULL);
fprintf(stdout,"WORKER: ready to recieve messages\n");
// 留神:不要应用这段代码,它不能工作!// e.g. topic_msg will be invalid the second time through
while (1){zmq_send(pubskt, &topic_msg, ZMQ_SNDMORE);
zmq_msg_init(&cmd_msg);
zmq_recv(qskt, &cmd_msg, 0);
memcpy(&cmd, zmq_msg_data(&cmd_msg), sizeof(uint8_t));
zmq_send(pubskt, &cmd_msg, ZMQ_SNDMORE);
zmq_msg_close(&cmd_msg);
fprintf(stdout, "recieved cmd %u\n", cmd);
zmq_msg_init(&nb_msg);
zmq_recv(qskt, &nb_msg, 0);
memcpy(&nb, zmq_msg_data(&nb_msg), sizeof(uint32_t));
zmq_send(pubskt, &nb_msg, 0);
zmq_msg_close(&nb_msg);
fprintf(stdout, "recieved nb %u\n", nb);
zmq_msg_init_size(&resp_msg, sizeof(uint8_t));
memset(zmq_msg_data(&resp_msg), 0, sizeof(uint8_t));
zmq_send(qskt, &resp_msg, 0);
zmq_msg_close(&resp_msg);
}
return NULL;
}
上面是我为他重写的代码,顺便修复了一些 BUG:
static void *
worker_thread (void *arg) {
void *context = arg;
void *worker = zmq_socket (context, ZMQ_REP);
assert (worker);
int rc;
rc = zmq_connect (worker, "ipc://worker");
assert (rc == 0);
void *broadcast = zmq_socket (context, ZMQ_PUB);
assert (broadcast);
rc = zmq_bind (broadcast, "ipc://publish");
assert (rc == 0);
while (1) {char *part1 = s_recv (worker);
char *part2 = s_recv (worker);
printf ("Worker got [%s][%s]\n", part1, part2);
s_sendmore (broadcast, "msg");
s_sendmore (broadcast, part1);
s_send (broadcast, part2);
free (part1);
free (part2);
s_send (worker, "OK");
}
return NULL;
}
上段程序的最初,它将套接字在两个线程之间传递,这会导致莫名其妙的问题。这种行为在 ZMQ 2.1 中尽管是非法的,然而不提倡应用。
ZMQ 2.1 版
历史通知咱们,ZMQ 2.0 是一个低提早的分布式音讯零碎,它从泛滥同类软件中怀才不遇,解脱了各种豪华的名目,向世界宣告“无极限”的口号。这是咱们始终在应用的稳固发行版。
时过境迁,2010 年风行的货色在 2011 年就不肯定了。当 ZMQ 的开发者和社区开发者在强烈地探讨 ZMQ 的种种问题时,ZMQ 2.1 横空出世了,成为新的稳固发行版。
本指南次要针对 ZMQ 2.1 进行形容,因而对于从 ZMQ 2.0 迁徙过去的开发者来说有一些须要留神的中央:
- 在 2.0 中,调用 zmq_close()和 zmq_term()时会抛弃所有尚未发送的音讯,所以在发送完音讯后不能间接关闭程序,2.0 的示例中往往应用 sleep(1)来躲避这个问题。然而在 2.1 中就不须要这样做了,程序会期待音讯全副发送结束后再退出。
- 相同地,2.0 中能够在尚有套接字关上的状况下调用 zmq_term(),这在 2.1 中会变得不平安,会造成程序的阻塞。所以,在 2.1 程序中咱们_会先敞开所有的套接字_,而后才退出程序。如果套接字中有尚未发送的音讯,程序就会始终处于期待状态,_除非手工设置了套接字的 LINGER 选项_(如设置为零),那么套接字会在相应的工夫后敞开。
int zero = 0;
zmq_setsockopt (mysocket, ZMQ_LINGER, &zero, sizeof (zero));
- 2.0 中,zmq_poll()函数没有定时性能,它会在满足条件时立即返回,咱们须要在循环体中查看还有多少残余。但在 2.1 中,zmq_poll()会在指定工夫后返回,因而能够作为定时器应用。
- 2.0 中,ZMQ 会疏忽零碎的中断音讯,这就意味着对 libzmq 的调用是不会收到 EINTR 音讯的,这样就无奈对 SIGINT(Ctrl-C)等音讯进行解决了。在 2.1 中,这个问题得以解决,像相似 zmq_recv()的办法都会接管并返回零碎的 EINTR 音讯。
正确地应用上下文
ZMQ 应用程序的一开始总是会先创立一个上下文,并用它来创立套接字。在 C 语言中,创立上下文的函数是 zmq_init()。一个过程中只应该创立一个上下文。从技术的角度来说,上下文是一个容器,蕴含了该过程下所有的套接字,并为 inproc 协定提供实现,用以高速连贯过程内不同的线程。如果一个过程中创立了两个上下文,那就相当于启动了两个 ZMQ 实例。如果这正是你须要的,那没有问题,但个别状况下:
在一个过程中应用 zmq_init()函数创立一个上下文,并在完结时应用 zmq_term()函数敞开它
如果你应用了 fork()零碎调用,那每个过程须要本人的上下文对象。如果在调用 fork()之前调用了 zmq_init()函数,那每个子过程都会有本人的上下文对象。通常状况下,你会须要在子过程中做些乏味的事,而让父过程来治理它们。
正确地退出和清理
程序员的一个良好习惯是:总是在完结时进行清理工作。当你应用像 Python 那样的语言编写 ZMQ 应用程序时,零碎会主动帮你实现清理。但如果应用的是 C 语言,那就须要小心地解决了,否则可能产生内存泄露、应用程序不稳固等问题。
内存泄露只是问题之一,其实 ZMQ 是很在意程序的退出形式的。个中起因比较复杂,但简略的来说,如果仍有套接字处于关上状态,调用 zmq_term()时会导致程序挂起;就算敞开了所有的套接字,如果仍有音讯处于待发送状态,zmq_term()也会造成程序的期待。只有当套接字的 LINGER 选项设为 0 时能力防止。
咱们须要关注的 ZMQ 对象包含:音讯、套接字、上下文。好在内容并不多,至多在个别的应用程序中是这样:
- 解决完音讯后,记得用 zmq_msg_close()函数敞开音讯;
- 如果你同时关上或敞开了很多套接字,那可能须要从新布局一下程序的构造了;
- 退出程序时,应该先敞开所有的套接字,最初调用 zmq_term()函数,销毁上下文对象。
如果要用 ZMQ 进行多线程的编程,须要思考的问题就更多了。咱们会在下一章中详述多线程编程,但如果你耐不住性子想要尝试一下,以下是在退出时的一些倡议:
- 不要在多个线程中应用同一个套接字。不要去想为什么,反正别这么干就是了。
- 敞开所有的套接字,并在主程序中敞开上下文对象。
- 如果仍有处于阻塞状态的 recv 或 poll 调用,应该在主程序中捕获这些谬误,并在相应的线程中敞开套接字。不要反复敞开上下文,zmq_term()函数会期待所有的套接字平安地敞开后才完结。
看吧,过程是简单的,所以不同语言的 API 实现者可能会将这些步骤封装起来,让完结程序变得不那么简单。
咱们为什么须要 ZMQ
当初咱们曾经将 ZMQ 运行起来了,让咱们回顾一下为什么咱们须要 ZMQ:
目前的应用程序很多都会蕴含跨网络的组件,无论是局域网还是因特网。这些程序的开发者都会用到某种音讯通信机制。有些人会应用某种音讯队列产品,而大多数人则会本人手工来做这些事,应用 TCP 或 UDP 协定。这些协定应用起来并不艰难,然而,简略地将音讯从 A 发给 B,和在任何状况下都能进行牢靠的音讯传输,这两种状况显然是不同的。
让咱们看看在应用纯 TCP 协定进行音讯传输时会遇到的一些典型问题。任何可复用的音讯传输层必定或多或少地会要解决以下问题:
- 如何解决 I /O?是让程序阻塞期待响应,还是在后盾解决这些事?这是软件设计的关键因素。阻塞式的 I / O 操作会让程序架构难以扩大,而后盾解决 I / O 也是比拟艰难的。
- 如何解决那些长期的、来去自由的组件?咱们是否要将组件分为客户端和服务端两种,并要求服务端永不隐没?那如果咱们想要将服务端相连怎么办?咱们要每隔几秒就进行重连吗?
- 咱们如何示意一条音讯?咱们怎么通过拆分音讯,让其变得易读易写,不必放心缓存溢出,既能高效地传输小音讯,又能胜任视频等大型文件的传输?
- 如何解决那些不能立即发送进来的音讯?比方咱们须要期待一个网络组件从新连贯的时候?咱们是间接抛弃该条音讯,还是将它存入数据库,或是内存中的一个队列?
- 要在哪里保留音讯队列?如果某个组件读取音讯队列的速度很慢,造成音讯的沉积怎么办?咱们要采取什么样的策略?
- 如何解决失落的音讯?咱们是期待新的数据,申请重发,还是须要建设一套新的可靠性机制以保障音讯不会失落?如果这个机制本身解体了呢?
- 如果咱们想换一种网络连接协定,如用播送代替 TCP 单播?或者改用 IPv6?咱们是否须要重写所有的应用程序,或者将这种协定形象到一个独自的层中?
- 咱们如何对音讯进行路由?咱们能够将音讯同时发送给多个节点吗?是否能将应答音讯返回给申请的发送方?
- 咱们如何为另一种语言写一个 API?咱们是否须要齐全重写某项协定,还是从新打包一个类库?
- 怎样才能做到在不同的架构之间传送音讯?是否须要为音讯规定一种编码?
- 咱们如何解决网络通信谬误?期待并重试,还是间接疏忽或勾销?
咱们能够找一个开源软件来做例子,如 Hadoop Zookeeper,看一下它的 C 语言 API 源码,src/c/src/zookeeper.c。这段代码大概有 3200 行,没有正文,实现了一个 C / S 网络通信协定。它工作起来很高效,因为应用了 poll()来代替 select()。然而,Zookeeper 应该被形象进去,作为一种通用的音讯通信层,并加以具体的正文。像这样的模块应该失去最大水平上的复用,而不是反复地制作轮子。
然而,如何编写这样一个可复用的音讯层呢?为什么长久以来人们宁愿在本人的代码中反复书写管制原始 TCP 套接字的代码,而不愿编写这样一个公共库呢?
其实,要编写一个通用的音讯层是件十分艰难的事,这也是为什么 FOSS 我的项目一直在尝试,一些商业化的音讯产品如此之简单、低廉、生硬、软弱。2006 年,iMatix 设计了 AMQP 协定,为 FOSS 我的项目的开发者提供了可能是过后第一个可复用的音讯零碎。[AMQP][]比其余同类产品要来得好,但依然是简单、低廉和软弱的。它须要破费几周的工夫去学习,破费数月的工夫去创立一个真正能用的架构,到那时可能为时已晚了。
大多数音讯零碎我的项目,如 AMQP,为了解决下面提到的种种问题,创造了一些新的概念,如“代理”的概念,将寻址、路由、队列等性能都蕴含了进来。后果就是在一个没有任何正文的协定之上,又构建了一个 C / S 协定和相应的 API,让应用程序和代理互相通信。代理确实是一个不错的解决方案,帮忙升高大型网络结构的复杂度。然而,在 Zookeeper 这样的我的项目中利用代理机制的音讯零碎,可能是件更加蹩脚的事,因为这象征了须要增加一台新的计算机,并形成一个新的单点故障。代理会逐步成为新的瓶颈,治理起来更具危险。如果软件反对,咱们能够增加第二个、第三个、第四个代理,形成某种冗余容错的模式。有人就是这么做的,这让零碎架构变得更为简单,减少了隐患。
在这种以代理为核心的架构下,须要一支专门的运维团队。你须要昼夜不停地察看代理的状态,不断地用棍棒调教他们。你须要增加计算机,以及更多的备份机,你须要有专人治理这些机器。这样做只对那些大型的网络应用程序才有意义,因为他们有更多可挪动的模块,有多个团队进行开发和保护,而且曾经通过了多年的建设。
这样一来,中小应用程序的开发者们就机关用尽了。他们只能设法防止编写网络应用程序,转而编写那些不须要扩大的程序;或者能够应用原始的形式进行网络编程,但编写的软件会十分软弱和简单,难以保护;亦或者他们抉择一种音讯通信产品,尽管可能开发出扩展性强的应用程序,但须要领取昂扬的代价。仿佛没有一种抉择是正当的,这也是为什么在上个世纪音讯零碎会成为一个宽泛的问题。
咱们真正须要的是这样一种音讯软件,它可能做大型音讯软件所能做的所有,但应用起来又非常简单,老本很低,能够用到所有的应用程序中,没有任何依赖条件。因为没有了额定的模块,就升高了出错的概率。这种软件须要可能在所有的操作系统上运行,并能反对所有的编程语言。
ZMQ 就是这样一种软件:它高效,提供了嵌入式的类库,使应用程序可能很好地在网络中扩大,老本低廉。
ZMQ 的次要特点有:
- ZMQ 会在后盾线程异步地解决 I / O 操作,它应用一种不会死锁的数据结构来存储音讯。
- 网络组件能够来去自如,ZMQ 会负责主动重连,这就意味着你能够以任何程序启动组件;用它创立的面向服务架构(SOA)中,服务端能够随便地退出或退出网络。
- ZMQ 会在有必要的状况下主动将音讯放入队列中保留,一旦建设了连贯就开始发送。
- ZMQ 有阈值(HWM)的机制,能够防止音讯溢出。当队列已满,ZMQ 会主动阻塞发送者,或抛弃局部音讯,这些行为取决于你所应用的音讯模式。
- ZMQ 能够让你用不同的通信协议进行连贯,如 TCP、播送、过程内、过程间。扭转通信协议时你不须要去批改代码。
- ZMQ 会失当地处理速度较慢的节点,会依据音讯模式应用不同的策略。
- ZMQ 提供了多种模式进行音讯路由,如申请 - 应答模式、公布 - 订阅模式等。这些模式能够用来搭建网络拓扑构造。
- ZMQ 中能够依据音讯模式建设起一些两头安装(很玲珑),能够用来升高网络的复杂程度。
- ZMQ 会发送整个音讯,应用音讯帧的机制来传递。如果你发送了 10KB 大小的音讯,你就会收到 10KB 大小的音讯。
- ZMQ 不强制应用某种音讯格局,音讯能够是 0 字节的,或是大到 GB 级的数据。当你示意这些音讯时,能够选用诸如谷歌的 protocol buffers,XDR 等序列化产品。
- ZMQ 可能智能地解决网络谬误,有时它会进行重试,有时会告知你某项操作产生了谬误。
- ZMQ 甚至能够升高对环境的净化,因为节俭了 CPU 工夫意味着节俭了电能。
其实 ZMQ 能够做的还不止这些,它会颠覆人们编写网络应用程序的模式。尽管从外表上看,它不过是提供了一套解决套接字的 API,可能用 zmq_recv()和 zmq_send()进行音讯的收发,然而,音讯解决将成为应用程序的外围局部,很快你的程序就会变成一个个音讯解决模块,这既好看又天然。它的扩展性还很强,每项工作由一个节点(节点是一个线程)、同一台机器上的两个节点(节点是一个过程)、同一网络上的两台机器(节点是一台机器)来解决,而不须要改变应用程序。
套接字的扩展性
咱们来用实例看看 ZMQ 套接字的扩展性。这个脚本会启动气象信息服务及多个客户端:
wuserver &
wuclient 12345 &
wuclient 23456 &
wuclient 34567 &
wuclient 45678 &
wuclient 56789 &
执行过程中,咱们能够通过 top 命令查看过程状态(以下是一台四核机器的状况):
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
7136 ph 20 0 1040m 959m 1156 R 157 12.0 16:25.47 wuserver
7966 ph 20 0 98608 1804 1372 S 33 0.0 0:03.94 wuclient
7963 ph 20 0 33116 1748 1372 S 14 0.0 0:00.76 wuclient
7965 ph 20 0 33116 1784 1372 S 6 0.0 0:00.47 wuclient
7964 ph 20 0 33116 1788 1372 S 5 0.0 0:00.25 wuclient
7967 ph 20 0 33072 1740 1372 S 5 0.0 0:00.35 wuclient
咱们想想当初产生了什么:气象信息服务程序有一个独自的套接字,却能同时向五个客户端并行地发送音讯。咱们能够有成千盈百个客户端并行地运作,服务端看不到这些客户端,不能操纵它们。
如果解决失落音讯的问题
在编写 ZMQ 应用程序时,你遇到最多的问题可能是无奈取得音讯。上面有一个问题解决路线图,列举了最根本的出错起因。不必放心其中的某些术语你没有见过,在前面的几章里都会讲到。
如果 ZMQ 在你的应用程序中表演十分重要的角色,那你可能就须要好好打算一下了。首先,创立一个原型,用以测试设计方案的可行性。采取一些压力测试的伎俩,确保它足够的强壮。其次,主攻测试代码,也就是编写测试框架,保障有足够的电力供应和工夫,来进行高强度的测试。现实状态下,应该由一个团队编写程序,另一个团队负责击垮它。最初,让你的公司及时分割 iMatix,取得技术上的反对。
简而言之,如果你没有足够理由阐明设计进去的架构可能在事实环境中运行,那么很有可能它就会在最紧要的关头解体。
正告:你的想法可能会被颠覆!
传统网络编程的一个规定是套接字只能和一个节点建设连贯。尽管也有播送的协定,但毕竟是第三方的。当咱们认定“一个套接字 = 一个连贯”的时候,咱们会用一些特定的形式来扩大应用程序架构:咱们为每一块逻辑创立线程,该线程独立地保护一个套接字。
但在 ZMQ 的世界里,套接字是智能的、多线程的,可能主动地保护一组残缺的连贯。你无奈看到它们,甚至不能间接操纵这些连贯。当你进行音讯的收发、轮询等操作时,只能和 ZMQ 套接字打交道,而不是连贯自身。所以说,ZMQ 世界里的连贯是公有的,不对外部凋谢,这也是 ZMQ 易于扩大的起因之一。
因为你的代码只会和某个套接字进行通信,这样就能够解决任意多个连贯,应用任意一种网络协议。而 ZMQ 的音讯模式又能够进行更为便宜和便捷的扩大。
这样一来,传统的思维就无奈在 ZMQ 的世界里利用了。在你浏览示例程序代码的时候,兴许你脑子里会千方百计地将这些代码和传统的网络编程相关联:当你读到“套接字”的时候,会认为它就示意与另一个节点的连贯——这种想法是谬误的;当你读到“线程”时,会认为它是与另一个节点的连贯——这也是谬误的。
如果你是第一次浏览本指南,应用 ZMQ 进行了一两天的开发(或者更长),可能会感觉纳闷,ZMQ 怎么会让事件便得如此简略。你再次尝试用以往的思维去了解 ZMQ,但又无功而返。最初,你会被 ZMQ 的理念所折服,拨云见雾,开始享受 ZMQ 带来的乐趣。