乐趣区

关于zmq:ZMQ-指南第四章-可靠的请求应答模式

牢靠的申请 - 应答模式

第三章中咱们应用实例介绍了高级申请 - 应答模式,本章咱们会讲述申请 - 应答模式的可靠性问题,并应用 ZMQ 提供的套接字类型组建起牢靠的申请 - 应答音讯零碎。

本章将介绍的内容有:

  • 客户端申请 - 应答
  • 最近起码应用队列
  • 心跳机制
  • 面向服务的队列
  • 基于磁盘(脱机)队列
  • 主从备份服务
  • 无中间件的申请 - 应答

什么是可靠性?

要给可靠性下定义,咱们能够先界定它的相同面——故障。如果咱们能够解决某些类型的故障,那么咱们的模型对于这些故障就是牢靠的。上面咱们就来列举分布式 ZMQ 应用程序中可能产生的问题,从可能性高的故障开始:

  • 利用程序代码是最大的故障起源。程序会解体或停止,进行对数据起源的响应,或是响应得太慢,耗尽内存等。
  • 零碎代码,如应用 ZMQ 编写的中间件,也会意外停止。零碎代码应该要比利用程序代码更为牢靠,但毕竟也有可能解体。特地是当零碎代码与速度过慢的客户端交互时,很容易耗尽内存。
  • 音讯队列溢出,典型的状况是零碎代码中没有对蛮客户端做踊跃的解决,任由音讯队列溢出。
  • 网络长期中断,造成音讯失落。这类谬误 ZMQ 应用程序是无奈及时发现的,因为 ZMQ 会主动进行重连。
  • 硬件零碎解体,导致所有过程停止。
  • 网络会呈现非凡情景的中断,如交换机的某个端口产生故障,导致局部网络无法访问。
  • 数据中心可能蒙受雷击、地震、火灾、电压过载、冷却系统生效等。

想要让软件系统躲避上述所有的危险,须要大量的人力物力,故不在本指南的探讨范畴之内。

因为前五个故障类型涵盖了 99.9% 的情景(这一数据源自我近期进行的一项钻研),所以咱们会深入探讨。如果你的公司大到足以思考最初两种情景,那请及时分割我,因为我正愁没钱将我家后院的大坑建成游泳池。

可靠性设计

简略地来说,可靠性就是当程序产生故障时也能顺利地运行上来,这要比搭建一个音讯零碎来得艰难得多。咱们会依据 ZMQ 提供的每一种外围音讯模式,来看看如何保障代码的继续运行。

  • 申请 - 应答模式:当服务端在解决申请是中断,客户端可能得悉这一信息,并进行接管音讯,转而抉择期待重试、申请另一服务端等操作。这里咱们暂不探讨客户端产生问题的情景。
  • 公布 - 订阅模式:如果客户端收到一些音讯后意外停止,服务端是不晓得这一状况的。公布 - 订阅模式中的订阅者不会返回任何音讯给发布者。然而,订阅者能够通过其余形式分割服务端,如申请 - 应答模式,要求服务端重发消息。这里咱们暂不探讨服务端产生问题的情景。此外,订阅者能够通过某些形式查看本身是否运行得过慢,并采取相应措施(向操作者收回正告、停止等)。
  • 管道模式:如果 worker 意外终止,工作散发器将无从得悉。管道模式和公布 - 订阅模式相似,只朝一个方向发送音讯。然而,上游的后果收集器能够检测哪项工作没有实现,并通知工作散发器重新分配该工作。如果工作散发器或后果收集器意外停止了,那客户端收回的申请只能另作解决。所以说,零碎代码真的要缩小出错的几率,因为这很难解决。

本章次要解说申请 - 应答模式中的可靠性设计,其余模式将在后续章节中解说。

最根本的申请应答模式是 REQ 客户端发送一个同步的申请至 REP 服务端,这种模式的可靠性很低。如果服务端在解决申请时停止,那客户端会永远处于期待状态。

相比 TCP 协定,ZMQ 提供了主动重连机制、音讯散发的负载平衡等。然而,在实在环境中这也是不够的。惟一能够齐全信赖根本申请 - 应答模式的利用场景是同一过程的两个线程之间进行通信,没有网络问题或服务器生效的状况。

然而,只有稍加润饰,这种根本的申请 - 应答模式就能很好地在事实环境中工作了。我喜爱将其称为“海盗”模式。

粗略地讲,客户端连贯服务端有三种形式,每种形式都须要不同的可靠性设计:

  • 多个客户端间接和单个服务端进行通信。应用场景:只有一个单点服务器,所有客户端都须要和它通信。需解决的故障:服务器解体和重启;网络连接中断。
  • 多个客户端和单个队列安装通信,该安装将申请分发给多个服务端。应用场景:工作散发。需解决的故障:worker 解体和重启,死循环,过载;队列安装解体和重启;网络中断。
  • 多个客户端间接和多个服务端通信,无中间件。应用场景:相似域名解析的分布式服务。需解决的故障:服务端解体和重启,死循环,过载;网络连接中断。

以上每种设计都必须有所取舍,很多时候会混合应用。上面咱们具体阐明。

客户端的可靠性设计(懈怠海盗模式)

咱们能够通过在客户端进行简略的设置,来实现牢靠的申请 - 应答模式。我暂且称之为“懈怠的海盗”(Lazy Pirate)模式。

在接管应答时,咱们不进行同步期待,而是做以下操作:

  • 对 REQ 套接字进行轮询,当音讯到达时才进行接管;
  • 申请超时后重发消息,循环屡次;
  • 若仍无音讯,则完结以后事务。

应用 REQ 套接字时必须严格遵守发送 - 接管过程,因为它外部采纳了一个无限状态机来限定状态,这一个性会让咱们利用“海盗”模式时遇上一些麻烦。最简略的做法是将 REQ 套接字敞开重启,从而突破这一限定。

lpclient: Lazy Pirate client in C

//
//  Lazy Pirate client
//  应用 zmq_poll 轮询来实现平安的申请 - 应答
//  运行时可随机敞开或重启 lpserver 程序
//
#include "czmq.h"
 
#define REQUEST_TIMEOUT     2500    //  毫秒, (> 1000!)
#define REQUEST_RETRIES     3       //  尝试次数
#define SERVER_ENDPOINT     "tcp://localhost:5555"
 
int main (void)
{zctx_t *ctx = zctx_new ();
    printf ("I: 正在连接服务器...\n");
    void *client = zsocket_new (ctx, ZMQ_REQ);
    assert (client);
    zsocket_connect (client, SERVER_ENDPOINT);
 
    int sequence = 0;
    int retries_left = REQUEST_RETRIES;
    while (retries_left && !zctx_interrupted) {
        //  发送一个申请,并开始接管音讯
        char request [10];
        sprintf (request, "%d", ++sequence);
        zstr_send (client, request);
 
        int expect_reply = 1;
        while (expect_reply) {
            //  对套接字进行轮询,并设置超时工夫
            zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0} };
            int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  中断
 
            //  如果接管到回复则进行解决
            if (items [0].revents & ZMQ_POLLIN) {
                //  收到服务器应答,必须和申请时的序号统一
                char *reply = zstr_recv (client);
                if (!reply)
                    break;      //  Interrupted
                if (atoi (reply) == sequence) {printf ("I: 服务器返回失常 (%s)\n", reply);
                    retries_left = REQUEST_RETRIES;
                    expect_reply = 0;
                }
                else
                    printf ("E: 服务器返回异样: %s\n",
                        reply);
 
                free (reply);
            }
            else
            if (--retries_left == 0) {printf ("E: 服务器不可用,勾销操作 \n");
                break;
            }
            else {printf ("W: 服务器没有响应,正在重试...\n");
                //  敞开旧套接字,并建设新套接字
                zsocket_destroy (ctx, client);
                printf ("I: 服务器重连中...\n");
                client = zsocket_new (ctx, ZMQ_REQ);
                zsocket_connect (client, SERVER_ENDPOINT);
                //  应用新套接字再次发送申请
                zstr_send (client, request);
            }
        }
    }
    zctx_destroy (&ctx);
    return 0;
}

lpserver: Lazy Pirate server in C

//
//  Lazy Pirate server
//  将 REQ 套接字连贯至 tcp://*:5555
//  和 hwserver 程序相似,除了以下两点://   - 间接输入申请内容
//   - 随机地降慢运行速度,或停止程序,模仿解体
//
#include "zhelpers.h"
 
int main (void)
{srandom ((unsigned) time (NULL));
 
    void *context = zmq_init (1);
    void *server = zmq_socket (context, ZMQ_REP);
    zmq_bind (server, "tcp://*:5555");
 
    int cycles = 0;
    while (1) {char *request = s_recv (server);
        cycles++;
 
        //  循环几次后开始模仿各种故障
        if (cycles > 3 && randof (3) == 0) {printf ("I: 模拟程序解体 \n");
            break;
        }
        else
        if (cycles > 3 && randof (3) == 0) {printf ("I: 模仿 CPU 过载 \n");
            sleep (2);
        }
        printf ("I: 失常申请 (%s)\n", request);
        sleep (1);              //  耗时的处理过程
        s_send (server, request);
        free (request);
    }
    zmq_close (server);
    zmq_term (context);
    return 0;
}

运行这个测试用例时,能够关上两个控制台,服务端会随机产生故障,你能够看看客户端的反馈。服务端的典型输入如下:

I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash

客户端的输入是:

I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning

客户端为每次申请都加上了序列号,并查看收到的应答是否和序列号统一,以保障没有申请或应答失落,同一个应答收到屡次或乱序。多运行几次实例,看看是否真的可能解决问题。事实环境中你不须要应用到序列号,那只是为了证实这一形式是可行的。

客户端应用 REQ 套接字进行申请,并在产生问题时关上一个新的套接字来,绕过 REQ 强制的发送 / 接管过程。可能你会想用 DEALER 套接字,但这并不是一个好主见。首先,DEALER 并不会像 REQ 那样解决信封(如果你不晓得信封是什么,那更不能用 DEALER 了)。其次,你可能会取得你并不想失去的后果。

这一计划的优劣是:

  • 长处:简单明了,容易施行;
  • 长处:能够不便地利用到现有的客户端和服务端程序中;
  • 长处:ZMQ 有主动重连机制;
  • 毛病:单点服务产生故障时不能定位到新的可用服务。

根本的牢靠队列(简略海盗模式)

在第二种模式中,咱们应用一个队列安装来扩大上述的“懈怠的海盗”模式,使客户端可能通明地和多个服务端通信。这里的服务端能够定义为 worker。咱们能够从最根底的模型开始,分阶段施行这个计划。

在所有的海盗模式中,worker 是无状态的,或者说存在着一个咱们所不晓得的公共状态,如共享数据库。队列安装的存在意味着 worker 能够在 client 毫不知情的状况下随便进出。一个 worker 死亡后,会有另一个 worker 接替它的工作。这种拓扑后果十分简洁,但惟一的毛病是队列安装自身会难以保护,可能造成单点故障。

在第三章中,队列安装的根本算法是最近起码应用算法。那么,如果 worker 死亡或阻塞,咱们须要做些什么?答案是很少很少。咱们曾经在 client 中退出了重试的机制,所以,应用根本的 LRU 队列就能够运作得很好了。这种做法也合乎 ZMQ 的逻辑,所以咱们能够通过在点对点交互中插入一个简略的队列安装来扩大它:

咱们能够间接应用“懈怠的海盗”模式中的 client,以下是队列安装的代码:

spqueue: Simple Pirate queue in C

//
//  简略海盗队列
//  
//  这个安装和 LRU 队列完全一致,不存在任何可靠性机制,依附 client 的重试来保障安装的运行
//
#include "czmq.h"
 
#define LRU_READY   "\001"      //  音讯:worker 准备就绪
 
int main (void)
{
    //  筹备上下文和套接字
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    void *backend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5555");    //  client 端点
    zsocket_bind (backend,  "tcp://*:5556");    //  worker 端点
 
    //  寄存可用 worker 的队列
    zlist_t *workers = zlist_new ();
 
    while (1) {zmq_pollitem_t items [] = {{ backend,  0, ZMQ_POLLIN, 0},
            {frontend, 0, ZMQ_POLLIN, 0}
        };
        //  当有可用的 woker 时,轮询前端端点
        int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
        if (rc == -1)
            break;              //  中断
 
        //  解决后端端点的 worker 音讯
        if (items [0].revents & ZMQ_POLLIN) {
            //  应用 worker 的地址进行 LRU 排队
            zmsg_t *msg = zmsg_recv (backend);
            if (!msg)
                break;          //  中断
            zframe_t *address = zmsg_unwrap (msg);
            zlist_append (workers, address);
 
            //  如果音讯不是 READY,则转发给 client
            zframe_t *frame = zmsg_first (msg);
            if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
                zmsg_destroy (&msg);
            else
                zmsg_send (&msg, frontend);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  获取 client 申请,转发给第一个可用的 worker
            zmsg_t *msg = zmsg_recv (frontend);
            if (msg) {zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
                zmsg_send (&msg, backend);
            }
        }
    }
    //  程序运行完结,进行清理
    while (zlist_size (workers)) {zframe_t *frame = (zframe_t *) zlist_pop (workers);
        zframe_destroy (&frame);
    }
    zlist_destroy (&workers);
    zctx_destroy (&ctx);
    return 0;
}

以下是 worker 的代码,用到了“懈怠的海盗”服务,并将其调整为 LRU 模式(应用 REQ 套接字传递“已就绪”信号):

spworker: Simple Pirate worker in C

//
//  简略海盗模式 worker
//  
//  应用 REQ 套接字连贯 tcp://*:5556,应用 LRU 算法实现 worker
//
#include "czmq.h"
#define LRU_READY   "\001"      //  音讯:worker 已就绪
 
int main (void)
{zctx_t *ctx = zctx_new ();
    void *worker = zsocket_new (ctx, ZMQ_REQ);
 
    //  应用随机符号来指定套接字标识,不便追踪
    srandom ((unsigned) time (NULL));
    char identity [10];
    sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
    zmq_setsockopt (worker, ZMQ_IDENTITY, identity, strlen (identity));
    zsocket_connect (worker, "tcp://localhost:5556");
 
    //  通知代理 worker 已就绪
    printf ("I: (%s) worker 准备就绪 \n", identity);
    zframe_t *frame = zframe_new (LRU_READY, 1);
    zframe_send (&frame, worker, 0);
 
    int cycles = 0;
    while (1) {zmsg_t *msg = zmsg_recv (worker);
        if (!msg)
            break;              //  中断
 
        //  通过几轮循环后,模仿各种问题
        cycles++;
        if (cycles > 3 && randof (5) == 0) {printf ("I: (%s) 模仿解体 \n", identity);
            zmsg_destroy (&msg);
            break;
        }
        else
        if (cycles > 3 && randof (5) == 0) {printf ("I: (%s) 模仿 CPU 过载 \n", identity);
            sleep (3);
            if (zctx_interrupted)
                break;
        }
        printf ("I: (%s) 失常应答 \n", identity);
        sleep (1);              //  进行某些解决
        zmsg_send (&msg, worker);
    }
    zctx_destroy (&ctx);
    return 0;
}

运行上述事例,启动多个 worker,一个 client,以及一个队列安装,程序随便。你能够看到 worker 最终都会解体或死亡,client 则多次重试并最终放弃。安装从来不会进行,你能够任意重启 worker 和 client,这个模型能够和任意个 worker、client 交互。

强壮的牢靠队列(偏执海盗模式)

“简略海盗队列”模式工作得十分好,次要是因为它只是两个现有模式的结合体。不过,它也有一些毛病:

  • 该模式无奈解决队列的解体或重启。client 会进行重试,但 worker 不会重启。尽管 ZMQ 会主动重连 worker 的套接字,但对于新启动的队列安装来说,因为 worker 并没有发送“已就绪”的音讯,所以它相当于是不存在的。为了解决这一问题,咱们须要从队列发送心跳给 worker,这样 worker 就能晓得队列是否曾经死亡。
  • 队列没有检测 worker 是否曾经死亡,所以当 worker 在处于闲暇状态时死亡,队列安装只有在发送了某个申请之后才会将该 worker 从队列中移除。这时,client 什么都不能做,只能期待。这不是一个致命的问题,然而仍然是不够好的。所以,咱们须要从 worker 发送心跳给队列安装,从而让队列得悉 worker 什么时候沦亡。

咱们应用一个名为“偏执的海盗模式”来解决上述两个问题。

之前咱们应用 REQ 套接字作为 worker 的套接字类型,但在偏执海盗模式中咱们会改用 DEALER 套接字,从而使咱们可能任意地发送和承受音讯,而不是像 REQ 套接字那样必须实现发送 - 承受循环。而 DEALER 的毛病是咱们必须本人治理音讯信封。如果你不晓得信封是什么,那请浏览第三章。

咱们仍会应用懈怠海盗模式的 client,以下是偏执海盗的队列安装代码:

ppqueue: Paranoid Pirate queue in C

//
//  偏执海盗队列
//
#include "czmq.h"
 
#define HEARTBEAT_LIVENESS  3       //  心跳衰弱度,3- 5 是正当的
#define HEARTBEAT_INTERVAL  1000    //  单位:毫秒
 
//  偏执海盗协定的音讯代码
#define PPP_READY       "\001"      //  worker 已就绪
#define PPP_HEARTBEAT   "\002"      //  worker 心跳
 
 
//  应用以下构造示意 worker 队列中的一个无效的 worker
 
typedef struct {
    zframe_t *address;          //  worker 的地址
    char *identity;             //  可打印的套接字标识
    int64_t expiry;             //  过期工夫
} worker_t;
 
//  创立新的 worker
static worker_t *
s_worker_new (zframe_t *address)
{worker_t *self = (worker_t *) zmalloc (sizeof (worker_t));
    self->address = address;
    self->identity = zframe_strdup (address);
    self->expiry = zclock_time () + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
    return self;
}
 
//  销毁 worker 构造,包含标识
static void
s_worker_destroy (worker_t **self_p)
{assert (self_p);
    if (*self_p) {
        worker_t *self = *self_p;
        zframe_destroy (&self->address);
        free (self->identity);
        free (self);
        *self_p = NULL;
    }
}
 
//  worker 已就绪,将其移至列表开端
static void
s_worker_ready (worker_t *self, zlist_t *workers)
{worker_t *worker = (worker_t *) zlist_first (workers);
    while (worker) {if (streq (self->identity, worker->identity)) {zlist_remove (workers, worker);
            s_worker_destroy (&worker);
            break;
        }
        worker = (worker_t *) zlist_next (workers);
    }
    zlist_append (workers, self);
}
 
//  返回下一个可用的 worker 地址
static zframe_t *
s_workers_next (zlist_t *workers)
{worker_t *worker = zlist_pop (workers);
    assert (worker);
    zframe_t *frame = worker->address;
    worker->address = NULL;
    s_worker_destroy (&worker);
    return frame;
}
 
