标签:Kafka3.Kafka-eagle3;

一、简介

Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流剖析、数据集成和要害工作利用,基于Zookeeper协调的解决平台,也是一种音讯零碎,具备更好的吞吐量、内置分区、复制和容错,这使得它成为大规模音讯解决应用程序的一个很好的解决方案;

二、环境搭建

1、Kafka部署

1、下载安装包:kafka_2.13-3.5.0.tgz2、配置环境变量open -e ~/.bash_profileexport KAFKA_HOME=/本地门路/kafka3.5export PATH=$PATH:$KAFKA_HOME/binsource ~/.bash_profile3、该目录【kafka3.5/bin】启动zookeeperzookeeper-server-start.sh ../config/zookeeper.properties4、该目录【kafka3.5/bin】启动kafkakafka-server-start.sh ../config/server.properties

2、Kafka测试

1、生产者kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic>id-1-message>id-2-message2、消费者kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topicid-1-messageid-2-message3、查看topic列表kafka-topics.sh --bootstrap-server localhost:9092 --listtest-topic4、查看音讯列表kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --partition 0id-1-messageid-2-message

3、可视化工具

配置和部署

1、下载安装包:kafka-eagle-bin-3.0.2.tar.gz2、配置环境变量open -e ~/.bash_profileexport KE_HOME=/本地门路/efak-web-3.0.2export PATH=$PATH:$KE_HOME/binsource ~/.bash_profile3、批改配置文件:system-config.propertiesefak.zk.cluster.alias=cluster1cluster1.zk.list=localhost:2181efak.url=jdbc:mysql://127.0.0.1:3306/kafka-eagle4、本地新建数据库:kafka-eagle,留神用户名和明码是否统一5、启动命令efak-web-3.0.2/bin/ke.sh start命令语法: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate|cluster}6、本地拜访【localhost:8048】 username:admin password:123456

KSQL语句测试

select * from `test-topic` where `partition` in (0)  order by `date` desc limit 5

select * from `test-topic` where `partition` in (0) and msg like '%5%' order by `date` desc limit 3

三、工程搭建

1、工程构造

2、依赖治理

这里对于依赖的治理就比较复杂了,首先spring-kafka组件抉择与boot框架中spring雷同的依赖,即6.0.10版本,在spring-kafka最近的版本中3.0.8合乎;

然而该版本应用的是kafka-clients组件的3.3.2版本,在Spring文档的kafka模块中,明确阐明spring-boot:3.1要应用kafka-clients:3.4,所以从spring-kafka组件中排除掉,从新依赖kafka-clients组件;

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId>    <version>${spring-kafka.version}</version>    <exclusions>        <exclusion>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka-clients</artifactId>        </exclusion>    </exclusions></dependency><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>${kafka-clients.version}</version></dependency>

3、配置文件

配置kafka连贯地址,监听器的音讯应答机制,消费者的根底模式;

spring:  # kafka配置  kafka:    bootstrap-servers: localhost:9092    listener:      missing-topics-fatal: false      ack-mode: manual_immediate    consumer:      group-id: boot-kafka-group      enable-auto-commit: false      max-poll-records: 10      properties:        max.poll.interval.ms: 3600000

四、根底用法

1、音讯生产

模板类KafkaTemplate用于执行高级的操作,封装各种音讯发送的办法,在该办法中,通过topickey以及音讯主体,实现音讯的生产;

@RestControllerpublic class ProducerWeb {    @Resource    private KafkaTemplate<String, String> kafkaTemplate;    @GetMapping("/send/msg")    public String sendMsg (){        try {            // 构建音讯主体            JsonMapper jsonMapper = new JsonMapper();            String msgBody = jsonMapper.writeValueAsString(new MqMsg(7,"boot-kafka-msg"));            // 发送音讯            kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody);        } catch (JsonProcessingException e) {            e.printStackTrace();        }        return "OK" ;    }}

2、音讯生产

编写音讯监听类,通过KafkaListener注解管制监听的具体信息,在实现音讯生产和生产的办法测试后,应用可视化工具kafka-eagle查看topic和音讯列表;

@Componentpublic class ConsumerListener {    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);    @KafkaListener(topics = "boot-kafka-topic")    public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {        try {            String key =  String.valueOf(record.key());            String body = record.value();            log.info("\n=====\ntopic:boot-kafka-topic,key{},body:{}\n=====\n",key,body);        } catch (Exception e){            e.printStackTrace();        } finally {            acknowledgment.acknowledge();        }    }}

五、参考源码

文档仓库:https://gitee.com/cicadasmile/butte-java-note源码仓库:https://gitee.com/cicadasmile/butte-spring-parent