共计 2611 个字符,预计需要花费 7 分钟才能阅读完成。
摘要
我的项目须要应用 Kafka Stream 通过加载 mysql 数据库外面的数据, 而后做一个相似于 ETL 数据过滤性能。以此将导入到某一个主题的 kafka 数据跟通过 kafka connect 连贯 mysql 的数据库外面的数据进行过滤去重。
内容
一.kafka 装置
- kafka 连接器性能是 kafka1.0 版本以上引入的, 咱们首先须要查看对应版本是否反对 connect(直观的形式是:bin 目录蕴含 connect,conf 目录蕴含 connect);
bin 目录下:
conf 目录下:
- 咱们应用版本:kafka_2.11-1.0.1.jar. 其中 2.11 是 Scala 版本,1.0.1 是 Kafka 版本;
二. 下载 kafka-connect-jdbc 插件
去网站:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
下载;
抉择对应的版本
解压失去以下目录构造:
失去:
将插件中 lib 外面的 jar 文件提取进去,放到 kafka 的 libs 目录:
三. 将 java 的 MySQL 驱动拷贝到到 kafka 的 libs 目录
四:connect-mysql-source.properties 配置文件
将 kafka-connect-jdbc 中 etc 目录下文件复制到 kafka 的 config 目录下,并批改为 connect-mysql-source.properties;
拷贝到 kafka 的 config 下:
依据本地数据源批改配置:
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
#connection.url=jdbc:mysql://192.168.101.3:3306/databasename?user=xxx&password=xxx
connection.url=jdbc:mysql://127.0.01:3306/us_app?user=root&password=root
table.whitelist=ocm_blacklist_number
#bulk 为批量导入,此外还有 incrementing 和 imestamp 模式
mode=bulk
#timestamp.column.name=time
#incrementing.column.name=id
topic.prefix=connect-mysql-
配置阐明参考:https://www.jianshu.com/p/9b1dd28e92f0
五. 批改 kafka 目录下 config/connect-standalone.properties.
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
六. 启动 kafka connect
bin/connect-standalone.sh config/connect-standalone.properties config/connect-mysql-source.properties
留神:connect-standalone.sh 为单节点模式,此外还有 connect-distributed 集群模式,应用集群模式则需批改 connect-distributed.properties
七. 生产 kafka, 查看是否导入胜利
你能够启动一个消费者,从起始点开始生产 connect-mysql-ocm_blacklist_number 这个主题,如果能看到输入阐明你的连接器配置胜利了。
./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic connect-mysql-ocm_blacklist_number --from-begin
参考:
https://blog.csdn.net/u014686399/article/details/84962377
正文完