关于java:Kafka-Connect-实战-入门

9次阅读

共计 7911 个字符,预计需要花费 20 分钟才能阅读完成。

前提

首先你须要理解 MQ / Kafka 相干的常识

本文指标

理解 Kafka Connect 基本概念与性能

什么是 Kafka Connect

Kafka Connect 是一款可扩大并且牢靠地在 Apache Kafka 和其余零碎之间进行数据传输的工具。能够很简略的定义 connectors(连接器)将大量数据迁入、迁出 Kafka。

例如我当初想要把数据从 MySQL 迁徙到 ElasticSearch,为了保障高效和数据不会失落,咱们抉择 MQ 作为中间件保留数据。这时候咱们须要一个生产者线程,一直的从 MySQL 中读取数据并发送到 MQ,还须要一个消费者线程生产 MQ 的数据写到 ElasticSearch,这件事件仿佛很简略,不须要任何框架。

然而如果咱们想要保障生产者和消费者服务的高可用性,例如重启后生产者复原到之前读取的地位,分布式部署并且节点宕机后将工作转移到其余节点。如果要加上这些的话,这件事就变得复杂起来了,而 Kafka Connect 曾经为咱们造好这些轮子。

Kafka Connect 如何工作?

Kafka Connect 个性如下:

  • Kafka 连接器的通用框架:Kafka Connect 标准化了其余数据系统与 Kafka 的集成,从而简化了连接器的开发,部署和治理
  • 反对分布式模式和单机模式部署
  • Rest API:通过简略的 Rest API 治理连接器
  • 偏移量治理:针对 Source 和 Sink 都有相应的偏移量(Offset)治理计划,程序员毋庸关怀 Offset 的提交
  • 分布式模式可扩大的,反对故障转移

Kafka Connect Concepts

这里简略介绍下 Kafka Connect 的概念与组成
更多细节请参考 ???? https://docs.confluent.io/pla…

Connectors

连接器,分为两种 Source(从源数据库拉取数据写入 Kafka),Sink(从 Kafka 生产数据写入指标数据)

连接器其实并不参加理论的数据 copy,连接器负责管理Task。连接器中定义了对应 Task 的类型,对外提供配置选项(用户创立连接器时须要提供对应的配置信息)。并且连接器还能够决定启动多少个 Task 线程。

用户能够通过 Rest API 启停连接器,查看连接器状态

Confluent 曾经提供了许多成熟的连接器,传送门???? https://www.confluent.io/prod…

Task

理论进行数据传输的单元,和连接器一样同样分为 Source 和 Sink

Task 的配置和状态存储在 Kafka 的 Topic 中,config.storage.topicstatus.storage.topic。咱们能够随时启动,进行工作,以提供弹性、可扩大的数据管道

Worker

刚刚咱们讲的 Connectors 和 Task 属于逻辑单元,而 Worker 是理论运行逻辑单元的过程,Worker 分为两种模式,单机模式和分布式模式

单机模式:比较简单,然而性能也受限,只有一些非凡的场景会应用到,例如收集主机的日志,通常来说更多的是应用分布式模式

分布式模式:为 Kafka Connect 提供了可扩大和故障转移。雷同 group.id 的 Worker,会主动组成集群。当新增 Worker,或者有 Worker 挂掉时,集群会主动协调调配所有的 Connector 和 Task(这个过程称为 Rebalance)

当应用 Worker 集群时,创立连接器,或者连接器 Task 数量变动时,都会触发 Rebalance 以保障集群各个 Worker 节点负载平衡。然而当 Task 进入 Fail 状态的时候并不会触发 Rebalance,只能通过 Rest Api 对 Task 进行重启

Converters

Kafka Connect 通过 Converter 将数据在 Kafka(字节数组)与 Task(Object)之间进行转换

