前提

首先你须要理解MQ / Kafka相干的常识

本文指标

理解 Kafka Connect 基本概念与性能

什么是Kafka Connect

Kafka Connect 是一款可扩大并且牢靠地在 Apache Kafka 和其余零碎之间进行数据传输的工具。 能够很简略的定义 connectors(连接器) 将大量数据迁入、迁出Kafka。

例如我当初想要把数据从MySQL迁徙到ElasticSearch,为了保障高效和数据不会失落,咱们抉择MQ作为中间件保留数据。这时候咱们须要一个生产者线程,一直的从MySQL中读取数据并发送到MQ,还须要一个消费者线程生产MQ的数据写到ElasticSearch,这件事件仿佛很简略,不须要任何框架。

然而如果咱们想要保障生产者和消费者服务的高可用性,例如重启后生产者复原到之前读取的地位,分布式部署并且节点宕机后将工作转移到其余节点。如果要加上这些的话,这件事就变得复杂起来了,而Kafka Connect 曾经为咱们造好这些轮子。

Kafka Connect 如何工作?

Kafka Connect 个性如下:

  • Kafka 连接器的通用框架:Kafka Connect 标准化了其余数据系统与Kafka的集成,从而简化了连接器的开发,部署和治理
  • 反对分布式模式和单机模式部署
  • Rest API:通过简略的Rest API治理连接器
  • 偏移量治理:针对Source和Sink都有相应的偏移量(Offset)治理计划,程序员毋庸关怀Offset 的提交
  • 分布式模式可扩大的,反对故障转移

Kafka Connect Concepts

这里简略介绍下Kafka Connect 的概念与组成
更多细节请参考 ???? https://docs.confluent.io/pla...

Connectors

连接器,分为两种 Source(从源数据库拉取数据写入Kafka),Sink(从Kafka生产数据写入指标数据)

连接器其实并不参加理论的数据copy,连接器负责管理Task。连接器中定义了对应Task的类型,对外提供配置选项(用户创立连接器时须要提供对应的配置信息)。并且连接器还能够决定启动多少个Task线程。

用户能够通过Rest API 启停连接器,查看连接器状态

Confluent 曾经提供了许多成熟的连接器,传送门???? https://www.confluent.io/prod...

Task

理论进行数据传输的单元,和连接器一样同样分为 Source和Sink

Task的配置和状态存储在Kafka的Topic中,config.storage.topicstatus.storage.topic。咱们能够随时启动,进行工作,以提供弹性、可扩大的数据管道

Worker

刚刚咱们讲的Connectors 和Task 属于逻辑单元,而Worker 是理论运行逻辑单元的过程,Worker 分为两种模式,单机模式和分布式模式

单机模式:比较简单,然而性能也受限,只有一些非凡的场景会应用到,例如收集主机的日志,通常来说更多的是应用分布式模式

分布式模式:为Kafka Connect提供了可扩大和故障转移。雷同group.id的Worker,会主动组成集群。当新增Worker,或者有Worker挂掉时,集群会主动协调调配所有的Connector 和 Task(这个过程称为Rebalance)

当应用Worker集群时,创立连接器,或者连接器Task数量变动时,都会触发Rebalance 以保障集群各个Worker节点负载平衡。然而当Task 进入Fail状态的时候并不会触发 Rebalance,只能通过Rest Api 对Task进行重启

Converters

Kafka Connect 通过 Converter 将数据在Kafka(字节数组)与Task(Object)之间进行转换

默认反对以下Converter

  • AvroConverter io.confluent.connect.avro.AvroConverter: 须要应用 Schema Registry
  • ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter: 须要应用 Schema Registry
  • JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter: 须要应用 Schema Registry
  • JsonConverter org.apache.kafka.connect.json.JsonConverter (无需 Schema Registry): 转换为json构造
  • StringConverter org.apache.kafka.connect.storage.StringConverter: 简略的字符串格局
  • ByteArrayConverter org.apache.kafka.connect.converters.ByteArrayConverter: 不做任何转换

Converters 与 Connector 是解耦的,下图展现了在Kafka Connect中,Converter 在何时进行数据转换

Transforms

连接器能够通过配置Transform 实现对单个音讯(对应代码中的Record)的转换和批改,能够配置多个Transform 组成一个。例如让所有音讯的topic加一个前缀、sink无奈生产source 写入的数据格式,这些场景都能够应用Transform 解决

Transform 如果配置在Source 则在Task之后执行,如果配置在Sink 则在Task之前执行

Dead Letter Queue

与其余MQ不同,Kafka 并没有死信队列这个性能。然而Kafka Connect提供了这一性能。

当Sink Task遇到无奈解决的音讯,会依据errors.tolerance配置项决定如何解决,默认状况下(errors.tolerance=none) Sink 遇到无奈解决的记录会间接抛出异样,Task进入Fail 状态。开发人员须要依据Worker的谬误日志解决问题,而后重启Task,能力持续生产数据

设置 errors.tolerance=all,Sink Task 会疏忽所有的谬误,持续解决。Worker中不会有任何谬误日志。能够通过配置errors.deadletterqueue.topic.name = <dead-letter-topic-name> 让无奈解决的音讯路由到 Dead Letter Topic

疾速上手

上面我来实战一下,如何应用Kafka Connect,咱们先定一个小指标 将MySQL中的全量数据同步到Redis


  1. 新建文件 docker-compose.yaml
version: '3.7'services:  zookeeper:    image: wurstmeister/zookeeper    container_name: zk    ports:      - 2182:2181  kafka:    image: wurstmeister/kafka:2.13-2.7.0    container_name: kafka    ports:      - 9092:9092    environment:      KAFKA_BROKER_ID: 0      # 宿主机ip      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.21:9092      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092    depends_on:      - zookeeper

