乐趣区

开源一个kafka增强okmq100

本工具的核心思想就是:赌。只有两个基础组件同时死亡,才会受到 严重影响。哦,断电除外。

mq 是个好东西,我们都在用。这也决定了 mq 应该是高高高可用的。某团就因为这个组件,出了好几次生产事故,呵呵。

大部分业务系统,要求的消息语义都是at least once,即都会有重复消息,但保证不会丢。即使这样,依然有很多问题:

一、mq 可用性无法保证。 mq 的意外死亡,造成生产端发送失败。很多消息要通过扒取日志进行回放,成本高耗时长。

二、mq 阻塞业务正常进行。 mq 卡顿或者网络问题,会造成业务线程卡在 mq 的发送方法上,正常业务进行不下去,造成灾难性的后果。

三、消息延迟。 mq 死了就用不着说了,消息还没投胎就已死亡。消息延迟主要是客户端消费能力不强,或者是消费通道单一造成的。

使用组合存储来保证消息的可靠投递,就是okmq

注意:okmq 注重的是可靠性。对于顺序性、事务等其他要素,不予考虑。当然,速度是必须的。

设计想法

我即使用两套 redis 来模拟一些 mq 操作,都会比现有的一些解决方案要强。但这肯定不是我们需要的,因为 redis 的堆积能力太有限,内存占用率直线上升的感觉并不太好。

但我们可以用 redis 来作为额外的发送确认机制。这个想法,在《使用多线程增加 kafka 消费能力》一文中曾经提到过,现在到了实现的时候了。

首先看下使用 Api

OkmqKafkaProducer producer = new ProducerBuilder()
.defaultSerializer()
.eanbleHa("redis")
.any("okmq.redis.mode", "single")
.any("okmq.redis.endpoint", "127.0.0.1:6379")
.any("okmq.redis.poolConfig.maxTotal", 100)
.servers("localhost:9092")
.clientID("okMQProducerTest")
.build();

Packet packet = new Packet();
packet.setTopic("okmq-test-topic");
packet.setContent("i will send you a msg");
producer.sendAsync(packet, null);
producer.shutdown();

以 redis 为例


我们按照数字标号来介绍:

1、 在消息发送到 kafka 之前,首先入库 redis。由于后续回调需要用到一个唯一表示,我们在 packet 包里添加了一个 uuid。

2、 调用底层的 api,进行真正的消息投递。

3、 通过监听 kafka 的回调,删除 redis 中对应的 key。在这里可以得到某条消息确切的的 ack 时间。那么长时间没有删除的,就算是投递失败的消息。

4、 后台会有一个线程进行这些失败消息的遍历和重新投递。我们叫做 recovery。最复杂的也就是这一部分。对于 redis 来说,会首先争抢一个持续 5min 的锁,然后遍历相关 hashkey。

所以,对于以上代码,redis 发出以下命令:

1559206423.395597 [0 127.0.0.1:62858] "HEXISTS" "okmq:indexhash" "okmq:5197354"
1559206423.396670 [0 127.0.0.1:62858] "HSET" "okmq:indexhash" "okmq:5197354" ""1559206423.397300 [0 127.0.0.1:62858]"HSET""okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" "{\"content\":\"i will send you a msg104736623015238\",\"topic\":\"okmq-test-topic\",\"identify\":\"2b9b33fd-95fd-4cd6-8815-4c572f13f76e\",\"timestamp\":1559206423318}"
1559206423.676212 [0 127.0.0.1:62858] "HDEL" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e"
1559206428.327788 [0 127.0.0.1:62861] "SET" "okmq:recovery:lock" "01fb85a9-0670-40c3-8386-b2b7178d4faf" "px" "300000"
1559206428.337930 [0 127.0.0.1:62858] "HGETALL" "okmq:indexhash"
1559206428.341365 [0 127.0.0.1:62858] "HSCAN" "okmq:5197354" "0"
1559206428.342446 [0 127.0.0.1:62858] "HDEL" "okmq:indexhash" "okmq:5197354"
1559206428.342788 [0 127.0.0.1:62861] "GET" "okmq:recovery:lock"
1559206428.343119 [0 127.0.0.1:62861] "DEL" "okmq:recovery:lock"

