1.环境筹备
指标实现
构建一个以Flink SQL Client为根底,简略的读取mysql binlog增量同步数据到kafka topic中的Flink工作
利用筹备清单
- Docker Desktop windows桌面版/Docker
- Flink docker镜像(Flink 1.11.2 scala 2.12)
- MySQL docker镜像(latest)
- kafka docker镜像(latest)
连接器清单
flink-format-changelog-json-1.1.0.jar
flink-sql-connector-kafka_2.12-1.11.2.jar
flink-sql-connector-mysql-cdc-1.1.0.jar
备注:连接器须要独自下载并放到FLINK_HOME/lib下,可从阿里云镜像点下载相干jar包
仓库服务
https://maven.aliyun.com/
应用命令docker cp ${jar} ${containerName}:/${dir},从本地复制jar包到docker容器中
Docker compose.yml文件配置
flink:compose-flink.yml
version: "2.1" services: jobmanager: image: flink expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: image: flink expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - "jobmanager:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager
mysql:compose-mysql.yml
version: '2.1' services: mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456
kafka:compose-kafka.yml
version: '2.1' services: zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" kafka: image: wurstmeister/kafka:2.12-2.2.1 ports: - "9092:9092" - "9094:9094" depends_on: - zookeeper environment: - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://ip:9092 - KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE - KAFKA_ZOOKEEPER_CONNECT=ip:2181 - KAFKA_CREATE_TOPICS="user_behavior:1:1"
初始化容器
将上节的yml文件别离搁置在docker-flink、docker-mysql、docker-kafka文件夹下,并别离在文件夹内执行:
docker-compose -f compose-flink.yml up -ddocker-compose -f compose-mysql.yml up -ddocker-compose -f compose-kafka.yml up -d
关上docker桌面版,apps中显示如下:
2.构建简略同步工作
1.MySQL初始化脚本
CREATE DATABASE mydb; USE mydb; CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) ); ALTER TABLE products AUTO_INCREMENT = 101; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistent black wind breaker"), (default,"spare tire","24 inch spare tire"); CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL -- 是否下单 ) AUTO_INCREMENT = 10001; INSERT INTO orders VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false), (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
2.关上flink sql client,执行操作
进入到flink容器中,进入FLINK_HOME/bin目录下,执行以下命令:
./sql-client.sh embedded
进入到sql-client界面:
3.Flink SQL测试脚本
顺次执行以下脚本,查看成果,其中host为本人本地ip,间接填localhost在kafka有坑
--Flink SQL CREATE TABLE products ( id INT, name STRING, description STRING ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'ip', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'products' ); CREATE TABLE orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'ip', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'orders' ); CREATE TABLE kafka_gmv ( day_str STRING, gmv DECIMAL(10, 5) ) WITH ( 'connector' = 'kafka', 'topic' = 'kafka_gmv', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'ip:9092', 'format' = 'changelog-json' ); INSERT INTO kafka_gmv SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv FROM orders WHERE order_status = true GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd'); -- 读取 Kafka 的 changelog 数据,察看 materialize 后的后果 SELECT * FROM kafka_gmv;