在终端上执行 docker-compose -f docker-compose.yaml up -d 启动docker容器

筹备连接器,这里我是本人写了一个简略的连接器????。下载地址:https://github.com/TavenYin/k...

# 将连接器上传到kafka 容器中docker cp kafka-connector-example-bin.jar kafka:/opt/connectors
  1. 批改配置并启动Worker
#在配置文件开端追加 plugin.path=/opt/connectorsvi /opt/kafka/config/connect-distributed.properties# 启动Workerbin/connect-distributed.sh -daemon config/connect-distributed.properties
  1. 筹备MySQL

因为我宿主机里曾经装置了MySQL,我就间接应用了,应用如下Sql创立表。创立之后轻易造几条数据

CREATE TABLE `test_user` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `name` varchar(255) DEFAULT NULL,  PRIMARY KEY (`id`)) ;
  1. 创立连接器

新建 source.json

{  "name" : "example-source",  "config" : {    "connector.class" : "com.github.taven.source.ExampleSourceConnector",    "tasks.max" : "1",    "database.url" : "jdbc:mysql://192.168.3.21:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=UTC&rewriteBatchedStatements=true",    "database.username" : "root",    "database.password" : "root",    "database.tables" : "test_user"  }}

向Worker 发送申请,创立连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

source.json 中,有一些属性是Kafka Connect 提供的,例如上述文件中 name, connector.class, tasks.max,剩下的属性能够在开发Connector 时自定义。对于Kafka Connect Configuration 相干请浏览这里 ???? https://docs.confluent.io/pla...
  1. 确认数据是否写入Kafka

首先查看一下Worker中的运行状态,如果Task的state = RUNNING,代表Task没有抛出任何异样,安稳运行

bash-4.4# curl -X GET localhost:8083/connectors/example-source/status{"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"source"}

查看kafka 中Topic 是否创立

bash-4.4# bin/kafka-topics.sh --list --zookeeper zookeeper:2181__consumer_offsetsconnect-configsconnect-offsetsconnect-statustest_user

这些Topic 都存储了什么?

  • __consumer_offsets: 记录所有Kafka Consumer Group的Offset
  • connect-configs: 存储连接器的配置,对应Connect 配置文件中config.storage.topic
  • connect-offsets: 存储Source 的Offset,对应Connect 配置文件中offset.storage.topic
  • connect-status: 连接器与Task的状态,对应Connect 配置文件中status.storage.topic

查看topic中数据,此时阐明MySQL数据曾经胜利写入Kafka

bash-4.4# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_user --from-beginning{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":1,"name":"yyyyyy"}}{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":2,"name":"qqqq"}}{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":3,"name":"eeee"}}

数据结构为Json,能够回顾一下下面咱们批改的connect-distributed.properties,默认提供的Converter 为JsonConverter,所有的数据蕴含schema 和 payload 两项是因为配置文件中默认启动了key.converter.schemas.enable=truevalue.converter.schemas.enable=true两个选项

  1. 启动 Sink

新建sink.json

{  "name" : "example-sink",  "config" : {    "connector.class" : "com.github.taven.sink.ExampleSinkConnector",    "topics" : "test_user, test_order",    "tasks.max" : "1",    "redis.host" : "192.168.3.21",    "redis.port" : "6379",    "redis.password" : "",    "redis.database" : "0"  }}

创立Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sink.json

而后查看Sink Connector Status,这里我发现因为我的Redis端口只对localhost开发,所以这里我的Task Fail了,批改了Redis配置之后,重启Task curl -X POST localhost:8083/connectors/example-sink/tasks/0/restart

在确认了Sink Status 为RUNNING 后,能够确认下Redis中是否有数据

对于Kafka Connect Rest api 文档,请参考????https://docs.confluent.io/pla...
  1. 如何查看Sink Offset生产状况

应用命令
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-example-sink

下图代表 test_user topic 三条数据曾经全副生产

Kafka Connect 高级性能

咱们的小指标曾经达成了。当初两个Task无事可做,正好借此机会咱们来体验一下可扩大和故障转移

集群扩大

我启动了开发环境中的Kafka Connect Worker,依据官网文档所示通过注册同一个Kafka 并且应用雷同的 group.id=connect-cluster 能够主动组成集群

启动我开发环境中的Kafka Connect,之后查看两个连接器状态

bash-4.4#  curl -X GET localhost:8083/connectors/example-source/status{"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.23.176.1:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.23.176.1:8083"}],"type":"source"}bash-4.4#bash-4.4#  curl -X GET localhost:8083/connectors/example-sink/status{"name":"example-sink","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"sink"}

察看worker_id 能够发现,两个Connectors 曾经别离运行在两个Worker上了

故障转移

此时咱们通过kill pid完结docker中的Worker过程察看是否宕机之后主动转移,然而发现Task并没有转移到仅存的Worker中,Task 状态变为UNASSIGNED,这是为啥呢?难道是有什么操作错了?

在网上查阅了一番得悉,Kafka Connect 的集群扩大与故障转移机制是通过Kafka Rebalance 协定实现的(Consumer也是该协定),当Worker节点宕机工夫超过 scheduled.rebalance.max.delay.ms 时,Kafka才会将其踢出集群。踢出后将该节点的连接器和任务分配给其余Worker,scheduled.rebalance.max.delay.ms默认值为五分钟。

起初经测试发现,五分钟之后查看连接器信息,曾经转移到存活的Worker节点了

原本还打算写一下如何开发连接器和Kafka Rebalance,然而这篇曾经够长了,所以打算后续更新这两篇文章