kafakarocketmq

71次阅读

共计 4189 个字符,预计需要花费 11 分钟才能阅读完成。

kafaka

官方:发布订阅,流处理管道和存储
https://kafka.apache.org/docu…

  • 组件

    broker
    topic(一个 queue)
    partition(物理分布,一个 topic 包含一个或多个 partition, 可以分布在不同的 broker 上)
    producer(与 broker leader 直连,负载均衡指定 partition, 可批次发,可设置要 ack 的副本数)
    consumer/consumer group
    partition:

    index 全部映射到内存,每个 partition 下自增 id

    元数据放在 zk 上. 分 partition, 每个 partition 副本分散在 broker 上,单 partition+ 单消费才能顺(rocketmq 一样)。每个 partition 一个索引,顺序写一个文件。流处理 + 批量处理,实时上有取舍。

    https://kafka.apache.org/docu…

  • 高可用和可扩展

    1) Producer 端使用 zookeeper 用来 ” 发现 ”broker 列表, 以及和 Topic 下每个 partition leader 建立 socket 连接并发送消息.
    2) Broker 端使用 zookeeper 用来注册 broker 信息, 已经监测 partition leader 存活性. 所有的 Kafka Broker 节点一起去 Zookeeper 上注册一个临时节点,成功的为 Broker controller,失效后 zk 后发现重新注册节点,controller 负责各 broker 内 partition 的选主(ISR 中,记录 replica 进度,随便选)ISR,在这个集合中的节点都是和 leader 保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为 leader. 如果 ISR 的大小超过某个最小值,则分区将仅接受写入,以防止丢失仅写入单个副本的消息(只关注 ISR,而不是共识多个都写入,多数(两个故障需要 5 个副本,一个要三个)对于主数据的写代价大)
    3) Consumer 端使用 zookeeper 用来注册 consumer 信息, 其中包括 consumer 消费的 partition 列表等, 同时也用来发现 broker 列表, 并和 partition leader 建立 socket 连接, 并获取消息。
    broker,partition,customer 组内线程可扩展。

  • 消费
    只保证一个 partition 被一个 customer 消费有序
    producter 推,customer 拉(拉需要存日志)
    partition 中的每个 message 只能被组(Consumer group)中的一个 consumer(consumer 线程)消费,若多个同时要配多个 Consumer group。
    kafka 中的消息是批量(通常以消息的条数或者 chunk 的尺寸为单位) 发送给 consumer,当消息被 consumer 接收之后, 负责维护消息的消费记录(JMS 等都是 broker 维护),consumer 可以在本地保存最后消息的 offset, 并间歇性的向 zookeeper 注册 offset. 也没有 ACK
    消息消费的可靠性,消费者控制,最多一次,先保存 offset 再处理;至少一次,先处理再保存 offset;只一次:最少 1 次+消费者的输出中额外增加已处理消息最大编号
  • 日志压缩
    确保有每个分区数据日志中每个 key 有最后已知值,offset 不能变。对同一 partition 的多个文件一起压缩合并。
    position 是文件的 bytes 偏移吧?压缩过程中要重建索引和位置?
    active 不动(不影响写入),对 cleaner point 后面的做压缩,选择日志 tail 和 header 比例小的,合并压缩每组 log 不超过 1G,index 不超过 10M。

    对于 tail 的压缩过程:【position 不变???】
    每个日志清理线程会使用一个名为“SkimpyOffsetMap”的对象来构建 key 与 offset 的映射关系的哈希表。日志清理需要遍历两次日志文件,第一次遍历把每个 key 的哈希值和最后出现的 offset 都保存在 SkimpyOffsetMap 中,映射模型如下图所示。第二次遍历检查每个消息是否符合保留条件,如果符合就保留下来,否则就会被清理掉

rocketmq

