乐趣区

关于springboot:Spring-Boot-2x-快速集成Kafka

1 Kafka

Kafka 是一个开源分布式的流解决平台,一种高吞吐量的分布式公布订阅音讯零碎,它能够解决消费者在网站中的所有动作流数据。Kafka 由 Scala 和 Java 编写,2012 年成为 Apache 基金会下顶级我的项目。

2 Kafka 长处

  • 低提早:Kafka 反对低提早消息传递,速度极快,能达到 200w 写 / 秒
  • 高性能:Kafka 对于音讯的散布,订阅都有高吞吐量。即便存储了 TB 级的信息,仍然可能保障稳固的性能
  • 可靠性:Kafka 是分布式,分区,复制和容错的,保障零停机和零数据失落
  • 可扩大:用户能够从但个代理 Broker 开始作 POC,而后缓缓扩大到由三个 Broker 组成的小型开发集群,接着扩大到数十个甚至数百个 Broker 集群进入生产阶段,能够在集群联机时进行扩大,而不会影响整个零碎的可用性
  • 多个生产者:无论这些客户应用雷同 Topic 还是多个 Topic,Kafka 都能无缝解决多个生产者,使得零碎能够非常容易聚合来自许多前端零碎的数据并使其保持一致
  • 多个消费者:Kafka 具备多个消费者设计,能够读取任何但个音讯流而不会互相烦扰。多个 Kafka 消费者能够组成一个生产组进行操作并共享音讯流,从而确保每一条音讯只被整个生产组解决一次
  • 基于磁盘的保留:Kafka 应用分布式提交日志,音讯可能疾速长久化到磁盘上。音讯长久化意味着如果消费者落后,无论是因为处理速度迟缓还是忽然的音讯涌入都不会有失落数据的危险,也意味着消费者能够被进行。音讯将保留在 Kafka 中,容许消费者重新启动并且从中断处获取解决信息而不会失落数据

3 Kafka 相干术语

  • Broker:Kafka 集群蕴含一个或多个服务器,这种服务器称为 Broker
  • Topic:每条公布到 Kafka 的音讯都有一个类别,这个类别称为 Topic。物理上不同 Topic 的音讯离开存储,逻辑上 Topic 的音讯尽管保留在一个或多个 Broker 上,但用户只需指定音讯的 Topic 即可生产或生产数据而不用关怀数据寄存于何处
  • Partition:每个 Topic 蕴含一个或多个 Partition
  • Producer:生产者,负责公布音讯到 Broker
  • Consumer:消费者,向 Broker 读取音讯的客户端
  • Consumer Group:每个 Consumer 属于一个特定的 Consumer Group,能够为每个 Consumer 指定 Group Name,否则属于默认 Group

4 入手干活

4.1 环境

  • Spring Boot 2.3.1
  • IDEA 2020.1.1
  • OpenJDK 11.0.7
  • Kafka 2.5.0
  • Kotlin 1.3.72

4.2 下载 Kafka

官网戳这里。
下载并解压(留神须要 Kafka 与 Spring Boot 版本对应,能够参考这里):

tar -xvf kafka_2.12-2.5.0.tgz
cd kafka_2.12-2.5.0

接着启动 ZooKeeper 与 Kafka:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Kafka 须要用到 ZooKeeper,须要在启动 Kafka 之前启动 ZooKeeper(ZooKeeper 是一个开源的分布式应用程序协调服务,是 Hadoop 的组件,次要用于解决分布式应用中的一些数据管理问题)。
Kafka 默认应用 9092 端口,部署在服务器上须要留神防火墙以及平安组的解决。

4.3 新建工程

思考到 Spring Boot 在 2.3.0M1 中(截至本文写作日期 2020.07.14Spring Boot 已更新到 2.4.0M1)首次采纳 Gradle 而不是 Maven 来构建我的项目,换句话说日后 Spring Boot 的构建工具将从 Maven 迁徙到 Gradle,Spring Boot 团队给出的次要起因是能够缩小我的项目构建所破费的工夫,详情能够戳这里瞧瞧。
另外因为另一个基于 JVM 的语言 Kotlin 的日渐崛起,后端开始逐步有人采纳 Kotlin(只管不多,不过语法糖真的香,JetBrains 家的语言配合 IDE,爽得飞起),因而本示例我的项目将采纳两种形式搭建:

  • Java+Maven
  • Kotlin+Gradle

抉择的依赖如下(当然您喜爱的话能够在 pom.xml 或者 build.gradle.kts 外面加,对于 Kotlin 不须要Lombok):

4.4 我的项目构造

Java 版:

