一.kafka命令行操作
1.1 创立topic
[v2admin@hadoop10 kafka]$ bin/kafka-topics.sh --zookeeper hadoop10:2181 --create --replication-factor 2 --partitions 1 --topic firstCreated topic first.
1.2 查看topic
[v2admin@hadoop10 kafka]$ bin/kafka-topics.sh --zookeeper hadoop10:2181 --listdemofirst
1.3 删除topic
[v2admin@hadoop10 kafka]$ bin/kafka-topics.sh --zookeeper hadoop10:2181 -delete --topic firstTopic first is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.
1.4 发送音讯
[v2admin@hadoop10 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop10:9092 --topic demo>hello>world>haha>women>tintian
1.5 生产音讯
[v2admin@hadoop11 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop10:9092 --from-beginning --topic demoworldwomenhellohahatintian
1.6 查看某个topic
^C[v2admin@hadoop11 kafka]$ bin/kafka-topics.sh --zookeeper hadoop10:2181 --describe --topic demoTopic: demo PartitionCount: 2 ReplicationFactor: 2 Configs: Topic: demo Partition: 0 Leader: 11 Replicas: 11,12 Isr: 12,11 Topic: demo Partition: 1 Leader: 12 Replicas: 12,10 Isr: 10,12
二.Kafka架构图
一个典型的Kafka集群由以下角色组成:
1)若干Producer,能够是web前端产生的Page View,或者是服务器日志,零碎CPU、Memory等,
2)若干broker(Kafka反对程度扩大,个别broker数量越多,集群吞吐率越高)
3)若干Consumer Group
4)一个Zookeeper集群
Kafka通过Zookeeper治理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer应用push模式将音讯公布到broker,Consumer应用pull模式从broker订阅并生产音讯。
Kafka中音讯是以topic进行分类的,生产者生产音讯,消费者生产音讯,都是面向topic的。
首先明确的是,topic是逻辑上的概念,而partition则是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。
Producer生产的数据会被一直追加到该log文件末端,且每条数据都有本人的offset。
消费者组中的每个消费者,都会实时记录本人生产到了哪个offset,以便出错复原时,从上次的地位持续生产。
生产者生产的音讯会一直追加到log文件开端,这样log文件会一直增大,为避免log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。
每个segment对应两个文件——“.index”文件和“.log”文件。
如下就是示例
-rw-rw-r--. 1 v2admin v2admin 10485760 1月 6 21:17 00000000000000000000.index-rw-rw-r--. 1 v2admin v2admin 0 1月 6 21:17 00000000000000000000.log-rw-rw-r--. 1 v2admin v2admin 10485756 1月 6 21:17 00000000000000000000.timeindex-rw-rw-r--. 1 v2admin v2admin 8 1月 6 21:17 leader-epoch-checkpoint
三.分区准则
通过后面晓得,Producer生产的数据进入kafka中,会保留到一个topic中,但topic是逻辑上的概念,实际上,一个topic中,在物理是宰割成一个或者多个Partition保留的。
那生产者的数据是怎么进行分区的?这个分区准则是什么?
Producer发送的数据封装成一个ProducerRecord对象,咱们来看下API中,它的源码
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) { if (topic == null) { throw new IllegalArgumentException("Topic cannot be null."); } else if (timestamp != null && timestamp < 0L) { throw new IllegalArgumentException(String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp)); } else if (partition != null && partition < 0) { throw new IllegalArgumentException(String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition)); } else { this.topic = topic; this.partition = partition; this.key = key; this.value = value; this.timestamp = timestamp; this.headers = new RecordHeaders(headers); } } public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { this(topic, partition, timestamp, key, value, (Iterable)null); } public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) { this(topic, partition, (Long)null, key, value, headers); } public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, (Long)null, key, value, (Iterable)null); } public ProducerRecord(String topic, K key, V value) { this(topic, (Integer)null, (Long)null, key, value, (Iterable)null); } public ProducerRecord(String topic, V value) { this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null); }
1)在指定Partition时,间接将Partition的值作为Partition值
2)在没指定Partition时,但指定了key的状况下,则将key的hash与topic的Partition值进行取余,失去的值作为Partition值
3)都没有指定状况下,第一次调用就随机生成一个整数,前面每次调用在这个整数上自增。
这个随机整数与topic可用的Partition总数取余失去Partition值,这个也就是roundp-robin算法。
四.数据可靠性
Producer发送的数据,如何保障可能达到指定的topic?可靠性如何保障?
topic的每个partition收到producer发送的数据后,都要向producer发送ack,示意确认收到,如果producer收到ack,就会进行下一轮的发送,否则从新发送数据。
那ack什么时候发送呢?
有以下两种同步策略
计划 | 长处 | 毛病 |
---|---|---|
半数以上实现同步,就发送ack | 提早低 | 选举新的leader时,容忍n台节点的故障,须要2n+1个正本 |
全副实现同步,才发送ack | 选举新的leader时,容忍n台节点的故障,须要n+1个正本 | 提早高 |
kafka采纳了第二种计划,
1)如果是第一种计划,为了容忍n个节点的故障,第一种计划须要2n+1个正本,而第二种计划则须要n+1个正本,Kafka的每个分区都有大量的数据,那第一种计划会造成大量数据的冗余,资源节约。
2)第二种计划的网络提早会比拟高,不过网络提早对Kafka的影响不大,能够忽略不计。
kafka采纳第二种计划后,那还有一个问题,
那看下这个场景,leader收到数据,这时候所有follower开始同步数据,但这个过程中,有一个follower,因为某种原因(自身故障或者网络故障缘故等),长时间不能与leader进行同步,那leader就要始终等上来,直到它实现同步,能力发送ack。
为了解决这个问题,Leader保护了一个动静的in-sync replica set (ISR),意为和leader放弃同步的follower汇合。当ISR中的follower实现数据的同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该工夫阈值由replica.lag.time.max.ms参数设定。Leader产生故障之后,就会从ISR中选举新的leader。
但还有一个问题,有些时候,咱们发送的数据并不是那么多重要,对可靠性要求也就不高,所以没必要等ISR中的Follower全副接管胜利。
Kafka提供三种牢靠级别,咱们能够自行抉择:
acks参数 | 形容 |
---|---|
0 | producer不期待broker的ack,这一操作提供了一个最低的提早,broker一接管到还没有写入磁盘就曾经返回,当broker故障时有可能失落数据; |
1 | producer期待broker的ack,partition的leader落盘胜利后返回ack,如果在follower同步胜利之前leader故障,那么将会失落数据; |
-1(all) | producer期待broker的ack,partition的leader和follower全副落盘胜利后才返回ack。然而如果在follower同步实现后,broker发送ack之前,leader产生故障,那么会造成数据反复。 |
那么具体故障是如何解决的呢?如下图示例
故障 | 解决细节 |
---|---|
Follower故障 | follower产生故障,会被长期踢出ISR,待该follower复原后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的局部截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就能够重新加入ISR了 |
Leader故障 | leader产生故障之后,会从ISR中选出一个新的leader,之后,为保障多个正本之间的数据一致性,其余的follower会先将各自的log文件高于HW的局部截掉,而后从新的leader同步数据。 |
五.Exactly Once和事务
将服务器ACK级别设置为0,能够保障生产者每条音讯只会被发送一次,即At Most Once语义,但不能保证数据不失落。
将服务器的ACK级别设置为-1,能够保障Producer到Server之间不会失落数据,即At Least Once语义,但不不能保证数据不反复。
这就难堪了,咱们对于一些重要的数据,既要保证数据不失落,也要保证数据不反复,也就是Exactly Once语义。
在0.11版本之前的Kafka是没啥方法的,只能在生产者发送数据保证数据不丢,而后在消费者时,对数据去重。
在0.11版本之后的Kafka引入一个个性:幂等性,就是用户对于同一操作发动的一次申请或者屡次申请的后果是统一的,不会因为屡次点击而产生了副作用。
在kafka中,就是指Producer不管向Server发送多少次反复数据,Server端都只会长久化一条。
幂等性与At Least Once联合,就形成kafka的Exactly Once语义:
At Least Once + 幂等性 = Exactly Once
启动幂等性,要将Producer的参数中enable.idompotence设置为true即可。
开启幂等性的Producer在初始化的时候会被调配一个PID,发往同一Partition的音讯会附带Sequence Number。而Broker端会对<PID, Partition, SeqNumber>做缓存,当具备雷同主键的音讯提交时,Broker只会长久化一条。
然而留神,不同的Partition也有不同主键,所以幂等性无奈保障跨分区的Exactly Once,而且PID重启也会发生变化。
那这个怎么解决?就是事务,事务,就是要么全胜利,要么全失败,kafka从0.11版本开始引入事务反对,这个事务能够保障Kafka在Exactly Once语义根底上,生产和消费者能够跨分区和会话,要么全胜利,要么全失败。
1)Producer事务
为了实现跨分区跨会话的事务,这里引入一个全局惟一的Transaction ID,并将Producer取得的PID和Transaction ID绑定。这样当Producer重启后就能够通过正在进行的Transaction ID取得原来的PID。
为了治理Transaction,Kafka引入了一个新的组件Transaction Coordinator。
Producer就是通过和Transaction Coordinator交互取得Transaction ID对应的工作状态。
Transaction Coordinator还负责将事务所有写入Kafka的一个外部Topic,这样即便整个服务重启,因为事务状态失去保留,进行中的事务状态能够失去复原,从而持续进行。
2)Consumer事务
对于Consumer而言,事务的保障就会绝对较弱,很难保障Commit的信息被准确生产。
这是因为Consumer能够通过offset拜访任意信息,而不同的Segment File生命周期不同,同一事务的音讯可能会呈现重启后被删除的状况。
解决这个问题,下一章Kafka API上会说。
六.Consumer消费者
6.1 生产形式
Consumer采纳pull模式从broker中读取数据。
为什么不采纳push模式呢?
因为采纳push模式,音讯发送速率由broker决定,所以push模式很难适应生产速率不同的Consumer。
比方说,push模式下
broker的推送音讯速率每秒100条,但Consumer解决音讯的速率是每秒10条,那每秒就有90条音讯来不及解决,典型的结果就是拒绝服务、网络梗塞。
采纳pull模式,Consumer能够依据本人的生产能力以适当的速率生产音讯。
pull模式也有不足之处,那就是broker没有数据,Consumer很可能会陷入循环中,始终返回空数据。
针对这一点,Kafka的消费者在生产数据时会传入一个时长参数timeout,如果以后没有数据可供生产,consumer会期待一段时间之后再返回,这段时长即为timeout。
6.2 分区调配策略
一个topic中有多个Partition,一个Consumer group中也会有多个Consumer,哪个Partition由哪个Consumer来解决?这就是Partition的调配问题。
kafka中有两种调配形式,round-robin和range。
1)round-bin 轮询分区
比方我1,2,3,4,5,6,7条数据,有a、b、c三个消费者,那怎么调配呢?
这种模式下这样调配
1给a,2给b,3给c,4给a,5给b,6给c,7给a,就这样轮询调配。
最初调配
a:1,4,7 ; b:2,5 c:3,6
2)range模式
如果有10个分区,3个消费者,把分区依照序号排列0,1,2,3,4,5,6,7,8,9;消费者为a,b,c,那么用分区数除以消费者数来决定每个Consumer生产几个Partition,除不尽的后面几个消费者将会多生产一个
最初调配后果如下
a:0,1,2,3
b:4,5,6
c:7,8,9
如果有11个分区将会是:
C1:0,1,2,3
C2:4,5,6,7
C3:8,9,10
如果咱们有两个主题T1,T2,别离有10个分区,最初的调配后果将会是这样:
C1:T1(0,1,2,3) T2(0,1,2,3)
C2:T1(4,5,6) T2(4,5,6)
C3:T1(7,8,9) T2(7,8,9)
在这种状况下,C1多生产了两个分区
6.3 offset
consumer在生产过程中可能会呈现断电宕机等故障,consumer复原后,须要从故障前的地位的持续生产,所以consumer须要实时记录本人生产到了哪个offset,以便故障复原后持续生产。
Kafka 0.9版本之前,consumer默认将offset保留在Zookeeper中,从0.9版本开始,consumer默认将offset保留在Kafka一个内置的topic中,该topic为__consumer_offsets。
6.4 Zookeeper的作用
Kafka集群中broker的上线下线、topic的分区正本调配、Leader的选举等等,这些须要有一个角色来治理,这个角色就是Controller,Controller是从集群中的broker中选举进去的,这个Controller的工作是依赖Zookeeper实现的。