消息中间件的背景介绍

消息中间件可能解决的问题

异步

很多业务场景咱们须要把同步的工作变成异步的工作。

拿一个电商平台的注册性能来简略剖析下,用 户注册这一个服务,不单单只是insert一条数据到数据库外面就完事了,还须要发送激活邮件、发送新人红包或者积分、发送营销短信等一系列操作。如果说这外面的每一个操作,都须要耗费1s,那么整个注册过程就须要耗时4s能力响应给用户。

那么咱们须要把这些操作拆出来,优化变成异步解决的逻辑。

  • 咱们能够应用阻塞队列+线程池来实现生产者消费者模式。

    • 然而这种形式只实用于单机,一旦机器宕机,那么原来在阻塞队列中存储的数据内容就失落了。
  • 应用消息中间件来解决

削峰

用户提交过去的申请,先写入到音讯队列。音讯队列是有长度的,如果音讯队列长度超过指定长度, 间接摈弃。这样就把流量的峰值削掉了。

限流

秒杀的具体外围解决业务,接管音讯队列中音讯进行解决,这里的音讯解决能力取决于生产端自身的 吞吐量

解耦

不同程序语言之间能够通过音讯队列来达到通信。

音讯长久化

可能不必放心应用程序挂了而无奈生产音讯

当然,消息中间件还有更多利用场景,比方在弱一致性事务模型中,能够采纳分布式音讯队列的实现最 大能力告诉形式来实现数据的最终一致性等等

思考消息中间件的设计

能够先从根本的需要开始思考
  • 最根本反对音讯的收发

    • 网络通信就会思考NIO
  • 音讯的存储

    • 长久化,非长久化
  • 音讯的序列化,反序列化
  • 是否跨语言
  • 音讯的确认机制

    • 如何防止音讯的重发
高级性能
  • 音讯的有序性
  • 是否反对事物音讯
  • 音讯收发的性能,对高并发大数据的反对
  • 是否反对集群
  • 音讯的可靠性传输
  • 是否反对多协定

消息中间件的倒退过程

实际上消息中间件的倒退也是挺有意思的,咱们晓得任何一个技术的呈现都是为了解决理论问题,这个 问题是 通过一种通用的软件总线也就是一种通信零碎,解决应用程序之间沉重的信息通信工作。

最早的小白鼠就是金融交易畛域,因为在过后这个畛域中,交易员须要通过不同的终端实现交易,每台终端显示不同的信息。

如果接入音讯总线,那么交易员只须要在一台终端上操作,而后订阅其余终端感兴趣 的音讯。于是就诞生了公布订阅模型(pubsub),同时诞生了世界上第一个古代音讯队列软件(TIB) The information Bus, TIB容许开发者建设一系列规定去形容音讯内容,只有音讯依照这些规定公布出 去,任何消费者利用都能订阅感兴趣的音讯。

随着TIB带来的苦头被广泛应用在各大畛域,IBM也开始研 究开发本人的消息中间件,3年后IBM的音讯队列IBM MQ产品系列公布,之后的一段时间MQ系列进化 成了WebSphere MQ统治商业音讯队列平台市场。

包含前期微软也研发了本人的音讯队列(MSMQ)

各大厂商纷纷钻研本人的MQ,然而他们是以商业化模式经营本人的MQ软件,商业MQ想要解决的是利用互通的问题,而不是创立标准接口来容许不同MQ产品互通。

不同消息中间件切换的问题

所以有些大型的金融公司可能会应用来 自多个供应商的MQ产品,来服务企业外部不同的利用。那么问题来了,如果利用曾经订阅了TIB MQ的 音讯而后忽然须要生产IBM MQ的音讯,那么整个实现过程会很麻烦。

JMS规范

为了解决这个问题,在2001年诞 生了 Java Message Service(JMS),JMS通过提供公共的Java API形式,暗藏独自MQ产品供应商的实现 接口,从而逾越了不同MQ生产和解决互通问题。从技术层面来说,Java应用程序只须要针对JMS API编 程,抉择适合的MQ驱动即可。JMS会解决其余局部。这种计划实际上是通过独自标准化接口来整合很 多不同的接口,成果还是不错的,然而碰到了互用性的问题。

两套应用两种不同编程语言的程序如何通过它们的异步消息传递机制互相通信呢。

AMQP的呈现

这个时候就须要定义一个异步消息传递的通用规范所以AMQP(Advanced Message Queuing Protocol)高级音讯队列协定产生了,它应用了一套规范的底层协定,退出了许多其余特色来反对互用性,为古代利用丰盛了消息传递需要,针对规范编码的任 何人都能够和任意AMQP供应商提供的MQ服务器进行交互。

MQTT

除了JMS和AMQP标准以外,还有一种MQTT(Message Queueing Telemetry[特莱米缺] Transport),它是专门为小设备设计的。因为计算性能不高的设施不能适应AMQP上的简单操作,它 们须要一种简略而且可互用的形式进行通信。

这是MQTT的根本要求,而现在,MQTT是物联网(IOT) 生态系统中次要成分之一

Kafka,它并没有遵循下面所说的协定标准,重视吞吐量,相似udp 和 tcp

Kafka的介绍

什么是Kafka

Kafka是一款分布式音讯公布和订阅零碎,它的特点是高性能、高吞吐量。

最早设计的目标是作为LinkedIn的流动流和经营数据的解决管道。这些数据次要是用来对用户做用户画 像剖析以及服务器性能数据的一些监控

所以kafka一开始设计的指标就是作为一个分布式、高吞吐量的音讯零碎,所以适宜使用在大数据传输 场景。

Kafka的利用场景

因为kafka具备更好的吞吐量、内置分区、冗余及容错性的长处(kafka每秒能够解决几十万音讯),让 kafka成为了一个很好的大规模音讯解决利用的解决方案。