默认反对以下 Converter

  • AvroConverter io.confluent.connect.avro.AvroConverter: 须要应用 Schema Registry
  • ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter: 须要应用 Schema Registry
  • JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter: 须要应用 Schema Registry
  • JsonConverter org.apache.kafka.connect.json.JsonConverter (无需 Schema Registry): 转换为 json 构造
  • StringConverter org.apache.kafka.connect.storage.StringConverter: 简略的字符串格局
  • ByteArrayConverter org.apache.kafka.connect.converters.ByteArrayConverter: 不做任何转换

Converters 与 Connector 是解耦的,下图展现了在 Kafka Connect 中,Converter 在何时进行数据转换

Transforms

连接器能够通过配置 Transform 实现对单个音讯(对应代码中的 Record)的转换和批改,能够配置多个 Transform 组成一个 。例如让所有音讯的 topic 加一个前缀、sink 无奈生产 source 写入的数据格式,这些场景都能够应用 Transform 解决

Transform 如果配置在 Source 则在 Task 之后执行,如果配置在 Sink 则在 Task 之前执行

Dead Letter Queue

与其余 MQ 不同,Kafka 并没有死信队列这个性能。然而 Kafka Connect 提供了这一性能。

当 Sink Task 遇到无奈解决的音讯,会依据 errors.tolerance 配置项决定如何解决,默认状况下(errors.tolerance=none) Sink 遇到无奈解决的记录会间接抛出异样,Task 进入 Fail 状态。开发人员须要依据 Worker 的谬误日志解决问题,而后重启 Task,能力持续生产数据

设置 errors.tolerance=all,Sink Task 会疏忽所有的谬误,持续解决。Worker 中不会有任何谬误日志。能够通过配置errors.deadletterqueue.topic.name = <dead-letter-topic-name> 让无奈解决的音讯路由到 Dead Letter Topic

疾速上手

上面我来实战一下,如何应用 Kafka Connect,咱们先定一个小指标 将 MySQL 中的全量数据同步到 Redis


  1. 新建文件 docker-compose.yaml
version: '3.7'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zk
    ports:
      - 2182:2181

  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      # 宿主机 ip
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.21:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    depends_on:
      - zookeeper

在终端上执行 docker-compose -f docker-compose.yaml up -d 启动 docker 容器

筹备连接器,这里我是本人写了一个简略的连接器????。下载地址:https://github.com/TavenYin/k…

# 将连接器上传到 kafka 容器中
docker cp kafka-connector-example-bin.jar kafka:/opt/connectors
  1. 批改配置并启动 Worker
# 在配置文件开端追加 plugin.path=/opt/connectors
vi /opt/kafka/config/connect-distributed.properties

# 启动 Worker
bin/connect-distributed.sh -daemon config/connect-distributed.properties
  1. 筹备 MySQL

因为我宿主机里曾经装置了 MySQL,我就间接应用了,应用如下 Sql 创立表。创立之后轻易造几条数据

