乐趣区

关于flink:Flink-SQL-Client-Mysql-CDC-部署实践

1. 环境筹备

指标实现

构建一个以 Flink SQL Client 为根底,简略的读取 mysql binlog 增量同步数据到 kafka topic 中的 Flink 工作

利用筹备清单

  • Docker Desktop windows 桌面版 /Docker
  • Flink docker 镜像 (Flink 1.11.2 scala 2.12)
  • MySQL docker 镜像 (latest)
  • kafka docker 镜像 (latest)

连接器清单

flink-format-changelog-json-1.1.0.jar

flink-sql-connector-kafka_2.12-1.11.2.jar

flink-sql-connector-mysql-cdc-1.1.0.jar

备注:连接器须要独自下载并放到 FLINK_HOME/lib 下,可从阿里云镜像点下载相干 jar 包

仓库服务

https://maven.aliyun.com/

应用命令 docker cp ${jar} ${containerName}:/${dir},从本地复制 jar 包到 docker 容器中

Docker compose.yml 文件配置

flink:compose-flink.yml



version: "2.1" 

services: 

 jobmanager: 

 image: flink 

 expose: 

 - "6123" 

 ports: 

 - "8081:8081" 

 command: jobmanager 

 environment: 

 - JOB_MANAGER_RPC_ADDRESS=jobmanager 

 taskmanager: 

 image: flink 

 expose: 

 - "6121" 

 - "6122" 

 depends_on: 

 - jobmanager 

 command: taskmanager 

 links: 

 - "jobmanager:jobmanager" 

 environment: 

 - JOB_MANAGER_RPC_ADDRESS=jobmanager

mysql:compose-mysql.yml



version: '2.1' 

services: 

 mysql: 

 image: debezium/example-mysql:1.1 

 ports: 

 - "3306:3306" 

 environment: 

 - MYSQL_ROOT_PASSWORD=123456

kafka:compose-kafka.yml



version: '2.1' 

services: 

 zookeeper: 

 image: wurstmeister/zookeeper:3.4.6 

 ports: 

 - "2181:2181" 

 kafka: 

 image: wurstmeister/kafka:2.12-2.2.1 

 ports: 

 - "9092:9092" 

 - "9094:9094" 

 depends_on: 

 - zookeeper 

 environment: 

 - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://ip:9092 

 - KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092 

 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT 

 - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE 

 - KAFKA_ZOOKEEPER_CONNECT=ip:2181 

 - KAFKA_CREATE_TOPICS="user_behavior:1:1"

初始化容器

将上节的 yml 文件别离搁置在 docker-flink、docker-mysql、docker-kafka 文件夹下,并别离在文件夹内执行:


docker-compose -f compose-flink.yml up -d

docker-compose -f compose-mysql.yml up -d

docker-compose -f compose-kafka.yml up -d

关上 docker 桌面版,apps 中显示如下:

2. 构建简略同步工作

1.MySQL 初始化脚本



CREATE DATABASE mydb; 

USE mydb; 

CREATE TABLE products ( 

 id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, 

 name VARCHAR(255) NOT NULL, 

 description VARCHAR(512) 

); 

ALTER TABLE products AUTO_INCREMENT = 101; 

INSERT INTO products 

VALUES (default,"scooter","Small 2-wheel scooter"), 

 (default,"car battery","12V car battery"), 

 (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), 

 (default,"hammer","12oz carpenter's hammer"), 

 (default,"hammer","14oz carpenter's hammer"), 

 (default,"hammer","16oz carpenter's hammer"), 

 (default,"rocks","box of assorted rocks"), 

 (default,"jacket","water resistent black wind breaker"), 

 (default,"spare tire","24 inch spare tire"); 

CREATE TABLE orders ( 

 order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, 

 order_date DATETIME NOT NULL, 

 customer_name VARCHAR(255) NOT NULL, 

 price DECIMAL(10, 5) NOT NULL, 

 product_id INTEGER NOT NULL, 

 order_status BOOLEAN NOT NULL -- 是否下单 

) AUTO_INCREMENT = 10001; 

INSERT INTO orders 

VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), 

 (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false), 

 (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

2. 关上 flink sql client,执行操作

进入到 flink 容器中,进入 FLINK_HOME/bin 目录下,执行以下命令:



./sql-client.sh embedded

进入到 sql-client 界面:

3.Flink SQL 测试脚本

顺次执行以下脚本,查看成果,其中 host 为本人本地 ip,间接填 localhost 在 kafka 有坑



--Flink SQL 

CREATE TABLE products ( 

 id INT, 

 name STRING, 

 description STRING 

) WITH ( 

 'connector' = 'mysql-cdc', 

 'hostname' = 'ip', 

 'port' = '3306', 

 'username' = 'root', 

 'password' = '123456', 

 'database-name' = 'mydb', 

 'table-name' = 'products' 

); 

CREATE TABLE orders ( 

 order_id INT, 

 order_date TIMESTAMP(0), 

 customer_name STRING, 

 price DECIMAL(10, 5), 

 product_id INT, 

 order_status BOOLEAN 

) WITH ( 

 'connector' = 'mysql-cdc', 

 'hostname' = 'ip', 

 'port' = '3306', 

 'username' = 'root', 

 'password' = '123456', 

 'database-name' = 'mydb', 

 'table-name' = 'orders' 

); 

CREATE TABLE kafka_gmv ( 

 day_str STRING, 

 gmv DECIMAL(10, 5) 

) WITH ( 

 'connector' = 'kafka', 

 'topic' = 'kafka_gmv', 

 'scan.startup.mode' = 'earliest-offset', 

 'properties.bootstrap.servers' = 'ip:9092', 

 'format' = 'changelog-json' 

); 

INSERT INTO kafka_gmv 

SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv 

FROM orders 

WHERE order_status = true 

GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd'); 

-- 读取 Kafka 的 changelog 数据,察看 materialize 后的后果 

SELECT * FROM kafka_gmv;

退出移动版