1. 环境筹备
指标实现
构建一个以 Flink SQL Client 为根底,简略的读取 mysql binlog 增量同步数据到 kafka topic 中的 Flink 工作
利用筹备清单
- Docker Desktop windows 桌面版 /Docker
- Flink docker 镜像 (Flink 1.11.2 scala 2.12)
- MySQL docker 镜像 (latest)
- kafka docker 镜像 (latest)
连接器清单
flink-format-changelog-json-1.1.0.jar
flink-sql-connector-kafka_2.12-1.11.2.jar
flink-sql-connector-mysql-cdc-1.1.0.jar
备注:连接器须要独自下载并放到 FLINK_HOME/lib 下,可从阿里云镜像点下载相干 jar 包
仓库服务
https://maven.aliyun.com/
应用命令 docker cp ${jar} ${containerName}:/${dir},从本地复制 jar 包到 docker 容器中
Docker compose.yml 文件配置
flink:compose-flink.yml
version: "2.1"
services:
jobmanager:
image: flink
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
mysql:compose-mysql.yml
version: '2.1'
services:
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
kafka:compose-kafka.yml
version: '2.1'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.2.1
ports:
- "9092:9092"
- "9094:9094"
depends_on:
- zookeeper
environment:
- KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://ip:9092
- KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
- KAFKA_ZOOKEEPER_CONNECT=ip:2181
- KAFKA_CREATE_TOPICS="user_behavior:1:1"
初始化容器
将上节的 yml 文件别离搁置在 docker-flink、docker-mysql、docker-kafka 文件夹下,并别离在文件夹内执行:
docker-compose -f compose-flink.yml up -d
docker-compose -f compose-mysql.yml up -d
docker-compose -f compose-kafka.yml up -d
关上 docker 桌面版,apps 中显示如下:
2. 构建简略同步工作
1.MySQL 初始化脚本
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- 是否下单
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
2. 关上 flink sql client,执行操作
进入到 flink 容器中,进入 FLINK_HOME/bin 目录下,执行以下命令:
./sql-client.sh embedded
进入到 sql-client 界面:
3.Flink SQL 测试脚本
顺次执行以下脚本,查看成果,其中 host 为本人本地 ip,间接填 localhost 在 kafka 有坑
--Flink SQL
CREATE TABLE products (
id INT,
name STRING,
description STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'ip',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'ip',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
CREATE TABLE kafka_gmv (
day_str STRING,
gmv DECIMAL(10, 5)
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_gmv',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'ip:9092',
'format' = 'changelog-json'
);
INSERT INTO kafka_gmv
SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv
FROM orders
WHERE order_status = true
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd');
-- 读取 Kafka 的 changelog 数据,察看 materialize 后的后果
SELECT * FROM kafka_gmv;