1.环境筹备

指标实现

构建一个以Flink SQL Client为根底,简略的读取mysql binlog增量同步数据到kafka topic中的Flink工作

利用筹备清单

  • Docker Desktop windows桌面版/Docker
  • Flink docker镜像(Flink 1.11.2 scala 2.12)
  • MySQL docker镜像(latest)
  • kafka docker镜像(latest)

连接器清单

flink-format-changelog-json-1.1.0.jar

flink-sql-connector-kafka_2.12-1.11.2.jar

flink-sql-connector-mysql-cdc-1.1.0.jar

备注:连接器须要独自下载并放到FLINK_HOME/lib下,可从阿里云镜像点下载相干jar包

仓库服务

https://maven.aliyun.com/

应用命令docker cp ${jar} ${containerName}:/${dir},从本地复制jar包到docker容器中

Docker compose.yml文件配置

flink:compose-flink.yml

version: "2.1" services:  jobmanager:  image: flink  expose:  - "6123"  ports:  - "8081:8081"  command: jobmanager  environment:  - JOB_MANAGER_RPC_ADDRESS=jobmanager  taskmanager:  image: flink  expose:  - "6121"  - "6122"  depends_on:  - jobmanager  command: taskmanager  links:  - "jobmanager:jobmanager"  environment:  - JOB_MANAGER_RPC_ADDRESS=jobmanager

mysql:compose-mysql.yml

version: '2.1' services:  mysql:  image: debezium/example-mysql:1.1  ports:  - "3306:3306"  environment:  - MYSQL_ROOT_PASSWORD=123456

kafka:compose-kafka.yml

version: '2.1' services:  zookeeper:  image: wurstmeister/zookeeper:3.4.6  ports:  - "2181:2181"  kafka:  image: wurstmeister/kafka:2.12-2.2.1  ports:  - "9092:9092"  - "9094:9094"  depends_on:  - zookeeper  environment:  - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://ip:9092  - KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092  - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT  - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE  - KAFKA_ZOOKEEPER_CONNECT=ip:2181  - KAFKA_CREATE_TOPICS="user_behavior:1:1"

初始化容器

将上节的yml文件别离搁置在docker-flink、docker-mysql、docker-kafka文件夹下,并别离在文件夹内执行:

docker-compose -f compose-flink.yml up -ddocker-compose -f compose-mysql.yml up -ddocker-compose -f compose-kafka.yml up -d

关上docker桌面版,apps中显示如下:

2.构建简略同步工作

1.MySQL初始化脚本

CREATE DATABASE mydb; USE mydb; CREATE TABLE products (  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,  name VARCHAR(255) NOT NULL,  description VARCHAR(512) ); ALTER TABLE products AUTO_INCREMENT = 101; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),  (default,"car battery","12V car battery"),  (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),  (default,"hammer","12oz carpenter's hammer"),  (default,"hammer","14oz carpenter's hammer"),  (default,"hammer","16oz carpenter's hammer"),  (default,"rocks","box of assorted rocks"),  (default,"jacket","water resistent black wind breaker"),  (default,"spare tire","24 inch spare tire"); CREATE TABLE orders (  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,  order_date DATETIME NOT NULL,  customer_name VARCHAR(255) NOT NULL,  price DECIMAL(10, 5) NOT NULL,  product_id INTEGER NOT NULL,  order_status BOOLEAN NOT NULL -- 是否下单 ) AUTO_INCREMENT = 10001; INSERT INTO orders VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),  (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),  (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

2.关上flink sql client,执行操作

进入到flink容器中,进入FLINK_HOME/bin目录下,执行以下命令:

./sql-client.sh embedded

进入到sql-client界面:

3.Flink SQL测试脚本

顺次执行以下脚本,查看成果,其中host为本人本地ip,间接填localhost在kafka有坑

--Flink SQL CREATE TABLE products (  id INT,  name STRING,  description STRING ) WITH (  'connector' = 'mysql-cdc',  'hostname' = 'ip',  'port' = '3306',  'username' = 'root',  'password' = '123456',  'database-name' = 'mydb',  'table-name' = 'products' ); CREATE TABLE orders (  order_id INT,  order_date TIMESTAMP(0),  customer_name STRING,  price DECIMAL(10, 5),  product_id INT,  order_status BOOLEAN ) WITH (  'connector' = 'mysql-cdc',  'hostname' = 'ip',  'port' = '3306',  'username' = 'root',  'password' = '123456',  'database-name' = 'mydb',  'table-name' = 'orders' ); CREATE TABLE kafka_gmv (  day_str STRING,  gmv DECIMAL(10, 5) ) WITH (  'connector' = 'kafka',  'topic' = 'kafka_gmv',  'scan.startup.mode' = 'earliest-offset',  'properties.bootstrap.servers' = 'ip:9092',  'format' = 'changelog-json' ); INSERT INTO kafka_gmv SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv FROM orders WHERE order_status = true GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd'); -- 读取 Kafka 的 changelog 数据,察看 materialize 后的后果 SELECT * FROM kafka_gmv;