关于java:吊炸天的-Kafka-图形化工具-Eagle必须推荐给你

25次阅读

共计 6718 个字符,预计需要花费 17 分钟才能阅读完成。

Kafka 是当下十分风行的消息中间件,据官网走漏,已有成千上万的公司在应用它。最近实际了一波 Kafka,的确很好很弱小。明天咱们来从三个方面学习下 Kafka:Kafaka 在 Linux 下的装置,Kafka 的可视化工具,Kafka 和 SpringBoot 联合应用。心愿大家看完后能疾速入门 Kafka,把握这个风行的消息中间件!

SpringBoot 实战电商我的项目 mall(40k+star)地址:https://github.com/macrozheng/mall

Kafka 简介

Kafka 是由 LinkedIn 公司开发的一款开源分布式音讯流平台,由 Scala 和 Java 编写。次要作用是为解决实时数据提供一个对立、高吞吐、低提早的平台,其本质是基于 公布订阅模式 的音讯引擎零碎。

Kafka 具备以下个性:

  • 高吞吐、低提早:Kafka 收发音讯十分快,应用集群解决音讯提早可低至 2ms。
  • 高扩展性:Kafka 能够弹性地扩大和膨胀,能够扩大到上千个 broker,数十万个 partition,每天解决数万亿条音讯。
  • 永恒存储:Kafka 能够将数据安全地存储在分布式的,长久的,容错的群集中。
  • 高可用性:Kafka 在可用区上能够无效地扩大群集,某个节点宕机,集群照样可能失常工作。

Kafka 装置

咱们将采纳 Linux 下的装置形式,装置环境为 CentOS 7.6。此处没有采纳 Docker 来装置部署,个人感觉间接装置更简略(次要是官网没提供 Docker 镜像)!

  • 首先咱们须要下载 Kafka 的安装包,下载地址:https://mirrors.bfsu.edu.cn/a…

  • 下载实现后将 Kafka 解压到指定目录:
cd /mydata/kafka/
tar -xzf kafka_2.13-2.8.0.tgz
  • 解压实现后进入到解压目录:
cd kafka_2.13-2.8.0
  • 尽管有音讯称 Kafka 行将移除 Zookeeper,然而在 Kafka 最新版本中尚未移除,所以启动 Kafka 前还是须要先启动 Zookeeper;

  • 启动 Zookeeper 服务,服务将运行在 2181 端口;
# 后盾运行服务,并把日志输入到以后文件夹下的 zookeeper-out.file 文件中
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-out.file 2>&1 &
  • 因为目前 Kafka 是部署在 Linux 服务器上的,外网如果想要拜访,须要批改 Kafka 的配置文件config/server.properties,批改下 Kafka 的监听地址,否则会无奈连贯;
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.5.78:9092
  • 最初启动 Kafka 服务,服务将运行在 9092 端口。
# 后盾运行服务,并把日志输入到以后文件夹下的 kafka-out.file 文件中
nohup bin/kafka-server-start.sh config/server.properties > kafka-out.file 2>&1 &

Kafka 命令行操作

接下来咱们应用命令行来操作下 Kafka,相熟下 Kafka 的应用。

  • 首先创立一个叫 consoleTopic 的 Topic;
bin/kafka-topics.sh --create --topic consoleTopic --bootstrap-server 192.168.5.78:9092
  • 接下来查看 Topic;
bin/kafka-topics.sh --describe --topic consoleTopic --bootstrap-server 192.168.5.78:9092
  • 会显示如下 Topic 信息;
Topic: consoleTopic    TopicId: tJmxUQ8QRJGlhCSf2ojuGw    PartitionCount: 1    ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: consoleTopic    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
  • 向 Topic 中发送音讯:
bin/kafka-console-producer.sh --topic consoleTopic --bootstrap-server 192.168.5.78:9092
  • 间接在命令行中输出信息即可发送;

  • 从新关上一个窗口,通过如下命令能够从 Topic 中获取音讯:
bin/kafka-console-consumer.sh --topic consoleTopic --from-beginning --bootstrap-server 192.168.5.78:9092

Kafka 可视化

应用命令行操作 Kafka 的确有点麻烦,接下来咱们试试可视化工具kafka-eagle

装置 JDK

如果你应用的是 CentOS 的话,默认没有装置完整版的 JDK,须要自行装置!

  • 下载 JDK 8,下载地址:https://mirrors.tuna.tsinghua…

  • 下载实现后将 JDK 解压到指定目录;
cd /mydata/java
tar -zxvf OpenJDK8U-jdk_x64_linux_xxx.tar.gz
mv OpenJDK8U-jdk_x64_linux_xxx.tar.gz jdk1.8
  • /etc/profile 文件中增加环境变量JAVA_HOME
vi /etc/profile
# 在 profile 文件中增加
export JAVA_HOME=/mydata/java/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
# 使批改后的 profile 文件失效
. /etc/profile

装置kafka-eagle

  • 下载 kafka-eagle 的安装包,下载地址:https://github.com/smartloli/…

  • 下载实现后将 kafka-eagle 解压到指定目录;
cd /mydata/kafka/
tar -zxvf kafka-eagle-web-2.0.5-bin.tar.gz
  • /etc/profile 文件中增加环境变量KE_HOME