//  寻找并销毁已过期的 worker。//  因为列表中最旧的 worker 排在最前,所以当找到第一个未过期的 worker 时就进行。static void
s_workers_purge (zlist_t *workers)
{worker_t *worker = (worker_t *) zlist_first (workers);
    while (worker) {if (zclock_time () < worker->expiry)
            break;              //  worker 未过期,进行扫描
 
        zlist_remove (workers, worker);
        s_worker_destroy (&worker);
        worker = (worker_t *) zlist_first (workers);
    }
}
 
 
int main (void)
{zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    void *backend  = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5555");    //  client 端点
    zsocket_bind (backend,  "tcp://*:5556");    //  worker 端点
    //  List of available workers
    zlist_t *workers = zlist_new ();
 
    //  法则地发送心跳
    uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
 
    while (1) {zmq_pollitem_t items [] = {{ backend,  0, ZMQ_POLLIN, 0},
            {frontend, 0, ZMQ_POLLIN, 0}
        };
        //  当存在可用 worker 时轮询前端端点
        int rc = zmq_poll (items, zlist_size (workers)? 2: 1,
            HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
 
        //  解决后端 worker 申请
        if (items [0].revents & ZMQ_POLLIN) {
            //  应用 worker 地址进行 LRU 路由
            zmsg_t *msg = zmsg_recv (backend);
            if (!msg)
                break;          //  中断
 
            //  worker 的任何信号均示意其依然存活
            zframe_t *address = zmsg_unwrap (msg);
            worker_t *worker = s_worker_new (address);
            s_worker_ready (worker, workers);
 
            //  解决管制音讯,或者将应答转发给 client
            if (zmsg_size (msg) == 1) {zframe_t *frame = zmsg_first (msg);
                if (memcmp (zframe_data (frame), PPP_READY, 1)
                &&  memcmp (zframe_data (frame), PPP_HEARTBEAT, 1)) {printf ("E: invalid message from worker");
                    zmsg_dump (msg);
                }
                zmsg_destroy (&msg);
            }
            else
                zmsg_send (&msg, frontend);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  获取下一个 client 申请,交给下一个可用的 worker
            zmsg_t *msg = zmsg_recv (frontend);
            if (!msg)
                break;          //  中断
            zmsg_push (msg, s_workers_next (workers));
            zmsg_send (&msg, backend);
        }
 
        //  发送心跳给闲暇的 worker
        if (zclock_time () >= heartbeat_at) {worker_t *worker = (worker_t *) zlist_first (workers);
            while (worker) {
                zframe_send (&worker->address, backend,
                             ZFRAME_REUSE + ZFRAME_MORE);
                zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
                zframe_send (&frame, backend, 0);
                worker = (worker_t *) zlist_next (workers);
            }
            heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;}
        s_workers_purge (workers);
    }
 
    //  程序完结后进行清理
    while (zlist_size (workers)) {worker_t *worker = (worker_t *) zlist_pop (workers);
        s_worker_destroy (&worker);
    }
    zlist_destroy (&workers);
    zctx_destroy (&ctx);
    return 0;
}

该队列安装应用心跳机制扩大了 LRU 模式,看起来很简略,但要想出这个主见还挺难的。下文会更多地介绍心跳机制。

以下是偏执海盗的 worker 代码:

ppworker: Paranoid Pirate worker in C

//
//  偏执海盗 worker
//
#include "czmq.h"
 
#define HEARTBEAT_LIVENESS  3       //  正当值:3-5
#define HEARTBEAT_INTERVAL  1000    //  单位:毫秒
#define INTERVAL_INIT       1000    //  重试距离
#define INTERVAL_MAX       32000    //  回退算法最大值
 
//  偏执海盗标准的常量定义
#define PPP_READY       "\001"      //  音讯:worker 已就绪
#define PPP_HEARTBEAT   "\002"      //  音讯:worker 心跳
 
//  返回一个连贯至偏执海盗队列安装的套接字
 
static void *
s_worker_socket (zctx_t *ctx) {void *worker = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (worker, "tcp://localhost:5556");
 
    //  告知队列 worker 已准备就绪
    printf ("I: worker 已就绪 \n");
    zframe_t *frame = zframe_new (PPP_READY, 1);
    zframe_send (&frame, worker, 0);
 
    return worker;
}
 
int main (void)
{zctx_t *ctx = zctx_new ();
    void *worker = s_worker_socket (ctx);
 
    //  如果心跳衰弱度为零,则示意队列安装已死亡
    size_t liveness = HEARTBEAT_LIVENESS;
    size_t interval = INTERVAL_INIT;
 
    //  法则地发送心跳
    uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
 
    srandom ((unsigned) time (NULL));
    int cycles = 0;
    while (1) {zmq_pollitem_t items [] = {{ worker,  0, ZMQ_POLLIN, 0} };
        int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
 
        if (items [0].revents & ZMQ_POLLIN) {
            //  获取音讯
            //  - 3 段音讯,信封 + 内容,示意一个申请
            //  - 1 段音讯,示意心跳
            zmsg_t *msg = zmsg_recv (worker);
            if (!msg)
                break;          //  中断
 
            if (zmsg_size (msg) == 3) {
                //  若干词循环后模仿各种问题
                cycles++;
                if (cycles > 3 && randof (5) == 0) {printf ("I: 模仿解体 \n");
                    zmsg_destroy (&msg);
                    break;
                }
                else
                if (cycles > 3 && randof (5) == 0) {printf ("I: 模仿 CPU 过载 \n");
                    sleep (3);
                    if (zctx_interrupted)
                        break;
                }
                printf ("I: 失常应答 \n");
                zmsg_send (&msg, worker);
                liveness = HEARTBEAT_LIVENESS;
                sleep (1);              //  做一些解决工作
                if (zctx_interrupted)
                    break;
            }
            else
            if (zmsg_size (msg) == 1) {zframe_t *frame = zmsg_first (msg);
                if (memcmp (zframe_data (frame), PPP_HEARTBEAT, 1) == 0)
                    liveness = HEARTBEAT_LIVENESS;
                else {printf ("E: 非法音讯 \n");
                    zmsg_dump (msg);
                }
                zmsg_destroy (&msg);
            }
            else {printf ("E: 非法音讯 \n");
                zmsg_dump (msg);
            }
            interval = INTERVAL_INIT;
        }
        else
        if (--liveness == 0) {printf ("W: 心跳失败,无奈连贯队列安装 \n");
            printf ("W: %zd 毫秒后进行重连...\n", interval);
            zclock_sleep (interval);
 
            if (interval < INTERVAL_MAX)
                interval *= 2;
            zsocket_destroy (ctx, worker);
            worker = s_worker_socket (ctx);
            liveness = HEARTBEAT_LIVENESS;
        }
 
        //  适时发送心跳给队列
        if (zclock_time () > heartbeat_at) {heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
            printf ("I: worker 心跳 \n");
            zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
            zframe_send (&frame, worker, 0);
        }
    }
    zctx_destroy (&ctx);
    return 0;
}

几点阐明:

  • 代码中蕴含了几处失败模仿,和先前一样。这会让代码极难保护,所以当投入使用时,该当移除这些模仿代码。
  • 偏执海盗模式中队列的心跳有时会不失常,下文会讲述这一点。
  • worker 应用了一种相似于懈怠海盗 client 的重试机制,但有两点不同:1、回退算法设置;2、永不言弃。

尝试运行以下代码,跑通流程:

ppqueue &
for i in 1 2 3 4; do
    ppworker &
    sleep 1
done
lpclient &

你会看到 worker 一一解体,client 在屡次尝试后放弃。你能够进行并重启队列安装,client 和 worker 会相继重连,并正确地发送、解决和接管申请,程序不会凌乱。所以说,整个通信过程只有两种情景:交互胜利,或 client 最终放弃。

心跳

当我在写偏执海盗模式的示例时,大概花了五个小时的工夫来协调队列至 worker 的心跳,剩下的申请 - 应答链路只花了约 10 分钟的工夫。心跳机制在可靠性上带来的好处有时还不迭它所引发的问题。应用过程中很有可能会产生“虚伪故障”的状况,即节点误认为他们已失去连贯,因为心跳没有正确地发送。

在了解和施行心跳时,须要思考以下几点:

  • 心跳不是一种申请 - 应答,它们异步地在节点之间传递,任一节点都能够通过它来判断对方曾经死亡,并停止通信。
  • 如果某个节点应用长久套接字(即设定了套接字标识),意味着发送给它的心跳可能会堆砌,并在重连后一起收到。所以说,worker 不应该应用长久套接字。示例代码应用长久套接字是为了便于调试,而且代码中应用了随机的套接字标识,防止重用之前的标识。
  • 应用过程中,应先让心跳工作起来,再进行前面的音讯解决。你须要保障启动任一节点后,心跳都能正确地执行。进行并重启他们,模仿解冻、解体等状况来进行测试。
  • 当你的主循环应用了 zmq_poll(),则应该应用另一个计时器来触发心跳。不要应用主循环来管制心跳的发送,这回导致适量地发送心跳(阻塞网络),或是发送得太少(导致节点断开)。zhelpers 包提供了 s_clock()函数返回以后零碎工夫戳,单位是毫秒,能够用它来管制心跳的发送距离。C 代码如下:
// 法则地发送心跳
uint64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
while (1) {
    …
    zmq_poll (items, 1, HEARTBEAT_INTERVAL * 1000);
    …
    // 无论 zmq_poll 的行为是什么,都应用以下逻辑判断是否发送心跳
    if (s_clock () > heartbeat_at) {
        … 发送心跳给所有节点
        // 设置下一次心跳的工夫
        heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;}
}
  • 主循环应该应用心跳距离作为超时工夫。显然不能应用无超时工夫的设置,而短于心跳距离也只是节约循环次数而已。
  • 应用简略的追踪形式来进行追踪,如间接输入至控制台。这里有一些追踪的诀窍:应用 zmsg()函数打印套接字内容;对音讯进行编号,判断是否会有距离。
  • 在实在的应用程序中,心跳必须是能够配置的,并能和节点共同商定。有些节点须要高频心跳,如 10 毫秒,另一些节点则可能只须要 30 秒发送一次心跳即可。
  • 如果你要对不同的节点发送不同频率的心跳,那么 poll 的超时工夫应设置为最短的心跳距离。
  • 兴许你会想要用一个独自的套接字来解决心跳,这看起来很棒,能够将同步的申请 - 应答和异步的心跳隔离开来。然而,这个主见并不好,起因有几点:首先、发送数据时其实是不须要发送心跳的;其次、套接字可能会因为网络问题而阻塞,你须要设法晓得用于发送数据的套接字进行响应的起因是死亡了还是过于忙碌而已,这样你就须要对这个套接字进行心跳。最初,解决两个套接字要比解决一个简单得多。
  • 咱们没有设置 client 至队列的心跳,因为这太过简单了,而且没有太大价值。

约定和协定

兴许你曾经留神到,因为心跳机制,偏执海盗模式和简略海盗模式是不兼容的。

其实,这里咱们须要写一个协定。兴许在试验阶段是不须要协定的,但这在实在的应用程序中是有必要。如果咱们想用其余语言来写 worker 怎么办?咱们是否须要通过源代码来查看通信过程?如果咱们想扭转协定怎么办?标准可能很简略,但并不显然。越是胜利的协定,就会越为简单。

一个不足约定的应用程序肯定是不可复用的,所以让咱们来为这个协定写一个标准,怎么做呢?

  • 位于 rfc.zeromq.org 的 wiki 页上,咱们顺便设置了一个用于寄存 ZMQ 协定的页面。

    • 要创立一个新的协定,你须要注册并依照领导进行。过程很间接,但并不一定所有人都能撰写技术性文档。

我大概花了 15 分钟的工夫草拟海盗模式标准(PPP),麻雀虽小,但五脏俱全。

要用 PPP 协定进行实在环境下的编程,你还须要:

  • 在 READY 命令中退出版本号,这样就能再日后平安地新增 PPP 版本号。
  • 目前,READY 和 HEARTBEAT 信号并没有指定其来源于申请还是应答。要辨别他们,须要新建一个音讯构造,其中蕴含“音讯类型”这一信息。

面向服务的牢靠队列(管家模式)

世上的事物往往瞬息万变,正当咱们期待有更好的协定来解决上一节的问题时,曾经有人制订好了:

  • http://rfc.zeromq.org/spec:7

这份协定只有一页,它将 PPP 协定变得更为坚硬。咱们在设计简单架构时应该这样做:首先写下约定,再用软件去实现它。

管家模式协定(MDP)在扩大 PPP 协定时引入了一个乏味的个性:client 发送的每一个申请都有一个“服务名称”,而 worker 在像队列安装注册时须要告知本人的服务类型。MDP 的劣势在于它来源于事实编程,协定简略,且容易晋升。

引入“服务名称”的机制,是对偏执海盗队列的一个简略补充,而后果是让其成为一个面向服务的代理。

在施行管家模式之前,咱们须要为 client 和 worker 编写一个框架。如果程序员能够通过简略的 API 来实现这种模式,那就没有必要让他们去理解管家模式的协定内容和实现办法了。
所以,咱们第一个协定(即管家模式协定)定义了分布式架构中节点是如何相互交互的,第二个协定则要定义应用程序应该如何通过框架来应用这一协定。
管家模式有两个端点,客户端和服务端。因为咱们要为 client 和 worker 都撰写框架,所以就须要提供两套 API。以下是用简略的面向对象办法设计的 client 端 API 雏形,应用的是 C 语言的 ZFL library。

mdcli_t *mdcli_new     (char *broker);
void     mdcli_destroy (mdcli_t **self_p);
zmsg_t  *mdcli_send    (mdcli_t *self, char *service, zmsg_t **request_p);

就这么简略。咱们创立了一个会话来和代理通信,发送并接管一个申请,最初敞开连贯。以下是 worker 端 API 的雏形。

mdwrk_t *mdwrk_new     (char *broker,char *service);
void     mdwrk_destroy (mdwrk_t **self_p);
zmsg_t  *mdwrk_recv    (mdwrk_t *self, zmsg_t *reply);

下面两段代码看起来差不多,然而 worker 端 API 略有不同。worker 第一次执行 recv()后会传递一个空的应答,之后才传递以后的应答,并取得新的申请。

两段的 API 都很容易开发,只需在偏执海盗模式代码的根底上批改即可。以下是 client API:

mdcliapi: Majordomo client API in C

/*  =====================================================================
    mdcliapi.c
 
    Majordomo Protocol Client API
    Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
 
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
 
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
 
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
 
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
 
#include "mdcliapi.h"
 
//  类构造
//  咱们会通过成员办法来拜访这些属性
 
struct _mdcli_t {
    zctx_t *ctx;                //  上下文
    char *broker;
    void *client;               //  连贯至代理的套接字
    int verbose;                //  应用规范输入打印以后流动
    int timeout;                //  申请超时工夫
    int retries;                //  申请重试次数
};
 
 
//  ---------------------------------------------------------------------
//  连贯或重连代理
 
void s_mdcli_connect_to_broker (mdcli_t *self)
{if (self->client)
        zsocket_destroy (self->ctx, self->client);
    self->client = zsocket_new (self->ctx, ZMQ_REQ);
    zmq_connect (self->client, self->broker);
    if (self->verbose)
        zclock_log ("I: 正在连接至代理 %s...", self->broker);
}
 
 
//  ---------------------------------------------------------------------
//  构造函数
 
mdcli_t *
mdcli_new (char *broker, int verbose)
{assert (broker);
 
    mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
    self->ctx = zctx_new ();
    self->broker = strdup (broker);
    self->verbose = verbose;
    self->timeout = 2500;           //  毫秒
    self->retries = 3;              //  尝试次数
 
    s_mdcli_connect_to_broker (self);
    return self;
}
 
 
//  ---------------------------------------------------------------------
//  析构函数
 
void
mdcli_destroy (mdcli_t **self_p)
{assert (self_p);
    if (*self_p) {
        mdcli_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self->broker);
        free (self);
        *self_p = NULL;
    }
}
 
 
//  ---------------------------------------------------------------------
//  设定申请超时工夫
 
void
mdcli_set_timeout (mdcli_t *self, int timeout)
{assert (self);
    self->timeout = timeout;
}
 
 
//  ---------------------------------------------------------------------
//  设定申请重试次数
 
void
mdcli_set_retries (mdcli_t *self, int retries)
{assert (self);
    self->retries = retries;
}
 
 
//  ---------------------------------------------------------------------
//  向代理发送申请,并尝试获取应答;//  对音讯放弃所有权,发送后销毁;//  返回应答音讯,或 NULL。zmsg_t *
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{assert (self);
    assert (request_p);
    zmsg_t *request = *request_p;
 
    //  用协定前缀包装音讯
    //  Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
    //  Frame 2: 服务名称 (可打印字符串)
    zmsg_pushstr (request, service);
    zmsg_pushstr (request, MDPC_CLIENT);
    if (self->verbose) {zclock_log ("I: 发送申请给'%s'服务:", service);
        zmsg_dump (request);
    }
 
    int retries_left = self->retries;
    while (retries_left && !zctx_interrupted) {zmsg_t *msg = zmsg_dup (request);
        zmsg_send (&msg, self->client);
 
        while (TRUE) {
            //  轮询套接字以接管应答,有超时工夫
            zmq_pollitem_t items [] = {{ self->client, 0, ZMQ_POLLIN, 0} };
            int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  中断
 
            //  收到应答后进行解决
            if (items [0].revents & ZMQ_POLLIN) {zmsg_t *msg = zmsg_recv (self->client);
                if (self->verbose) {zclock_log ("I: received reply:");
                    zmsg_dump (msg);
                }
                //  不要尝试处理错误,间接报错即可
                assert (zmsg_size (msg) >= 3);
 
                zframe_t *header = zmsg_pop (msg);
                assert (zframe_streq (header, MDPC_CLIENT));
                zframe_destroy (&header);
 
                zframe_t *reply_service = zmsg_pop (msg);
                assert (zframe_streq (reply_service, service));
                zframe_destroy (&reply_service);
 
                zmsg_destroy (&request);
                return msg;     //  胜利
            }
            else
            if (--retries_left) {if (self->verbose)
                    zclock_log ("W: no reply, reconnecting...");
                //  重连并重发消息
                s_mdcli_connect_to_broker (self);
                zmsg_t *msg = zmsg_dup (request);
                zmsg_send (&msg, self->client);
            }
            else {if (self->verbose)
                    zclock_log ("W: 产生严重错误,放弃重试。");
                break;          //  放弃
            }
        }
    }
    if (zctx_interrupted)
        printf ("W: 收到中断音讯,完结 client 过程...\n");
    zmsg_destroy (&request);
    return NULL;
}

以下测试程序会执行 10 万次申请应答:

mdclient: Majordomo client application in C

//
//  管家模式协定 - 客户端示例
//  应用 mdcli API 暗藏管家模式协定的外部实现
//
 
//  让咱们间接编译这段代码,不生成类库
#include "mdcliapi.c"
 
int main (int argc, char *argv [])
{int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
 
    int count;
    for (count = 0; count < 100000; count++) {zmsg_t *request = zmsg_new ();
        zmsg_pushstr (request, "Hello world");
        zmsg_t *reply = mdcli_send (session, "echo", &request);
        if (reply)
            zmsg_destroy (&reply);
        else
            break;              //  中断或进行
    }
    printf ("已解决 %d 次申请 - 应答 \n", count);
    mdcli_destroy (&session);
    return 0;
}

上面是 worker 的 API:

mdwrkapi: Majordomo worker API in C

/*  =====================================================================
    mdwrkapi.c
 
    Majordomo Protocol Worker API
    Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
 
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
 
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
 
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
 
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
 
#include "mdwrkapi.h"
 
//  可靠性参数
#define HEARTBEAT_LIVENESS  3       //  正当值:3-5
 
//  类构造
//  应用成员函数拜访属性
 
struct _mdwrk_t {
    zctx_t *ctx;                //  上下文
    char *broker;
    char *service;
    void *worker;               //  连贯至代理的套接字
    int verbose;                //  应用规范输入打印流动
 
    //  心跳设置
    uint64_t heartbeat_at;      //  发送心跳的工夫
    size_t liveness;            //  尝试次数
    int heartbeat;              //  心跳延时,单位:毫秒
    int reconnect;              //  重连延时,单位:毫秒
 
    //  外部状态
    int expect_reply;           //  初始值为 0
 
    //  应答地址,如果存在的话
    zframe_t *reply_to;
};
 
 
//  ---------------------------------------------------------------------
//  发送音讯给代理
//  如果没有提供音讯,则外部创立一个
 
static void
s_mdwrk_send_to_broker (mdwrk_t *self, char *command, char *option,
                        zmsg_t *msg)
{msg = msg? zmsg_dup (msg): zmsg_new ();
 
    //  将协定信封压入音讯顶部
    if (option)
        zmsg_pushstr (msg, option);
    zmsg_pushstr (msg, command);
    zmsg_pushstr (msg, MDPW_WORKER);
    zmsg_pushstr (msg, "");
 
    if (self->verbose) {
        zclock_log ("I: sending %s to broker",
            mdps_commands [(int) *command]);
        zmsg_dump (msg);
    }
    zmsg_send (&msg, self->worker);
}
 
 
//  ---------------------------------------------------------------------
//  连贯或重连代理
 
void s_mdwrk_connect_to_broker (mdwrk_t *self)
{if (self->worker)
        zsocket_destroy (self->ctx, self->worker);
    self->worker = zsocket_new (self->ctx, ZMQ_DEALER);
    zmq_connect (self->worker, self->broker);
    if (self->verbose)
        zclock_log ("I: 正在连接代理 %s...", self->broker);
 
    //  向代理注册服务类型
    s_mdwrk_send_to_broker (self, MDPW_READY, self->service, NULL);
 
    //  当心跳衰弱度为零,示意代理已断开连接
    self->liveness = HEARTBEAT_LIVENESS;
    self->heartbeat_at = zclock_time () + self->heartbeat;}
 
 
//  ---------------------------------------------------------------------
//  构造函数
 
mdwrk_t *
mdwrk_new (char *broker,char *service, int verbose)
{assert (broker);
    assert (service);
 
    mdwrk_t *self = (mdwrk_t *) zmalloc (sizeof (mdwrk_t));
    self->ctx = zctx_new ();
    self->broker = strdup (broker);
    self->service = strdup (service);
    self->verbose = verbose;
    self->heartbeat = 2500;     //  毫秒
    self->reconnect = 2500;     //  毫秒
 
    s_mdwrk_connect_to_broker (self);
    return self;
}
 
 
//  ---------------------------------------------------------------------
//  析构函数
 
void
mdwrk_destroy (mdwrk_t **self_p)
{assert (self_p);
    if (*self_p) {
        mdwrk_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self->broker);
        free (self->service);
        free (self);
        *self_p = NULL;
    }
}
 
 
//  ---------------------------------------------------------------------
//  设置心跳提早
 
void
mdwrk_set_heartbeat (mdwrk_t *self, int heartbeat)
{self->heartbeat = heartbeat;}
 
 
//  ---------------------------------------------------------------------
//  设置重连提早
 
void
mdwrk_set_reconnect (mdwrk_t *self, int reconnect)
{self->reconnect = reconnect;}
 
 
//  ---------------------------------------------------------------------
//  若有应答则发送给代理,并期待新的申请
 
zmsg_t *
mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p)
{
    //  格式化并发送申请传入的应答
    assert (reply_p);
    zmsg_t *reply = *reply_p;
    assert (reply || !self->expect_reply);
    if (reply) {assert (self->reply_to);
        zmsg_wrap (reply, self->reply_to);
        s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply);
        zmsg_destroy (reply_p);
    }
    self->expect_reply = 1;
 
    while (TRUE) {zmq_pollitem_t items [] = {{ self->worker,  0, ZMQ_POLLIN, 0} };
        int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
 
        if (items [0].revents & ZMQ_POLLIN) {zmsg_t *msg = zmsg_recv (self->worker);
            if (!msg)
                break;          //  中断
            if (self->verbose) {zclock_log ("I: 从代理处取得音讯:");
                zmsg_dump (msg);
            }
            self->liveness = HEARTBEAT_LIVENESS;
 
            //  不要处理错误,间接报错即可
            assert (zmsg_size (msg) >= 3);
 
            zframe_t *empty = zmsg_pop (msg);
            assert (zframe_streq (empty, ""));
            zframe_destroy (&empty);
 
            zframe_t *header = zmsg_pop (msg);
            assert (zframe_streq (header, MDPW_WORKER));
            zframe_destroy (&header);
 
            zframe_t *command = zmsg_pop (msg);
            if (zframe_streq (command, MDPW_REQUEST)) {
                //  这里须要将音讯中空帧之前的所有地址都保存起来,//  但在这里咱们临时只保留一个
                self->reply_to = zmsg_unwrap (msg);
                zframe_destroy (&command);
                return msg;     //  解决申请
            }
            else
            if (zframe_streq (command, MDPW_HEARTBEAT))
                ;               //  不对心跳做任何解决
            else
            if (zframe_streq (command, MDPW_DISCONNECT))
                s_mdwrk_connect_to_broker (self);
            else {zclock_log ("E: 音讯不非法");
                zmsg_dump (msg);
            }
            zframe_destroy (&command);
            zmsg_destroy (&msg);
        }
        else
        if (--self->liveness == 0) {if (self->verbose)
                zclock_log ("W: 失去与代理的连贯 - 正在重试...");
            zclock_sleep (self->reconnect);
            s_mdwrk_connect_to_broker (self);
        }
        //  适时地发送音讯
        if (zclock_time () > self->heartbeat_at) {s_mdwrk_send_to_broker (self, MDPW_HEARTBEAT, NULL, NULL);
            self->heartbeat_at = zclock_time () + self->heartbeat;}
    }
    if (zctx_interrupted)
        printf ("W: 收到中断音讯,停止 worker...\n");
    return NULL;
}

以下测试程序实现了名为 echo 的服务:

mdworker: Majordomo worker application in C

//
//  管家模式协定 - worker 示例
//  应用 mdwrk API 暗藏 MDP 协定的外部实现
//
 
//  让咱们间接编译代码,而不创立类库
#include "mdwrkapi.c"
 
int main (int argc, char *argv [])
{int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdwrk_t *session = mdwrk_new ("tcp://localhost:5555", "echo", verbose);
 
    zmsg_t *reply = NULL;
    while (1) {zmsg_t *request = mdwrk_recv (session, &reply);
        if (request == NULL)
            break;              //  worker 被停止
        reply = request;        //  echo 服务……其实很简单:)
    }
    mdwrk_destroy (&session);
    return 0;
}

几点阐明:

  • API 是单线程的,所以说 worker 不会再后盾发送心跳,而这也是咱们所冀望的:如果 worker 应用程序进行了,心跳就会跟着停止,代理便会进行向该 worker 发送新的申请。
  • wroker API 没有做回退算法的设置,因为这里不值得应用这一简单的机制。
  • API 没有提供任何报错机制,如果呈现问题,它会间接报断言(或异样,依语言而定)。这一做法对实验性的编程是有用的,这样能够立即看到执行后果。但在实在编程环境中,API 应该足够强壮,适合地解决非法音讯。

兴许你会问,worker API 为什么要敞开它的套接字并新开一个呢?特地是 ZMQ 是有重连机制的,可能在节点归来后进行重连。咱们能够回顾一下简略海盗模式中的 worker,以及偏执海盗模式中的 worker 来加以了解。ZMQ 的确会进行主动重连,但如果代理死亡并重连,worker 并不会从新进行注册。这个问题有两种解决方案:一是咱们这里用到的较为简便的计划,即当 worker 判断代理曾经死亡时,敞开它的套接字并重头来过;另一个计划是当代理收到未知 worker 的心跳时要求该 worker 对其提供的服务类型进行注册,这样一来就须要在协定中阐明这一规定。

上面让咱们设计管家模式的代理,它的外围代码是一组队列,每种服务对应一个队列。咱们会在 worker 呈现时创立相应的队列(worker 隐没时应该销毁对应的队列,不过咱们这里临时不思考)。额定的,咱们会为每种服务保护一个 worker 的队列。

为了让 C 语言代码更为易读易写,我应用了 ZFL 我的项目提供的哈希和链表容器,并命名为 zhash 和 zlist。如果应用古代语言编写,那天然能够应用其内置的容器。

mdbroker: Majordomo broker in C

//
//  管家模式协定 - 代理
//  协定 http://rfc.zeromq.org/spec:7 和 spec:8 的最简实现
//
#include "czmq.h"
#include "mdp.h"
 
//  个别咱们会从配置文件中获取以下值
 
#define HEARTBEAT_LIVENESS  3       //  正当值:3-5
#define HEARTBEAT_INTERVAL  2500    //  单位:毫秒
#define HEARTBEAT_EXPIRY    HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
 
//  定义一个代理
typedef struct {
    zctx_t *ctx;                //  上下文
    void *socket;               //  用于连贯 client 和 worker 的套接字
    int verbose;                //  应用规范输入打印流动信息
    char *endpoint;             //  代理绑定到的端点
    zhash_t *services;          //  已知服务的哈希表
    zhash_t *workers;           //  已知 worker 的哈希表
    zlist_t *waiting;           //  正在期待的 worker 队列
    uint64_t heartbeat_at;      //  发送心跳的工夫
} broker_t;
 
//  定义一个服务
typedef struct {
    char *name;                 //  服务名称
    zlist_t *requests;          //  客户端申请队列
    zlist_t *waiting;           //  正在期待的 worker 队列
    size_t workers;             //  可用 worker 数
} service_t;
 
//  定义一个 worker,状态为闲暇或占用
typedef struct {
    char *identity;             //  worker 的标识
    zframe_t *address;          //  地址帧
    service_t *service;         //  所属服务
    int64_t expiry;             //  过期工夫,从未收到心跳起计时
} worker_t;
 
 
//  ---------------------------------------------------------------------
//  代理应用的函数
static broker_t *
    s_broker_new (int verbose);
static void
    s_broker_destroy (broker_t **self_p);
static void
    s_broker_bind (broker_t *self, char *endpoint);
static void
    s_broker_purge_workers (broker_t *self);
 
//  服务应用的函数
static service_t *
    s_service_require (broker_t *self, zframe_t *service_frame);
static void
    s_service_destroy (void *argument);
static void
    s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg);
static void
    s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg);
 
//  worker 应用的函数
static worker_t *
    s_worker_require (broker_t *self, zframe_t *address);
static void
    s_worker_delete (broker_t *self, worker_t *worker, int disconnect);
static void
    s_worker_destroy (void *argument);
static void
    s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
    s_worker_send (broker_t *self, worker_t *worker, char *command,
                   char *option, zmsg_t *msg);
static void
    s_worker_waiting (broker_t *self, worker_t *worker);
 
//  客户端应用的函数
static void
    s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
 
 
//  ---------------------------------------------------------------------
//  主程序
 
int main (int argc, char *argv [])
{int verbose = (argc > 1 && streq (argv [1], "-v"));
 
    broker_t *self = s_broker_new (verbose);
    s_broker_bind (self, "tcp://*:5555");
 
    //  承受并解决音讯,直至程序被停止
    while (TRUE) {zmq_pollitem_t items [] = {{ self->socket,  0, ZMQ_POLLIN, 0} };
        int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
 
        //  Process next input message, if any
        if (items [0].revents & ZMQ_POLLIN) {zmsg_t *msg = zmsg_recv (self->socket);
            if (!msg)
                break;          //  中断
            if (self->verbose) {zclock_log ("I: 收到音讯:");
                zmsg_dump (msg);
            }
            zframe_t *sender = zmsg_pop (msg);
            zframe_t *empty  = zmsg_pop (msg);
            zframe_t *header = zmsg_pop (msg);
 
            if (zframe_streq (header, MDPC_CLIENT))
                s_client_process (self, sender, msg);
            else
            if (zframe_streq (header, MDPW_WORKER))
                s_worker_process (self, sender, msg);
            else {zclock_log ("E: 非法音讯:");
                zmsg_dump (msg);
                zmsg_destroy (&msg);
            }
            zframe_destroy (&sender);
            zframe_destroy (&empty);
            zframe_destroy (&header);
        }
        //  断开并删除过期的 worker
        //  适时地发送心跳给 worker
        if (zclock_time () > self->heartbeat_at) {s_broker_purge_workers (self);
            worker_t *worker = (worker_t *) zlist_first (self->waiting);
            while (worker) {s_worker_send (self, worker, MDPW_HEARTBEAT, NULL, NULL);
                worker = (worker_t *) zlist_next (self->waiting);
            }
            self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;}
    }
    if (zctx_interrupted)
        printf ("W: 收到中断音讯,敞开中...\n");
 
    s_broker_destroy (&self);
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  代理对象的构造函数
 
static broker_t *
s_broker_new (int verbose)
{broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
 
    //  初始化代理状态
    self->ctx = zctx_new ();
    self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
    self->verbose = verbose;
    self->services = zhash_new ();
    self->workers = zhash_new ();
    self->waiting = zlist_new ();
    self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
    return self;
}
 
//  ---------------------------------------------------------------------
//  代理对象的析构函数
 
static void
s_broker_destroy (broker_t **self_p)
{assert (self_p);
    if (*self_p) {
        broker_t *self = *self_p;
        zctx_destroy (&self->ctx);
        zhash_destroy (&self->services);
        zhash_destroy (&self->workers);
        zlist_destroy (&self->waiting);
        free (self);
        *self_p = NULL;
    }
}
 
//  ---------------------------------------------------------------------
//  将代理套接字绑定至端点,能够反复调用该函数
//  咱们应用一个套接字来同时解决 client 和 worker
 
void
s_broker_bind (broker_t *self, char *endpoint)
{zsocket_bind (self->socket, endpoint);
    zclock_log ("I: MDP broker/0.1.1 is active at %s", endpoint);
}
 
//  ---------------------------------------------------------------------
//  删除闲暇状态中过期的 worker
 
static void
s_broker_purge_workers (broker_t *self)
{worker_t *worker = (worker_t *) zlist_first (self->waiting);
    while (worker) {if (zclock_time () < worker->expiry)
            continue;              //  该 worker 未过期,进行搜寻
        if (self->verbose)
            zclock_log ("I: 正在删除过期的 worker: %s",
                worker->identity);
 
        s_worker_delete (self, worker, 0);
        worker = (worker_t *) zlist_first (self->waiting);
    }
}
 
//  ---------------------------------------------------------------------
//  定位或创立新的服务项
 
static service_t *
s_service_require (broker_t *self, zframe_t *service_frame)
{assert (service_frame);
    char *name = zframe_strdup (service_frame);
 
    service_t *service =
        (service_t *) zhash_lookup (self->services, name);
    if (service == NULL) {service = (service_t *) zmalloc (sizeof (service_t));
        service->name = name;
        service->requests = zlist_new ();
        service->waiting = zlist_new ();
        zhash_insert (self->services, name, service);
        zhash_freefn (self->services, name, s_service_destroy);
        if (self->verbose)
            zclock_log ("I: 收到音讯:");
    }
    else
        free (name);
 
    return service;
}
 
//  ---------------------------------------------------------------------
//  当服务从 broker->services 中移除时销毁该服务对象
 
static void
s_service_destroy (void *argument)
{service_t *service = (service_t *) argument;
    //  销毁申请队列中的所有我的项目
    while (zlist_size (service->requests)) {zmsg_t *msg = zlist_pop (service->requests);
        zmsg_destroy (&msg);
    }
    zlist_destroy (&service->requests);
    zlist_destroy (&service->waiting);
    free (service->name);
    free (service);
}
 
//  ---------------------------------------------------------------------
//  可能时,散发申请给期待中的 worker
 
static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg)
{assert (service);
    if (msg)                    //  将音讯退出队列
        zlist_append (service->requests, msg);
 
    s_broker_purge_workers (self);
    while (zlist_size (service->waiting)
        && zlist_size (service->requests))
    {worker_t *worker = zlist_pop (service->waiting);
        zlist_remove (self->waiting, worker);
        zmsg_t *msg = zlist_pop (service->requests);
        s_worker_send (self, worker, MDPW_REQUEST, NULL, msg);
        zmsg_destroy (&msg);
    }
}
 
//  ---------------------------------------------------------------------
//  应用 8 /MMI 协定解决外部服务
 
static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg)
{
    char *return_code;
    if (zframe_streq (service_frame, "mmi.service")) {char *name = zframe_strdup (zmsg_last (msg));
        service_t *service =
            (service_t *) zhash_lookup (self->services, name);
        return_code = service && service->workers? "200": "404";
        free (name);
    }
    else
        return_code = "501";
 
    zframe_reset (zmsg_last (msg), return_code, strlen (return_code));
 
    //  移除并保留返回给 client 的信封,插入协定头信息和服务名称,并从新包装信封
    zframe_t *client = zmsg_unwrap (msg);
    zmsg_push (msg, zframe_dup (service_frame));
    zmsg_pushstr (msg, MDPC_CLIENT);
    zmsg_wrap (msg, client);
    zmsg_send (&msg, self->socket);
}
 
//  ---------------------------------------------------------------------
//  按需创立 worker
 
static worker_t *
s_worker_require (broker_t *self, zframe_t *address)
{assert (address);
 
    //  self->workers 应用 wroker 的标识为键
    char *identity = zframe_strhex (address);
    worker_t *worker =
        (worker_t *) zhash_lookup (self->workers, identity);
 
    if (worker == NULL) {worker = (worker_t *) zmalloc (sizeof (worker_t));
        worker->identity = identity;
        worker->address = zframe_dup (address);
        zhash_insert (self->workers, identity, worker);
        zhash_freefn (self->workers, identity, s_worker_destroy);
        if (self->verbose)
            zclock_log ("I: 正在注册新的 worker: %s", identity);
    }
    else
        free (identity);
    return worker;
}
 
//  ---------------------------------------------------------------------
//  从所有数据结构中删除 wroker,并销毁 worker 对象
 
static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect)
{assert (worker);
    if (disconnect)
        s_worker_send (self, worker, MDPW_DISCONNECT, NULL, NULL);
 
    if (worker->service) {zlist_remove (worker->service->waiting, worker);
        worker->service->workers--;
    }
    zlist_remove (self->waiting, worker);
    //  以下办法间接调用了 s_worker_destroy()办法
    zhash_delete (self->workers, worker->identity);
}
 
//  ---------------------------------------------------------------------
//  当 worker 从 broker->workers 中移除时,销毁 worker 对象
 
static void
s_worker_destroy (void *argument)
{worker_t *worker = (worker_t *) argument;
    zframe_destroy (&worker->address);
    free (worker->identity);
    free (worker);
}
 
//  ---------------------------------------------------------------------
//  解决 worker 发送来的音讯
 
static void
s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{assert (zmsg_size (msg) >= 1);     //  音讯中至多蕴含命令帧
 
    zframe_t *command = zmsg_pop (msg);
    char *identity = zframe_strhex (sender);
    int worker_ready = (zhash_lookup (self->workers, identity) != NULL);
    free (identity);
    worker_t *worker = s_worker_require (self, sender);
 
    if (zframe_streq (command, MDPW_READY)) {
        //  若 worker 队列中已有该 worker,但仍收到了它的“已就绪”音讯,则删除这个 worker。if (worker_ready)
            s_worker_delete (self, worker, 1);
        else
        if (zframe_size (sender) >= 4  //  服务名称为保留的服务
        &&  memcmp (zframe_data (sender), "mmi.", 4) == 0)
            s_worker_delete (self, worker, 1);
        else {
            //  将 worker 对应到服务,并置为闲暇状态
            zframe_t *service_frame = zmsg_pop (msg);
            worker->service = s_service_require (self, service_frame);
            worker->service->workers++;
            s_worker_waiting (self, worker);
            zframe_destroy (&service_frame);
        }
    }
    else
    if (zframe_streq (command, MDPW_REPLY)) {if (worker_ready) {
            //  移除并保留返回给 client 的信封,插入协定头信息和服务名称,并从新包装信封
            zframe_t *client = zmsg_unwrap (msg);
            zmsg_pushstr (msg, worker->service->name);
            zmsg_pushstr (msg, MDPC_CLIENT);
            zmsg_wrap (msg, client);
            zmsg_send (&msg, self->socket);
            s_worker_waiting (self, worker);
        }
        else
            s_worker_delete (self, worker, 1);
    }
    else
    if (zframe_streq (command, MDPW_HEARTBEAT)) {if (worker_ready)
            worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
        else
            s_worker_delete (self, worker, 1);
    }
    else
    if (zframe_streq (command, MDPW_DISCONNECT))
        s_worker_delete (self, worker, 0);
    else {zclock_log ("E: 非法音讯");
        zmsg_dump (msg);
    }
    free (command);
    zmsg_destroy (&msg);
}
 
//  ---------------------------------------------------------------------
//  发送音讯给 worker
//  如果指针指向了一条音讯,发送它,但不销毁它,因为这是调用者的工作
 
static void
s_worker_send (broker_t *self, worker_t *worker, char *command,
               char *option, zmsg_t *msg)
{msg = msg? zmsg_dup (msg): zmsg_new ();
 
    //  将协定信封压入音讯顶部
    if (option)
        zmsg_pushstr (msg, option);
    zmsg_pushstr (msg, command);
    zmsg_pushstr (msg, MDPW_WORKER);
 
    //  在音讯顶部插入路由帧
    zmsg_wrap (msg, zframe_dup (worker->address));
 
    if (self->verbose) {
        zclock_log ("I: 正在发送音讯给 worker %s",
            mdps_commands [(int) *command]);
        zmsg_dump (msg);
    }
    zmsg_send (&msg, self->socket);
}
 
//  ---------------------------------------------------------------------
//  正在期待的 worker
 
static void
s_worker_waiting (broker_t *self, worker_t *worker)
{
    //  将 worker 退出代理和服务的期待队列
    zlist_append (self->waiting, worker);
    zlist_append (worker->service->waiting, worker);
    worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
    s_service_dispatch (self, worker->service, NULL);
}
 
//  ---------------------------------------------------------------------
//  解决 client 发来的申请
 
static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{assert (zmsg_size (msg) >= 2);     //  服务名称 + 申请内容
 
    zframe_t *service_frame = zmsg_pop (msg);
    service_t *service = s_service_require (self, service_frame);
 
    //  为应答内容设置申请方的地址
    zmsg_wrap (msg, zframe_dup (sender));
    if (zframe_size (service_frame) >= 4
    &&  memcmp (zframe_data (service_frame), "mmi.", 4) == 0)
        s_service_internal (self, service_frame, msg);
    else
        s_service_dispatch (self, service, msg);
    zframe_destroy (&service_frame);
}

这个例子应该是咱们见过最简单的一个示例了,大概有 500 行代码。编写这段代码并让其变的强壮,大概破费了两天的工夫。然而,这也仅仅是一个残缺的面向服务代理的一部分。

几点阐明:

  • 管家模式协定要求咱们在一个套接字中同时解决 client 和 worker,这一点对部署和治理代理很有好处:它只会在一个 ZMQ 端点上收发申请,而不是两个。
  • 代理很好地实现了 MDP/0.1 协定中标准的内容,包含当代理发送非法命令和心跳时断开的机制。
  • 能够将这段代码扩充为多线程,每个线程治理一个套接字、一组 client 和 worker。这种做法在大型架构的拆分中显得很乏味。C 语言代码曾经是这样的格局了,因而很容易实现。
  • 还能够将这段代码扩充为主备模式、双在线模式,进一步提高可靠性。因为从实质上来说,代理是无状态的,只是保留了服务的存在与否,因而 client 和 worker 能够自行抉择除此之外的代理来进行通信。
  • 示例代码核心跳的距离为 5 秒,次要是为了缩小调试时的输入。事实中的值应该设得低一些,然而,重试的过程应该设置得稍长一些,让服务有足够的工夫启动,如 10 秒钟。

异步管家模式

上文那种实现管家模式的办法比较简单,client 还是简略海盗模式中的,仅仅是用 API 重写了一下。我在测试机上运行了程序,解决 10 万条申请大概须要 14 秒的工夫,这和代码也有一些关系,因为复制音讯帧的工夫节约了 CPU 解决工夫。但真正的问题在于,咱们总是一一循环进行解决(round-trip),即发送 - 接管 - 发送 - 接管……ZMQ 外部禁用了 TCP 发包优化算法(Nagle’s algorithm),但一一解决循环还是比拟节约。

实践归实践,还是须要由实际来测验。咱们用一个简略的测试程序来看看一一解决循环是否真的耗时。这个测试程序会发送一组音讯,第一次它发一条收一条,第二次则一起发送再一起接管。两次后果应该是一样的,但速度截然不同。

tripping: Round-trip demonstrator in C

//
//  Round-trip 模仿
//
//  本示例程序应用多线程的形式启动 client、worker、以及代理,//  当 client 处理完毕时会发送信号给主程序。//
#include "czmq.h"
 
static void
client_task (void *args, zctx_t *ctx, void *pipe)
{void *client = zsocket_new (ctx, ZMQ_DEALER);
    zmq_setsockopt (client, ZMQ_IDENTITY, "C", 1);
    zsocket_connect (client, "tcp://localhost:5555");
 
    printf ("开始测试...\n");
    zclock_sleep (100);
 
    int requests;
    int64_t start;
 
    printf ("同步 round-trip 测试...\n");
    start = zclock_time ();
    for (requests = 0; requests < 10000; requests++) {zstr_send (client, "hello");
        char *reply = zstr_recv (client);
        free (reply);
    }
    printf ("%d 次 / 秒 \n",
        (1000 * 10000) / (int) (zclock_time () - start));
 
    printf ("异步 round-trip 测试...\n");
    start = zclock_time ();
    for (requests = 0; requests < 100000; requests++)
        zstr_send (client, "hello");
    for (requests = 0; requests < 100000; requests++) {char *reply = zstr_recv (client);
        free (reply);
    }
    printf ("%d 次 / 秒 \n",
        (1000 * 100000) / (int) (zclock_time () - start));
 
    zstr_send (pipe, "实现");
}
 
static void *
worker_task (void *args)
{zctx_t *ctx = zctx_new ();
    void *worker = zsocket_new (ctx, ZMQ_DEALER);
    zmq_setsockopt (worker, ZMQ_IDENTITY, "W", 1);
    zsocket_connect (worker, "tcp://localhost:5556");
 
    while (1) {zmsg_t *msg = zmsg_recv (worker);
        zmsg_send (&msg, worker);
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
static void *
broker_task (void *args)
{
    //  筹备上下文和套接字
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    void *backend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5555");
    zsocket_bind (backend,  "tcp://*:5556");
 
    //  初始化轮询对象
    zmq_pollitem_t items [] = {{ frontend, 0, ZMQ_POLLIN, 0},
        {backend,  0, ZMQ_POLLIN, 0}
    };
    while (1) {int rc = zmq_poll (items, 2, -1);
        if (rc == -1)
            break;              //  中断
        if (items [0].revents & ZMQ_POLLIN) {zmsg_t *msg = zmsg_recv (frontend);
            zframe_t *address = zmsg_pop (msg);
            zframe_destroy (&address);
            zmsg_pushstr (msg, "W");
            zmsg_send (&msg, backend);
        }
        if (items [1].revents & ZMQ_POLLIN) {zmsg_t *msg = zmsg_recv (backend);
            zframe_t *address = zmsg_pop (msg);
            zframe_destroy (&address);
            zmsg_pushstr (msg, "C");
            zmsg_send (&msg, frontend);
        }
    }
    zctx_destroy (&ctx);
    return NULL;
}
 
int main (void)
{
    //  创立线程
    zctx_t *ctx = zctx_new ();
    void *client = zthread_fork (ctx, client_task, NULL);
    zthread_new (ctx, worker_task, NULL);
    zthread_new (ctx, broker_task, NULL);
 
    //  期待 client 端管道的信号
    char *signal = zstr_recv (client);
    free (signal);
 
    zctx_destroy (&ctx);
    return 0;
}

在我的开发环境中运行后果如下:

Setting up test...
Synchronous round-trip test...
 9057 calls/second
Asynchronous round-trip test...
 173010 calls/second

须要留神的是 client 在运行开始会暂停一段时间,这是因为在向 ROUTER 套接字发送音讯时,若指定标识的套接字没有连贯,那么 ROUTER 会间接抛弃该音讯。这个示例中咱们没有应用 LRU 算法,所以当 worker 连贯速度稍慢时就有可能失落数据,影响测试后果。

咱们能够看到,一一解决循环比异步解决要慢将近 20 倍,让咱们把它利用到管家模式中去。

首先,让咱们批改 client 的 API,增加独立的发送和接管办法:

mdcli_t *mdcli_new     (char *broker);
void     mdcli_destroy (mdcli_t **self_p);
int      mdcli_send    (mdcli_t *self, char *service, zmsg_t **request_p);
zmsg_t  *mdcli_recv    (mdcli_t *self);

而后花很短的工夫就能将同步的 client API 革新成异步的 API:

mdcliapi2: Majordomo asynchronous client API in C

/*  =====================================================================
    mdcliapi2.c
 
    Majordomo Protocol Client API (async version)
    Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
 
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
 
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
 
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
 
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
 
#include "mdcliapi2.h"
 
//  类构造
//  应用成员函数拜访属性
 
struct _mdcli_t {
    zctx_t *ctx;                //  上下文
    char *broker;
    void *client;               //  连贯至代理的套接字
    int verbose;                //  在规范输入打印运行状态
    int timeout;                //  申请超时工夫
};
 
 
//  ---------------------------------------------------------------------
//  连贯或重连代理
 
void s_mdcli_connect_to_broker (mdcli_t *self)
{if (self->client)
        zsocket_destroy (self->ctx, self->client);
    self->client = zsocket_new (self->ctx, ZMQ_DEALER);
    zmq_connect (self->client, self->broker);
    if (self->verbose)
        zclock_log ("I: 正在连接代理 %s...", self->broker);
}
 
 
//  ---------------------------------------------------------------------
//  构造函数
 
mdcli_t *
mdcli_new (char *broker, int verbose)
{assert (broker);
 
    mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
    self->ctx = zctx_new ();
    self->broker = strdup (broker);
    self->verbose = verbose;
    self->timeout = 2500;           //  毫秒
 
    s_mdcli_connect_to_broker (self);
    return self;
}
 
 
//  ---------------------------------------------------------------------
//  析构函数
 
void
mdcli_destroy (mdcli_t **self_p)
{assert (self_p);
    if (*self_p) {
        mdcli_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self->broker);
        free (self);
        *self_p = NULL;
    }
}
 
 
//  ---------------------------------------------------------------------
//  设置申请超时工夫
 
void
mdcli_set_timeout (mdcli_t *self, int timeout)
{assert (self);
    self->timeout = timeout;
}
 
 
//  ---------------------------------------------------------------------
//  发送申请给代理
//  获得申请音讯的所有权,发送后销毁
 
int
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{assert (self);
    assert (request_p);
    zmsg_t *request = *request_p;
 
    //  在音讯顶部退出协定规定的帧
    //  Frame 0: empty (模仿 REQ 套接字的行为)
    //  Frame 1: "MDPCxy" (6 个字节, MDP/Client x.y)
    //  Frame 2: Service name (看打印字符串)
    zmsg_pushstr (request, service);
    zmsg_pushstr (request, MDPC_CLIENT);
    zmsg_pushstr (request, "");
    if (self->verbose) {zclock_log ("I: 发送申请给'%s'服务:", service);
        zmsg_dump (request);
    }
    zmsg_send (&request, self->client);
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  获取应答音讯,若无则返回 NULL;//  该函数不会尝试从代理的解体中复原,//  因为咱们没有记录那些未收到应答的申请,所以也无奈重发。zmsg_t *
mdcli_recv (mdcli_t *self)
{assert (self);
 
    //  轮询套接字以获取应答
    zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0} };
    int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
    if (rc == -1)
        return NULL;            //  中断
 
    //  收到应答后进行解决
    if (items [0].revents & ZMQ_POLLIN) {zmsg_t *msg = zmsg_recv (self->client);
        if (self->verbose) {zclock_log ("I: received reply:");
            zmsg_dump (msg);
        }
        //  不要处理错误,间接报出
        assert (zmsg_size (msg) >= 4);
 
        zframe_t *empty = zmsg_pop (msg);
        assert (zframe_streq (empty, ""));
        zframe_destroy (&empty);
 
        zframe_t *header = zmsg_pop (msg);
        assert (zframe_streq (header, MDPC_CLIENT));
        zframe_destroy (&header);
 
        zframe_t *service = zmsg_pop (msg);
        zframe_destroy (&service);
 
        return msg;     //  Success
    }
    if (zctx_interrupted)
        printf ("W: 收到中断音讯,正在停止 client...\n");
    else
    if (self->verbose)
        zclock_log ("W: 严重错误,放弃申请");
 
    return NULL;
}

上面是对应的测试代码:

mdclient2: Majordomo client application in C

//
//  异步管家模式 - client 示例程序
//  应用 mdcli API 暗藏 MDP 协定的具体实现
//
//  间接编译源码,而不创立类库
#include "mdcliapi2.c"
 
int main (int argc, char *argv [])
{int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
 
    int count;
    for (count = 0; count < 100000; count++) {zmsg_t *request = zmsg_new ();
        zmsg_pushstr (request, "Hello world");
        mdcli_send (session, "echo", &request);
    }
    for (count = 0; count < 100000; count++) {zmsg_t *reply = mdcli_recv (session);
        if (reply)
            zmsg_destroy (&reply);
        else
            break;              //  应用 Ctrl- C 中断
    }
    printf ("收到 %d 个应答 \n", count);
    mdcli_destroy (&session);
    return 0;
}

代理和 worker 的代码没有变,因为咱们并没有扭转 MDP 协定。通过对 client 的革新,咱们能够显著看到速度的晋升。如以下是同步情况下解决 10 万条申请的工夫:

$ time mdclient
100000 requests/replies processed

real    0m14.088s
user    0m1.310s
sys     0m2.670s

以下是异步申请的状况:

$ time mdclient2
100000 replies received

real    0m8.730s
user    0m0.920s
sys     0m1.550s

让咱们建设 10 个 worker,看看成果如何:

$ time mdclient2
100000 replies received

real    0m3.863s
user    0m0.730s
sys     0m0.470s

因为 worker 取得音讯须要通过 LRU 队列机制,所以并不能做到齐全的异步。然而,worker 越多其成果也会越好。在我的测试机上,当 worker 的数量达到 8 个时,速度就不再晋升了——四核处理器只能做这么多。然而,咱们依然取得了近四倍的速度晋升,而革新过程只有几分钟而已。此外,代理其实还没有进行优化,它仍会复制音讯,而没有实现零拷贝。不过,咱们曾经做到每秒解决 2.5 万次申请 - 应答,曾经很不错了。

当然,异步的管家模式也并不完满,有一个显著的毛病:它无奈从代理的解体中复原。能够看到 mdcliapi2 的代码中并没有复原连贯的代码,从新连贯须要有以下几点作为前提:

  • 每个申请都做了编号,每次应答也含有相应的编号,这就须要批改协定,明确定义;
  • client 的 API 须要保留并跟踪所有已发送、但仍未收到应答的申请;
  • 如果代理产生解体,client 会重发所有音讯。

能够看到,高可靠性往往和复杂度成正比,值得在管家模式中利用这一机制吗?这就要看利用场景了。如果是一个名称查问服务,每次会话会调用一次,那不须要利用这一机制;如果是一个位于前端的网页服务,有数千个客户端相连,那可能就须要了。

服务查问

当初,咱们曾经有了一个面向服务的代理了,然而咱们无奈得悉代理是否提供了某项特定服务。如果申请失败,那当然就示意该项服务目前不可用,但具体起因是什么呢?所以,如果可能询问代理“echo 服务正在运行吗?”,那将会很有用途。最显著的办法是在 MDP/Client 协定中增加一种命令,客户端能够询问代理某项服务是否可用。然而,MDP/Client 最大的长处在于简略,如果增加了服务查问的性能就太过简单了。

另一种计划是学电子邮件的解决形式,将失败的申请从新返回。然而这同样会减少复杂度,因为咱们须要甄别收到的音讯是一个应答还是被退回的申请。

让咱们用之前的形式,在 MDP 的根底上建设新的机制,而不是扭转它。服务定位自身也是一项服务,咱们还能够提供相似于“禁用某服务”、“提供服务数据”等其余服务。咱们须要的是一个可能扩大协定但又不会影响协定自身的机制。

这样就诞生了一个玲珑的 RFC – MMI(管家接口)的应用层,建设在 MDP 协定之上:http://rfc.zeromq.org/spec:8。咱们在代理中其实曾经加以实现了,不知你是否曾经留神到。上面的代码演示了如何应用这项服务查问性能:

mmiecho: Service discovery over Majordomo in C

//
//  MMI echo 服务查问示例程序
//
 
//  让咱们间接编译,不生成类库
#include "mdcliapi.c"
 
int main (int argc, char *argv [])
{int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
 
    //  咱们须要查问的服务名称
    zmsg_t *request = zmsg_new ();
    zmsg_addstr (request, "echo");
 
    //  发送给“服务查问”服务的音讯
    zmsg_t *reply = mdcli_send (session, "mmi.service", &request);
 
    if (reply) {char *reply_code = zframe_strdup (zmsg_first (reply));
        printf ("查问 echo 服务的后果: %s\n", reply_code);
        free (reply_code);
        zmsg_destroy (&reply);
    }
    else
        printf ("E: 代理无响应,请确认它正在工作 \n");
 
    mdcli_destroy (&session);
    return 0;
}

代理在运行时会查看申请的服务名称,自行处理那些 mmi. 结尾的服务,而不转发给 worker。你能够在不开启 worker 的状况下运行以上代码,能够看到程序是报告 200 还是 404。MMI 在示例程序代理中的实现是很简略的,比方,当某个 worker 沦亡时,该服务依然标记为可用。实际中,代理应该在肯定距离后革除那些没有 worker 的服务。

幂等服务

幂等是指可能平安地反复执行某项操作。如,看钟是幂等的,但借钱给他人老婆就不是了。有些客户端至服务端的通信是幂等的,但有些则不是。幂等的通信示例有:

  • 无状态的任务分配,即管道模式中服务端是无状态的 worker,它的处理结果是依据客户端的申请状态生成的,因而能够反复解决雷同的申请;
  • 命名服务中将逻辑地址转化成理论绑定或连贯的端点,能够反复查问屡次,因而也是幂等的。

非幂等的通信示例有:

  • 日志服务,咱们不会心愿雷同的日志内容被记录屡次;
  • 任何会对上游节点有影响的服务,如该服务会向上游节点发送信息,若收到雷同的申请,那上游节点收到的信息就是反复的;
  • 当服务批改了某些共享的数据,且没有进行幂等方面的设置。如某项服务对银行账户进行了借操作(debit),这肯定是非幂等的。

如果应用程序提供的服务是非幂等的,那就须要思考它到底是在哪个阶段解体的。如果程序在闲暇或解决申请的过程中解体,那不会有什么问题。咱们能够应用数据库中的事务机制来保障借贷操作是同时产生的。如果应用程序在发送申请的时候解体了,那就会有问题,因为对于该程序来说,它曾经实现了工作。

如果在返回应答的过程中网络阻塞了,客户端会认为申请发送失败,并进行重发,这样服务端会再一次执行雷同的申请。这不是咱们想要的后果。

罕用的解决办法是在服务端检测并回绝反复的申请,这就须要:

  • 客户端为每个申请加注惟一的标识,包含客户端标识和音讯标识;
  • 服务端在发送应答时应用客户端标识和音讯标识作为键,保留应答内容;
  • 当服务端发现收到的申请已在应答哈希表中存在,它会跳过该次申请,间接返回应答内容。

脱机可靠性(伟人模式)

当你意识到管家模式是一种十分牢靠的音讯代理时,你可能会想要应用磁盘做一下音讯直达,从而进一步晋升可靠性。这种形式尽管在很多企业级音讯零碎中利用,但我还是有些拥护的,起因有:

  • 咱们能够看到,懈怠海盗模式的 client 能够工作得十分好,可能在多种架构中运行。惟一的问题是它会假如 worker 是无状态的,且提供的服务是幂等的。但这个问题咱们能够通过其余形式解决,而不是增加磁盘。
  • 增加磁盘会带来新的问题,须要额定的治理和维护费用。海盗模式的最大长处就是简单明了,不会解体。如果你还是放心硬件会出问题,能够改用点对点的通信模式,这会在本章最初一节讲到。

尽管有以上起因,但还是有一个正当的场景能够用到磁盘直达的——异步脱机网络。海盗模式有一个问题,那就是 client 发送申请后会始终期待应答。如果 client 和 worker 并不是长连贯(能够拿电子邮箱做个类比),咱们就无奈在 client 和 worker 之间建设一个无状态的网络,因而须要将这种状态保存起来。

于是咱们就有了伟人模式,该模式下会将音讯写到磁盘中,确保不会失落。当咱们进行服务查问时,会转向伟人这一层进行。伟人是建设在管家之上的,而不是改写了 MDP 协定。这样做的益处是咱们能够在一个特定的 worker 中实现这种可靠性,而不必去减少代理的逻辑。

  • 实现更为简略;

    • 代理用一种语言编写,worker 应用另一种语言编写;
    • 能够自在降级这种模式。

惟一的毛病是,代理和磁盘之间会有一层额定的分割,不过这也是值得的。

咱们有很多办法来实现一种长久化的申请 - 应答架构,而指标当然是越简略越好。我能想到的最简略的形式是提供一种成为“伟人”的代理服务,它不会影响现有 worker 的工作,若 client 想要立刻失去应答,它能够和代理进行通信;如果它不是那么焦急,那就能够和伟人通信:“嗨,伟人,麻烦帮我解决下这个申请,我去买些菜。”

这样一来,伟人就既是 worker 又是 client。client 和伟人之间的对话个别是:

  • Client: 请帮我解决这个申请。伟人:好的。
  • Client: 有要给我的应答吗?伟人:有的。(或者没有)
  • Client: OK,你能够开释那个申请了,工作曾经实现。伟人:好的。

伟人和代理之间的对话个别是:

  • 伟人:嗨,代理程序,你这里有个叫 echo 的服务吗?代理:恩,如同有。
  • 伟人:嗨,echo 服务,请帮我解决一下这个申请。Echo: 好了,这是应答。
  • 伟人:谢谢!

你能够设想一些产生故障的情景,看看上述模式是否能解决?worker 在解决申请的时候解体,伟人会一直地从新发送申请;应答在传输过程中失落了,伟人也会重试;如果申请曾经解决,但 client 没有失去应答,那它会再次询问伟人;如果伟人在解决申请或进行应答的时候解体了,客户端会进行重试;只有申请是被保留在磁盘上的,那它就不会失落。

这个机制中,握手的过程是比拟漫长的,但 client 能够应用异步的管家模式,一次发送多个申请,并一起期待应答。

咱们须要一种办法,让 client 会去申请应答内容。不同的 client 会拜访到雷同的服务,且 client 是来去自由的,有着不同的标识。一个简略、正当、平安的解决方案是:

  • 当伟人收到申请时,它会为每个申请生成惟一的编号(UUID),并将这个编号返回给 client;
  • client 在申请应答内容时须要提供这个编号。

这样一来 client 就须要负责将 UUID 平安地保存起来,不过这就省去了验证的过程。有其余计划吗?咱们能够应用长久化的套接字,即显式申明客户端的套接字标识。然而,这会造成治理上的麻烦,而且万一两个 client 的套接字标识雷同,那会引来无穷的麻烦。

在咱们开始制订一个新的协定之前,咱们先思考一下 client 如何和伟人通信。一种计划是提供一种服务,配合三个不同的命令;另一种计划则更为简略,提供三种独立的服务:

  • titanic.request – 保留一个申请,并返回 UUID
  • titanic.reply – 依据 UUID 获取应答内容
  • titanic.close – 确认某个申请已被正确地解决

咱们须要创立一个多线程的 worker,正如咱们之前用 ZMQ 进行多线程编程一样,很简略。然而,在咱们开始编写代码之前,先讲伟人模式的一些定义写下来:http://rfc.zeromq.org/spec:9。咱们称之为“伟人服务协定”,或 TSP。

应用 TSP 协定天然会让 client 多出额定的工作,上面是一个简略但足够强壮的 client:

ticlient: Titanic client example in C

//
//  伟人模式 client 示例
//  实现 http://rfc.zeromq.org/spec:9 协定中的 client 端
 
//  让咱们间接编译,不创立类库
#include "mdcliapi.c"
 
//  申请 TSP 协定下的服务
//  如果胜利则返回应答(状态码:200),否则返回 NULL
//
static zmsg_t *
s_service_call (mdcli_t *session, char *service, zmsg_t **request_p)
{zmsg_t *reply = mdcli_send (session, service, request_p);
    if (reply) {zframe_t *status = zmsg_pop (reply);
        if (zframe_streq (status, "200")) {zframe_destroy (&status);
            return reply;
        }
        else
        if (zframe_streq (status, "400")) {printf ("E: 客户端产生严重错误,勾销申请 \n");
            exit (EXIT_FAILURE);
        }
        else
        if (zframe_streq (status, "500")) {printf ("E: 服务端产生严重错误,勾销申请 \n");
            exit (EXIT_FAILURE);
        }
    }
    else
        exit (EXIT_SUCCESS);    //  中断或产生谬误
 
    zmsg_destroy (&reply);
    return NULL;        //  申请不胜利,但不返回失败起因
}
 
int main (int argc, char *argv [])
{int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
 
    //  1. 发送 echo 服务的申请给伟人
    zmsg_t *request = zmsg_new ();
    zmsg_addstr (request, "echo");
    zmsg_addstr (request, "Hello world");
    zmsg_t *reply = s_service_call (session, "titanic.request", &request);
 
    zframe_t *uuid = NULL;
    if (reply) {uuid = zmsg_pop (reply);
        zmsg_destroy (&reply);
        zframe_print (uuid, "I: request UUID");
    }
 
    //  2. 期待应答
    while (!zctx_interrupted) {zclock_sleep (100);
        request = zmsg_new ();
        zmsg_add (request, zframe_dup (uuid));
        zmsg_t *reply = s_service_call (session, "titanic.reply", &request);
 
        if (reply) {char *reply_string = zframe_strdup (zmsg_last (reply));
            printf ("Reply: %s\n", reply_string);
            free (reply_string);
            zmsg_destroy (&reply);
 
            //  3. 敞开申请
            request = zmsg_new ();
            zmsg_add (request, zframe_dup (uuid));
            reply = s_service_call (session, "titanic.close", &request);
            zmsg_destroy (&reply);
            break;
        }
        else {printf ("I: 尚未收到应答,筹备稍后重试...\n");
            zclock_sleep (5000);     //  5 秒后重试
        }
    }
    zframe_destroy (&uuid);
    mdcli_destroy (&session);
    return 0;
}

当然,下面的代码能够整合到一个框架中,程序员不须要理解其中的细节。如果我有工夫的话,我会尝试写一个这样的 API 的,让应用程序又变回短短的几行。这种理念和 MDP 中的统一:不要做反复的事。

上面是伟人的实现。这个服务端会应用三个线程来解决三种服务。它应用最原始的长久化办法来保留申请:为每个申请创立一个磁盘文件。尽管简略,但也挺恐怖的。比较复杂的局部是,伟人会保护一个队列来保留这些申请,从而防止反复地扫描目录。

titanic: Titanic broker example in C

//
//  伟人模式 - 服务
//
//  实现 http://rfc.zeromq.org/spec:9 协定的服务端
 
//  让咱们间接编译,不创立类库
#include "mdwrkapi.c"
#include "mdcliapi.c"
 
#include "zfile.h"
#include <uuid/uuid.h>
 
//  返回一个可打印的惟一编号(UUID)//  调用者负责开释 UUID 字符串的内存
 
static char *
s_generate_uuid (void)
{char hex_char [] = "0123456789ABCDEF";
    char *uuidstr = zmalloc (sizeof (uuid_t) * 2 + 1);
    uuid_t uuid;
    uuid_generate (uuid);
    int byte_nbr;
    for (byte_nbr = 0; byte_nbr < sizeof (uuid_t); byte_nbr++) {uuidstr [byte_nbr * 2 + 0] = hex_char [uuid [byte_nbr] >> 4];
        uuidstr [byte_nbr * 2 + 1] = hex_char [uuid [byte_nbr] & 15];
    }
    return uuidstr;
}
 
//  依据 UUID 生成用于保留申请内容的文件名,并返回
 
#define TITANIC_DIR ".titanic"
 
static char *
s_request_filename (char *uuid) {char *filename = malloc (256);
    snprintf (filename, 256, TITANIC_DIR "/%s.req", uuid);
    return filename;
}
 
//  依据 UUID 生成用于保留应答内容的文件名,并返回
 
static char *
s_reply_filename (char *uuid) {char *filename = malloc (256);
    snprintf (filename, 256, TITANIC_DIR "/%s.rep", uuid);
    return filename;
}
 
 
//  ---------------------------------------------------------------------
//  伟人模式 - 申请服务
 
static void
titanic_request (void *args, zctx_t *ctx, void *pipe)
{
    mdwrk_t *worker = mdwrk_new ("tcp://localhost:5555", "titanic.request", 0);
    zmsg_t *reply = NULL;
 
    while (TRUE) {
        //  若应答非空则发送,再从代理处取得新的申请
        zmsg_t *request = mdwrk_recv (worker, &reply);
        if (!request)
            break;      //  中断并退出
 
        //  确保音讯目录是存在的
        file_mkdir (TITANIC_DIR);
 
        //  生成 UUID,并将音讯保留至磁盘
        char *uuid = s_generate_uuid ();
        char *filename = s_request_filename (uuid);
        FILE *file = fopen (filename, "w");
        assert (file);
        zmsg_save (request, file);
        fclose (file);
        free (filename);
        zmsg_destroy (&request);
 
        //  将 UUID 退出队列
        reply = zmsg_new ();
        zmsg_addstr (reply, uuid);
        zmsg_send (&reply, pipe);
 
        //  将 UUID 返回给客户端
        //  将由循环顶部的 mdwrk_recv()函数实现
        reply = zmsg_new ();
        zmsg_addstr (reply, "200");
        zmsg_addstr (reply, uuid);
        free (uuid);
    }
    mdwrk_destroy (&worker);
}
 
 
//  ---------------------------------------------------------------------
//  伟人模式 - 应答服务
 
static void *
titanic_reply (void *context)
{
    mdwrk_t *worker = mdwrk_new ("tcp://localhost:5555", "titanic.reply", 0);
    zmsg_t *reply = NULL;
 
    while (TRUE) {zmsg_t *request = mdwrk_recv (worker, &reply);
        if (!request)
            break;      //  中断并退出
 
        char *uuid = zmsg_popstr (request);
        char *req_filename = s_request_filename (uuid);
        char *rep_filename = s_reply_filename (uuid);
        if (file_exists (rep_filename)) {FILE *file = fopen (rep_filename, "r");
            assert (file);
            reply = zmsg_load (file);
            zmsg_pushstr (reply, "200");
            fclose (file);
        }
        else {reply = zmsg_new ();
            if (file_exists (req_filename))
                zmsg_pushstr (reply, "300"); // 挂起
            else
                zmsg_pushstr (reply, "400"); // 未知
        }
        zmsg_destroy (&request);
        free (uuid);
        free (req_filename);
        free (rep_filename);
    }
    mdwrk_destroy (&worker);
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  伟人模式 - 敞开申请
 
static void *
titanic_close (void *context)
{
    mdwrk_t *worker = mdwrk_new ("tcp://localhost:5555", "titanic.close", 0);
    zmsg_t *reply = NULL;
 
    while (TRUE) {zmsg_t *request = mdwrk_recv (worker, &reply);
        if (!request)
            break;      //  中断并退出
 
        char *uuid = zmsg_popstr (request);
        char *req_filename = s_request_filename (uuid);
        char *rep_filename = s_reply_filename (uuid);
        file_delete (req_filename);
        file_delete (rep_filename);
        free (uuid);
        free (req_filename);
        free (rep_filename);
 
        zmsg_destroy (&request);
        reply = zmsg_new ();
        zmsg_addstr (reply, "200");
    }
    mdwrk_destroy (&worker);
    return 0;
}
 
//  解决某个申请,胜利则返回 1
 
static int
s_service_success (mdcli_t *client, char *uuid)
{
    //  读取申请内容,第一帧为服务名称
    char *filename = s_request_filename (uuid);
    FILE *file = fopen (filename, "r");
    free (filename);
 
    //  如果 client 曾经敞开了该申请,则返回 1
    if (!file)
        return 1;
 
    zmsg_t *request = zmsg_load (file);
    fclose (file);
    zframe_t *service = zmsg_pop (request);
    char *service_name = zframe_strdup (service);
 
    //  应用 MMI 协定查看服务是否可用
    zmsg_t *mmi_request = zmsg_new ();
    zmsg_add (mmi_request, service);
    zmsg_t *mmi_reply = mdcli_send (client, "mmi.service", &mmi_request);
    int service_ok = (mmi_reply
        && zframe_streq (zmsg_first (mmi_reply), "200"));
    zmsg_destroy (&mmi_reply);
 
    if (service_ok) {zmsg_t *reply = mdcli_send (client, service_name, &request);
        if (reply) {filename = s_reply_filename (uuid);
            FILE *file = fopen (filename, "w");
            assert (file);
            zmsg_save (reply, file);
            fclose (file);
            free (filename);
            return 1;
        }
        zmsg_destroy (&reply);
    }
    else
        zmsg_destroy (&request);
 
    free (service_name);
    return 0;
}
 
 
int main (int argc, char *argv [])
{int verbose = (argc > 1 && streq (argv [1], "-v"));
    zctx_t *ctx = zctx_new ();
 
    //  创立 MDP 客户端会话
    mdcli_t *client = mdcli_new ("tcp://localhost:5555", verbose);
    mdcli_set_timeout (client, 1000);  //  1 秒
    mdcli_set_retries (client, 1);     //  只尝试一次
 
    void *request_pipe = zthread_fork (ctx, titanic_request, NULL);
    zthread_new (ctx, titanic_reply, NULL);
    zthread_new (ctx, titanic_close, NULL);
 
    //  主循环
    while (TRUE) {
        //  如果没有流动,咱们将每秒循环一次
        zmq_pollitem_t items [] = { { request_pipe, 0, ZMQ_POLLIN, 0} };
        int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
        if (items [0].revents & ZMQ_POLLIN) {
            //  确保音讯目录是存在的
            file_mkdir (TITANIC_DIR);
 
            //  将 UUID 增加到队列中,应用“-”号标识期待中的申请
            zmsg_t *msg = zmsg_recv (request_pipe);
            if (!msg)
                break;          //  中断
            FILE *file = fopen (TITANIC_DIR "/queue", "a");
            char *uuid = zmsg_popstr (msg);
            fprintf (file, "-%s\n", uuid);
            fclose (file);
            free (uuid);
            zmsg_destroy (&msg);
        }
        //  分派
        //
        char entry [] = "?.......:.......:.......:.......:";
        FILE *file = fopen (TITANIC_DIR "/queue", "r+");
        while (file && fread (entry, 33, 1, file) == 1) {
            //  解决 UUID 前缀为“-”的申请
            if (entry [0] == '-') {if (verbose)
                    printf ("I: 开始解决申请 %s\n", entry + 1);
                if (s_service_success (client, entry + 1)) {
                    //  标记为已解决
                    fseek (file, -33, SEEK_CUR);
                    fwrite ("+", 1, 1, file);
                    fseek (file, 32, SEEK_CUR);
                }
            }
            //  跳过最初一行
            if (fgetc (file) == '\r')
                fgetc (file);
            if (zctx_interrupted)
                break;
        }
        if (file)
            fclose (file);
    }
    mdcli_destroy (&client);
    return 0;
}

测试时,关上 mdbroker 和 titanic,再运行 ticlient,而后开启任意个 mdworker,就能够看到 client 取得了应答。

几点阐明:

  • 咱们应用 MMI 协定去向代理询问某项服务是否可用,这一点和 MDP 中的逻辑统一;
  • 咱们应用 inproc(过程内)协定建设主循环和 titanic.request 服务间的分割,保留新的申请信息。这样能够防止主循环不断扫描磁盘目录,读取所有申请文件,并依照工夫日期排序。

这个示例程序不应关注它的性能(肯定会十分蹩脚,尽管我没有测试过),而是应该看到它是如何提供一种牢靠的通信模式的。你能够测试一下,关上代理、伟人、worker 和 client,应用 - v 参数显示跟踪信息,而后随便地开关代理、伟人、或 worker(client 不能敞开),能够看到所有的申请都能取得应答。

如果你想在实在环境中应用伟人模式,你必定会问怎样才能让速度快起来。以下是我的做法:

  • 应用一个磁盘文件保留所有数据。操作系统解决大文件的效率要比解决许多小文件来的高。
  • 应用一种循环的机制来组织该磁盘文件的构造,这样新的申请能够被间断地写入这个文件。单个线程在全速写入磁盘时的效率是比拟高的。
  • 将索引保留在内存中,能够在启动程序时重建这个索引。这样做能够节俭磁盘缓存,让索引平安地保留在磁盘上。你须要用到 fsync 的机制来保留每一条数据;或者能够期待几毫秒,如果不怕失落上千条数据的话。
  • 如果条件容许,应抉择应用固态硬盘;
  • 提前调配该磁盘文件的空间,或者将每次调配的空间调大一些,这样能够防止磁盘碎片的产生,并保障读写是间断的。

另外,我不倡议将音讯保留在数据库中,甚至不倡议交给那些所谓的高速键值缓存,它们比起一个磁盘文件要来得低廉。

如果你想让伟人模式变得更为牢靠,你能够将申请复制到另一台服务器上,这样就不须要放心主程序受到核武器袭击了。

如果你想让伟人模式变得更为疾速,但能够就义一些可靠性,那你能够将申请和应答都保留在内存中。这样做能够让该服务作为脱机网络运行,不过若伟人服务自身解体了,我也无能为力。

高牢靠对称节点(双子星模式)

概览

双子星模式是一对具备主从机制的高牢靠节点。任一时间,某个节点会充当主机,接管所有客户端的申请;另一个则作为一种备机存在。两个节点会相互监控对方,当主机从网络中隐没时,备机会代替主机的地位。

双子星模式由 Pieter Hintjens 和 Martin Sustrik 设计,利用在 iMatix 的 OpenAMQ 服务器中。它的设计理念是:

  • 提供一种扼要的高可靠性解决方案;
  • 易于了解和应用;
  • 可能进行牢靠的故障切换。

假如咱们有一组双子星模式的服务器,以下是可能产生的故障:

  1. 主机产生硬件故障(断电、失火等),应用程序发送后立即应用备机进行连贯;
  2. 主机的网络环境产生故障,可能某个路由器被雷击了,立即应用备机;
  3. 主机上的服务被保护人员误杀,无奈主动复原。

复原步骤如下:

  1. 保护人员排查主机故障;
  2. 将备机关闭,造成短时间的服务不可用;
  3. 待应用程序都连贯到主机后,保护人员重启备机。

复原过程是人工进行的,惨痛的教训通知咱们主动复原是很可怕的:

  • 故障的产生会造成 10-30 秒之间的服务暂停,如果这是一个真正的突发状况,那最好还是让主机暂停服务的好,因为立即重启服务可能造成另一个 10-30 秒的暂停,不如让用户停止使用。
  • 当有紧急状况产生时,能够在修复的过程中记录故障产生起因,而不是让零碎主动复原,管理员因而无奈用其教训抵挡下一次突发状况。
  • 最初,如果主动复原的确胜利了,管理员将无从得悉故障的产生起因,因此无奈进行剖析。

双子星模式的故障复原过程是:在修复了主机的问题后,将备机做敞开解决,稍后再从新开启:

双子星模式的敞开过程有两种:

  1. 先敞开备机,期待一段时间后再敞开主机;
  2. 同时敞开主机和备机,间隔时间不超过几秒。

敞开时,间隔时间要比故障切换工夫短,否则会导致应用程序失去连贯、从新连贯、并再次失去连贯,导致用户投诉。

具体要求

双子星模式能够非常简单,但能工作得很杰出。事实上,这里的实现办法曾经历经三个版本了,之前的版本都过于简单,想要做太多的事件,因此被咱们摈弃。咱们须要的只是最根本的性能,可能提供易了解、易开发、高牢靠的解决办法就能够了。

以下是该架构的具体需要:

  • 须要用到双子星模式的故障是:零碎蒙受灾难性的打击,如硬件解体、火灾、意外等。对于其余惯例的服务器故障,能够用更简略的办法。
  • 故障复原工夫应该在 60 秒以内,现实状况下应该在 10 秒以内;
  • 故障复原 (failover) 应该是主动实现的,而系统还原 (recover) 则是由人工实现的。咱们心愿应用程序可能在产生故障时主动从主机切换到备机,但不心愿在问题解决之前主动切换回主机,因为这很有可能让主机再次解体。
  • 程序的逻辑应该尽量简略,易于应用,最好能封装在 API 中;
  • 须要提供一个明确的批示,哪台主机正在提供服务,以防止“精神分裂”的症状,即两台服务器都认为本人是主机;
  • 两台服务器的启动程序不应该有限度;
  • 启动或敞开主从机时不须要更改客户端的配置,但有可能会中断连贯;
  • 管理员须要可能同时监控两台机器;
  • 两台机器之间必须有专用的高速网络连接,必须能应用特定 IP 进行路由。

咱们做如下架假如:

  • 单台备机可能提供足够的保障,不须要再进行其余备份机制;
  • 主从机应该都可能提供残缺的服务,承载雷同的压力,不须要进行负载平衡;
  • 估算中容许有这样一台长时间闲置的备机。

双子星模式不会用到:

  • 多台备机,或在主从机之间进行负载平衡。该模式中的备机将始终处于闲暇状态,只有主机产生问题时才会工作;
  • 解决长久化的音讯或事务。咱们假如所连贯的网络是不牢靠的(或不可信的)。
  • 主动搜寻网络。双子星模式是手工配置的,他们晓得对方的存在,应用程序则晓得双子星的存在。
  • 主从机之间状态的同步。所有服务端的状态必须能由应用程序进行重建。

以下是双子星模式中的几个术语:

  • 主机 – 通常状况下作为 master 的机器;
  • 备机 – 通常状况下作为 slave 的机器,只有当主机从网络中隐没时,备机才会切换成 master 状态,接管所有的应用程序申请;
  • master – 双子星模式中接管应用程序申请的机器;同一时刻只有一台 master;
  • slave – 当 master 隐没时用以顶替的机器。

配置双子星模式的步骤:

  1. 让主机晓得备机的地位;
  2. 让备机晓得主机的地位;
  3. 调整故障复原工夫,两台机器的配置必须雷同。

比拟重要的配置是应让两台机器距离多久查看一次对方的状态,以及多长时间后采取行动。在咱们的示例中,故障复原工夫设置为 2000 毫秒,超过这个工夫备机就会代替主机的地位。但若你将主机的服务包裹在一个 shell 脚本中进行重启,就须要缩短这个工夫,否则备机可能在主机复原连贯的过程中转换成 master。

要让客户端应用程序和双子星模式配合,你须要做的是:

  1. 晓得两台服务器的地址;
  2. 尝试连贯主机,若失败则连贯备机;
  3. 检测生效的连贯,个别应用心跳机制;
  4. 尝试重连主机,而后再连贯备机,其间的距离应比服务器故障复原工夫长;
  5. 重建服务器端须要的所有状态数据;
  6. 如果要保障可靠性,应重发故障期间的音讯。

这不是件容易的事,所以咱们个别会将其封装成一个 API,供程序员应用。

双子星模式的次要限度有:

  • 服务端过程不能波及到一个以上的双子星对称节点;
  • 主机只能有一个备机;
  • 当备机处于 slave 状态时,它不会解决任何申请;
  • 备机必须可能接受所有的应用程序申请;
  • 故障复原工夫不能在运行时调整;
  • 客户端应用程序须要做一些重连的工作。

避免精神分裂

“精神分裂”症状指的是一个集群中的不同局部同时认为本人是 master,从而进行对对方的检测。双子星模式中的算法会升高这种症状的产生几率:主备机在决定本人是否为 master 时会检测本身是否收到了应用程序的申请,以及对方是否曾经从网络中隐没。

但在某些状况下,双子星模式也会产生精神分裂。比如说,主备机被配置在两幢大楼里,每幢大楼的局域网中又散布了一些应用程序。这样,当两幢大楼的网络通信被阻断,双子星模式的主备机就会别离在两幢大楼里承受和解决申请。

为了避免精神分裂,咱们必须让主备机应用专用的网络进行连贯,最简略的办法当然是用一根双绞线将他们相连。

咱们不能将双子星部署在两个不同的岛屿上,为各自岛屿的应用程序服务。这种状况下,咱们会应用诸如联邦模式的机制进行可靠性设计。

最好但最夸大的做法是,将两台机器之间的连贯和应用程序的连贯齐全隔离开来,甚至是应用不同的网卡,而不仅仅是不同的端口。这样做也是为了日后排查谬误时更为明确。

实现双子星模式

闲话少说,上面是双子星模式的服务端代码:

bstarsrv: Binary Star server in C

//
//  双子星模式 - 服务端
//
#include "czmq.h"
 
//  发送状态信息的间隔时间
//  如果对方在两次心跳过后都没有应答,则视为断开
#define HEARTBEAT 1000          //  In msecs
 
//  服务器状态枚举
typedef enum {
    STATE_PRIMARY = 1,          //  主机,期待伙伴连贯
    STATE_BACKUP = 2,           //  备机,期待伙伴连贯
    STATE_ACTIVE = 3,           //  激活态,解决应用程序申请
    STATE_PASSIVE = 4           //  被动静,不接管申请
} state_t;
 
//  对话节点事件
typedef enum {
    PEER_PRIMARY = 1,           //  主机
    PEER_BACKUP = 2,            //  备机
    PEER_ACTIVE = 3,            //  激活态
    PEER_PASSIVE = 4,           //  被动静
    CLIENT_REQUEST = 5          //  客户端申请
} event_t;
 
//  无限状态机
typedef struct {
    state_t state;              //  以后状态
    event_t event;              //  以后事件
    int64_t peer_expiry;        //  断定节点死亡的时限
} bstar_t;
 
 
//  执行无限状态机(将事件绑定至状态);//  产生异样时返回 TRUE。static Bool
s_state_machine (bstar_t *fsm)
{
    Bool exception = FALSE;
    //  主机期待伙伴连贯
    //  该状态下接管 CLIENT_REQUEST 事件
    if (fsm->state == STATE_PRIMARY) {if (fsm->event == PEER_BACKUP) {printf ("I: 已连贯至备机(slave),能够作为 master 运行。\n");
            fsm->state = STATE_ACTIVE;
        }
        else
        if (fsm->event == PEER_ACTIVE) {printf ("I: 已连贯至备机(master),能够作为 slave 运行。\n");
            fsm->state = STATE_PASSIVE;
        }
    }
    else
    //  备机期待伙伴连贯
    //  该状态下回绝 CLIENT_REQUEST 事件
    if (fsm->state == STATE_BACKUP) {if (fsm->event == PEER_ACTIVE) {printf ("I: 已连贯至主机(master),能够作为 slave 运行。\n");
            fsm->state = STATE_PASSIVE;
        }
        else
        if (fsm->event == CLIENT_REQUEST)
            exception = TRUE;
    }
    else
    //  服务器处于激活态
    //  该状态下承受 CLIENT_REQUEST 事件
    if (fsm->state == STATE_ACTIVE) {if (fsm->event == PEER_ACTIVE) {
            //  若呈现两台 master,则抛出异样
            printf ("E: 严重错误:双 master。正在退出。\n");
            exception = TRUE;
        }
    }
    else
    //  服务器处于被动静
    //  若伙伴已死,CLIENT_REQUEST 事件将触发故障复原
    if (fsm->state == STATE_PASSIVE) {if (fsm->event == PEER_PRIMARY) {
            //  伙伴正在重启 - 转为激活态,伙伴将转为被动静。printf ("I: 主机(slave)正在重启,可作为 master 运行。\n");
            fsm->state = STATE_ACTIVE;
        }
        else
        if (fsm->event == PEER_BACKUP) {
            //  伙伴正在重启 - 转为激活态,伙伴将转为被动静。printf ("I: 备机(slave)正在重启,可作为 master 运行。\n");
            fsm->state = STATE_ACTIVE;
        }
        else
        if (fsm->event == PEER_PASSIVE) {
            //  若呈现两台 slave,集群将无响应
            printf ("E: 严重错误:双 slave。正在退出 \n");
            exception = TRUE;
        }
        else
        if (fsm->event == CLIENT_REQUEST) {
            //  若心跳超时,伙伴将成为 master;//  此行为由客户端申请触发。assert (fsm->peer_expiry > 0);
            if (zclock_time () >= fsm->peer_expiry) {
                //  伙伴已死,转为激活态。printf ("I: 故障复原,可作为 master 运行。\n");
                fsm->state = STATE_ACTIVE;
            }
            else
                //  伙伴还在,拒绝请求。exception = TRUE;
        }
    }
    return exception;
}
 
 
int main (int argc, char *argv [])
{
    //  命令行参数能够为://      -p  作为主机启动, at tcp://localhost:5001
    //      -b  作为备机启动, at tcp://localhost:5002
    zctx_t *ctx = zctx_new ();
    void *statepub = zsocket_new (ctx, ZMQ_PUB);
    void *statesub = zsocket_new (ctx, ZMQ_SUB);
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    bstar_t fsm = {0};
 
    if (argc == 2 && streq (argv [1], "-p")) {printf ("I: 主机 master,期待备机(slave)连贯。\n");
        zsocket_bind (frontend, "tcp://*:5001");
        zsocket_bind (statepub, "tcp://*:5003");
        zsocket_connect (statesub, "tcp://localhost:5004");
        fsm.state = STATE_PRIMARY;
    }
    else
    if (argc == 2 && streq (argv [1], "-b")) {printf ("I: 备机 slave,期待主机(master)连贯。\n");
        zsocket_bind (frontend, "tcp://*:5002");
        zsocket_bind (statepub, "tcp://*:5004");
        zsocket_connect (statesub, "tcp://localhost:5003");
        fsm.state = STATE_BACKUP;
    }
    else {printf ("Usage: bstarsrv { -p | -b}\n");
        zctx_destroy (&ctx);
        exit (0);
    }
    //  设定下一次发送状态的工夫
    int64_t send_state_at = zclock_time () + HEARTBEAT;
 
    while (!zctx_interrupted) {zmq_pollitem_t items [] = {{ frontend, 0, ZMQ_POLLIN, 0},
            {statesub, 0, ZMQ_POLLIN, 0}
        };
        int time_left = (int) ((send_state_at - zclock_time ()));
        if (time_left < 0)
            time_left = 0;
        int rc = zmq_poll (items, 2, time_left * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  上下文对象被敞开
 
        if (items [0].revents & ZMQ_POLLIN) {
            //  收到客户端申请
            zmsg_t *msg = zmsg_recv (frontend);
            fsm.event = CLIENT_REQUEST;
            if (s_state_machine (&fsm) == FALSE)
                //  返回应答
                zmsg_send (&msg, frontend);
            else
                zmsg_destroy (&msg);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  收到状态音讯,作为事件处理
            char *message = zstr_recv (statesub);
            fsm.event = atoi (message);
            free (message);
            if (s_state_machine (&fsm))
                break;          //  谬误,退出。fsm.peer_expiry = zclock_time () + 2 * HEARTBEAT;}
        //  定时发送状态信息
        if (zclock_time () >= send_state_at) {char message [2];
            sprintf (message, "%d", fsm.state);
            zstr_send (statepub, message);
            send_state_at = zclock_time () + HEARTBEAT;}
    }
    if (zctx_interrupted)
        printf ("W: 中断 \n");
 
    //  敞开套接字和上下文
    zctx_destroy (&ctx);
    return 0;
}

上面是客户端代码:

bstarcli: Binary Star client in C

//
//  双子星模式 - 客户端
//
#include "czmq.h"
 
#define REQUEST_TIMEOUT     1000    //  毫秒
#define SETTLE_DELAY        2000    //  超时工夫
 
int main (void)
{zctx_t *ctx = zctx_new ();
 
    char *server [] = { "tcp://localhost:5001", "tcp://localhost:5002"};
    uint server_nbr = 0;
 
    printf ("I: 正在连接服务器 %s...\n", server [server_nbr]);
    void *client = zsocket_new (ctx, ZMQ_REQ);
    zsocket_connect (client, server [server_nbr]);
 
    int sequence = 0;
    while (!zctx_interrupted) {
        //  发送申请并期待应答
        char request [10];
        sprintf (request, "%d", ++sequence);
        zstr_send (client, request);
 
        int expect_reply = 1;
        while (expect_reply) {
            //  轮询套接字
            zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0} };
            int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  中断
 
            //  解决应答
            if (items [0].revents & ZMQ_POLLIN) {
                //  审核应答编号
                char *reply = zstr_recv (client);
                if (atoi (reply) == sequence) {printf ("I: 服务端应答失常 (%s)\n", reply);
                    expect_reply = 0;
                    sleep (1);  //  每秒发送一个申请
                }
                else {
                    printf ("E: 谬误的应答内容: %s\n",
                        reply);
                }
                free (reply);
            }
            else {printf ("W: 服务器无响应,正在重试 \n");
                //  重开套接字
                zsocket_destroy (ctx, client);
                server_nbr = (server_nbr + 1) % 2;
                zclock_sleep (SETTLE_DELAY);
                printf ("I: 正在连接服务端 %s...\n",
                        server [server_nbr]);
                client = zsocket_new (ctx, ZMQ_REQ);
                zsocket_connect (client, server [server_nbr]);
 
                //  应用新套接字重发申请
                zstr_send (client, request);
            }
        }
    }
    zctx_destroy (&ctx);
    return 0;
}

运行以下命令进行测试,程序随便:

bstarsrv -p     # Start primary
bstarsrv -b     # Start backup
bstarcli

能够将主机过程杀掉,测试故障复原机制;再开启主机,杀掉备机,查看还原机制。要留神是由客户端触发这两个事件的。

下图展示了服务过程的状态图。绿色状态下会接管客户端申请,粉色状态会拒绝请求。事件指的是伙伴的状态,所以“伙伴激活态”指的是伙伴机器告知咱们它处于激活态。“客户申请”示意咱们从客户端取得了申请,“客户投票”则指咱们从客户端取得了申请并且伙伴曾经超时死亡。

须要留神的是,服务过程应用 PUB-SUB 套接字进行状态替换,其它类型的套接字在这里不实用。比方,PUSH 和 DEALER 套接字在没有节点相连的时候会产生阻塞;PAIR 套接字不会在节点断开后进行重连;ROUTER 套接字须要地址能力发送音讯。

These are the main limitations of the Binary Star pattern:

  • A server process cannot be part of more than one Binary Star pair.
  • A primary server can have a single backup server, no more.
  • The backup server cannot do useful work while in slave mode.
  • The backup server must be capable of handling full application loads.
  • Failover configuration cannot be modified at runtime.
  • Client applications must do some work to benefit from failover.

双子星反应堆

咱们能够将双子星模式打包成一个相似反应堆的类,供当前复用。在 C 语言中,咱们应用 czmq 的 zloop 类,其余语言应该会有相应的实现。以下是 C 语言版的 bstar 接口:

// 创立双子星模式实例,应用本地(绑定)和近程(连贯)端点来设置节点对。bstar_t *bstar_new (int primary, char *local, char *remote);
 
// 销毁实例
void bstar_destroy (bstar_t **self_p);
 
// 返回底层的 zloop 反应堆,用以增加定时器、读取器、注册和勾销等性能。zloop_t *bstar_zloop (bstar_t *self);
 
// 注册投票读取器
int bstar_voter (bstar_t *self, char *endpoint, int type,
zloop_fn handler, void *arg);
 
// 注册状态机处理器
void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);
 
// 开启反应堆,当回调函数返回 -1,或过程收到 SIGINT、SIGTERM 信号时停止。int bstar_start (bstar_t *self);

以下是类的实现:

bstar: Binary Star core class in C

/*  =====================================================================
    bstar - Binary Star reactor
 
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
 
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
 
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
 
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
 
#include "bstar.h"
 
//  服务器状态枚举
typedef enum {
    STATE_PRIMARY = 1,          //  主机,期待伙伴连贯
    STATE_BACKUP = 2,           //  备机,期待伙伴连贯
    STATE_ACTIVE = 3,           //  激活态,解决应用程序申请
    STATE_PASSIVE = 4           //  被动静,不接管申请
} state_t;
 
//  对话节点事件
typedef enum {
    PEER_PRIMARY = 1,           //  主机
    PEER_BACKUP = 2,            //  备机
    PEER_ACTIVE = 3,            //  激活态
    PEER_PASSIVE = 4,           //  被动静
    CLIENT_REQUEST = 5          //  客户端申请
} event_t;
 
 
//  发送状态信息的间隔时间
//  如果对方在两次心跳过后都没有应答,则视为断开
#define BSTAR_HEARTBEAT     1000        //  In msecs
 
//  类构造
 
struct _bstar_t {
    zctx_t *ctx;                //  公有上下文
    zloop_t *loop;              //  反应堆循环
    void *statepub;             //  状态发布者
    void *statesub;             //  状态订阅者
    state_t state;              //  以后状态
    event_t event;              //  以后事件
    int64_t peer_expiry;        //  断定节点死亡的时限
    zloop_fn *voter_fn;         //  投票套接字处理器
    void *voter_arg;            //  投票处理程序的参数
    zloop_fn *master_fn;        //  成为 master 时回调
    void *master_arg;           //  参数
    zloop_fn *slave_fn;         //  成为 slave 时回调
    void *slave_arg;            //  参数
};
 
 
//  ---------------------------------------------------------------------
//  执行无限状态机(将事件绑定至状态);//  产生异样时返回 -1,正确时返回 0。static int
s_execute_fsm (bstar_t *self)
{
    int rc = 0;
    //  主机期待伙伴连贯
    //  该状态下接管 CLIENT_REQUEST 事件
    if (self->state == STATE_PRIMARY) {if (self->event == PEER_BACKUP) {zclock_log ("I: 已连贯至备机(slave),能够作为 master 运行。");
            self->state = STATE_ACTIVE;
            if (self->master_fn)
                (self->master_fn) (self->loop, NULL, self->master_arg);
        }
        else
        if (self->event == PEER_ACTIVE) {zclock_log ("I: 已连贯至备机(master),能够作为 slave 运行。");
            self->state = STATE_PASSIVE;
            if (self->slave_fn)
                (self->slave_fn) (self->loop, NULL, self->slave_arg);
        }
        else
        if (self->event == CLIENT_REQUEST) {zclock_log ("I: 收到客户端申请,可作为 master 运行。");
            self->state = STATE_ACTIVE;
            if (self->master_fn)
                (self->master_fn) (self->loop, NULL, self->master_arg);
        }
    }
    else
    //  备机期待伙伴连贯
    //  该状态下回绝 CLIENT_REQUEST 事件
    if (self->state == STATE_BACKUP) {if (self->event == PEER_ACTIVE) {zclock_log ("I: 已连贯至主机(master),能够作为 slave 运行。");
            self->state = STATE_PASSIVE;
            if (self->slave_fn)
                (self->slave_fn) (self->loop, NULL, self->slave_arg);
        }
        else
        if (self->event == CLIENT_REQUEST)
            rc = -1;
    }
    else
    //  服务器处于激活态
    //  该状态下承受 CLIENT_REQUEST 事件
    //  只有服务器死亡才会来到激活态
    if (self->state == STATE_ACTIVE) {if (self->event == PEER_ACTIVE) {
            //  若呈现两台 master,则抛出异样
            zclock_log ("E: 严重错误:双 master。正在退出。");
            rc = -1;
        }
    }
    else
    //  服务器处于被动静
    //  若伙伴已死,CLIENT_REQUEST 事件将触发故障复原
    if (self->state == STATE_PASSIVE) {if (self->event == PEER_PRIMARY) {
            //  伙伴正在重启 - 转为激活态,伙伴将转为被动静。zclock_log ("I: 主机(slave)正在重启,可作为 master 运行。");
            self->state = STATE_ACTIVE;
        }
        else
        if (self->event == PEER_BACKUP) {
            //  伙伴正在重启 - 转为激活态,伙伴将转为被动静。zclock_log ("I: 备机(slave)正在重启,可作为 master 运行。");
            self->state = STATE_ACTIVE;
        }
        else
        if (self->event == PEER_PASSIVE) {
            //  若呈现两台 slave,集群将无响应
            zclock_log ("E: 严重错误:双 slave。正在退出");
            rc = -1;
        }
        else
        if (self->event == CLIENT_REQUEST) {
            //  若心跳超时,伙伴将成为 master;//  此行为由客户端申请触发。assert (self->peer_expiry > 0);
            if (zclock_time () >= self->peer_expiry) {
                //  伙伴已死,转为激活态。zclock_log ("I: 故障复原,可作为 master 运行。");
                self->state = STATE_ACTIVE;
            }
            else
                //  伙伴还在,拒绝请求。rc = -1;
        }
        //  触发状态更改事件处理函数
        if (self->state == STATE_ACTIVE && self->master_fn)
            (self->master_fn) (self->loop, NULL, self->master_arg);
    }
    return rc;
}
 
 
//  ---------------------------------------------------------------------
//  反应堆事件处理程序
 
//  发送状态信息
int s_send_state (zloop_t *loop, void *socket, void *arg)
{bstar_t *self = (bstar_t *) arg;
    zstr_sendf (self->statepub, "%d", self->state);
    return 0;
}
 
//  接管状态信息,启动无限状态机
int s_recv_state (zloop_t *loop, void *socket, void *arg)
{bstar_t *self = (bstar_t *) arg;
    char *state = zstr_recv (socket);
    if (state) {self->event = atoi (state);
        self->peer_expiry = zclock_time () + 2 * BSTAR_HEARTBEAT;
        free (state);
    }
    return s_execute_fsm (self);
}
 
//  收到应用程序申请,判断是否接管
int s_voter_ready (zloop_t *loop, void *socket, void *arg)
{bstar_t *self = (bstar_t *) arg;
    //  如果可能解决申请,则调用函数
    self->event = CLIENT_REQUEST;
    if (s_execute_fsm (self) == 0) {puts ("CLIENT REQUEST");
        (self->voter_fn) (self->loop, socket, self->voter_arg);
    }
    else {
        //  销毁期待中的音讯
        zmsg_t *msg = zmsg_recv (socket);
        zmsg_destroy (&msg);
    }
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  构造函数
 
bstar_t *
bstar_new (int primary, char *local, char *remote)
{
    bstar_t
        *self;
 
    self = (bstar_t *) zmalloc (sizeof (bstar_t));
 
    //  初始化双子星
    self->ctx = zctx_new ();
    self->loop = zloop_new ();
    self->state = primary? STATE_PRIMARY: STATE_BACKUP;
 
    //  创立状态 PUB 套接字
    self->statepub = zsocket_new (self->ctx, ZMQ_PUB);
    zsocket_bind (self->statepub, local);
 
    //  创立状态 SUB 套接字
    self->statesub = zsocket_new (self->ctx, ZMQ_SUB);
    zsocket_connect (self->statesub, remote);
 
    //  设置根本的反应堆事件处理器
    zloop_timer (self->loop, BSTAR_HEARTBEAT, 0, s_send_state, self);
    zloop_reader (self->loop, self->statesub, s_recv_state, self);
    return self;
}
 
 
//  ---------------------------------------------------------------------
//  析构函数
 
void
bstar_destroy (bstar_t **self_p)
{assert (self_p);
    if (*self_p) {
        bstar_t *self = *self_p;
        zloop_destroy (&self->loop);
        zctx_destroy (&self->ctx);
        free (self);
        *self_p = NULL;
    }
}
 
 
//  ---------------------------------------------------------------------
//  返回底层 zloop 对象,用以增加额定的定时器、阅读器等。zloop_t *
bstar_zloop (bstar_t *self)
{return self->loop;}
 
 
//  ---------------------------------------------------------------------
//  创立套接字,连贯至本地端点,注册成为阅读器;//  只有当无限状态机容许时才会读取该套接字;//  从该套接字取得的音讯将作为一次“投票”;//  咱们要求双子星模式中只有一个“投票”套接字。int
bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler,
             void *arg)
{
    //  保留原始的回调函数和参数,稍后应用
    void *socket = zsocket_new (self->ctx, type);
    zsocket_bind (socket, endpoint);
    assert (!self->voter_fn);
    self->voter_fn = handler;
    self->voter_arg = arg;
    return zloop_reader (self->loop, socket, s_voter_ready, self);
}
 
//  ---------------------------------------------------------------------
//  注册状态变动事件处理器
 
void
bstar_new_master (bstar_t *self, zloop_fn handler, void *arg)
{assert (!self->master_fn);
    self->master_fn = handler;
    self->master_arg = arg;
}
 
void
bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg)
{assert (!self->slave_fn);
    self->slave_fn = handler;
    self->slave_arg = arg;
}
 
 
//  ---------------------------------------------------------------------
//  启用或禁止跟踪信息
void bstar_set_verbose (bstar_t *self, Bool verbose)
{zloop_set_verbose (self->loop, verbose);
}
 
 
//  ---------------------------------------------------------------------
//  开启反应堆,当回调函数返回 -1,或过程收到 SIGINT、SIGTERM 信号时停止。int
bstar_start (bstar_t *self)
{assert (self->voter_fn);
    return zloop_start (self->loop);
}

这样一来,咱们的服务端代码会变得十分简短:

bstarsrv2: Binary Star server, using core class in C

//
//  双子星模式服务端,应用 bstar 反应堆
//
 
//  间接编译,不建类库
#include "bstar.c"
 
//  Echo service
int s_echo (zloop_t *loop, void *socket, void *arg)
{zmsg_t *msg = zmsg_recv (socket);
    zmsg_send (&msg, socket);
    return 0;
}
 
int main (int argc, char *argv [])
{
    //  命令行参数能够为://      -p  作为主机启动, at tcp://localhost:5001
    //      -b  作为备机启动, at tcp://localhost:5002
    bstar_t *bstar;
    if (argc == 2 && streq (argv [1], "-p")) {printf ("I: 主机 master,期待备机(slave)连贯。\n");
        bstar = bstar_new (BSTAR_PRIMARY,
            "tcp://*:5003", "tcp://localhost:5004");
        bstar_voter (bstar, "tcp://*:5001", ZMQ_ROUTER, s_echo, NULL);
    }
    else
    if (argc == 2 && streq (argv [1], "-b")) {printf ("I: 备机 slave,期待主机(master)连贯。\n");
        bstar = bstar_new (BSTAR_BACKUP,
            "tcp://*:5004", "tcp://localhost:5003");
        bstar_voter (bstar, "tcp://*:5002", ZMQ_ROUTER, s_echo, NULL);
    }
    else {printf ("Usage: bstarsrvs { -p | -b}\n");
        exit (0);
    }
    bstar_start (bstar);
    bstar_destroy (&bstar);
    return 0;
}

无中间件的可靠性(自在者模式)

咱们讲了那么多对于中间件的示例,如同有些违反“ZMQ 是无中间件”的说法。但要晓得在现实生活中,中间件始终是让人又爱又恨的货色。实际中的很多音讯架构能都在应用中间件进行分布式架构的搭建,所以说最终的决定还是须要你本人去衡量的。这也是为什么尽管我能驾车 10 分钟到一个大型商场里购买五箱音量,但我还是会抉择走 10 分钟到楼下的便利店里去买。这种出于经济方面的思考(工夫、精力、老本等)不仅在日常生活中很常见,在软件架构中也很重要。

这就是为什么 ZMQ 不会强制应用带有中间件的架构,但仍提供了像内置安装这样的中间件供编程人员自在选用。

这一节咱们会突破以往应用中间件进行可靠性设计的架构,转而应用点对点架构,即自在者模式,来进行牢靠的音讯传输。咱们的示例程序会是一个名称解析服务。ZMQ 中的一个常见问题是:咱们如何得悉须要连贯的端点?在代码中间接写入 TCP/IP 地址必定是不适合的;应用配置文件会造成治理上的不便。试想一下,你要在上百台计算机中进行配置,只是为了让它们晓得 google.com 的 IP 地址是 74.125.230.82。

一个 ZMQ 的名称解析服务须要实现的性能有:

  • 将逻辑名称解析为一个或多个端点地址,包含绑定端和连接端。理论应用时,名称服务会提供一组端点。
  • 容许咱们在不同的环境下,即开发环境和生产环境,进行解析;
  • 该服务必须是牢靠的,否则应用程序将无奈连贯到网络。

为管家模式提供名称解析服务会很有用,尽管将代理程序的端点对外裸露也很简略,然而如果用好名称解析服务,那它将成为惟一一个对外裸露的接口,将更便于管理。

咱们须要解决的故障类型有:服务解体或重启、服务过载、网络因素等。为获取可靠性,咱们必须建设一个服务群,当某个服务端解体后,客户端能够连贯其余的服务端。实际中,两个服务端就曾经足够了,但事实上服务端的数量能够是任意个。

在这个架构中,大量客户端和大量服务端进行通信,服务端将套接字绑定至独自的端口,这和管家模式中的代理有很大不同。对于客户端来说,它有这样几种抉择:

  • 客户端能够应用 REQ 套接字和懈怠海盗模式,但须要有一个机制避免客户端一直地申请已进行的服务端。
  • 客户端能够应用 DEALER 套接字,向所有的服务端发送申请。很简略,但并不太妙;
  • 客户端应用 ROUTER 套接字,连贯特定的服务端。但客户端如何得悉服务端的套接字标识呢?一种形式是让服务端被动连贯客户端(很简单),或者将服务端标识写入代码进行固化(很凌乱)。

模型一:简略重试

让咱们先尝试简略的计划,重写懈怠海盗模式,让其可能和多个服务端进行通信。启动服务端时用命令行参数指定端口。而后启动多个服务端。

flserver1: Freelance server, Model One in C

//
//  自在者模式 - 服务端 - 模型 1
//  提供 echo 服务
//
#include "czmq.h"
 
int main (int argc, char *argv [])
{if (argc < 2) {printf ("I: syntax: %s <endpoint>\n", argv [0]);
        exit (EXIT_SUCCESS);
    }
    zctx_t *ctx = zctx_new ();
    void *server = zsocket_new (ctx, ZMQ_REP);
    zsocket_bind (server, argv [1]);
 
    printf ("I: echo 服务端点:%s\n", argv [1]);
    while (TRUE) {zmsg_t *msg = zmsg_recv (server);
        if (!msg)
            break;          //  中断
        zmsg_send (&msg, server);
    }
    if (zctx_interrupted)
        printf ("W: 中断 \n");
 
    zctx_destroy (&ctx);
    return 0;
}

启动客户端,指定一个或多个端点:

flclient1: Freelance client, Model One in C

//
//  自在者模式 - 客户端 - 模型 1
//  应用 REQ 套接字申请一个或多个服务端
//
#include "czmq.h"
 
#define REQUEST_TIMEOUT     1000
#define MAX_RETRIES         3       //  尝试次数
 
 
static zmsg_t *
s_try_request (zctx_t *ctx, char *endpoint, zmsg_t *request)
{printf ("I: 在端点 %s 上尝试申请 echo 服务...\n", endpoint);
    void *client = zsocket_new (ctx, ZMQ_REQ);
    zsocket_connect (client, endpoint);
 
    //  发送申请,并期待应答
    zmsg_t *msg = zmsg_dup (request);
    zmsg_send (&msg, client);
    zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0} };
    zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
    zmsg_t *reply = NULL;
    if (items [0].revents & ZMQ_POLLIN)
        reply = zmsg_recv (client);
 
    //  敞开套接字
    zsocket_destroy (ctx, client);
    return reply;
}
 
 
int main (int argc, char *argv [])
{zctx_t *ctx = zctx_new ();
    zmsg_t *request = zmsg_new ();
    zmsg_addstr (request, "Hello world");
    zmsg_t *reply = NULL;
 
    int endpoints = argc - 1;
    if (endpoints == 0)
        printf ("I: syntax: %s <endpoint> ...\n", argv [0]);
    else
    if (endpoints == 1) {
        //  若只有一个端点,则尝试 N 次
        int retries;
        for (retries = 0; retries < MAX_RETRIES; retries++) {char *endpoint = argv [1];
            reply = s_try_request (ctx, endpoint, request);
            if (reply)
                break;          //  胜利
            printf ("W: 没有收到 %s 的应答, 筹备重试...\n", endpoint);
        }
    }
    else {
        //  若有多个端点,则每个尝试一次
        int endpoint_nbr;
        for (endpoint_nbr = 0; endpoint_nbr < endpoints; endpoint_nbr++) {char *endpoint = argv [endpoint_nbr + 1];
            reply = s_try_request (ctx, endpoint, request);
            if (reply)
                break;          //  Successful
            printf ("W: 没有收到 %s 的应答 \n", endpoint);
        }
    }
    if (reply)
        printf ("服务运作失常 \n");
 
    zmsg_destroy (&request);
    zmsg_destroy (&reply);
    zctx_destroy (&ctx);
    return 0;
}

可用如下命令运行:

flserver1 tcp://*:5555 &
flserver1 tcp://*:5556 &
flclient1 tcp://localhost:5555 tcp://localhost:5556

客户端的外围机制是懈怠海盗模式,即取得一次胜利的应答后就完结。会有两种状况:

  • 如果只有一个服务端,客户端会再尝试 N 次后进行,这和懈怠海盗模式的逻辑统一;
  • 如果有多个服务端,客户端会每个尝试一次,收到应答后进行。

这种机制补充了海盗模式,使其可能克服只有一个服务端的状况。

然而,这种设计无奈在事实程序中应用:当有很多客户端连贯了服务端,而主服务端解体了,那所有客户端都须要在超时后能力继续执行。

模型二:批量发送

上面让咱们应用 DEALER 套接字。咱们的指标是能再最短的工夫里收到一个应答,不能受主服务端解体的影响。能够采取以下措施:

  • 连贯所有的服务端;
  • 当有申请时,一次性发送给所有的服务端;
  • 期待第一个应答;
  • 疏忽其余应答。

这样设计客户端时,当发送申请后,所有的服务端都会收到这个申请,并返回应答。如果某个服务端断开连接了,ZMQ 可能会将申请发给其余服务端,导致某些服务端会收到两次申请。

更麻烦的是客户端无奈得悉应答的数量,容易产生凌乱。

咱们能够为申请进行编号,疏忽不匹配的应答。咱们要对服务端进行革新,返回的音讯中须要蕴含申请编号:
flserver2: Freelance server, Model Two in C

//
//  自在者模式 - 服务端 - 模型 2
//  返回带有申请编号的 OK 信息
//
#include "czmq.h"
 
int main (int argc, char *argv [])
{if (argc < 2) {printf ("I: syntax: %s <endpoint>\n", argv [0]);
        exit (EXIT_SUCCESS);
    }
    zctx_t *ctx = zctx_new ();
    void *server = zsocket_new (ctx, ZMQ_REP);
    zsocket_bind (server, argv [1]);
 
    printf ("I: 服务已就绪 %s\n", argv [1]);
    while (TRUE) {zmsg_t *request = zmsg_recv (server);
        if (!request)
            break;          //  中断
        //  判断申请内容是否正确
        assert (zmsg_size (request) == 2);
 
        zframe_t *address = zmsg_pop (request);
        zmsg_destroy (&request);
 
        zmsg_t *reply = zmsg_new ();
        zmsg_add (reply, address);
        zmsg_addstr (reply, "OK");
        zmsg_send (&reply, server);
    }
    if (zctx_interrupted)
        printf ("W: interrupted\n");
 
    zctx_destroy (&ctx);
    return 0;
}

客户端代码:

flclient2: Freelance client, Model Two in C

//
//  自在者模式 - 客户端 - 模型 2
//  应用 DEALER 套接字发送批量音讯
//
#include "czmq.h"
 
//  超时工夫
#define GLOBAL_TIMEOUT 2500
 
//  将客户端 API 封装成一个类
 
#ifdef __cplusplus
extern "C" {
#endif
 
//  申明类构造
typedef struct _flclient_t flclient_t;
 
flclient_t *
    flclient_new (void);
void
    flclient_destroy (flclient_t **self_p);
void
    flclient_connect (flclient_t *self, char *endpoint);
zmsg_t *
    flclient_request (flclient_t *self, zmsg_t **request_p);
 
#ifdef __cplusplus
}
#endif
 
 
int main (int argc, char *argv [])
{if (argc == 1) {printf ("I: syntax: %s <endpoint> ...\n", argv [0]);
        exit (EXIT_SUCCESS);
    }
    //  创立自在者模式客户端
    flclient_t *client = flclient_new ();
 
    //  连贯至各个端点
    int argn;
    for (argn = 1; argn < argc; argn++)
        flclient_connect (client, argv [argn]);
 
    //  发送一组申请,并记录时间
    int requests = 10000;
    uint64_t start = zclock_time ();
    while (requests--) {zmsg_t *request = zmsg_new ();
        zmsg_addstr (request, "random name");
        zmsg_t *reply = flclient_request (client, &request);
        if (!reply) {printf ("E: 名称解析服务不可用,正在退出 \n");
            break;
        }
        zmsg_destroy (&reply);
    }
    printf ("均匀申请工夫: %d 微秒 \n",
        (int) (zclock_time () - start) / 10);
 
    flclient_destroy (&client);
    return 0;
}
 
 
 
//  --------------------------------------------------------------------
//  类构造
 
struct _flclient_t {
    zctx_t *ctx;        //  上下文
    void *socket;       //  用于和服务端通信的 DEALER 套接字
    size_t servers;     //  以连贯的服务端数量
    uint sequence;      //  已发送的申请数
};
 
 
//  --------------------------------------------------------------------
//  Constructor
 
flclient_t *
flclient_new (void)
{
    flclient_t
        *self;
 
    self = (flclient_t *) zmalloc (sizeof (flclient_t));
    self->ctx = zctx_new ();
    self->socket = zsocket_new (self->ctx, ZMQ_DEALER);
    return self;
}
 
//  --------------------------------------------------------------------
//  析构函数
 
void
flclient_destroy (flclient_t **self_p)
{assert (self_p);
    if (*self_p) {
        flclient_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self);
        *self_p = NULL;
    }
}
 
//  --------------------------------------------------------------------
//  连贯至新的服务端端点
 
void
flclient_connect (flclient_t *self, char *endpoint)
{assert (self);
    zsocket_connect (self->socket, endpoint);
    self->servers++;
}
 
//  --------------------------------------------------------------------
//  发送申请,接管应答
//  发送后销毁申请
 
zmsg_t *
flclient_request (flclient_t *self, zmsg_t **request_p)
{assert (self);
    assert (*request_p);
    zmsg_t *request = *request_p;
 
    //  向音讯增加编号和空帧
    char sequence_text [10];
    sprintf (sequence_text, "%u", ++self->sequence);
    zmsg_pushstr (request, sequence_text);
    zmsg_pushstr (request, "");
 
    //  向所有已连贯的服务端发送申请
    int server;
    for (server = 0; server < self->servers; server++) {zmsg_t *msg = zmsg_dup (request);
        zmsg_send (&msg, self->socket);
    }
    //  接管来自任何服务端的应答
    //  因为咱们可能 poll 屡次,所以每次都进行计算
    zmsg_t *reply = NULL;
    uint64_t endtime = zclock_time () + GLOBAL_TIMEOUT;
    while (zclock_time () < endtime) {zmq_pollitem_t items [] = {{ self->socket, 0, ZMQ_POLLIN, 0} };
        zmq_poll (items, 1, (endtime - zclock_time ()) * ZMQ_POLL_MSEC);
        if (items [0].revents & ZMQ_POLLIN) {//  应答内容是 [empty][sequence][OK]
            reply = zmsg_recv (self->socket);
            assert (zmsg_size (reply) == 3);
            free (zmsg_popstr (reply));
            char *sequence = zmsg_popstr (reply);
            int sequence_nbr = atoi (sequence);
            free (sequence);
            if (sequence_nbr == self->sequence)
                break;
        }
    }
    zmsg_destroy (request_p);
    return reply;
}

几点阐明:

  • 客户端被封装成了一个 API 类,将简单的代码都包装了起来。
  • 客户端会在几秒之后放弃寻找可用的服务端;
  • 客户端须要创立一个非法的 REP 信封,所以须要增加一个空帧。

程序中,客户端收回了 1 万次名称解析申请(尽管是假的),并计算均匀消耗工夫。在我的测试机上,有一个服务端时,耗时 60 奥妙;三个时 80 奥妙。

该模型的优缺点是:

  • 长处:简略,容易了解和编写;
  • 长处:它工作迅速,有重试机制;
  • 毛病:占用了额定的网络带宽;
  • 毛病:咱们不能为服务端设置优先级,如主服务、次服务等;
  • 毛病:服务端不能同时解决多个申请。

Model Three – Complex and Nasty

批量发送模型看起来不太实在,那就让咱们来摸索最初这个极度简单的模型。很有可能在编写完之后咱们又会转而应用批量发送,哈哈,这就是我的风格。

咱们能够将客户端应用的套接字更换为 ROUTER,让咱们可能向特定的服务端发送申请,进行向已死亡的服务端发送申请,从而做得尽可能地智能。咱们还能够将服务端的套接字更换为 ROUTER,从而冲破单线程的瓶颈。

然而,应用 ROUTER-ROUTER 套接字连贯两个刹时套接字是不可行的,节点只有在收到第一条音讯时才会为对方生成套接字标识。惟一的办法是让其中一个节点应用长久化的套接字,比拟好的形式是让客户端晓得服务端的标识,即服务端作为长久化的套接字。

为了防止产生新的配置项,咱们间接应用服务端的端点作为套接字标识。

回忆一下 ZMQ 套接字标识是如何工作的。服务端的 ROUTER 套接字为本人设置一个标识(在绑定之前),当客户端连贯时,通过一个握手的过程来替换单方的标识。客户端的 ROUTER 套接字会先发送一条空音讯,服务端为客户端生成一个随机的 UUID。而后,服务端会向客户端发送本人的标识。

这样一来,客户端就能够将音讯发送给特定的服务端了。不过还有一个问题:咱们不晓得服务端会在什么时候实现这个握手的过程。如果服务端是在线的,那可能几毫秒就能实现。如果不在线,那可能须要很久很久。

这里有一个矛盾:咱们须要晓得服务端何时连贯胜利且可能开始工作。自在者模式不像中间件模式,它的服务端必须要先发送申请后能力的应答。所以在服务端发送音讯给客户端之前,客户端必须要先申请服务端,这看似是不可能的。

我有一个解决办法,那就是批量发送。这里发送的不是真正的申请,而是一个试探性的心跳(PING-PONG)。当收到应答时,就阐明对方是在线的。

上面让咱们制订一个协定,来定义自在者模式是如何传递这种心跳的:

  • http://rfc.zeromq.org/spec:10

实现这个协定的服务端很不便,上面就是通过革新的 echo 服务:

flserver3: Freelance server, Model Three in C

//
//  自在者模式 - 服务端 - 模型 3
//  应用 ROUTER-ROUTER 套接字进行通信;单线程。//
#include "czmq.h"
 
int main (int argc, char *argv [])
{int verbose = (argc > 1 && streq (argv [1], "-v"));
 
    zctx_t *ctx = zctx_new ();
 
    //  筹备服务端套接字,其标识和端点名雷同
    char *bind_endpoint = "tcp://*:5555";
    char *connect_endpoint = "tcp://localhost:5555";
    void *server = zsocket_new (ctx, ZMQ_ROUTER);
    zmq_setsockopt (server,
        ZMQ_IDENTITY, connect_endpoint, strlen (connect_endpoint));
    zsocket_bind (server, bind_endpoint);
    printf ("I: 服务端已准备就绪 %s\n", bind_endpoint);
 
    while (!zctx_interrupted) {zmsg_t *request = zmsg_recv (server);
        if (verbose && request)
            zmsg_dump (request);
        if (!request)
            break;          //  中断
 
        //  Frame 0: 客户端标识
        //  Frame 1: 心跳,或客户端管制信息帧
        //  Frame 2: 申请内容
        zframe_t *address = zmsg_pop (request);
        zframe_t *control = zmsg_pop (request);
        zmsg_t *reply = zmsg_new ();
        if (zframe_streq (control, "PONG"))
            zmsg_addstr (reply, "PONG");
        else {zmsg_add (reply, control);
            zmsg_addstr (reply, "OK");
        }
        zmsg_destroy (&request);
        zmsg_push (reply, address);
        if (verbose && reply)
            zmsg_dump (reply);
        zmsg_send (&reply, server);
    }
    if (zctx_interrupted)
        printf ("W: 中断 \n");
 
    zctx_destroy (&ctx);
    return 0;
}

然而,自在者模式的客户端会变得大一写。为了清晰期间,咱们将其拆分为两个类来实现。首先是在下层应用的程序:

flclient3: Freelance client, Model Three in C

//
//  自在者模式 - 客户端 - 模型 3
//  应用 flcliapi 类来封装自在者模式
//
//  间接编译,不建类库
#include "flcliapi.c"
 
int main (void)
{
    //  创立自在者模式实例
    flcliapi_t *client = flcliapi_new ();
 
    //  链接至服务器端点
    flcliapi_connect (client, "tcp://localhost:5555");
    flcliapi_connect (client, "tcp://localhost:5556");
    flcliapi_connect (client, "tcp://localhost:5557");
 
    //  发送随机申请,计算工夫
    int requests = 1000;
    uint64_t start = zclock_time ();
    while (requests--) {zmsg_t *request = zmsg_new ();
        zmsg_addstr (request, "random name");
        zmsg_t *reply = flcliapi_request (client, &request);
        if (!reply) {printf ("E: 名称解析服务不可用,正在退出 \n");
            break;
        }
        zmsg_destroy (&reply);
    }
    printf ("均匀执行工夫:%d usec\n",
        (int) (zclock_time () - start) / 10);
 
    flcliapi_destroy (&client);
    return 0;
}

上面是该模式简单的实现过程:

flcliapi: Freelance client API in C

/*  =====================================================================
    flcliapi - Freelance Pattern agent class
    Model 3: uses ROUTER socket to address specific services
 
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
 
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
 
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
 
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
 
#include "flcliapi.h"
 
//  申请超时工夫
#define GLOBAL_TIMEOUT  3000    //  msecs
//  心跳距离
#define PING_INTERVAL   2000    //  msecs
//  断定服务死亡的工夫
#define SERVER_TTL      6000    //  msecs
 
 
//  =====================================================================
//  同步局部,在应用程序层面运行
 
//  ---------------------------------------------------------------------
//  类构造
 
struct _flcliapi_t {
    zctx_t *ctx;        //  上下文
    void *pipe;         //  用于和主线程通信的套接字
};
 
//  这是运行后盾代理程序的线程
static void flcliapi_agent (void *args, zctx_t *ctx, void *pipe);
 
 
//  ---------------------------------------------------------------------
//  构造函数
 
flcliapi_t *
flcliapi_new (void)
{
    flcliapi_t
        *self;
 
    self = (flcliapi_t *) zmalloc (sizeof (flcliapi_t));
    self->ctx = zctx_new ();
    self->pipe = zthread_fork (self->ctx, flcliapi_agent, NULL);
    return self;
}
 
//  ---------------------------------------------------------------------
//  析构函数
 
void
flcliapi_destroy (flcliapi_t **self_p)
{assert (self_p);
    if (*self_p) {
        flcliapi_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self);
        *self_p = NULL;
    }
}
 
//  ---------------------------------------------------------------------
//  连贯至新服务器端点
//  音讯内容:[CONNECT][endpoint]
 
void
flcliapi_connect (flcliapi_t *self, char *endpoint)
{assert (self);
    assert (endpoint);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "CONNECT");
    zmsg_addstr (msg, endpoint);
    zmsg_send (&msg, self->pipe);
    zclock_sleep (100);      //  期待连贯
}
 
//  ---------------------------------------------------------------------
//  发送并销毁申请,接管应答
 
zmsg_t *
flcliapi_request (flcliapi_t *self, zmsg_t **request_p)
{assert (self);
    assert (*request_p);
 
    zmsg_pushstr (*request_p, "REQUEST");
    zmsg_send (request_p, self->pipe);
    zmsg_t *reply = zmsg_recv (self->pipe);
    if (reply) {char *status = zmsg_popstr (reply);
        if (streq (status, "FAILED"))
            zmsg_destroy (&reply);
        free (status);
    }
    return reply;
}
 
 
//  =====================================================================
//  异步局部,在后盾运行
 
//  ---------------------------------------------------------------------
//  单个服务端信息
 
typedef struct {
    char *endpoint;             //  服务端端点 / 套接字标识
    uint alive;                 //  是否在线
    int64_t ping_at;            //  下一次心跳工夫
    int64_t expires;            //  过期工夫
} server_t;
 
server_t *
server_new (char *endpoint)
{server_t *self = (server_t *) zmalloc (sizeof (server_t));
    self->endpoint = strdup (endpoint);
    self->alive = 0;
    self->ping_at = zclock_time () + PING_INTERVAL;
    self->expires = zclock_time () + SERVER_TTL;
    return self;
}
 
void
server_destroy (server_t **self_p)
{assert (self_p);
    if (*self_p) {
        server_t *self = *self_p;
        free (self->endpoint);
        free (self);
        *self_p = NULL;
    }
}
 
int
server_ping (char *key, void *server, void *socket)
{server_t *self = (server_t *) server;
    if (zclock_time () >= self->ping_at) {zmsg_t *ping = zmsg_new ();
        zmsg_addstr (ping, self->endpoint);
        zmsg_addstr (ping, "PING");
        zmsg_send (&ping, socket);
        self->ping_at = zclock_time () + PING_INTERVAL;}
    return 0;
}
 
int
server_tickless (char *key, void *server, void *arg)
{server_t *self = (server_t *) server;
    uint64_t *tickless = (uint64_t *) arg;
    if (*tickless > self->ping_at)
        *tickless = self->ping_at;
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  后盾处理程序信息
 
typedef struct {
    zctx_t *ctx;                //  上下文
    void *pipe;                 //  用于应用程序通信的套接字
    void *router;               //  用于服务端通信的套接字
    zhash_t *servers;           //  已连贯的服务端
    zlist_t *actives;           //  在线的服务端
    uint sequence;              //  申请编号
    zmsg_t *request;            //  以后申请
    zmsg_t *reply;              //  以后应答
    int64_t expires;            //  申请过期工夫
} agent_t;
 
agent_t *
agent_new (zctx_t *ctx, void *pipe)
{agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
    self->ctx = ctx;
    self->pipe = pipe;
    self->router = zsocket_new (self->ctx, ZMQ_ROUTER);
    self->servers = zhash_new ();
    self->actives = zlist_new ();
    return self;
}
 
void
agent_destroy (agent_t **self_p)
{assert (self_p);
    if (*self_p) {
        agent_t *self = *self_p;
        zhash_destroy (&self->servers);
        zlist_destroy (&self->actives);
        zmsg_destroy (&self->request);
        zmsg_destroy (&self->reply);
        free (self);
        *self_p = NULL;
    }
}
 
//  当服务端从列表中移除时,回调该函数。static void
s_server_free (void *argument)
{server_t *server = (server_t *) argument;
    server_destroy (&server);
}
 
void
agent_control_message (agent_t *self)
{zmsg_t *msg = zmsg_recv (self->pipe);
    char *command = zmsg_popstr (msg);
 
    if (streq (command, "CONNECT")) {char *endpoint = zmsg_popstr (msg);
        printf ("I: connecting to %s...\n", endpoint);
        int rc = zmq_connect (self->router, endpoint);
        assert (rc == 0);
        server_t *server = server_new (endpoint);
        zhash_insert (self->servers, endpoint, server);
        zhash_freefn (self->servers, endpoint, s_server_free);
        zlist_append (self->actives, server);
        server->ping_at = zclock_time () + PING_INTERVAL;
        server->expires = zclock_time () + SERVER_TTL;
        free (endpoint);
    }
    else
    if (streq (command, "REQUEST")) {assert (!self->request);    //  遵循申请 - 应答循环
        //  将申请编号和空帧退出音讯顶部
        char sequence_text [10];
        sprintf (sequence_text, "%u", ++self->sequence);
        zmsg_pushstr (msg, sequence_text);
        //  获取申请音讯的所有权
        self->request = msg;
        msg = NULL;
        //  设置申请过期工夫
        self->expires = zclock_time () + GLOBAL_TIMEOUT;}
    free (command);
    zmsg_destroy (&msg);
}
 
void
agent_router_message (agent_t *self)
{zmsg_t *reply = zmsg_recv (self->router);
 
    //  第一帧是应答的服务端标识
    char *endpoint = zmsg_popstr (reply);
    server_t *server =
        (server_t *) zhash_lookup (self->servers, endpoint);
    assert (server);
    free (endpoint);
    if (!server->alive) {zlist_append (self->actives, server);
        server->alive = 1;
    }
    server->ping_at = zclock_time () + PING_INTERVAL;
    server->expires = zclock_time () + SERVER_TTL;
 
    //  第二帧是应答的编号
    char *sequence = zmsg_popstr (reply);
    if (atoi (sequence) == self->sequence) {zmsg_pushstr (reply, "OK");
        zmsg_send (&reply, self->pipe);
        zmsg_destroy (&self->request);
    }
    else
        zmsg_destroy (&reply);
}
 
 
//  ---------------------------------------------------------------------
//  异步的后盾代理会保护一个服务端池,解决申请和应答。static void
flcliapi_agent (void *args, zctx_t *ctx, void *pipe)
{agent_t *self = agent_new (ctx, pipe);
 
    zmq_pollitem_t items [] = {{ self->pipe, 0, ZMQ_POLLIN, 0},
        {self->router, 0, ZMQ_POLLIN, 0}
    };
    while (!zctx_interrupted) {
        //  计算超时工夫
        uint64_t tickless = zclock_time () + 1000 * 3600;
        if (self->request
        &&  tickless > self->expires)
            tickless = self->expires;
        zhash_foreach (self->servers, server_tickless, &tickless);
 
        int rc = zmq_poll (items, 2,
            (tickless - zclock_time ()) * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  上下文对象被敞开
 
        if (items [0].revents & ZMQ_POLLIN)
            agent_control_message (self);
 
        if (items [1].revents & ZMQ_POLLIN)
            agent_router_message (self);
 
        //  如果咱们须要解决一项申请,将其发送给下一个可用的服务端
        if (self->request) {if (zclock_time () >= self->expires) {
                //  申请超时
                zstr_send (self->pipe, "FAILED");
                zmsg_destroy (&self->request);
            }
            else {
                //  寻找可用的服务端
                while (zlist_size (self->actives)) {
                    server_t *server =
                        (server_t *) zlist_first (self->actives);
                    if (zclock_time () >= server->expires) {zlist_pop (self->actives);
                        server->alive = 0;
                    }
                    else {zmsg_t *request = zmsg_dup (self->request);
                        zmsg_pushstr (request, server->endpoint);
                        zmsg_send (&request, self->router);
                        break;
                    }
                }
            }
        }
        //  断开并删除已过期的服务端
        //  发送心跳给闲暇服务器
        zhash_foreach (self->servers, server_ping, self->router);
    }
    agent_destroy (&self);
}

这组 API 应用了较为简单的机制,咱们之前也有用到过:

异步后盾代理

客户端 API 由两局部组成:同步的 flcliapi 类,运行于应用程序线程;异步的 agent 类,运行于后盾线程。flcliapi 和 agent 类通过一个 inproc 套接字相互通信。所有和 ZMQ 相干的内容都封装在 API 中。agent 类本质上是作为一个迷你的代理程序在运行,负责在后盾与服务端进行通信,只有咱们发送申请,它就会设法连贯一个服务器来解决申请。

连贯期待机制

ROUTER 套接字的特点之一是会间接抛弃无奈路由的音讯,这就意味着当与服务器建设了 ROUTER-ROUTER 连贯后,如果立即发送一条音讯,该音讯是会失落的。flcliapi 类则提早了一会儿后再发送音讯。之后的通信中,因为服务端套接字是长久的,客户端就不再抛弃音讯了。

Ping silence

0MQ will queue messages for a dead server indefinitely. So if a client repeatedly PINGs a dead server, when that server comes back to life it’ll get a whole bunch of PING messages all at once. Rather than continuing to ping a server we know is offline, we count on 0MQ’s handling of durable sockets to deliver the old PING messages when the server comes back online. As soon as a server reconnects, it’ll get PINGs from all clients that were connected to it, it’ll PONG back, and those clients will recognize it as alive again.

调整轮询工夫

在之前的示例程序中,咱们个别会为轮询设置固定的超时工夫(如 1 秒),这种做法尽管简略,然而对于用电较为敏感的设施来说(如笔记本电脑或手机)唤醒 CPU 是须要额定的电力的。所以,为了完满也好,好玩也好,咱们这里调整了轮询工夫,将其设置为达到过期工夫时才超时,这样就能节俭一部分轮询次数了。咱们能够将过期工夫放入一个列表中存储,不便查问。

总结

这一章中咱们看到了很多牢靠的申请 - 应答机制,每种机制都有其优劣性。大部分示例代码是能够间接用于生产环境的,不过还能够进一步优化。有两个模式会比拟典型:应用了中间件的管家模式,以及未应用中间件的自在者模式。

退出移动版