CREATE TABLE `test_user` (`id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ;
  1. 创立连接器

新建 source.json

{
  "name" : "example-source",
  "config" : {
    "connector.class" : "com.github.taven.source.ExampleSourceConnector",
    "tasks.max" : "1",
    "database.url" : "jdbc:mysql://192.168.3.21:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=UTC&rewriteBatchedStatements=true",
    "database.username" : "root",
    "database.password" : "root",
    "database.tables" : "test_user"
  }
}

向 Worker 发送申请,创立连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

source.json 中,有一些属性是 Kafka Connect 提供的,例如上述文件中 name, connector.class, tasks.max,剩下的属性能够在开发 Connector 时自定义。对于 Kafka Connect Configuration 相干请浏览这里 ???? https://docs.confluent.io/pla…

  1. 确认数据是否写入 Kafka

首先查看一下 Worker 中的运行状态,如果 Task 的 state = RUNNING,代表 Task 没有抛出任何异样,安稳运行

bash-4.4# curl -X GET localhost:8083/connectors/example-source/status
{"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"},
"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"source"}

查看 kafka 中 Topic 是否创立

bash-4.4# bin/kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
connect-configs
connect-offsets
connect-status
test_user

这些 Topic 都存储了什么?

  • __consumer_offsets: 记录所有 Kafka Consumer Group 的 Offset
  • connect-configs: 存储连接器的配置,对应 Connect 配置文件中config.storage.topic
  • connect-offsets: 存储 Source 的 Offset,对应 Connect 配置文件中offset.storage.topic
  • connect-status: 连接器与 Task 的状态,对应 Connect 配置文件中status.storage.topic

查看 topic 中数据,此时阐明 MySQL 数据曾经胜利写入 Kafka

bash-4.4# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_user --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":1,"name":"yyyyyy"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":2,"name":"qqqq"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":3,"name":"eeee"}}

数据结构为 Json,能够回顾一下下面咱们批改的 connect-distributed.properties,默认提供的 Converter 为 JsonConverter,所有的数据蕴含 schema 和 payload 两项是因为配置文件中默认启动了key.converter.schemas.enable=truevalue.converter.schemas.enable=true两个选项

  1. 启动 Sink

新建 sink.json

{
  "name" : "example-sink",
  "config" : {
    "connector.class" : "com.github.taven.sink.ExampleSinkConnector",
    "topics" : "test_user, test_order",
    "tasks.max" : "1",
    "redis.host" : "192.168.3.21",
    "redis.port" : "6379",
    "redis.password" : "","redis.database":"0"
  }
}

创立 Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sink.json

而后查看 Sink Connector Status,这里我发现因为我的 Redis 端口只对 localhost 开发,所以这里我的 Task Fail 了,批改了 Redis 配置之后,重启 Task curl -X POST localhost:8083/connectors/example-sink/tasks/0/restart

在确认了 Sink Status 为 RUNNING 后,能够确认下 Redis 中是否有数据

对于 Kafka Connect Rest api 文档,请参考????https://docs.confluent.io/pla…

  1. 如何查看 Sink Offset 生产状况

应用命令
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-example-sink

下图代表 test_user topic 三条数据曾经全副生产

Kafka Connect 高级性能

咱们的小指标曾经达成了。当初两个 Task 无事可做,正好借此机会咱们来体验一下可扩大和故障转移

集群扩大

我启动了开发环境中的 Kafka Connect Worker,依据官网文档所示通过注册同一个 Kafka 并且应用雷同的 group.id=connect-cluster 能够主动组成集群

启动我开发环境中的 Kafka Connect,之后查看两个连接器状态

bash-4.4#  curl -X GET localhost:8083/connectors/example-source/status
{"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.23.176.1:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.23.176.1:8083"}],"type":"source"}bash-4.4#

bash-4.4#  curl -X GET localhost:8083/connectors/example-sink/status
{"name":"example-sink","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"sink"}

察看 worker_id 能够发现,两个 Connectors 曾经别离运行在两个 Worker 上了

故障转移

此时咱们通过 kill pid 完结 docker 中的 Worker 过程察看是否宕机之后主动转移,然而发现 Task 并没有转移到仅存的 Worker 中,Task 状态变为 UNASSIGNED,这是为啥呢?难道是有什么操作错了?

在网上查阅了一番得悉,Kafka Connect 的集群扩大与故障转移机制是通过 Kafka Rebalance 协定实现的(Consumer 也是该协定),当 Worker 节点宕机工夫超过 scheduled.rebalance.max.delay.ms 时,Kafka 才会将其踢出集群。踢出后将该节点的连接器和任务分配给其余 Worker,scheduled.rebalance.max.delay.ms默认值为五分钟。

起初经测试发现,五分钟之后查看连接器信息,曾经转移到存活的 Worker 节点了

原本还打算写一下如何开发连接器和 Kafka Rebalance,然而这篇曾经够长了,所以打算后续更新这两篇文章

正文完
 0