以上问题解答

所以对于以上的三个问题,回答如下:

一、mq 可用性无法保证。

为什么要要通过事后进行恢复呢?我把 recovery 机制带着不是更好么?通过对未收到 ack 的消息进行遍历,可以把这个过程做成自动化。

二、mq 阻塞业务正常进行。

通过设置 kafka 的 MAX_BLOCK_MS_CONFIG
参数,其实是可以不阻塞业务的,但会丢失消息。我可以使用其他存储来保证这些丢失的消息重新发送。

三、消息延迟。

mq 死掉了,依然有其他备用通道进行正常服务。也有的团队采用双写 mq 双消费的方式来保证这个过程,也是被逼急了:)。如果 kafka 死掉了,业务会切换到备用通道进行消费。

扩展自己的 HA

如果你不想用 redis,比如你先要用 hbase,那也是很简单的。
但需要实现一个 HA 接口。

public interface HA {void close();

    void configure(Properties properties);

    void preSend(Packet packet) throws HaException;

    void postSend(Packet packet) throws HaException;

    void doRecovery(AbstractProducer producer) throws HaException;
}

使用之前,还需要注册一下你的插件。

AbstractProducer.register("log", "com.sayhiai.arch.okmq.api.producer.ha.Ha2SimpleLog");

重要参数

okmq.ha.recoveryPeriod 恢复线程检测周期,默认 5 秒

okmq.redis.mode redis 的集群模式,可选:single、sentinel、cluster
okmq.redis.endpoint 地址,多个地址以, 分隔
okmq.redis.connectionTimeout 连接超时
okmq.redis.soTimeout socket 超时
okmq.redis.lockPx 分布式锁的持有时间,可默认,5min
okmq.redis.splitMillis 间隔时间,redis 换一个 key 进行运算,默认 5min
okmq.redis.poolConfig.* 兼容 jedis 的所有参数

1.0.0 版本功能

1、进行了生产端的高可用抽象,实现了 kafka 的样例。

2、增加了 SimpleLog 的 ping、pong 日志实现。

3、增加了 Redis 的生产端备用通道。包含 single、cluster、sentinel 三种模式。

4、可以自定义其他备用通道。

5、兼容 kakfa 的所有参数设置。

规划

2.0.0

1、实现 ActiveMQ 的集成。

2、实现消费者的备用通道集成。

3、增加嵌入式 kv 存储的生产者集成。

4、更精细的控制系统的行为。

5、加入开关和预热,避免新启动 mq 即被压垮。

6、redis 分片机制,大型系统专用。

3.0.0

1、监控功能添加。

2、rest 接口添加。

使用限制

当你把参数 ha 设置为 true,表明你已经收到以下的使用限制。反之,系统反应于原生无异。

使用限制:
本工具仅适用于非顺序性、非事务性的普通消息投递,且客户端已经做了幂等。一些订单系统、消息通知等业务,非常适合。如果你需要其他特性,请跳出此页面。

kafka 死亡,或者 redis 单独死亡,消息最终都会被发出,仅当 kafka 与 redis 同时死亡,消息才会发送失败,并记录在日志文件里。

正常情况下,redis 的使用容量极少极少。异常情况下,redis 的容量有限,会迅速占满。redis 的剩余时间就是你的StopWatch,你必须在这个时间内恢复你的消息系统,一定要顶住哇。

End

系统目前处于 1.0.0 版本,正在线上小规模试用。工具小众,但适用于大部分应用场景。如果你正在寻求这样的解决方案,欢迎一块完善代码。

github 地址:

https://github.com/sayhiai/okmq

也欢迎关注《小姐姐味道》微信公众号,进行交流。

退出移动版