1 Kafka
Kafka是一个开源分布式的流解决平台,一种高吞吐量的分布式公布订阅音讯零碎,它能够解决消费者在网站中的所有动作流数据。Kafka由Scala和Java编写,2012年成为Apache基金会下顶级我的项目。
2 Kafka长处
- 低提早:Kafka反对低提早消息传递,速度极快,能达到200w写/秒
- 高性能:Kafka对于音讯的散布,订阅都有高吞吐量。即便存储了TB级的信息,仍然可能保障稳固的性能
- 可靠性:Kafka是分布式,分区,复制和容错的,保障零停机和零数据失落
- 可扩大:用户能够从但个代理Broker开始作POC,而后缓缓扩大到由三个Broker组成的小型开发集群,接着扩大到数十个甚至数百个Broker集群进入生产阶段,能够在集群联机时进行扩大,而不会影响整个零碎的可用性
- 多个生产者:无论这些客户应用雷同Topic还是多个Topic,Kafka都能无缝解决多个生产者,使得零碎能够非常容易聚合来自许多前端零碎的数据并使其保持一致
- 多个消费者:Kafka具备多个消费者设计,能够读取任何但个音讯流而不会互相烦扰。多个Kafka消费者能够组成一个生产组进行操作并共享音讯流,从而确保每一条音讯只被整个生产组解决一次
- 基于磁盘的保留:Kafka应用分布式提交日志,音讯可能疾速长久化到磁盘上。音讯长久化意味着如果消费者落后,无论是因为处理速度迟缓还是忽然的音讯涌入都不会有失落数据的危险,也意味着消费者能够被进行。音讯将保留在Kafka中,容许消费者重新启动并且从中断处获取解决信息而不会失落数据
3 Kafka相干术语
- Broker:Kafka集群蕴含一个或多个服务器,这种服务器称为Broker
- Topic:每条公布到Kafka的音讯都有一个类别,这个类别称为Topic。物理上不同Topic的音讯离开存储,逻辑上Topic的音讯尽管保留在一个或多个Broker上,但用户只需指定音讯的Topic即可生产或生产数据而不用关怀数据寄存于何处
- Partition:每个Topic蕴含一个或多个Partition
- Producer:生产者,负责公布音讯到Broker
- Consumer:消费者,向Broker读取音讯的客户端
- Consumer Group:每个Consumer属于一个特定的Consumer Group,能够为每个Consumer指定Group Name,否则属于默认Group
4 入手干活
4.1 环境
- Spring Boot 2.3.1
- IDEA 2020.1.1
- OpenJDK 11.0.7
- Kafka 2.5.0
- Kotlin 1.3.72
4.2 下载Kafka
官网戳这里。
下载并解压(留神须要Kafka与Spring Boot版本对应,能够参考这里):
tar -xvf kafka_2.12-2.5.0.tgzcd kafka_2.12-2.5.0
接着启动ZooKeeper与Kafka:
bin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh config/server.properties
Kafka须要用到ZooKeeper,须要在启动Kafka之前启动ZooKeeper(ZooKeeper是一个开源的分布式应用程序协调服务,是Hadoop的组件,次要用于解决分布式应用中的一些数据管理问题)。
Kafka默认应用9092端口,部署在服务器上须要留神防火墙以及平安组的解决。
4.3 新建工程
思考到Spring Boot在2.3.0M1中(截至本文写作日期2020.07.14Spring Boot已更新到2.4.0M1)首次采纳Gradle而不是Maven来构建我的项目,换句话说日后Spring Boot的构建工具将从Maven迁徙到Gradle,Spring Boot团队给出的次要起因是能够缩小我的项目构建所破费的工夫,详情能够戳这里瞧瞧。
另外因为另一个基于JVM的语言Kotlin的日渐崛起,后端开始逐步有人采纳Kotlin(只管不多,不过语法糖真的香,JetBrains家的语言配合IDE,爽得飞起),因而本示例我的项目将采纳两种形式搭建:
- Java+Maven
- Kotlin+Gradle
抉择的依赖如下(当然您喜爱的话能够在pom.xml
或者build.gradle.kts
外面加,对于Kotlin不须要Lombok
):
4.4 我的项目构造
Java版:
Kotlin版:
serialize
:序列化/反序列化实体类Constant.java
/Constant.kt
:常量类Consumer.java
/Consumer.kt
:消费者类Entity.java
/Entity.kt
:实体类Producer.java
/Product.kt
:生产者类TestApplicationTests
:测试类
4.5 常量类
蕴含Topic与GroupId,Java版:
public class Constants { public static final String TOPIC = "TestTopic"; public static final String GROUP_ID = "TestGroupId";}
Kotlin版:
object Constants{ const val TOPIC = "TestTopic" const val GROUP_ID = "TestGroupId"}
4.6 实体类
@AllArgsConstructor@NoArgsConstructor@Data@Builderpublic class Entity { private long id; private String name; private int num;}
说一下Lombok的几个注解:
@AllArgsConstructor
/@NoArgsConstructor
:生成所有参数/无参数构造方法@Data
:等价于@Setter+@Getter+@RequiredArgsConstrucotr+@ToString+@EqualAndHashCode
,主动生成Setter+Getter+toString()+equals()+hashCode()
,还有@RequireArgsConstructor
为类的每一个final
或非空字段生成一个构造方法@Builder
:能够通过建造者模式创建对象
Kotlin版:
class Entity { var id: Long = 0 var name: String = "" var num: Int = 0 constructor() constructor(id:Long,name:String,num:Int) { this.id = id this.name = name this.num = num }}
4.7 生产者
@Component@Slf4j//防止出现Field injection not recommended正告,代替了原来的间接在字段上@Autowired@RequiredArgsConstructor(onConstructor = @__(@Autowired))public class Producer { private final KafkaTemplate<String, Entity> kafkaTemplate; public void send(Entity entity) { //发送音讯 //类型个别为String+自定义音讯内容,String代表音讯Topic,这里音讯内容用Entity示意 ListenableFuture<SendResult<String, Entity>> future = kafkaTemplate.send(Constants.TOPIC, entity); //回调函数 future.addCallback(new ListenableFutureCallback<>() { @Override public void onFailure(Throwable throwable) { log.info("Send message failed"); } @Override public void onSuccess(SendResult<String, Entity> stringEntitySendResult) { log.info("Send message success"); } }); }}
这里的send
有两个参数,对应于sendResult<>
中的参数类型,第一个为音讯的Topic,第二个为音讯体,个别应用String或者Json。
Kotlin版:
@Componentclass Producer{ @Autowired private var kafkaTemplate:KafkaTemplate<String,Entity> ? = null private val log = LoggerFactory.getLogger(this.javaClass) fun send(entity: Entity) { val future = kafkaTemplate!!.send(Constants.TOPIC,entity); future.addCallback(object : ListenableFutureCallback<SendResult<String?, Entity?>?>{ override fun onSuccess(result : SendResult<String?,Entity?>?) { log.info("Send success"); } override fun onFailure(e:Throwable) { log.info("Send failed"); } }) }}
4.8 消费者
@Component@Slf4jpublic class Consumer { @KafkaListener(topics = Constants.TOPIC,groupId = Constants.GROUP_ID) public void consume(Entity entity) { log.info("Consume a entity, id is "+entity.getId()); }}
应用@KafkaListener
注解,第一个参数示意须要生产的音讯的Topic,能够是String []
,第二个是消费者组的id。生产者的音讯Topic必须与消费者的Topic保持一致否则不能生产,这里简略解决打印日志。
Kotlin版:
@Componentclass Consumer { private val log = LoggerFactory.getLogger(this.javaClass) @KafkaListener(topics = [Constants.TOPIC],groupId = Constants.GROUP_ID) fun consume(entity: Entity) { log.info("Consume a entity, id is "+entity.id.toString()) }}
4.9 序列化/反序列化
这里自定义了序列化/反序列化类,序列化/反序列化类须要实现org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T>
接口,其中T
是想要序列化的类型,这里是Entity
。序列化接口反编译如下:
public interface Serializer<T> extends Closeable { default void configure(Map<String, ?> configs, boolean isKey) { } byte[] serialize(String var1, T var2); default byte[] serialize(String topic, Headers headers, T data) { return this.serialize(topic, data); } default void close() { }}
反序列化反编译接口如下:
public interface Deserializer<T> extends Closeable { default void configure(Map<String, ?> configs, boolean isKey) { } T deserialize(String var1, byte[] var2); default T deserialize(String topic, Headers headers, byte[] data) { return this.deserialize(topic, data); } default void close() { }}
也就是只须要实现其中的serialize/deserialize
办法即可。这里序列化/反序列化用到了自带的Jackson:
@Slf4jpublic class Serializer implements org.apache.kafka.common.serialization.Serializer<Entity> { public byte [] serialize(String topic, Entity entity) { try { return entity == null ? null : new ObjectMapper().writeValueAsBytes(entity); } catch (JsonProcessingException e) { e.printStackTrace(); log.error("Can not serialize entity in Serializer"); } return null; }}
反序列化:
@Slf4jpublic class Deserializer implements org.apache.kafka.common.serialization.Deserializer<Entity> { public Entity deserialize(String topic,byte [] data) { try { return data == null ? null : new ObjectMapper().readValue(data,Entity.class); } catch (IOException e) { e.printStackTrace(); log.error("Can not deserialize entity in Deserializer"); } return null; }}
Kotlin版:
class Serializer : org.apache.kafka.common.serialization.Serializer<Entity?>{ private val log = LoggerFactory.getLogger(this.javaClass) override fun serialize(topic: String?, data: Entity?): ByteArray? { try { return if (data == null) null else ObjectMapper().writeValueAsBytes(data) } catch (e:JsonProcessingException) { e.printStackTrace() log.error("Can not serialize entity in Serializer") } return null }}
class Deserializer : org.apache.kafka.common.serialization.Deserializer<Entity?>{ private val log = LoggerFactory.getLogger(this.javaClass) override fun deserialize(topic: String?, data: ByteArray?): Entity? { try { return ObjectMapper().readValue(data, Entity::class.java) } catch (e:IOException) { e.printStackTrace() log.error("Can not deserialize entity in Deserializer") } return null }}
4.10 配置文件
application.properties
:
# 地址,本地间接localhost,部署能够应用公网ipspring.kafka.bootstrap-servers=localhost:9092# 消费者组idspring.kafka.consumer.group-id=TestGroupIdspring.kafka.consumer.auto-offset-reset=earliest# 消费者键反序列化类spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 消费者值反序列化类spring.kafka.consumer.value-deserializer=com.test.serialize.Deserializer# 生产者键序列化类spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer# 生产者值序列化类spring.kafka.producer.value-serializer=com.test.serialize.Serializer
对于auto-offest-rest
,该属性指定了消费者在读取一个没有偏移量的分区或者偏移量有效的状况下怎么解决,有四个取值:
earliest
:当各分区有已提交的offest
时,从提交的offest
开始生产,无提交的offest
时,从头开始生产latest
(默认):当各分区有已提交的offest
时,从提交的offest
开始生产,无提交的offest
时,生产新产生的该分区下的数据none
:各分区都存在已提交的offest
时,从offest
后生产,只有有一个分区不存在已提交的offest
,则抛出异样exception
:其余状况将抛出异样给消费者
对于序列化/反序列化,String能够应用自带的序列化/反序列化类:
org.apache.kafka.common.serialization.StringSerializerorg.apache.kafka.common.serialization.StringDeserializer
至于Json能够应用:
org.springframework.kafka.support.serializer.JsonSerializerorg.springframework.kafka.support.serializer.JsonDeserializer
其余自定义的请实现org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T>
接口。
yml版:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: TestGroupId auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.test.serialize.Deserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.test.serialize.Serializer
5 测试
5.1 测试类
@SpringBootTest@Slf4j@RequiredArgsConstructor(onConstructor = @__(@Autowired))class TestApplicationTests { private final Producer producer; @Test void contextLoads() { Random random = new Random(); for (int i = 0; i < 1000; i++) { long id = i+1; String name = UUID.randomUUID().toString(); int num = random.nextInt(); producer.send(Entity.builder().id(id).name(name).num(num).build()); } }}
生产者发送1000条音讯。
Kotlin版:
@SpringBootTestclass TestApplicationTests { @Autowired private val producer:Producer? = null @Test fun contextLoads() { for(i in 0..1000) { val id = (i + 1).toLong() val name = java.util.UUID.randomUUID().toString() val num = (0..100000).random() producer!!.send(Entity(id,name,num)) } }}
5.2 测试
控制台输入如下:
所有音讯被胜利发送并且被胜利生产。
最初能够去验证一下Kafka的Topic列表,能够看到配置文件中的Topic的值(TestTopic
),进入Kafka目录:
bin/kafka-topics.sh --list --zookepper localhost:2181
6 源码
- Github
- 码云
7 参考
1、CSDN-Kafka长处
2、简书-Spring Boot 2.x 疾速集成整合消息中间件 Kafka
3、简书-springboot 之集成kafka
如果感觉文章难看,欢送点赞。
同时欢送关注微信公众号:氷泠之路。