vi /etc/profile
# 在 profile 文件中增加
export KE_HOME=/mydata/kafka/kafka-eagle-web-2.0.5
export PATH=$PATH:$KE_HOME/bin
# 使批改后的 profile 文件失效
. /etc/profile
  • 装置 MySQL 并增加数据库 kekafka-eagle 之后会用到它;
  • 批改配置文件$KE_HOME/conf/system-config.properties,次要是批改 Zookeeper 的配置和数据库配置,正文掉 sqlite 配置,改为应用 MySQL;
######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181

######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048

######################################
# kafka sqlite jdbc driver address
######################################
# kafka.eagle.driver=org.sqlite.JDBC
# kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# kafka.eagle.username=root
# kafka.eagle.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=root
  • 应用如下命令启动kafka-eagle
$KE_HOME/bin/ke.sh start
  • 命令执行实现后会显示如下信息,但并不代表服务曾经启动胜利,还须要期待一会;

  • 再介绍几个有用的 kafka-eagle 命令:
# 进行服务
$KE_HOME/bin/ke.sh stop
# 重启服务
$KE_HOME/bin/ke.sh restart
# 查看服务运行状态
$KE_HOME/bin/ke.sh status
# 查看服务状态
$KE_HOME/bin/ke.sh stats
# 动静查看服务输入日志
tail -f $KE_HOME/logs/ke_console.out
  • 启动胜利能够间接拜访,输出账号密码admin:123456,拜访地址:http://192.168.5.78:8048/

  • 登录胜利后能够拜访到 Dashboard,界面还是很棒的!

可视化工具应用

  • 之前咱们应用命令行创立了 Topic,这里能够间接通过界面来创立;

  • 咱们还能够间接通过 kafka-eagle 来发送音讯;

  • 咱们能够通过命令行来生产 Topic 中的音讯;
bin/kafka-console-consumer.sh --topic testTopic --from-beginning --bootstrap-server 192.168.5.78:9092
  • 控制台获取到信息显示如下;

  • 还有一个很有意思的性能叫KSQL,能够通过 SQL 语句来查问 Topic 中的音讯;

  • 可视化工具天然少不了监控,如果你想开启 kafka-eagle 对 Kafka 的监控性能的话,须要批改 Kafka 的启动脚本,裸露 JMX 的端口;
vi kafka-server-start.sh
# 裸露 JMX 端口
if ["x$KAFKA_HEAP_OPTS" = "x"]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
fi
  • 来看下监控图表界面;

  • 还有一个很骚气的监控大屏性能;

  • 还有 Zookeeper 的命令行性能,总之性能很全,很弱小!

SpringBoot 整合 Kafka

在 SpringBoot 中操作 Kafka 也是非常简单的,比方 Kafka 的音讯模式很简略,没有队列,只有 Topic。

  • 首先在利用的 pom.xml 中增加 Spring Kafka 依赖;
<!--Spring 整合 Kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.1</version>
</dependency>
  • 批改利用配置文件application.yml,配置 Kafka 服务地址及 consumer 的group-id
server:
  port: 8088
spring:
  kafka:
    bootstrap-servers: '192.168.5.78:9092'
    consumer:
      group-id: "bootGroup"
  • 创立一个生产者,用于向 Kafka 的 Topic 中发送音讯;
/**
 * Kafka 音讯生产者
 * Created by macro on 2021/5/19.
 */
@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(String message){kafkaTemplate.send("bootTopic",message);
    }
}
  • 创立一个消费者,用于从 Kafka 中获取音讯并生产;
/**
 * Kafka 音讯消费者
 * Created by macro on 2021/5/19.
 */
@Slf4j
@Component
public class KafkaConsumer {@KafkaListener(topics = "bootTopic")
    public void processMessage(String content) {log.info("consumer processMessage : {}",content);
    }

}
  • 创立一个发送音讯的接口,调用生产者去发送音讯;
/**
 * Kafka 功能测试
 * Created by macro on 2021/5/19.
 */
@Api(tags = "KafkaController", description = "Kafka 功能测试")
@Controller
@RequestMapping("/kafka")
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @ApiOperation("发送音讯")
    @RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult sendMessage(@RequestParam String message) {kafkaProducer.send(message);
        return CommonResult.success(null);
    }
}
  • 间接在 Swagger 中调用接口进行测试;

  • 我的项目控制台会输入如下信息,表明音讯曾经被接管并生产掉了。
2021-05-19 16:59:21.016  INFO 2344 --- [ntainer#0-0-C-1] c.m.mall.tiny.component.KafkaConsumer    : consumer processMessage : Spring Boot message!

总结

通过本文的一波实际,大家根本就能入门 Kafka 了。装置、可视化工具、联合 SpringBoot,这些根本都是和开发者相干的操作,也是学习 Kafka 的必经之路。

参考资料

  • Kafka 官网文档:https://kafka.apache.org/quic…
  • kafka-eagle官网文档:http://www.kafka-eagle.org/ar…
  • Kafka 相干概念:https://juejin.cn/post/684490…

我的项目源码地址

https://github.com/macrozheng…

本文 GitHub https://github.com/macrozheng/mall-learning 曾经收录,欢送大家 Star!

正文完
 0