乐趣区

关于java:光说不练假把式一起Kafka业务实战

7.1 程序性场景

7.1.1 场景概述

假如咱们要传输一批订单到另一个零碎,那么订单对应状态的演变是有程序性要求的。

已下单 → 已领取 → 已确认

不容许错乱!

7.1.2 程序级别

1)全局有序:

串行化。每条通过 kafka 的音讯必须严格保障有序性。

这就要求 kafka 单通道,每个 groupid 下单消费者

极大的影响性能,事实业务下简直没必要

2)部分有序:

业务部分有序。同一条订单有序即可,不同订单能够并行处理。不同订单的程序前后无所谓

充分利用 kafka 多分区的并发性,只须要想方法让须要程序的一批数据进同一分区即可。

7.1.3 实现计划

1)发送端:

指定 key 发送,key=order.id 即可,案例回顾:4.2.3,PartitionProducer

2)发送中:

给队列配置多分区保障并发性。

3)读取端:

单消费者:显然不合理

吞吐量显然上不去,kafka 开多个分区还有何意义?

所以开多个消费者指定分区生产,现实情况下,每个分区配一个。

然而,这个吞吐量仍然无限,那如何解决呢?

计划:多线程

在每个消费者上再开多线程,是个解决办法。然而,要警觉程序性被突破!

参考下图:thread 解决后,会将 data 变成 2-1-3

改良:接管后散发二级内存队列

消费者取到音讯后不做解决,依据 key 二次散发到多个阻塞队列。

再开启多个线程,每个队列调配一个线程解决。晋升吞吐量

7.1.4 代码验证

1)新建一个 sort 队列,2 个分区

2)启动 order 我的项目

源码参考:

SortedProducer(程序性发送端)

SortedConsumer(程序性生产端 – 阻塞队列实现,不便大家了解设计思路)

SortedConsumer2(程序性生产端 – 线程池实现,事实中举荐这种形式!)

3)通过 swagger 申请

先按不同的 id 发送,查看控制台日志,id 被正确散发到对应的队列

同一个 key 调配到同一个 queue,程序性失去保障

7.2 海量同步场景

假如大数据部门须要大屏来展现用户的打车订单状况,须要把订单数据送入 druid

这里不波及程序,只有下单就传输,然而对实时性和并发量要求较高

7.2.1 惯例架构

在下单实现 mysql 后,通过程序代码打印,间接进入 kafka

或者 logback 和 kafka 集成,通过 log 输送

长处:

更合乎惯例的思维。将数据送给想要的部门

毛病:

耦合度高,将 kafka 发送音讯嵌入了订单下单的主业务,造成代码入侵。

下单不关怀,也不应该关注送入 kafka 的状况,一旦 kafka 不可用,程序受影响

7.2.2 解耦合

借助 canal,监听订单表的数据变动,不再影响主业务。

7.2.3 部署实现

1)mysql 部署

留神,须要关上 binlog,8.0 默认处于开启状态

# 启动 mysql8
docker run --name mysql8 -v /opt/data/mysql8:/var/lib/mysql -p 3389:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=thisisprizemysql8db -d daocloud.io/mysql:8.0

连上 mysql,执行以下 sql,增加 canal 用户

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';

创立订单表

CREATE TABLE `orders` (
  `id` int unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
);

2)canal 部署

#canal.properties
#附带材料里有,放到服务器 /opt/data/canal/ 目录下
#批改 servers 为你的 kafka 的机器地址
canal.serverMode = kafka
kafka.bootstrap.servers = 52.82.98.209:10903,52.82.98.209:10904
#docker-compose.yml
#附带材料里有 canal.yml,轻易找个目录,重命名为 docker-compose.yml
#批改 mysql 的链接信息的链接信息
#而后在当前目录下执行 docker-compose up -d
version: '2'
services:
    canal:
        image: canal/canal-server
        container_name: canal
        restart: always
        ports:
            - "10908:11111"
        environment:
                #mysql 的链接信息
            canal.instance.master.address: 52.82.98.209:3389
            canal.instance.dbUsername: canal
            canal.instance.dbPassword: canal
            #投放到 kafka 的哪个主题?要提前准备好!canal.mq.topic: canal
        volumes:
            - "/opt/data/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"

3)数据通道验证

进入 kafka 容器,用下面 3.2.4 里的命令行形式监听 canal 队列

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal

在 mysql 上创立 orders 表,增删数据试一下

mysql> insert into orders (name) values ('张三');
Query OK, 1 row affected (0.03 sec)

在 kafka 控制台,能够看到同步的音讯

{"data":[{"id":"1","name":"张三"}],"database":"canal","es":1611657853000,"id":5,"isDdl":false,"mysqlType":{"id":"int unsigned","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"orders","ts":1611657853802,"type":"INSERT"}

数据通道已买通,还短少的是 druid 作为生产端来接管音讯

4)druid 部署

#druid.yml
#在附带材料里有
#轻易找个目录,执行
docker-compose -f druid.yml up -d

5)验证

配置 druid 的数据源,从 kafka 读取数据,验证数据能够正确进入 druid。

注:

对于 druid 的具体应用,在大数据篇章里会具体解说。

7.3 kafka 监控

7.3.1 eagle 简介

Kafka Eagle 监控零碎是一款用来监控 Kafka 集群的工具,反对治理多个 Kafka 集群、治理 Kafka 主题(蕴含查看、删除、创立等)、消费者组合消费者实例监控、音讯阻塞告警、Kafka 集群衰弱状态查看等。

7.3.2 部署

举荐 docker-compose 启动

将装备的材料中 eagle.yml,拷贝到服务器任意目录

批改对应的 ip 地址为你服务器的地址

# 留神 ip 地址:52.82.98.209,全副换成你本人服务器的

version: '3'
services:
    zookeeper:
        image: zookeeper:3.4.13

    kafka-1:
        container_name: kafka-1
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10903:9092
            - 10913:10913
        environment:
            KAFKA_BROKER_ID: 1 
            HOST_IP: 52.82.98.209
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            #docker 部署必须设置内部可拜访 ip 和端口,否则注册进 zk 的地址将不可达造成内部无奈连贯
            KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
            KAFKA_ADVERTISED_PORT: 10903 
            KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=52.82.98.209 -Dcom.sun.management.jmxremote.rmi.port=10913"
            JMX_PORT: 10913
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper           
    kafka-2:
        container_name: kafka-2
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10904:9092
            - 10914:10914
        environment:
            KAFKA_BROKER_ID: 2 
            HOST_IP: 52.82.98.209
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
            KAFKA_ADVERTISED_PORT: 10904 
            KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=52.82.98.209 -Dcom.sun.management.jmxremote.rmi.port=10914"
            JMX_PORT: 10914
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper 
    eagle:
        image: gui66497/kafka_eagle
        container_name: ke
        restart: always
        depends_on:
            - kafka-1
            - kafka-2
        ports:
            - "10907:8048"
        environment:
            ZKSERVER: "zookeeper:2181"

执行 docker-compose -f eagle.yml up -d

7.3.3 应用阐明

拜访:http://52.82.98.209:10907/ke/

默认用户名明码:admin / 123456

如果要删除 topic 等操作,须要治理 token:keadmin

与 km 到底选哪个呢?依据本人习惯,集体认为:

  • 界面好看水平和监控曲线优于 km,有登录权限管制
  • 性能操作上不如 km 简略直白,然而 km 须要配置肯定的连贯信息

本文由传智教育博学谷 – 狂野架构师教研团队公布,转载请注明出处!

如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源

退出移动版