共计 4261 个字符,预计需要花费 11 分钟才能阅读完成。
Apache Kafka 是由 Apache 治理的开源流解决平台,由 Scala 和 Java 编写,为解决实时数据提供了对立、高吞吐、低提早的性能个性。
其长久化层实质上是一个“依照分布式事务日志架构的大规模公布 / 订阅音讯队列”,这使它作为企业级基础设施来解决流式数据十分有价值。目前已被数千家公司用于高性能数据管道、流剖析、数据集成和工作要害型应用程序等畛域。
实现形式:kafka-logger
Apache APISIX 早在 1.2 版本开始就曾经提供了 kafka-logger
插件的反对,其后又通过屡次性能强化,目前已具备十分成熟且欠缺的性能。反对将 API 申请日志,甚至申请体和响应体以 JSON 格局推送至 Kafka 集群中。
应用 kafka-logger
时,用户能够发送多种数据并自定义发送的日志格局,同时还反对以批处理的形式打包发送日志或进行主动重试等性能。
如何应用
步骤一:启动 Kafka 集群
本文示例只演示了一种启动形式,其余启动形式细节可参考官网文档。
# 应用 docker-compose 启动一个具备 1 个 zookeeper 节点、3 个 kafka 节点的集群
# 同时还启动一个 EFAK 用于数据监控。version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.1
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zookeeper:2888:3888
kafka1:
image: confluentinc/cp-kafka:6.2.1
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zookeeper
kafka2:
image: confluentinc/cp-kafka:6.2.1
hostname: kafka2
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zookeeper
kafka3:
image: confluentinc/cp-kafka:6.2.1
hostname: kafka3
ports:
- "9094:9094"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
depends_on:
- zookeeper
efak:
image: nickzurich/kafka-eagle:2.0.9
hostname: efak
ports:
- "8048:8048"
depends_on:
- kafka1
- kafka2
- kafka3
步骤二:创立 Topic
接下来咱们创立 test Topic
用于收集日志。
步骤三:创立路由并开启插件
通过下方命令可进行路由创立与 kafka-logger
插件的开启。
curl -XPUT 'http://127.0.0.1:9080/apisix/admin/routes/r1' \
--header 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' \
--header 'Content-Type: application/json' \
--data-raw '{"uri":"/*","plugins": {"kafka-logger": {"batch_max_size": 1,"broker_list": {"127.0.0.1": 9092},
"disable": false,
"kafka_topic": "test",
"producer_type": "sync"
}
},
"upstream": {
"nodes": {"httpbin.org:80": 1},
"type": "roundrobin"
}
}'
上述代码中配置了 kafka broker
地址、指标 Topic、同步的生产模式和每一批次蕴含的最大日志数量。这里咱们能够先将 batch_max_size
设置为 1,即每产生一条日志就向 Kafka 写入一条音讯。
通过上述设置,就能够实现将 /*
门路下的 API 申请日志发送至 Kafka 的性能。
步骤四:发送申请
接下来咱们通过 API 发送一些申请,并记录下申请次数。
# 向 API 发送 10 次申请
curl http://127.0.0.1:9080/get
通过下图能够看到,有一些日志音讯曾经被写入到咱们创立的 test topic
中。点击查看日志内容,能够发现上述进行的 API 申请日志曾经被写入了。
自定义日志构造
当然,在应用过程中咱们也能够通过 kafka-logger
插件提供的元数据配置,来设置发送至 Kafka 的日志数据结构。通过设置 log_format
数据,能够管制发送的数据类型。
比方以下数据中的 $host
、$time_iso8601
等,都是来自于 Nginx 提供的内置变量;也反对如 $route_id
和 $service_id
等 APISIX 提供的变量配置。
curl -XPUT 'http://127.0.0.1:9080/apisix/admin/plugin_metadata/kafka-logger' \
--header 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' \
--header 'Content-Type: application/json' \
--data-raw '{"log_format": {"host":"$host","@timestamp":"$time_iso8601","client_ip":"$remote_addr","route_id":"$route_id"}
}'
通过发送申请进行简略测试,能够看到上述日志构造设置已失效。目前 Apache APISIX 提供多种日志格局模板,在配置上具备极大的灵活性,更多日志格局细节可参考官网文档。
敞开插件
如应用结束,只需移除路由配置中 kafka-logger
插件相干配置并保留,即可敞开路由上的插件。得益于 Apache APISIX 的动态化劣势,开启敞开插件的过程都不须要重启 Apache APISIX,非常不便。
$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '{"methods": ["GET"],"uri":"/hello","plugins": {},"upstream": {"type":"roundrobin","nodes": {"127.0.0.1:1980": 1}
}
}'
总结
本文为大家介绍了对于 kafka-logger
插件的性能前瞻与应用步骤,更多对于 kafka-logger
插件阐明和残缺配置列表,能够参考官网文档。
目前,咱们也在开发其余日志类插件以便与更多相干服务进行集成。如果您对此类集成我的项目感兴趣,也欢送随时在 GitHub Discussions 中发动探讨,或通过邮件列表进行交换。