摘要

我的项目须要应用Kafka Stream通过加载mysql数据库外面的数据,而后做一个相似于ETL数据过滤性能。以此将导入到某一个主题的kafka数据跟通过kafka connect连贯mysql的数据库外面的数据进行过滤去重。

内容

一.kafka装置

  • kafka连接器性能是kafka1.0版本以上引入的,咱们首先须要查看对应版本是否反对connect(直观的形式是:bin目录蕴含connect,conf目录蕴含connect);

bin目录下:

conf目录下:

  • 咱们应用版本:kafka_2.11-1.0.1.jar. 其中2.11是Scala版本,1.0.1是Kafka版本;

二.下载kafka-connect-jdbc插件

去网站:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
下载;

抉择对应的版本

解压失去以下目录构造:

失去:

将插件中lib外面的jar文件提取进去,放到kafka的libs目录:

三.将java的MySQL驱动拷贝到到kafka的libs目录

四:connect-mysql-source.properties配置文件

将kafka-connect-jdbc中etc目录下文件复制到kafka的config目录下,并批改为connect-mysql-source.properties;

拷贝到kafka的config下:

依据本地数据源批改配置:

# A simple example that copies all tables from a SQLite database. The first few settings are# required for all connectors: a name, the connector class to run, and the maximum number of# tasks to create:name=test-source-mysql-jdbc-autoincrementconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=10# The remaining configs are specific to the JDBC source connector. In this example, we connect to a# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.#connection.url=jdbc:mysql://192.168.101.3:3306/databasename?user=xxx&password=xxxconnection.url=jdbc:mysql://127.0.01:3306/us_app?user=root&password=roottable.whitelist=ocm_blacklist_number#bulk为批量导入,此外还有incrementing和imestamp模式mode=bulk#timestamp.column.name=time#incrementing.column.name=idtopic.prefix=connect-mysql-

配置阐明参考:https://www.jianshu.com/p/9b1dd28e92f0

五.批改kafka目录下config/connect-standalone.properties.

bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=truevalue.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=falseoffset.storage.file.filename=/tmp/connect.offsetsoffset.flush.interval.ms=10000

六.启动kafka connect

bin/connect-standalone.sh config/connect-standalone.properties config/connect-mysql-source.properties

留神:connect-standalone.sh为单节点模式,此外还有connect-distributed集群模式,应用集群模式则需批改connect-distributed.properties

七.生产kafka,查看是否导入胜利

你能够启动一个消费者,从起始点开始生产connect-mysql-ocm_blacklist_number这个主题,如果能看到输入阐明你的连接器配置胜利了。

./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic connect-mysql-ocm_blacklist_number --from-begin

参考:

https://blog.csdn.net/u014686399/article/details/84962377