Kafka是当下十分风行的消息中间件,据官网走漏,已有成千上万的公司在应用它。最近实际了一波Kafka,的确很好很弱小。明天咱们来从三个方面学习下Kafka:Kafaka在Linux下的装置,Kafka的可视化工具,Kafka和SpringBoot联合应用。心愿大家看完后能疾速入门Kafka,把握这个风行的消息中间件!
SpringBoot实战电商我的项目mall(40k+star)地址:https://github.com/macrozheng/mall
Kafka简介
Kafka是由LinkedIn
公司开发的一款开源分布式音讯流平台,由Scala和Java编写。次要作用是为解决实时数据提供一个对立、高吞吐、低提早的平台,其本质是基于公布订阅模式
的音讯引擎零碎。
Kafka具备以下个性:
- 高吞吐、低提早:Kafka收发音讯十分快,应用集群解决音讯提早可低至2ms。
- 高扩展性:Kafka能够弹性地扩大和膨胀,能够扩大到上千个broker,数十万个partition,每天解决数万亿条音讯。
- 永恒存储:Kafka能够将数据安全地存储在分布式的,长久的,容错的群集中。
- 高可用性:Kafka在可用区上能够无效地扩大群集,某个节点宕机,集群照样可能失常工作。
Kafka装置
咱们将采纳Linux下的装置形式,装置环境为CentOS 7.6。此处没有采纳Docker来装置部署,个人感觉间接装置更简略(次要是官网没提供Docker镜像)!
- 首先咱们须要下载Kafka的安装包,下载地址:https://mirrors.bfsu.edu.cn/a...
- 下载实现后将Kafka解压到指定目录:
cd /mydata/kafka/tar -xzf kafka_2.13-2.8.0.tgz
- 解压实现后进入到解压目录:
cd kafka_2.13-2.8.0
- 尽管有音讯称Kafka行将移除Zookeeper,然而在Kafka最新版本中尚未移除,所以启动Kafka前还是须要先启动Zookeeper;
- 启动Zookeeper服务,服务将运行在
2181
端口;
# 后盾运行服务,并把日志输入到以后文件夹下的zookeeper-out.file文件中nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-out.file 2>&1 &
- 因为目前Kafka是部署在Linux服务器上的,外网如果想要拜访,须要批改Kafka的配置文件
config/server.properties
,批改下Kafka的监听地址,否则会无奈连贯;
############################# Socket Server Settings ############################## The address the socket server listens on. It will get the value returned from# java.net.InetAddress.getCanonicalHostName() if not configured.# FORMAT:# listeners = listener_name://host_name:port# EXAMPLE:# listeners = PLAINTEXT://your.host.name:9092listeners=PLAINTEXT://192.168.5.78:9092
- 最初启动Kafka服务,服务将运行在
9092
端口。
# 后盾运行服务,并把日志输入到以后文件夹下的kafka-out.file文件中nohup bin/kafka-server-start.sh config/server.properties > kafka-out.file 2>&1 &
Kafka命令行操作
接下来咱们应用命令行来操作下Kafka,相熟下Kafka的应用。
- 首先创立一个叫
consoleTopic
的Topic;
bin/kafka-topics.sh --create --topic consoleTopic --bootstrap-server 192.168.5.78:9092
- 接下来查看Topic;
bin/kafka-topics.sh --describe --topic consoleTopic --bootstrap-server 192.168.5.78:9092
- 会显示如下Topic信息;
Topic: consoleTopic TopicId: tJmxUQ8QRJGlhCSf2ojuGw PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: consoleTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
- 向Topic中发送音讯:
bin/kafka-console-producer.sh --topic consoleTopic --bootstrap-server 192.168.5.78:9092
- 间接在命令行中输出信息即可发送;
- 从新关上一个窗口,通过如下命令能够从Topic中获取音讯:
bin/kafka-console-consumer.sh --topic consoleTopic --from-beginning --bootstrap-server 192.168.5.78:9092
Kafka可视化
应用命令行操作Kafka的确有点麻烦,接下来咱们试试可视化工具kafka-eagle
。
装置JDK
如果你应用的是CentOS的话,默认没有装置完整版的JDK,须要自行装置!
- 下载JDK 8,下载地址:https://mirrors.tuna.tsinghua...
- 下载实现后将JDK解压到指定目录;
cd /mydata/javatar -zxvf OpenJDK8U-jdk_x64_linux_xxx.tar.gzmv OpenJDK8U-jdk_x64_linux_xxx.tar.gz jdk1.8
- 在
/etc/profile
文件中增加环境变量JAVA_HOME
。
vi /etc/profile# 在profile文件中增加export JAVA_HOME=/mydata/java/jdk1.8export PATH=$PATH:$JAVA_HOME/bin# 使批改后的profile文件失效. /etc/profile
装置kafka-eagle
- 下载
kafka-eagle
的安装包,下载地址:https://github.com/smartloli/...
- 下载实现后将
kafka-eagle
解压到指定目录;
cd /mydata/kafka/tar -zxvf kafka-eagle-web-2.0.5-bin.tar.gz
- 在
/etc/profile
文件中增加环境变量KE_HOME
;
vi /etc/profile# 在profile文件中增加export KE_HOME=/mydata/kafka/kafka-eagle-web-2.0.5export PATH=$PATH:$KE_HOME/bin# 使批改后的profile文件失效. /etc/profile
- 装置MySQL并增加数据库
ke
,kafka-eagle
之后会用到它; - 批改配置文件
$KE_HOME/conf/system-config.properties
,次要是批改Zookeeper的配置和数据库配置,正文掉sqlite配置,改为应用MySQL;
####################################### multi zookeeper & kafka cluster list######################################kafka.eagle.zk.cluster.alias=cluster1cluster1.zk.list=localhost:2181####################################### kafka eagle webui port######################################kafka.eagle.webui.port=8048####################################### kafka sqlite jdbc driver address####################################### kafka.eagle.driver=org.sqlite.JDBC# kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db# kafka.eagle.username=root# kafka.eagle.password=www.kafka-eagle.org####################################### kafka mysql jdbc driver address######################################kafka.eagle.driver=com.mysql.cj.jdbc.Driverkafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNullkafka.eagle.username=rootkafka.eagle.password=root
- 应用如下命令启动
kafka-eagle
;
$KE_HOME/bin/ke.sh start
- 命令执行实现后会显示如下信息,但并不代表服务曾经启动胜利,还须要期待一会;
- 再介绍几个有用的
kafka-eagle
命令:
# 进行服务$KE_HOME/bin/ke.sh stop# 重启服务$KE_HOME/bin/ke.sh restart# 查看服务运行状态$KE_HOME/bin/ke.sh status# 查看服务状态$KE_HOME/bin/ke.sh stats# 动静查看服务输入日志tail -f $KE_HOME/logs/ke_console.out
- 启动胜利能够间接拜访,输出账号密码
admin:123456
,拜访地址:http://192.168.5.78:8048/
- 登录胜利后能够拜访到Dashboard,界面还是很棒的!
可视化工具应用
- 之前咱们应用命令行创立了Topic,这里能够间接通过界面来创立;
- 咱们还能够间接通过
kafka-eagle
来发送音讯;
- 咱们能够通过命令行来生产Topic中的音讯;
bin/kafka-console-consumer.sh --topic testTopic --from-beginning --bootstrap-server 192.168.5.78:9092
- 控制台获取到信息显示如下;
- 还有一个很有意思的性能叫
KSQL
,能够通过SQL语句来查问Topic中的音讯;
- 可视化工具天然少不了监控,如果你想开启
kafka-eagle
对Kafka的监控性能的话,须要批改Kafka的启动脚本,裸露JMX的端口;
vi kafka-server-start.sh# 裸露JMX端口if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_PORT="9999"fi
- 来看下监控图表界面;
- 还有一个很骚气的监控大屏性能;
- 还有Zookeeper的命令行性能,总之性能很全,很弱小!
SpringBoot整合Kafka
在SpringBoot中操作Kafka也是非常简单的,比方Kafka的音讯模式很简略,没有队列,只有Topic。
- 首先在利用的
pom.xml
中增加Spring Kafka依赖;
<!--Spring整合Kafka--><dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.1</version></dependency>
- 批改利用配置文件
application.yml
,配置Kafka服务地址及consumer的group-id
;
server: port: 8088spring: kafka: bootstrap-servers: '192.168.5.78:9092' consumer: group-id: "bootGroup"
- 创立一个生产者,用于向Kafka的Topic中发送音讯;
/** * Kafka音讯生产者 * Created by macro on 2021/5/19. */@Componentpublic class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; public void send(String message){ kafkaTemplate.send("bootTopic",message); }}
- 创立一个消费者,用于从Kafka中获取音讯并生产;
/** * Kafka音讯消费者 * Created by macro on 2021/5/19. */@Slf4j@Componentpublic class KafkaConsumer { @KafkaListener(topics = "bootTopic") public void processMessage(String content) { log.info("consumer processMessage : {}",content); }}
- 创立一个发送音讯的接口,调用生产者去发送音讯;
/** * Kafka功能测试 * Created by macro on 2021/5/19. */@Api(tags = "KafkaController", description = "Kafka功能测试")@Controller@RequestMapping("/kafka")public class KafkaController { @Autowired private KafkaProducer kafkaProducer; @ApiOperation("发送音讯") @RequestMapping(value = "/sendMessage", method = RequestMethod.GET) @ResponseBody public CommonResult sendMessage(@RequestParam String message) { kafkaProducer.send(message); return CommonResult.success(null); }}
- 间接在Swagger中调用接口进行测试;
- 我的项目控制台会输入如下信息,表明音讯曾经被接管并生产掉了。
2021-05-19 16:59:21.016 INFO 2344 --- [ntainer#0-0-C-1] c.m.mall.tiny.component.KafkaConsumer : consumer processMessage : Spring Boot message!
总结
通过本文的一波实际,大家根本就能入门Kafka了。装置、可视化工具、联合SpringBoot,这些根本都是和开发者相干的操作,也是学习Kafka的必经之路。
参考资料
- Kafka官网文档:https://kafka.apache.org/quic...
kafka-eagle
官网文档:http://www.kafka-eagle.org/ar...- Kafka相干概念:https://juejin.cn/post/684490...
我的项目源码地址
https://github.com/macrozheng...
本文 GitHub https://github.com/macrozheng/mall-learning 曾经收录,欢送大家Star!