关于消息队列:Kafka1初识Kafka

41次阅读

共计 5397 个字符,预计需要花费 14 分钟才能阅读完成。

消息中间件的背景介绍

消息中间件可能解决的问题

异步

​ 很多业务场景咱们须要把同步的工作变成异步的工作。

​ 拿一个电商平台的注册性能来简略剖析下,用 户注册这一个服务,不单单只是 insert 一条数据到数据库外面就完事了,还须要发送激活邮件、发送新人红包或者积分、发送营销短信等一系列操作。如果说这外面的每一个操作,都须要耗费 1s,那么整个注册过程就须要耗时 4s 能力响应给用户。

​ 那么咱们须要把这些操作拆出来,优化变成异步解决的逻辑。

  • 咱们能够应用阻塞队列 + 线程池来实现生产者消费者模式。

    • 然而这种形式只实用于单机,一旦机器宕机,那么原来在阻塞队列中存储的数据内容就失落了。
  • 应用消息中间件来解决

削峰

​ 用户提交过去的申请,先写入到音讯队列。音讯队列是有长度的,如果音讯队列长度超过指定长度,间接摈弃。这样就把流量的峰值削掉了。

限流

​ 秒杀的具体外围解决业务,接管音讯队列中音讯进行解决,这里的音讯解决能力取决于生产端自身的 吞吐量

解耦

​ 不同程序语言之间能够通过音讯队列来达到通信。

音讯长久化

​ 可能不必放心应用程序挂了而无奈生产音讯

​ 当然,消息中间件还有更多利用场景,比方在弱一致性事务模型中,能够采纳分布式音讯队列的实现最 大能力告诉形式来实现数据的最终一致性等等

思考消息中间件的设计

能够先从根本的需要开始思考

  • 最根本反对音讯的收发

    • 网络通信就会思考 NIO
  • 音讯的存储

    • 长久化,非长久化
  • 音讯的序列化,反序列化
  • 是否跨语言
  • 音讯的确认机制

    • 如何防止音讯的重发

高级性能

  • 音讯的有序性
  • 是否反对事物音讯
  • 音讯收发的性能,对高并发大数据的反对
  • 是否反对集群
  • 音讯的可靠性传输
  • 是否反对多协定

消息中间件的倒退过程

​ 实际上消息中间件的倒退也是挺有意思的,咱们晓得任何一个技术的呈现都是为了解决理论问题,这个 问题是 通过一种通用的软件 总线 也就是一种通信零碎,解决应用程序之间 沉重的信息通信 工作。

​ 最早的小白鼠就是金融交易畛域,因为在过后这个畛域中,交易员须要通过不同的终端实现交易,每台终端显示不同的信息。

​ 如果接入音讯总线,那么交易员只须要在一台终端上操作,而后订阅其余终端感兴趣 的音讯。于是就诞生了公布订阅模型(pubsub),同时诞生了世界上第一个古代音讯队列软件(TIB)The information Bus, TIB 容许开发者建设一系列规定去形容音讯内容,只有音讯依照这些规定公布出 去,任何消费者利用都能订阅感兴趣的音讯。

​ 随着 TIB 带来的苦头被广泛应用在各大畛域,IBM 也开始研 究开发本人的消息中间件,3 年后 IBM 的音讯队列 IBM MQ 产品系列公布,之后的一段时间 MQ 系列进化 成了 WebSphere MQ 统治商业音讯队列平台市场。

​ 包含前期微软也研发了本人的音讯队列(MSMQ)

​ 各大厂商纷纷钻研本人的 MQ,然而他们是以商业化模式经营本人的 MQ 软件,商业 MQ 想要解决的是利用互通的问题,而不是创立标准接口来容许不同 MQ 产品互通。

不同消息中间件切换的问题

​ 所以有些大型的金融公司可能会应用来 自多个供应商的 MQ 产品,来服务企业外部不同的利用。那么问题来了,如果利用曾经订阅了 TIB MQ 的 音讯而后忽然须要生产 IBM MQ 的音讯,那么整个实现过程会很麻烦。

JMS 规范

