7.1 程序性场景
7.1.1 场景概述
假如咱们要传输一批订单到另一个零碎,那么订单对应状态的演变是有程序性要求的。
已下单 → 已领取 → 已确认
不容许错乱!
7.1.2 程序级别
1)全局有序:
串行化。每条通过kafka的音讯必须严格保障有序性。
这就要求kafka单通道,每个groupid下单消费者
极大的影响性能,事实业务下简直没必要
2)部分有序:
业务部分有序。同一条订单有序即可,不同订单能够并行处理。不同订单的程序前后无所谓
充分利用kafka多分区的并发性,只须要想方法让须要程序的一批数据进同一分区即可。
7.1.3 实现计划
1)发送端:
指定key发送,key=order.id即可,案例回顾:4.2.3,PartitionProducer
2)发送中:
给队列配置多分区保障并发性。
3)读取端:
单消费者:显然不合理
吞吐量显然上不去,kafka开多个分区还有何意义?
所以开多个消费者指定分区生产,现实情况下,每个分区配一个。
然而,这个吞吐量仍然无限,那如何解决呢?
计划:多线程
在每个消费者上再开多线程,是个解决办法。然而,要警觉程序性被突破!
参考下图:thread解决后,会将data变成 2-1-3
改良:接管后散发二级内存队列
消费者取到音讯后不做解决,依据key二次散发到多个阻塞队列。
再开启多个线程,每个队列调配一个线程解决。晋升吞吐量
7.1.4 代码验证
1)新建一个sort队列,2个分区
2)启动order我的项目
源码参考:
SortedProducer(程序性发送端)
SortedConsumer(程序性生产端 - 阻塞队列实现,不便大家了解设计思路)
SortedConsumer2(程序性生产端 - 线程池实现,事实中举荐这种形式!)
3)通过swagger申请
先按不同的id发送,查看控制台日志,id被正确散发到对应的队列
同一个key调配到同一个queue,程序性失去保障
7.2 海量同步场景
假如大数据部门须要大屏来展现用户的打车订单状况,须要把订单数据送入druid
这里不波及程序,只有下单就传输,然而对实时性和并发量要求较高
7.2.1 惯例架构
在下单实现mysql后,通过程序代码打印,间接进入kafka
或者logback和kafka集成,通过log输送
长处:
更合乎惯例的思维。将数据送给想要的部门
毛病:
耦合度高,将kafka发送音讯嵌入了订单下单的主业务,造成代码入侵。
下单不关怀,也不应该关注送入kafka的状况,一旦kafka不可用,程序受影响
7.2.2 解耦合
借助canal,监听订单表的数据变动,不再影响主业务。
7.2.3 部署实现
1)mysql部署
留神,须要关上binlog,8.0 默认处于开启状态
#启动mysql8docker run --name mysql8 -v /opt/data/mysql8:/var/lib/mysql -p 3389:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=thisisprizemysql8db -d daocloud.io/mysql:8.0
连上mysql,执行以下sql,增加canal用户
CREATE USER canal IDENTIFIED BY 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
创立订单表
CREATE TABLE `orders` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`));
2)canal部署
#canal.properties#附带材料里有,放到服务器 /opt/data/canal/ 目录下#批改servers为你的kafka的机器地址canal.serverMode = kafkakafka.bootstrap.servers = 52.82.98.209:10903,52.82.98.209:10904
#docker-compose.yml#附带材料里有canal.yml,轻易找个目录,重命名为docker-compose.yml#批改mysql的链接信息的链接信息#而后在当前目录下执行 docker-compose up -dversion: '2'services: canal: image: canal/canal-server container_name: canal restart: always ports: - "10908:11111" environment: #mysql的链接信息 canal.instance.master.address: 52.82.98.209:3389 canal.instance.dbUsername: canal canal.instance.dbPassword: canal #投放到kafka的哪个主题?要提前准备好! canal.mq.topic: canal volumes: - "/opt/data/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"
3)数据通道验证
进入kafka容器,用下面3.2.4里的命令行形式监听canal队列
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal
在mysql上创立orders表,增删数据试一下
mysql> insert into orders (name) values ('张三');Query OK, 1 row affected (0.03 sec)
在kafka控制台,能够看到同步的音讯
{"data":[{"id":"1","name":"张三"}],"database":"canal","es":1611657853000,"id":5,"isDdl":false,"mysqlType":{"id":"int unsigned","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"orders","ts":1611657853802,"type":"INSERT"}
数据通道已买通,还短少的是druid作为生产端来接管音讯
4)druid部署
#druid.yml#在附带材料里有#轻易找个目录,执行docker-compose -f druid.yml up -d
5)验证
配置druid的数据源,从kafka读取数据,验证数据能够正确进入druid。
注:
对于druid的具体应用,在大数据篇章里会具体解说。
7.3 kafka监控
7.3.1 eagle简介
Kafka Eagle监控零碎是一款用来监控Kafka集群的工具,反对治理多个Kafka集群、治理Kafka主题(蕴含查看、删除、创立等)、消费者组合消费者实例监控、音讯阻塞告警、Kafka集群衰弱状态查看等。
7.3.2 部署
举荐docker-compose启动
将装备的材料中 eagle.yml , 拷贝到服务器任意目录
批改对应的ip地址为你服务器的地址
#留神ip地址:52.82.98.209,全副换成你本人服务器的version: '3'services: zookeeper: image: zookeeper:3.4.13 kafka-1: container_name: kafka-1 image: wurstmeister/kafka:2.12-2.2.2 ports: - 10903:9092 - 10913:10913 environment: KAFKA_BROKER_ID: 1 HOST_IP: 52.82.98.209 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 #docker部署必须设置内部可拜访ip和端口,否则注册进zk的地址将不可达造成内部无奈连贯 KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209 KAFKA_ADVERTISED_PORT: 10903 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=52.82.98.209 -Dcom.sun.management.jmxremote.rmi.port=10913" JMX_PORT: 10913 volumes: - /etc/localtime:/etc/localtime depends_on: - zookeeper kafka-2: container_name: kafka-2 image: wurstmeister/kafka:2.12-2.2.2 ports: - 10904:9092 - 10914:10914 environment: KAFKA_BROKER_ID: 2 HOST_IP: 52.82.98.209 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209 KAFKA_ADVERTISED_PORT: 10904 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=52.82.98.209 -Dcom.sun.management.jmxremote.rmi.port=10914" JMX_PORT: 10914 volumes: - /etc/localtime:/etc/localtime depends_on: - zookeeper eagle: image: gui66497/kafka_eagle container_name: ke restart: always depends_on: - kafka-1 - kafka-2 ports: - "10907:8048" environment: ZKSERVER: "zookeeper:2181"
执行 docker-compose -f eagle.yml up -d
7.3.3 应用阐明
拜访 : http://52.82.98.209:10907/ke/
默认用户名明码: admin / 123456
如果要删除topic等操作,须要治理token: keadmin
与km到底选哪个呢?依据本人习惯,集体认为:
- 界面好看水平和监控曲线优于km,有登录权限管制
- 性能操作上不如km简略直白,然而km须要配置肯定的连贯信息
本文由传智教育博学谷 - 狂野架构师教研团队公布,转载请注明出处!
如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源