摘要
我的项目须要应用Kafka Stream通过加载mysql数据库外面的数据,而后做一个相似于ETL数据过滤性能。以此将导入到某一个主题的kafka数据跟通过kafka connect连贯mysql的数据库外面的数据进行过滤去重。
内容
一.kafka装置
- kafka连接器性能是kafka1.0版本以上引入的,咱们首先须要查看对应版本是否反对connect(直观的形式是:bin目录蕴含connect,conf目录蕴含connect);
bin目录下:
conf目录下:
- 咱们应用版本:kafka_2.11-1.0.1.jar. 其中2.11是Scala版本,1.0.1是Kafka版本;
二.下载kafka-connect-jdbc插件
去网站:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
下载;
抉择对应的版本
解压失去以下目录构造:
失去:
将插件中lib外面的jar文件提取进去,放到kafka的libs目录:
三.将java的MySQL驱动拷贝到到kafka的libs目录
四:connect-mysql-source.properties配置文件
将kafka-connect-jdbc中etc目录下文件复制到kafka的config目录下,并批改为connect-mysql-source.properties;
拷贝到kafka的config下:
依据本地数据源批改配置:
# A simple example that copies all tables from a SQLite database. The first few settings are# required for all connectors: a name, the connector class to run, and the maximum number of# tasks to create:name=test-source-mysql-jdbc-autoincrementconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=10# The remaining configs are specific to the JDBC source connector. In this example, we connect to a# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.#connection.url=jdbc:mysql://192.168.101.3:3306/databasename?user=xxx&password=xxxconnection.url=jdbc:mysql://127.0.01:3306/us_app?user=root&password=roottable.whitelist=ocm_blacklist_number#bulk为批量导入,此外还有incrementing和imestamp模式mode=bulk#timestamp.column.name=time#incrementing.column.name=idtopic.prefix=connect-mysql-
配置阐明参考:https://www.jianshu.com/p/9b1dd28e92f0
五.批改kafka目录下config/connect-standalone.properties.
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=truevalue.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=falseoffset.storage.file.filename=/tmp/connect.offsetsoffset.flush.interval.ms=10000
六.启动kafka connect
bin/connect-standalone.sh config/connect-standalone.properties config/connect-mysql-source.properties
留神:connect-standalone.sh为单节点模式,此外还有connect-distributed集群模式,应用集群模式则需批改connect-distributed.properties
七.生产kafka,查看是否导入胜利
你能够启动一个消费者,从起始点开始生产connect-mysql-ocm_blacklist_number这个主题,如果能看到输入阐明你的连接器配置胜利了。
./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic connect-mysql-ocm_blacklist_number --from-begin
参考:
https://blog.csdn.net/u014686399/article/details/84962377