​ 为了解决这个问题,在 2001 年诞 生了 Java Message Service(JMS),JMS 通过提供公共的 Java API 形式,暗藏独自 MQ 产品供应商的实现 接口,从而逾越了不同 MQ 生产和解决互通问题。从技术层面来说,Java 应用程序只须要针对 JMS API 编 程,抉择适合的 MQ 驱动即可。JMS 会解决其余局部。这种计划实际上是通过独自标准化接口来整合很 多不同的接口,成果还是不错的,然而碰到了互用性的问题。

两套应用两种不同编程语言的程序如何通过它们的异步消息传递机制互相通信呢。

AMQP 的呈现

​ 这个时候就须要定义一个异步消息传递的通用规范所以 AMQP(Advanced Message Queuing Protocol)高级音讯队列协定产生了,它应用了一套规范的底层协定,退出了许多其余特色来反对互用性,为古代利用丰盛了消息传递需要,针对规范编码的任 何人都能够和任意 AMQP 供应商提供的 MQ 服务器进行交互。

MQTT

​ 除了 JMS 和 AMQP 标准以外,还有一种 MQTT(Message Queueing Telemetry[特莱米缺] Transport),它是专门为小设备设计的。因为计算性能不高的设施不能适应 AMQP 上的简单操作,它 们须要一种简略而且可互用的形式进行通信。

​ 这是 MQTT 的根本要求,而现在,MQTT 是物联网(IOT)生态系统中次要成分之一

Kafka,它并没有遵循下面所说的协定标准,重视吞吐量,相似 udp 和 tcp

Kafka 的介绍

什么是 Kafka

​ Kafka 是一款分布式音讯公布和订阅零碎,它的特点是高性能、高吞吐量。

​ 最早设计的目标是作为 LinkedIn 的流动流和经营数据的解决管道。这些数据次要是用来对用户做用户画 像剖析以及服务器性能数据的一些监控

​ 所以 kafka 一开始设计的指标就是作为一个分布式、高吞吐量的音讯零碎,所以适宜使用在大数据传输 场景。

Kafka 的利用场景

​ 因为 kafka 具备更好的吞吐量、内置分区、冗余及容错性的长处(kafka 每秒能够解决几十万音讯),让 kafka 成为了一个很好的大规模音讯解决利用的解决方案。

​ 所以在企业级利用上,次要会利用于如下几 个方面

行为跟踪

​ kafka 能够用于跟踪用户浏览页面、搜寻及其他行为。通过公布 - 订阅模式实时记录到对应的 topic 中,通过后端大数据平台接入解决剖析,并做更进一步的实时处理和监控

日志收集

​ 日志收集方面,有很多比拟优良的产品,比方 Apache Flume,很多公司应用 kafka 代理日志 聚合。

​ 日志聚合示意从服务器上收集日志文件,而后放到一个集中的平台(文件服务器)进行解决。在 理论利用开发中,咱们应用程序的 log 都会输入到本地的磁盘上,排查问题的话通过 linux 命令来搞定,如果应用程序组成了负载平衡集群,并且集群的机器有几十台以上,那么想通过日志疾速定位到问题,就是很麻烦的事件了。所以个别都会做一个日志对立收集平台治理 log 日志用来疾速查问重要利用的问 题。所以很多公司的套路都是把利用日志集中到 kafka 上,而后别离导入到 es 和 hdfs 上,用来做实时检索剖析和离线统计数据备份等。而另一方面,kafka 自身又提供了很好的 api 来集成日志并且做日志收集

Kafka 自身的架构

​ 一个典型的 kafka 集群蕴含若干 Producer

  • 能够是利用节点产生的音讯
  • 也能够是通过 Flume 收集日志 产生的事件
  • 若干个 Broker(kafka 反对程度扩大)
  • 若干个 Consumer Group
  • 一个 zookeeper 集群。

​ kafka 通过 zookeeper 治理集群配置及服务协同。

​ Producer 应用 push 模式将音讯公布 到 broker,consumer 通过监听应用 pull 模式从 broker 订阅并生产音讯。

​ 多个 broker 协同工作,producer 和 consumer 部署在各个业务逻辑中。

​ 三者通过 zookeeper 治理协调申请和转发。这样就组成了一个高性能的分布式音讯公布和订阅零碎。

​ 图上有一个细节是和其余 mq 中间件不同的点,producer 发送音讯到 broker 的过程是 push,而 consumer 从 broker 生产音讯的过程是 pull,被动去拉数据。而不是 broker 把数据被动发送给 consumer

