关于zmq:ZMQ-指南第四章-可靠的请求应答模式
牢靠的申请-应答模式第三章中咱们应用实例介绍了高级申请-应答模式,本章咱们会讲述申请-应答模式的可靠性问题,并应用ZMQ提供的套接字类型组建起牢靠的申请-应答音讯零碎。 本章将介绍的内容有: 客户端申请-应答最近起码应用队列心跳机制面向服务的队列基于磁盘(脱机)队列主从备份服务无中间件的申请-应答什么是可靠性?要给可靠性下定义,咱们能够先界定它的相同面——故障。如果咱们能够解决某些类型的故障,那么咱们的模型对于这些故障就是牢靠的。上面咱们就来列举分布式ZMQ应用程序中可能产生的问题,从可能性高的故障开始: 利用程序代码是最大的故障起源。程序会解体或停止,进行对数据起源的响应,或是响应得太慢,耗尽内存等。零碎代码,如应用ZMQ编写的中间件,也会意外停止。零碎代码应该要比利用程序代码更为牢靠,但毕竟也有可能解体。特地是当零碎代码与速度过慢的客户端交互时,很容易耗尽内存。音讯队列溢出,典型的状况是零碎代码中没有对蛮客户端做踊跃的解决,任由音讯队列溢出。网络长期中断,造成音讯失落。这类谬误ZMQ应用程序是无奈及时发现的,因为ZMQ会主动进行重连。硬件零碎解体,导致所有过程停止。网络会呈现非凡情景的中断,如交换机的某个端口产生故障,导致局部网络无法访问。数据中心可能蒙受雷击、地震、火灾、电压过载、冷却系统生效等。想要让软件系统躲避上述所有的危险,须要大量的人力物力,故不在本指南的探讨范畴之内。 因为前五个故障类型涵盖了99.9%的情景(这一数据源自我近期进行的一项钻研),所以咱们会深入探讨。如果你的公司大到足以思考最初两种情景,那请及时分割我,因为我正愁没钱将我家后院的大坑建成游泳池。 可靠性设计简略地来说,可靠性就是当程序产生故障时也能顺利地运行上来,这要比搭建一个音讯零碎来得艰难得多。咱们会依据ZMQ提供的每一种外围音讯模式,来看看如何保障代码的继续运行。 申请-应答模式:当服务端在解决申请是中断,客户端可能得悉这一信息,并进行接管音讯,转而抉择期待重试、申请另一服务端等操作。这里咱们暂不探讨客户端产生问题的情景。公布-订阅模式:如果客户端收到一些音讯后意外停止,服务端是不晓得这一状况的。公布-订阅模式中的订阅者不会返回任何音讯给发布者。然而,订阅者能够通过其余形式分割服务端,如申请-应答模式,要求服务端重发消息。这里咱们暂不探讨服务端产生问题的情景。此外,订阅者能够通过某些形式查看本身是否运行得过慢,并采取相应措施(向操作者收回正告、停止等)。管道模式:如果worker意外终止,工作散发器将无从得悉。管道模式和公布-订阅模式相似,只朝一个方向发送音讯。然而,上游的后果收集器能够检测哪项工作没有实现,并通知工作散发器重新分配该工作。如果工作散发器或后果收集器意外停止了,那客户端收回的申请只能另作解决。所以说,零碎代码真的要缩小出错的几率,因为这很难解决。本章次要解说申请-应答模式中的可靠性设计,其余模式将在后续章节中解说。 最根本的申请应答模式是REQ客户端发送一个同步的申请至REP服务端,这种模式的可靠性很低。如果服务端在解决申请时停止,那客户端会永远处于期待状态。 相比TCP协定,ZMQ提供了主动重连机制、音讯散发的负载平衡等。然而,在实在环境中这也是不够的。惟一能够齐全信赖根本申请-应答模式的利用场景是同一过程的两个线程之间进行通信,没有网络问题或服务器生效的状况。 然而,只有稍加润饰,这种根本的申请-应答模式就能很好地在事实环境中工作了。我喜爱将其称为“海盗”模式。 粗略地讲,客户端连贯服务端有三种形式,每种形式都须要不同的可靠性设计: 多个客户端间接和单个服务端进行通信。应用场景:只有一个单点服务器,所有客户端都须要和它通信。需解决的故障:服务器解体和重启;网络连接中断。多个客户端和单个队列安装通信,该安装将申请分发给多个服务端。应用场景:工作散发。需解决的故障:worker解体和重启,死循环,过载;队列安装解体和重启;网络中断。多个客户端间接和多个服务端通信,无中间件。应用场景:相似域名解析的分布式服务。需解决的故障:服务端解体和重启,死循环,过载;网络连接中断。以上每种设计都必须有所取舍,很多时候会混合应用。上面咱们具体阐明。 客户端的可靠性设计(懈怠海盗模式)咱们能够通过在客户端进行简略的设置,来实现牢靠的申请-应答模式。我暂且称之为“懈怠的海盗”(Lazy Pirate)模式。 在接管应答时,咱们不进行同步期待,而是做以下操作: 对REQ套接字进行轮询,当音讯到达时才进行接管;申请超时后重发消息,循环屡次;若仍无音讯,则完结以后事务。 应用REQ套接字时必须严格遵守发送-接管过程,因为它外部采纳了一个无限状态机来限定状态,这一个性会让咱们利用“海盗”模式时遇上一些麻烦。最简略的做法是将REQ套接字敞开重启,从而突破这一限定。 lpclient: Lazy Pirate client in C //// Lazy Pirate client// 应用zmq_poll轮询来实现平安的申请-应答// 运行时可随机敞开或重启lpserver程序//#include "czmq.h" #define REQUEST_TIMEOUT 2500 // 毫秒, (> 1000!)#define REQUEST_RETRIES 3 // 尝试次数#define SERVER_ENDPOINT "tcp://localhost:5555" int main (void){ zctx_t *ctx = zctx_new (); printf ("I: 正在连接服务器...\n"); void *client = zsocket_new (ctx, ZMQ_REQ); assert (client); zsocket_connect (client, SERVER_ENDPOINT); int sequence = 0; int retries_left = REQUEST_RETRIES; while (retries_left && !zctx_interrupted) { // 发送一个申请,并开始接管音讯 char request [10]; sprintf (request, "%d", ++sequence); zstr_send (client, request); int expect_reply = 1; while (expect_reply) { // 对套接字进行轮询,并设置超时工夫 zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC); if (rc == -1) break; // 中断 // 如果接管到回复则进行解决 if (items [0].revents & ZMQ_POLLIN) { // 收到服务器应答,必须和申请时的序号统一 char *reply = zstr_recv (client); if (!reply) break; // Interrupted if (atoi (reply) == sequence) { printf ("I: 服务器返回失常 (%s)\n", reply); retries_left = REQUEST_RETRIES; expect_reply = 0; } else printf ("E: 服务器返回异样: %s\n", reply); free (reply); } else if (--retries_left == 0) { printf ("E: 服务器不可用,勾销操作\n"); break; } else { printf ("W: 服务器没有响应,正在重试...\n"); // 敞开旧套接字,并建设新套接字 zsocket_destroy (ctx, client); printf ("I: 服务器重连中...\n"); client = zsocket_new (ctx, ZMQ_REQ); zsocket_connect (client, SERVER_ENDPOINT); // 应用新套接字再次发送申请 zstr_send (client, request); } } } zctx_destroy (&ctx); return 0;}lpserver: Lazy Pirate server in C ...