所以在企业级利用上,次要会利用于如下几 个方面

行为跟踪

kafka能够用于跟踪用户浏览页面、搜寻及其他行为。通过公布-订阅模式实时记录到对应的 topic中,通过后端大数据平台接入解决剖析,并做更进一步的实时处理和监控

日志收集

日志收集方面,有很多比拟优良的产品,比方Apache Flume,很多公司应用kafka代理日志 聚合。

日志聚合示意从服务器上收集日志文件,而后放到一个集中的平台(文件服务器)进行解决。在 理论利用开发中,咱们应用程序的log都会输入到本地的磁盘上,排查问题的话通过linux命令来搞定, 如果应用程序组成了负载平衡集群,并且集群的机器有几十台以上,那么想通过日志疾速定位到问题, 就是很麻烦的事件了。所以个别都会做一个日志对立收集平台治理log日志用来疾速查问重要利用的问 题。所以很多公司的套路都是把利用日志集中到kafka上,而后别离导入到es和hdfs上,用来做实时检索剖析和离线统计数据备份等。而另一方面,kafka自身又提供了很好的api来集成日志并且做日志收集

Kafka自身的架构

一个典型的kafka集群蕴含若干Producer

  • 能够是利用节点产生的音讯
  • 也能够是通过Flume收集日志 产生的事件
  • 若干个Broker(kafka反对程度扩大)
  • 若干个Consumer Group
  • 一个 zookeeper集群。

kafka通过zookeeper治理集群配置及服务协同。

Producer应用push模式将音讯公布 到broker,consumer通过监听应用pull模式从broker订阅并生产音讯。

多个broker协同工作,producer和consumer部署在各个业务逻辑中。

三者通过zookeeper治理协调申请和转发。这样就组成了一个高性能的分布式音讯公布和订阅零碎。

图上有一个细节是和其余mq中间件不同的点,producer 发送音讯到broker的过程是push,而 consumer从broker生产音讯的过程是pull,被动去拉数据。而不是broker把数据被动发送给consumer

名词解释

Broker

Kafka集群蕴含一个或多个服务器,这种服务器被称为broker。

broker端不保护数据的生产状态,晋升了性能。

间接应用磁盘进行存储,线性读写,速度快:

防止了数据在JVM内存和零碎内存之间的复制, 缩小耗性能的创建对象和垃圾回收。

Producer

负责公布音讯到Kafka broker

Consumer

音讯消费者,向Kafka broker读取音讯的客户端,consumer从broker拉取(pull)数据并进行解决。

Topic

每条公布到Kafka集群的音讯都有一个类别,这个类别被称为Topic。

  • 物理上

    • 不同Topic的音讯离开存 储
  • 逻辑上

    • 一个Topic的音讯尽管保留于一个或多个broker上但用户只需指定音讯的Topic即可生产或生产数据而不用关怀数据存于何处

Partition

Parition是物理上的概念,每个Topic蕴含一个或多个Partition.

Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定 group name则属于默认的group)

Topic & Partition

Topic在逻辑上能够被认为是一个queue,每条生产都必须指定它的Topic,能够简略了解为必须指明

把这条音讯放进哪个queue里。

为了使得Kafka的吞吐率能够线性进步,物理上把Topic分成一个或多个 Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有音讯和索引文 件。若创立topic1和topic2两个topic,且别离有13个和19个分区,则整个集群上会相应会生成共32个 文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1)。

装置和配置

下载Kafka

wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz
tar -zxvf kafka_2.11-2.0.0.tgz -C /usr/local

配置zookeeper

因为kafka依赖于zookeeper来做master选举一起其余数据的保护,所以须要先启动zookeeper节点

kafka内置了zookeeper的服务,所以在bin目录下提供了这些脚本

zookeeper-server-start.shzookeeper-server-stop.sh

在config目录下,存在一些配置文件

zookeeper.propertiesserver.properties

所以咱们能够通过上面的脚本来启动zk服务,当然,也能够本人搭建zk的集群来实现

sudo sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties

启动和进行kafka

咱们能够应用本人的zookeeper

批改server.properties, 减少zookeeper的配置

zookeeper.connect=localhost:2181
  • 启动kafka

    sudo bin/kafka-server-start.sh -daemon config/server.properties
  • 进行Kafka

    sh bin/kafka-server-stop.sh -daemon config/server.properties

Kafka的基本操作

创立topic

sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -partitions 1 --topic test
  • Replication-factor

    • 示意该topic须要在不同的broker中保留几份,这里设置成1,示意在两个broker中保留两份
  • partitions

    • 分区数

查看topic

sh bin/kafka-topics.sh --list --zookeeper localhost:2181
[zzy@116 kafka_2.11-2.0.0]$ sh bin/kafka-topics.sh --list --zookeeper localhost:2181__consumer_offsetsfirst_topictesttest_partition

创立消费者

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  --from-beginning

发送音讯

sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first_topic

集群环境装置

环境筹备

  • 筹备三台虚拟机
  • 别离把kafka的安装包部署在三台机器上

批改配置

以下配置批改均为server.properties

  • 别离批改三台机器的server.properties配置,同一个集群中的每个机器的id必须惟一
  • broker.id=0broker.id=1broker.id=2
  • 批改zookeeper的连贯配置(指定为以后启动zookeeper的机器)

    zookeeper.connect=192.168.30.2:2181
  • 批改listeners配置

    • 如果配置了listeners,那么音讯生产者和消费者会应用listeners的配置来进行音讯的收发,否则, 会应用localhost
    • PLAINTEXT示意协定,默认是明文,能够抉择其余加密协议

      • listeners=PLAINTEXT://192.168.13.102:9092
  • 别离启动三台服务器