Kotlin 版:

  • serialize:序列化 / 反序列化实体类
  • Constant.java/Constant.kt:常量类
  • Consumer.java/Consumer.kt:消费者类
  • Entity.java/Entity.kt:实体类
  • Producer.java/Product.kt:生产者类
  • TestApplicationTests:测试类

4.5 常量类

蕴含 Topic 与 GroupId,Java 版:

public class Constants {
    public static final String TOPIC = "TestTopic";
    public static final String GROUP_ID = "TestGroupId";
}

Kotlin 版:

object Constants
{
    const val TOPIC = "TestTopic"
    const val GROUP_ID = "TestGroupId"
}

4.6 实体类

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Entity {
    private long id;
    private String name;
    private int num;
}

说一下 Lombok 的几个注解:

  • @AllArgsConstructor/@NoArgsConstructor:生成所有参数 / 无参数构造方法
  • @Data:等价于 @Setter+@Getter+@RequiredArgsConstrucotr+@ToString+@EqualAndHashCode,主动生成Setter+Getter+toString()+equals()+hashCode(),还有@RequireArgsConstructor 为类的每一个 final 或非空字段生成一个构造方法
  • @Builder:能够通过建造者模式创建对象

Kotlin 版:

class Entity {
    var id: Long = 0
    var name: String = ""
    var num: Int = 0

    constructor()

    constructor(id:Long,name:String,num:Int)
    {
        this.id = id
        this.name = name
        this.num = num
    }
}

4.7 生产者

@Component
@Slf4j
// 防止出现 Field injection not recommended 正告,代替了原来的间接在字段上 @Autowired
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class Producer {

    private final KafkaTemplate<String, Entity> kafkaTemplate;

    public void send(Entity entity) {
        // 发送音讯
        // 类型个别为 String+ 自定义音讯内容,String 代表音讯 Topic,这里音讯内容用 Entity 示意
        ListenableFuture<SendResult<String, Entity>> future =
                kafkaTemplate.send(Constants.TOPIC, entity);
        // 回调函数
        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable throwable) {log.info("Send message failed");
            }

            @Override
            public void onSuccess(SendResult<String, Entity> stringEntitySendResult) {log.info("Send message success");
            }
        });
    }
}

这里的 send 有两个参数,对应于 sendResult<> 中的参数类型,第一个为音讯的 Topic,第二个为音讯体,个别应用 String 或者 Json。

Kotlin 版:

@Component
class Producer
{
    @Autowired
    private var kafkaTemplate:KafkaTemplate<String,Entity> ? = null
    private val log = LoggerFactory.getLogger(this.javaClass)

    fun send(entity: Entity)
    {val future = kafkaTemplate!!.send(Constants.TOPIC,entity);
        future.addCallback(object : ListenableFutureCallback<SendResult<String?, Entity?>?>{override fun onSuccess(result : SendResult<String?,Entity?>?)
            {log.info("Send success");
            }

            override fun onFailure(e:Throwable)
            {log.info("Send failed");
            }
        })
    }
}

4.8 消费者

@Component
@Slf4j
public class Consumer {@KafkaListener(topics = Constants.TOPIC,groupId = Constants.GROUP_ID)
    public void consume(Entity entity)
    {log.info("Consume a entity, id is"+entity.getId());
    }
}

应用 @KafkaListener 注解,第一个参数示意须要生产的音讯的 Topic,能够是String [],第二个是消费者组的 id。生产者的音讯 Topic 必须与消费者的 Topic 保持一致否则不能生产,这里简略解决打印日志。
Kotlin 版:

@Component
class Consumer {private val log = LoggerFactory.getLogger(this.javaClass)

    @KafkaListener(topics = [Constants.TOPIC],groupId = Constants.GROUP_ID)
    fun consume(entity: Entity) {log.info("Consume a entity, id is"+entity.id.toString())
    }
}

4.9 序列化 / 反序列化

这里自定义了序列化 / 反序列化类,序列化 / 反序列化类须要实现 org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T> 接口,其中 T 是想要序列化的类型,这里是Entity。序列化接口反编译如下:

public interface Serializer<T> extends Closeable {default void configure(Map<String, ?> configs, boolean isKey) { }

    byte[] serialize(String var1, T var2);

    default byte[] serialize(String topic, Headers headers, T data) {return this.serialize(topic, data);
    }

    default void close() {}
}

反序列化反编译接口如下:

public interface Deserializer<T> extends Closeable {default void configure(Map<String, ?> configs, boolean isKey) { }

    T deserialize(String var1, byte[] var2);

    default T deserialize(String topic, Headers headers, byte[] data) {return this.deserialize(topic, data);
    }

    default void close() {}
}

也就是只须要实现其中的 serialize/deserialize 办法即可。这里序列化 / 反序列化用到了自带的 Jackson:

@Slf4j
public class Serializer implements org.apache.kafka.common.serialization.Serializer<Entity> {public byte [] serialize(String topic, Entity entity)
    {
        try {return entity == null ? null : new ObjectMapper().writeValueAsBytes(entity);
        } catch (JsonProcessingException e) {e.printStackTrace();
            log.error("Can not serialize entity in Serializer");
        }
        return null;
    }
}

反序列化:

@Slf4j
public class Deserializer implements org.apache.kafka.common.serialization.Deserializer<Entity> {public Entity deserialize(String topic,byte [] data)
    {
        try {return data == null ? null : new ObjectMapper().readValue(data,Entity.class);
        } catch (IOException e) {e.printStackTrace();
            log.error("Can not deserialize entity in Deserializer");
        }
        return null;
    }
}

Kotlin 版:

class Serializer : org.apache.kafka.common.serialization.Serializer<Entity?>
{private val log = LoggerFactory.getLogger(this.javaClass)

    override fun serialize(topic: String?, data: Entity?): ByteArray? {
        try {return if (data == null) null else ObjectMapper().writeValueAsBytes(data)
        }
        catch (e:JsonProcessingException)
        {e.printStackTrace()
            log.error("Can not serialize entity in Serializer")
        }
        return null
    }
}
class Deserializer : org.apache.kafka.common.serialization.Deserializer<Entity?>
{private val log = LoggerFactory.getLogger(this.javaClass)

    override fun deserialize(topic: String?, data: ByteArray?): Entity? {
        try
        {return ObjectMapper().readValue(data, Entity::class.java)
        }
        catch (e:IOException)
        {e.printStackTrace()
            log.error("Can not deserialize entity in Deserializer")
        }
        return null
    }
}

4.10 配置文件

application.properties

# 地址,本地间接 localhost,部署能够应用公网 ip
spring.kafka.bootstrap-servers=localhost:9092
# 消费者组 id
spring.kafka.consumer.group-id=TestGroupId
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者键反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费者值反序列化类
spring.kafka.consumer.value-deserializer=com.test.serialize.Deserializer

# 生产者键序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者值序列化类
spring.kafka.producer.value-serializer=com.test.serialize.Serializer

对于auto-offest-rest,该属性指定了消费者在读取一个没有偏移量的分区或者偏移量有效的状况下怎么解决,有四个取值:

  • earliest:当各分区有已提交的 offest 时,从提交的 offest 开始生产,无提交的 offest 时,从头开始生产
  • latest(默认):当各分区有已提交的 offest 时,从提交的 offest 开始生产,无提交的 offest 时,生产新产生的该分区下的数据
  • none:各分区都存在已提交的 offest 时,从 offest 后生产,只有有一个分区不存在已提交的offest,则抛出异样
  • exception:其余状况将抛出异样给消费者

对于序列化 / 反序列化,String 能够应用自带的序列化 / 反序列化类:

org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.StringDeserializer

至于 Json 能够应用:

org.springframework.kafka.support.serializer.JsonSerializer
org.springframework.kafka.support.serializer.JsonDeserializer

其余自定义的请实现 org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T> 接口。

yml 版:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: TestGroupId
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.test.serialize.Deserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.test.serialize.Serializer

5 测试

5.1 测试类

@SpringBootTest
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
class TestApplicationTests {

    private final Producer producer;

    @Test
    void contextLoads() {Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            long id = i+1;
            String name = UUID.randomUUID().toString();
            int num = random.nextInt();
            producer.send(Entity.builder().id(id).name(name).num(num).build());
        }
    }
}

生产者发送 1000 条音讯。
Kotlin 版:

@SpringBootTest
class TestApplicationTests {

    @Autowired
    private val producer:Producer? = null

    @Test
    fun contextLoads() {for(i in 0..1000)
        {val id = (i + 1).toLong()
            val name = java.util.UUID.randomUUID().toString()
            val num = (0..100000).random()
            producer!!.send(Entity(id,name,num))
        }
    }
}

5.2 测试

控制台输入如下:

所有音讯被胜利发送并且被胜利生产。
最初能够去验证一下 Kafka 的 Topic 列表,能够看到配置文件中的 Topic 的值(TestTopic),进入 Kafka 目录:

bin/kafka-topics.sh --list --zookepper localhost:2181

6 源码

  • Github
  • 码云

7 参考

1、CSDN-Kafka 长处
2、简书 -Spring Boot 2.x 疾速集成整合消息中间件 Kafka
3、简书 -springboot 之集成 kafka

如果感觉文章难看,欢送点赞。

同时欢送关注微信公众号:氷泠之路。

退出移动版