名词解释

Broker

​ Kafka 集群蕴含一个或多个服务器,这种服务器被称为 broker。

​ broker 端不保护数据的生产状态,晋升了性能。

​ 间接应用磁盘进行存储,线性读写,速度快:

​ 防止了数据在 JVM 内存和零碎内存之间的复制,缩小耗性能的创建对象和垃圾回收。

Producer

​ 负责公布音讯到 Kafka broker

Consumer

​ 音讯消费者,向 Kafka broker 读取音讯的客户端,consumer 从 broker 拉取 (pull) 数据并进行解决。

Topic

​ 每条公布到 Kafka 集群的音讯都有一个类别,这个类别被称为 Topic。

  • 物理上

    • 不同 Topic 的音讯离开存 储
  • 逻辑上

    • 一个 Topic 的音讯尽管保留于一个或多个 broker 上但用户只需指定音讯的 Topic 即可生产或生产数据而不用关怀数据存于何处

Partition

​ Parition 是物理上的概念,每个 Topic 蕴含一个或多个 Partition.

Consumer Group

​ 每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)

Topic & Partition

​ Topic 在逻辑上能够被认为是一个 queue,每条生产都必须指定它的 Topic,能够简略了解为必须指明

​ 把这条音讯放进哪个 queue 里。

​ 为了使得 Kafka 的吞吐率能够线性进步,物理上把 Topic 分成一个或多个 Partition,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有音讯和索引文 件。若创立 topic1 和 topic2 两个 topic,且别离有 13 个和 19 个分区,则整个集群上会相应会生成共 32 个 文件夹(本文所用集群共 8 个节点,此处 topic1 和 topic2 replication-factor 均为 1)。

装置和配置

下载 Kafka

wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz
tar -zxvf kafka_2.11-2.0.0.tgz -C /usr/local

配置 zookeeper

​ 因为 kafka 依赖于 zookeeper 来做 master 选举一起其余数据的保护,所以须要先启动 zookeeper 节点

​ kafka 内置了 zookeeper 的服务,所以在 bin 目录下提供了这些脚本

zookeeper-server-start.sh
zookeeper-server-stop.sh

​ 在 config 目录下,存在一些配置文件

zookeeper.properties
server.properties

所以咱们能够通过上面的脚本来启动 zk 服务,当然,也能够本人搭建 zk 的集群来实现

sudo sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties

启动和进行 kafka

​ 咱们能够应用本人的 zookeeper

​ 批改 server.properties, 减少 zookeeper 的配置

zookeeper.connect=localhost:2181
  • 启动 kafka

    sudo bin/kafka-server-start.sh -daemon config/server.properties
  • 进行 Kafka

    sh bin/kafka-server-stop.sh -daemon config/server.properties

Kafka 的基本操作

创立 topic

sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -partitions 1 --topic test
  • Replication-factor

    • 示意该 topic 须要在不同的 broker 中保留几份,这里设置成 1,示意在两个 broker 中保留两份
  • partitions

    • 分区数

查看 topic

sh bin/kafka-topics.sh --list --zookeeper localhost:2181
[zzy@116 kafka_2.11-2.0.0]$ sh bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
first_topic
test
test_partition

创立消费者

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  --from-beginning

发送音讯

sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first_topic

集群环境装置

环境筹备

  • 筹备三台虚拟机
  • 别离把 kafka 的安装包部署在三台机器上

批改配置

以下配置批改均为 server.properties

  • 别离批改三台机器的 server.properties 配置,同一个集群中的每个机器的 id 必须惟一
  • broker.id=0
    broker.id=1
    broker.id=2
  • 批改 zookeeper 的连贯配置(指定为以后启动 zookeeper 的机器)

    zookeeper.connect=192.168.30.2:2181
  • 批改 listeners 配置

    • 如果配置了 listeners,那么音讯生产者和消费者会应用 listeners 的配置来进行音讯的收发,否则,会应用 localhost
    • PLAINTEXT 示意协定,默认是明文,能够抉择其余加密协议

      • listeners=PLAINTEXT://192.168.13.102:9092
  • 别离启动三台服务器

正文完
 0