activemq 不能分片。kafka 延时(上面知道基本上 partition 和 consumer 需要配置一样的,一个 consumer group 的线程数和 partition 数量一致, 受 partition 限制,rocketmq 多 partition 的扩展在于都用一个 commitlog,而不是一个 partition 单独一份顺序 log,cq 只存储位置,对 commitlog 中找数据。http://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/)
  • 组件

    broker : 主从
    nameserver: 几乎无状态,可集群内部署,节点对等,不同步。数据是 broker 同步过来的
    tag
    topic
    queue
    producer: 连接 ns,主 brokers(心跳), 无状态
    consumer/group : 连接 ns,主从 brokers(心跳啊)
  • 高可用和可扩展

    负载均衡:Broker 上存 Topic 信息,Topic 由多个队列组成,队列会平均分散在多个 Broker 上,而 Producer 的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个 Broker 上。
    主从:机器级别,不依赖 zk,元数据: 在 Broker 启动的时候,其会将自己在本地存储的配置文件 (默认位于 $HOME/store/config/topics.json 目录) 中的所有话题加载到内存中去,然后会将这些所有的话题全部同步到所有的 Name 服务器中。与此同时,Broker 也会启动一个定时任务,默认每隔 30 秒来执行一次话题全同步.

       消息存储持久化:所有 broker 上的所有 topic 都顺序写入内存文件 mapedfile(1G),mapedfilelist 记录每个 mapedfile 在磁盘的偏移量,新消息写入最后一个文件。

    动态伸缩能力(非顺序消息,消息分散;有序消息只能放在一个 queue 中,切不支持迁移,只保证一个 queue 内顺序,但可以多消费线程保证顺序):Broker 的伸缩性体现在两个维度:Topic, Broker。
    1)Topic 维度:假如一个 Topic 的消息量特别大,但集群水位压力还是很低,就可以扩大该 Topic 的队列数,Topic 的队列数跟发送、消费速度成正比。
    2)Broker 维度:如果集群水位很高了,需要扩容,直接加机器部署 Broker 就可以。Broker 起来后想 Namesrv 注册,Producer、Consumer 通过 Namesrv 发现新 Broker,立即跟该 Broker 直连,收发消息。

     Broker 与 Namesrv 的心跳机制:

    单个 Broker 跟所有 Namesrv 保持心跳请求,心跳间隔为 30 秒,心跳请求中包括当前 Broker 所有的 Topic 信息。Namesrv 会反查 Broer 的心跳信息,如果某个 Broker 在 2 分钟之内都没有心跳,则认为该 Broker 下线,调整 Topic 跟 Broker 的对应关系。但此时 Namesrv 不会主动通知 Producer、Consumer 有 Broker 宕机。

  • 消费

    1. 消费者注册,消费者上有多有 topic 的 broker 地址和队列,消费者负载均衡选择;
    1)广播模式:每个 costumer 全量消费,消费偏移量保存在 costumer 中
    2)集群模式:constumer 均匀消费部分,每个消息只有一个 costumer 消费,保存在 broker 上

    2. 新消息发送到 q:brocker 上 commit log 和消费组信息


    每个 commmit log 消息发给 topic 的随机 queue 中(生产者的负载均衡,每个 msg 只发送到一个 q 中),每个 queue 有很多 consumequeue,发给所有。广播模式,cq 会在所有 q 上,集群模式 cq 会负载均衡到某个 q 上,消息根据这些配置数据落到 q 的所有 cq 上。

    3. 消费
    3.1)普通的并发消费:queue 的所有 cq 都直接发,所有 cq 发送后删除(q 以 TreeMap 结构存储)。内部 RocketMQ 的消息树是用 TreeMap 实现的,其内部基于消息偏移量维护了消息的有序性。每次消费请求都会从消息数中拿取偏移量最小的几条消息 (默认为 1 条) 给用户,以此来达到有序消费的目的。
    3.2)有序消费:在 3.1 的基础上加两个锁,costumer client 给消费的每个 queue 会加锁,保证同一时刻只有一个 costumer client 在消费 queue(否则发给一个 client 删除了消息,此消息在另一个 client 和后面的 client 的消息无法保证顺序), 默认 20s 加一次,queue 检测 60s 没有就释放,每次成功后才取下一条,反正只有一个客户端消费。第二把锁是在 client 中,将堆积的消息按照顺序加锁的写入线程池 task 队列中。

  • 高可用 & 高可靠
    高可用:集群部署时一般都为主备,备机实时从主机同步消息,如果其中一个主机宕机,备机提供消费服务,但不提供写服务。

    高可靠:所有发往 broker 的消息,有同步刷盘和异步刷盘机制;同步刷盘时,消息写入物理文件才会返回成功,异步刷盘时,只有机器宕机,才会产生消息丢失,broker 挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电

其他

  • bridgequeue
    内存。redis 实现。适合小型系统

  • mmqd 对大型延时系统的支持,引入 chronos

    这里的 kafka 去掉了。普通的直接用哪个 rocketmq. 延时消息和事务消息
    对延时消息,放入 rocketmq 一个内部的消费 topic 中,消费入 chronos 中(存 RocksDB,seektimestamp, while 从 leveldb 中取符合时间的再放入 rocketmq 中)
    事务消息:A 执行后要发送消息给 B,因为 ddmq 一旦接收是保证被消费的,所以增加发送方事务回查。

  • 对比

    分析:少 topic 时 kafka 性能好,rockemq 需要读 mq 后去读一个大的 cl。多 topic 是 rockemq 好,处理线程多。

正文完
 0