关于运维:Kafka最佳实践

6次阅读

共计 30754 个字符,预计需要花费 77 分钟才能阅读完成。

前言

Kafka 最佳实际,波及

  1. 典型应用场景
  2. Kafka 应用的最佳实际

Kafka 典型应用场景

Data Streaming

Kafka 可能对接到 Spark、Flink、Flume 等多个支流的流数据处理技术。利用 Kafka 高吞吐量的特点,客户能够通过 Kafka 建设传输通道,把利用侧的海量数据传输到流数据处理引擎中,数据通过解决剖析后,可反对后端大数据分析,AI 模型训练等多种业务。

日志平台

Kafka 最罕用也是我最相熟的场景是日志剖析零碎。典型的实现形式是在客户端部署 日志收集器(如 Fluentd、Filebeat 或者 Logstash 等)进行日志采集,并将数据发送到 Kafka,之后通过后端的 ES 等进行数据运算,再搭建一个展现层如 Kibana 进行统计分析数据的展现。

物联网

随着有价值的用例的呈现,物联网 (IoT) 正失去越来越多的关注。然而,一个要害的挑战是整合设施和机器来实时和大规模地解决数据。Apache Kafka®及其周边的生态系统,包含 Kafka Connect、Kafka Streams,曾经成为集成和解决这类数据集的首选技术。

Kafka 曾经被用于许多物联网部署,包含消费者物联网和工业物联网(IIoT)。大多数场景都须要牢靠、可伸缩和平安的端到端集成,从而反对实时的双向通信和数据处理。一些具体的用例是:

  • 联网的汽车基础设施
  • 智能城市和智能家居
  • 智能批发和客户 360
  • 智能制作

具体的实现架构如下图所示:

应用的最佳实际

可靠性最佳实际

基于生产者和消费者配置满足不同的可靠性

生产者 At Least Once

生产者须要设置 request.required.acks = ALL,服务端主节点写胜利且备节点同步胜利才 返回 Response。

消费者 At Least Once

消费者接管音讯后,应先进行对应业务操作,随后再进行 commit 标识音讯已被解决,通过这种解决形式能够确保一条音讯在业务解决失败时,可能从新被生产。留神消费者的 enable.auto.commit 参数须要设置为 False,确保 commit 动作手工管制。

生产者 At Most Once

保障一条音讯最多投放一次,须要设置 request.required.acks = 0,同时设置 retries = 0。这里的原理是生产者遇到任何异样都不重试,并且不思考 broker 是否响应写入胜利。

消费者 At Most Once

保障一条音讯最多被生产一次,须要消费者在 收到音讯后先进行 commit 标识音讯已被解决,随后再进行对应业务操作。这里的原理是消费者不须要管理论业务的处理结果,拿到音讯当前立即 commit 通知 broker 音讯解决胜利。留神消费者的 enable.auto.commit 参数须要设置为 False,确保 commit 动作手工管制。

生产者 Exactly-once

Kafka 0.11 版本起新增了 幂等音讯 的语义,通过设置 enable.idempotence=true 参数,能够实现 单个分区 的音讯幂等。

如果 Topic 波及多个分区或者须要多条音讯封装成一个事务保障幂等,则须要减少 Transaction 管制,样例如下:

// 开启幂等控制参数
producerProps.put("enbale.idempotence", "true");
// 初始化事务
producer.initTransactions();
// 设置事务 ID
producerProps.put("transactional.id", "id-001");

try{
  // 开始事务,并在事务中发送 2 条音讯
  producer.beginTranscation();
  producer.send(record0);
  producer.send(record1);
  // 提交事务
  producer.commitTranscation();} catch(Exception e) {producer.abortTransaction();
  producer.close();}
消费者 Exactly-once

须要设置 isolation.level=read_committed,并设置 enable.auto.commit = false,确保消费者只生产生产者曾经提交事务的音讯,消费者业务须要确保事务性防止反复解决音讯,比如说把音讯长久化到数据库,而后向服务端提交 commit。

依据业务场景选用适合的语义

应用 At Least Once 语义撑持可承受大量音讯反复的业务

At Least Once 是最罕用的语义,可确保音讯只多不少的发送和生产,性能和可靠性上有较好的均衡,能够作为 默认选用 的模式。业务侧也能够通过在音讯体退出惟一的业务主键自行保障幂等性,在生产侧确保同一个业务主键的音讯只被解决一次。

应用 Exactly Once 语义撑持须要强幂等性业务

Exactly Once 语义个别用相对不容许反复的要害业务,典型案例是 订单和领取相干场景

应用 At Most Once 语义撑持非关键业务

At Most Once 语义个别用在 非关键业务 ,业务 对于音讯失落并不敏感 ,只须要尽量确保音讯胜利生产生产即可。典型应用 At Most Once 语义的场景是 音讯告诉,呈现大量脱漏音讯影响不大,相比之下反复发送告诉会造成较坏的用户体验。

性能调优最佳实际

正当设置 Topic 的 partition 数量

以下汇总了通过 partition 调优性能倡议思考的维度,建议您依据实践剖析配合压力测试对系统整体性能进行调优。

思考维度 阐明
吞吐量 减少 partition 的数量能够音讯生产的并发度,当零碎瓶颈在于生产端,而生产端又能够程度扩大的时候,减少 partition 能够减少零碎吞吐量。在 Kafka 外部每个 Topic 下的每个 partition 都是一个独立的音讯解决通道,一个 partition 内的音讯只能被同时被一个 consumer group 生产,当 consumer group 数量多于 partition 的数量时,多余的 consumer group 会呈现闲暇。
音讯程序 Kafka 能够保障一个 partition 内的音讯程序性,partition 之间的音讯程序无奈保障,减少 partition 的时候须要思考音讯程序对业务的影响。
实例 Partition 下限 Partition 减少会耗费底层更多的内存,IO 和文件句柄等资源。在布局 Topic 的 partition 数量时须要思考 Kafka 集群能反对的 partition 下限。

生产者,消费者与 partition 的关系阐明。

正当设置 batch 大小

如果 Topic 设置了多个分区,生产者发送音讯时须要先确认往哪个分区发送。在给同一个分区发送多条音讯时,Producer 客户端会将相干音讯打包成一个 Batch,批量发送到服务端。个别状况下,小 Batch 会导致 Producer 客户端产生大量申请,造成申请队列在客户端和服务端的排队,从而整体推高了音讯发送和生产提早。

一个适合的 batch 大小,能够缩小发送音讯时客户端向服务端发动的申请次数,在整体上进步音讯发送的吞吐和提早。

Batch 参数阐明如下:

参数 阐明
batch.size 发往每个分区(Partition)的音讯缓存量(音讯内容的字节数之和,不是条数)。达到设置的数值时,就会触发一次网络申请,而后 Producer 客户端把音讯批量发往服务器。
linger.ms 每条音讯在缓存中的最长工夫。若超过这个工夫,Producer 客户端就会疏忽 batch.size 的限度,立刻把音讯发往服务器。
buffer.memory 所有缓存音讯的总体大小超过这个数值后,就会触发把音讯发往服务器,此时会疏忽 batch.sizelinger.ms 的限度。buffer.memory 的默认数值是 32MB,对于单个 Producer 而言,能够保障足够的性能。

Batch 相干参数值的抉择并没有通用的办法,倡议针对性能敏感的业务场景进行压测调优。

应用粘性分区解决大批量发送

Kafka 生产者与服务端发送音讯时有批量发送的机制,只有发送到雷同 Partition 的音讯才会被放到同一个 Batch 中。在大批量发送场景,如果音讯散落到多个 Partition 当中就可能会造成多个小 Batch,导致批量发送机制生效而升高性能。

Kafka 默认抉择分区的策略如下

场景 策略
音讯指定 Key 对音讯的 Key 进行哈希,而后依据哈希后果抉择分区,保障雷同 Key 的音讯会发送到同一个分区。
音讯没有指定 Key 默认策略是循环应用主题的所有分区,将音讯以轮询的形式发送到每一个分区上。

从默认机制可见 partition 的抉择随机性很强,因而在大批量传输的场景下,举荐设置 partitioner.class参数,指定自定义的分区抉择算法实现 粘性分区

其中一种实现办法是在固定的时间段内应用同一个 partition,过一段时间切换到下一个分区,防止数据散落到多个不同 partition。

通用最佳实际

Kafka 对音讯程序的保障

Kafka 会在同一个 partition 内保障音讯程序,如果 Topic 存在多个 partition 则无奈确保全局程序。如果须要保障全局程序,则须要管制 partition 数量为 1 个。

对音讯设置惟一的 Key

音讯队列 Kafka 的音讯有 Key(音讯标识)和 Value(音讯内容)两个字段。为了便于追踪,倡议为音讯设置一个惟一的 Key。之后能够通过 Key 追踪某音讯,打印发送日志和生产日志,理解该音讯的生产和生产状况。

正当设置队列的重试策略

分布式环境下,因为网络等起因,音讯偶然会呈现发送失败的状况,其起因可能是音讯曾经发送胜利然而 ACK 机制失败或者音讯的确没有发送胜利。默认的参数能满足大部分场景,但能够依据业务需要,按需设置以下重试参数:

参数 阐明
retries 重试次数,默认值为 3,但对于数据失落零容忍的利用而言,请思考设置为 Integer.MAX_VALUE(无效且最大)。
retry.backoff.ms 重试距离,倡议设置为 1000。

留神:

如果心愿实现 At Most Once 语义,重试须要敞开。

接入最佳实际

Spark Streaming 接入 Kafka

Spark Streaming 是 Spark Core 的一个扩大,用于高吞吐且容错地解决持续性的数据,目前反对的内部输出有 Kafka、Flume、HDFS/S3、Kinesis、Twitter 和 TCP socket。

Spark Streaming 将间断数据抽象成 DStream(Discretized Stream),而 DStream 由一系列间断的 RDD(弹性分布式数据集)组成,每个 RDD 是肯定工夫距离内产生的数据。应用函数对 DStream 进行解决其实即为对这些 RDD 进行解决。

应用 Spark Streaming 作为 Kafka 的数据输出时,可反对 Kafka 稳固版本与试验版本:

Kafka Version spark-streaming-kafka-0.8 spark-streaming-kafka-0.10
Broker Version 0.8.2.1 or higher 0.10.0 or higher
Api Maturity Deprecated Stable
Language Support Scala、Java、Python Scala、Java
Receiver DStream Yes No
Direct DStream Yes Yes
SSL / TLS Support No Yes
Offset Commit Api No Yes
Dynamic Topic Subscription No Yes

本次实际应用 0.10.2.1 版本的 Kafka 依赖。

操作步骤

步骤 1:创立 Kafka 集群及 Topic

创立 Kafka 集群的步骤略,再创立一个名为 test 的 Topic。

步骤 2:筹备服务器环境

Centos6.8 零碎

package version
sbt 0.13.16
hadoop 2.7.3
spark 2.1.0
protobuf 2.5.0
ssh CentOS 默认装置
Java 1.8

具体装置步骤略,包含以下步骤:

  1. 装置 sbt
  2. 装置 protobuf
  3. 装置 Hadoop
  4. 装置 Spark
步骤 3:对接 Kafka
向 Kafka 中生产音讯

这里应用 0.10.2.1 版本的 Kafka 依赖。

  1. build.sbt 增加依赖:
name := "Producer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.1"
  1. 配置 producer_example.scala

    import java.util.Properties
    import org.apache.kafka.clients.producer._
    object ProducerExample extends App {val  props = new Properties()
        props.put("bootstrap.servers", "172.0.0.1:9092") // 实例信息中的内网 IP 与端口
    
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    
        val producer = new KafkaProducer[String, String](props)
        val TOPIC="test"  // 指定要生产的 Topic
        for(i<- 1 to 50){val record = new ProducerRecord(TOPIC, "key", s"hello $i") // 生产 key 是 "key",value 是 hello i 的音讯
            producer.send(record)
        }
        val record = new ProducerRecord(TOPIC, "key", "the end"+new java.util.Date)
        producer.send(record)
        producer.close() // 最初要断开}

更多无关 ProducerRecord 的用法请参考 ProducerRecord 文档。

从 Kafka 生产音讯
DirectStream
  1. build.sbt 增加依赖:
name := "Consumer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0"
  1. 配置 DirectStream_example.scala
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {def main(args: Array[String]) {val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "172.0.0.1:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "spark_stream_test1",
            "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> "false"
        )

        val sparkConf = new SparkConf()
        sparkConf.setMaster("local")
        sparkConf.setAppName("Kafka")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        val topics = Array("spark_test")

        val offsets : Map[TopicPartition, Long] = Map()

        for (i <- 0 until 3){val tp = new TopicPartition("spark_test", i)
            offsets.updated(tp , 0L)
        }
        val stream = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
        )
        println("directStream")
        stream.foreachRDD{ rdd=>
            // 输入取得的音讯
            rdd.foreach{iter =>
                val i = iter.value
                println(s"${i}")
            }
            // 取得 offset
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            rdd.foreachPartition { iter =>
                val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
                println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
            }
        }

        // Start the computation
        ssc.start()
        ssc.awaitTermination()}
}
RDD
  1. 配置build.sbt(配置同上,单击查看)。
  2. 配置RDD_example
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {def main(args: Array[String]) {val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "172.0.0.1:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "spark_stream",
            "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        val sc = new SparkContext("local", "Kafka", new SparkConf())
        val java_kafkaParams : java.util.Map[String, Object] = kafkaParams
        // 按程序向 parition 拉取相应 offset 范畴的音讯,如果拉取不到则阻塞直到超过等待时间或者新生产音讯达到拉取的数量
        val offsetRanges = Array[OffsetRange](OffsetRange("spark_test", 0, 0, 5),
            OffsetRange("spark_test", 1, 0, 5),
            OffsetRange("spark_test", 2, 0, 5)
        )
        val range = KafkaUtils.createRDD[String, String](
            sc,
            java_kafkaParams,
            offsetRanges,
            PreferConsistent
        )
        range.foreach(rdd=>println(rdd.value))
        sc.stop()}
}

更多 kafkaParams 用法参考 kafkaParams 文档。

Flume 接入 Kafka

Apache Flume 是一个分布式、牢靠、高可用的日志收集零碎,反对各种各样的数据起源(如 HTTP、Log 文件、JMS、监听端口数据等),能将这些数据源的海量日志数据进行高效收集、聚合、挪动,最初存储到指定存储系统中(如 Kafka、分布式文件系统、Solr 搜寻服务器等)。

Flume 根本构造如下:

Flume 以 agent 为最小的独立运行单位。一个 agent 就是一个 JVM,单个 agent 由 Source、Sink 和 Channel 三大组件形成。

Flume 与 Kafka

把数据存储到 HDFS 或者 HBase 等上游存储模块或者计算模块时须要思考各种简单的场景,例如并发写入的量以及零碎承载压力、网络提早等问题。Flume 作为灵便的分布式系统具备多种接口,同时提供可定制化的管道。
在生产解决环节中,当生产与处理速度不统一时,Kafka 能够充当缓存角色。Kafka 领有 partition 构造以及采纳 append 追加数据,使 Kafka 具备优良的吞吐能力;同时其领有 replication 构造,使 Kafka 具备很高的容错性。
所以将 Flume 和 Kafka 联合起来,能够满足生产环境中绝大多数要求。

筹备工作

  • 下载 Apache Flume(1.6.0 以上版本兼容 Kafka)
  • 下载 Kafka 工具包(0.9.x 以上版本,0.8 曾经不反对)
  • 确认 Kafka 的 Source、Sink 组件曾经在 Flume 中。

接入形式

Kafka 可作为 Source 或者 Sink 端对音讯进行导入或者导出。

Kafka Source

配置 kafka 作为消息来源,行将本人作为消费者,从 Kafka 中拉取数据传入到指定 Sink 中。次要配置选项如下:

配置项 阐明
channels 本人配置的 Channel
type 必须为:org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers Kafka Broker 的服务器地址
kafka.consumer.group.id 作为 Kafka 生产端的 Group ID
kafka.topics Kafka 中数据起源 Topic
batchSize 每次写入 Channel 的大小
batchDurationMillis 每次写入最大间隔时间

示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource 
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

更多内容请参考 Apache Flume 官网。

Kafka Sink

配置 Kafka 作为内容接管方,行将本人作为生产者,推到 Kafka Server 中期待后续操作。次要配置选项如下:

配置项 阐明
channel 本人配置的 Channel
type 必须为:org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers Kafka Broker 的服务器
kafka.topics Kafka 中数据起源 Topic
kafka.flumeBatchSize 每次写入的 Bacth 大小
kafka.producer.acks Kafka 生产者的生产策略

示例:

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

更多内容请参考 Apache Flume 官网。

Storm 接入 Kafka

Storm 是一个分布式实时计算框架,可能对数据进行流式解决和提供通用性分布式 RPC 调用,能够实现处理事件亚秒级的提早,实用于对提早要求比拟高的实时数据处理场景。

Storm 工作原理

在 Storm 的集群中有两种节点,管制节点 Master Node 和工作节点 Worker NodeMaster Node 上运行 Nimbus 过程,用于资源分配与状态监控。Worker Node上运行 Supervisor 过程,监听工作工作,启动 executor 执行。整个 Storm 集群依赖 zookeeper 负责公共数据寄存、集群状态监听、任务分配等性能。

用户提交给 Storm 的数据处理程序称为 topology,它解决的最小音讯单位是tuple,一个任意对象的数组。topologyspoutbolt 形成,spout是产生 tuple 的源头,bolt能够订阅任意 spoutbolt收回的 tuple 进行解决。

Storm with Kafka

Storm 能够把 Kafka 作为spout,生产数据进行解决;也能够作为bolt,寄存通过解决后的数据提供给其它组件生产。

Centos6.8 零碎

package version
maven 3.5.0
storm 2.1.0
ssh 5.3
Java 1.8

前提条件

  • 下载并装置 JDK 8。具体操作,请参见 Download JDK 8。
  • 下载并装置 Storm,参考 Apache Storm downloads。
  • 已创立 Kafka 集群。

操作步骤

步骤 1:创立 Topic
步骤 2:增加 Maven 依赖

pom.xml 配置如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>storm</name> 
     <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.2.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>ExclamationTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
步骤 3:生产音讯
应用 spout/bolt

topology 代码:

//TopologyKafkaProducerSpout.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

import java.util.Properties;

public class TopologyKafkaProducerSpout {
    // 申请的 kafka 实例 ip:port
    private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
    // 指定要将音讯写入的 topic
    private final static String TOPIC = "storm_test";
    public static void main(String[] args) throws Exception {
        // 设置 producer 属性
        // 函数参考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
        // 属性参考:http://kafka.apache.org/0102/documentation.html
        Properties properties = new Properties();
        properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.put("acks", "1");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创立写入 kafka 的 bolt,默认应用 fields("key" "message")作为生产音讯的 key 和 message,也能够在 FieldNameBasedTupleToKafkaMapper()中指定
        KafkaBolt kafkaBolt = new KafkaBolt()
                .withProducerProperties(properties)
                .withTopicSelector(new DefaultTopicSelector(TOPIC))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
        TopologyBuilder builder = new TopologyBuilder();
        // 一个程序生成音讯的 spout 类,输入 field 是 sentence
        SerialSentenceSpout spout = new SerialSentenceSpout();
        AddMessageKeyBolt bolt = new AddMessageKeyBolt();
        builder.setSpout("kafka-spout", spout, 1);
        // 为 tuple 加上生产到 kafka 所须要的 fields
        builder.setBolt("add-key", bolt, 1).shuffleGrouping("kafka-spout");
        // 写入 kafka
        builder.setBolt("sendToKafka", kafkaBolt, 8).shuffleGrouping("add-key");
    
        Config config = new Config();
        if (args != null && args.length > 0) {
            // 集群模式,用于打包 jar,并放到 storm 运行
            config.setNumWorkers(1);
            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
        } else {
            // 本地模式
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", config, builder.createTopology());
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();}
    
    }
}

创立一个程序生成音讯的 spout 类:

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.UUID;

public class SerialSentenceSpout extends BaseRichSpout {

    private SpoutOutputCollector spoutOutputCollector;
    
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector = spoutOutputCollector;}
    
    @Override
    public void nextTuple() {Utils.sleep(1000);
        // 生产一个 UUID 字符串发送给下一个组件
        spoutOutputCollector.emit(new Values(UUID.randomUUID().toString()));
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("sentence"));
    }
}

tuple 加上 key、message 两个字段,当 key 为 null 时,生产的音讯平均调配到各个 partition,指定了 key 后将依照 key 值 hash 到特定 partition 上:

//AddMessageKeyBolt.java
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class AddMessageKeyBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        // 取出第一个 filed 值
        String messae = tuple.getString(0);
        //System.out.println(messae);
        // 发送给下一个组件
        basicOutputCollector.emit(new Values(null, messae));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        // 创立发送给下一个组件的 schema
        outputFieldsDeclarer.declare(new Fields("key", "message"));
    }
}
应用 trident

应用 trident 类生成 topology:

//TopologyKafkaProducerTrident.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
import org.apache.storm.kafka.trident.TridentKafkaStateUpdater;
import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Properties;

public class TopologyKafkaProducerTrident {
    // 申请的 kafka 实例 ip:port
    private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
    // 指定要将音讯写入的 topic
    private final static String TOPIC = "storm_test";
    public static void main(String[] args) throws Exception {
        // 设置 producer 属性
        // 函数参考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
        // 属性参考:http://kafka.apache.org/0102/documentation.html
        Properties properties = new Properties();
        properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.put("acks", "1");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置 Trident
        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
                .withProducerProperties(properties)
                .withKafkaTopicSelector(new DefaultTopicSelector(TOPIC))
                // 设置应用 fields("key", "value")作为音讯写入  不像 FieldNameBasedTupleToKafkaMapper 有默认值
                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"));
        TridentTopology builder = new TridentTopology();
        // 一个批量产生句子的 spout, 输入 field 为 sentence
        builder.newStream("kafka-spout", new TridentSerialSentenceSpout(5))
                .each(new Fields("sentence"), new AddMessageKey(), new Fields("key", "value"))
                .partitionPersist(stateFactory, new Fields("key", "value"), new TridentKafkaStateUpdater(), new Fields());

        Config config = new Config();
        if (args != null && args.length > 0) {
            // 集群模式,用于打包 jar,并放到 storm 运行
            config.setNumWorkers(1);
            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
        } else {
            // 本地模式
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", config, builder.build());
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();}
    
    }
    
    private static class AddMessageKey extends BaseFunction {
    
        @Override
        public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
            // 取出第一个 filed 值
            String messae = tridentTuple.getString(0);
            //System.out.println(messae);
            // 发送给下一个组件
            //tridentCollector.emit(new Values(Integer.toString(messae.hashCode()), messae));
            tridentCollector.emit(new Values(null, messae));
        }
    }
}

创立一个批量生成音讯的 spout 类:

//TridentSerialSentenceSpout.java
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.UUID;

public class TridentSerialSentenceSpout implements IBatchSpout {

    private final int batchCount;
    
    public TridentSerialSentenceSpout(int batchCount) {this.batchCount = batchCount;}
    
    @Override
    public void open(Map map, TopologyContext topologyContext) { }
    
    @Override
    public void emitBatch(long l, TridentCollector tridentCollector) {Utils.sleep(1000);
        for(int i = 0; i < batchCount; i++){tridentCollector.emit(new Values(UUID.randomUUID().toString()));
        }
    }
    
    @Override
    public void ack(long l) { }
    
    @Override
    public void close() {}
    
    @Override
    public Map<String, Object> getComponentConfiguration() {Config conf = new Config();
        conf.setMaxTaskParallelism(1);
        return conf;
    }
    
    @Override
    public Fields getOutputFields() {return new Fields("sentence");
    }
}
步骤 4:生产音讯
应用 spout/bolt
//TopologyKafkaConsumerSpout.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.*;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.HashMap;
import java.util.Map;

import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;

public class TopologyKafkaConsumerSpout {
    // 申请的 kafka 实例 ip:port
    private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
    // 指定要将音讯写入的 topic
    private final static String TOPIC = "storm_test";

    public static void main(String[] args) throws Exception {
        // 设置重试策略
        KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
                Integer.MAX_VALUE,
                KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
        );
        ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
                new Fields("topic", "partition", "offset", "key", "value"));
        // 设置 consumer 参数
        // 函数参考 http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
        // 参数参考 http://kafka.apache.org/0102/documentation.html
        KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
                .setProp(new HashMap<String, Object>(){{put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); // 设置 group
                    put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); // 设置 session 超时
                    put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); // 设置申请超时
                }})
                .setOffsetCommitPeriodMs(10_000) // 设置主动确认工夫
                .setFirstPollOffsetStrategy(LATEST) // 设置拉取最新消息
                .setRetry(kafkaSpoutRetryService)
                .setRecordTranslator(trans)
                .build();
    
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
        builder.setBolt("bolt", new BaseRichBolt(){
            private OutputCollector outputCollector;
            @Override
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
    
            @Override
            public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.outputCollector = outputCollector;}
    
            @Override
            public void execute(Tuple tuple) {System.out.println(tuple.getStringByField("value"));
                outputCollector.ack(tuple);
            }
        }, 1).shuffleGrouping("kafka-spout");
    
        Config config = new Config();
        config.setMaxSpoutPending(20);
        if (args != null && args.length > 0) {config.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
        }
        else {LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", config, builder.createTopology());
            Utils.sleep(20000);
            cluster.killTopology("test");
            cluster.shutdown();}
    }
}
应用 trident
//TopologyKafkaConsumerTrident.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.HashMap;

import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;


public class TopologyKafkaConsumerTrident {
    // 申请的 kafka 实例 ip:port
    private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
    // 指定要将音讯写入的 topic
    private final static String TOPIC = "storm_test";

    public static void main(String[] args) throws Exception {
        ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
                new Fields("topic", "partition", "offset", "key", "value"));
        // 设置 consumer 参数
        // 函数参考 http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
        // 参数参考 http://kafka.apache.org/0102/documentation.html
        KafkaTridentSpoutConfig spoutConfig = KafkaTridentSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
                .setProp(new HashMap<String, Object>(){{put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); // 设置 group
                    put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 设置主动确认
                    put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); // 设置 session 超时
                    put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); // 设置申请超时
                }})
                .setFirstPollOffsetStrategy(LATEST) // 设置拉取最新消息
                .setRecordTranslator(trans)
                .build();
    
        TridentTopology builder = new TridentTopology();
//      Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutTransactional(spoutConfig)); // 事务型
        Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutOpaque(spoutConfig));
        spoutStream.each(spoutStream.getOutputFields(), new BaseFunction(){
            @Override
            public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {System.out.println(tridentTuple.getStringByField("value"));
                tridentCollector.emit(new Values(tridentTuple.getStringByField("value")));
            }
        }, new Fields("message"));

        Config conf = new Config();
        conf.setMaxSpoutPending(20);conf.setNumWorkers(1);
        if (args != null && args.length > 0) {conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.build());
        }
        else {StormTopology stormTopology = builder.build();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, stormTopology);
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();stormTopology.clear();
        }
    }
}
步骤 5:提交 Storm

应用 mvn package 编译后,能够提交到本地集群进行 debug 测试,也能够提交到正式集群进行运行。

storm jar your_jar_name.jar topology_name
storm jar your_jar_name.jar topology_name tast_name

Logstash 接入 Kafka

Logstash 是一个开源的日志解决工具,能够从多个源头收集数据、过滤收集的数据并对数据进行存储作为其余用处。

Logstash 灵活性强,领有弱小的语法分析性能,插件丰盛,反对多种输出和输入源。Logstash 作为程度可伸缩的数据管道,与 Elasticsearch 和 Kibana 配合,在日志收集检索方面功能强大。

Logstash 工作原理

Logstash 数据处理能够分为三个阶段:inputs → filters → outputs。

  1. inputs:产生数据起源,例如文件、syslog、redis 和 beats 此类起源。
  2. filters:批改过滤数据,在 Logstash 数据管道中属于中间环节,能够依据条件去对事件进行更改。一些常见的过滤器包含:grok、mutate、drop 和 clone 等。
  3. outputs:将数据传输到其余中央,一个事件能够传输到多个 outputs,当传输实现后这个事件就完结。Elasticsearch 就是最常见的 outputs。

同时 Logstash 反对编码解码,能够在 inputs 和 outputs 端指定格局。

Logstash 接入 Kafka 的劣势

  • 能够异步解决数据:避免突发流量。
  • 解耦:当 Elasticsearch 异样的时候不会影响上游工作。

留神:​
Logstash 过滤耗费资源,如果部署在生产 server 上会影响其性能。

操作步骤

筹备工作
  • 下载并装置 Logstash,参考 Download Logstash。
  • 下载并装置 JDK 8,参考 Download JDK 8。
  • 已创立 Kafka 集群。
步骤 1:创立 Topic

创立一个名为 logstash_test的 Topic。

步骤 2:接入 Kafka
作为 inputs 接入
  1. 执行 bin/logstash-plugin list,查看曾经反对的插件是否含有 logstash-input-kafka
  2. .bin/ 目录下编写配置文件 input.conf
    此处将规范输入作为数据起点,将 Kafka 作为数据起源。

    input {
        kafka {
            bootstrap_servers => "xx.xx.xx.xx:xxxx" // kafka 实例接入地址
            group_id => "logstash_group"  // kafka groupid 名称
            topics => ["logstash_test"] // kafka topic 名称
            consumer_threads => 3 // 生产线程数,个别与 kafka 分区数统一
            auto_offset_reset => "earliest"
        }
    }
    output {stdout{codec=>rubydebug}
    }
  3. 执行以下命令启动 Logstash,进行音讯生产。

    ./logstash -f input.conf

会看到方才 Topic 中的数据被生产进去。

作为 outputs 接入
  1. 执行 bin/logstash-plugin list,查看曾经反对的插件是否含有 logstash-output-kafka
  2. 在.bin/目录下编写配置文件 output.conf
    此处将规范输出作为数据起源,将 Kafka 作为数据目的地。

    input {
        input {stdin{}
      }
    }
    
    output {
       kafka {
            bootstrap_servers => "xx.xx.xx.xx:xxxx"  // ckafka 实例接入地址
            topic_id => "logstash_test" // ckafka topic 名称
           }
    }
  3. 执行如下命令启动 Logstash,向创立的 Topic 发送音讯。

    ./logstash -f output.conf
  4. 启动 Kafka 消费者,测验上一步的生产数据。

    ./kafka-console-consumer.sh --bootstrap-server 172.0.0.1:9092 --topic logstash_test --from-begging --new-consumer

Filebeats 接入 Kafka

Beats 平台 汇合了多种繁多用处数据采集器。这些采集器装置后可用作轻量型代理,从成千盈百或成千上万台机器向指标发送采集数据。

Beats 有多种采集器,您能够依据本身的需要下载对应的采集器。本文以 Filebeat(轻量型日志采集器)为例,向您介绍 Filebeat 接入 Kafka 的操作指办法,及接入后常见问题的解决办法。

前提条件

  • 下载并装置 Filebeat(参见 Download Filebeat)
  • 下载并装置 JDK 8(参见 Download JDK 8)
  • 已 创立 Kafka 集群

操作步骤

步骤 1:创立 Topic

创立一个名为 test 的 Topic。

步骤 2:筹备配置文件

进入 Filebeat 的装置目录,创立配置监控文件 filebeat.yml。

#======= Filebeat prospectors ==========
filebeat.prospectors:
- input_type: log 
# 此处为监听文件门路
  paths:
    - /var/log/messages

#=======  Outputs =========

#------------------ kafka -------------------------------------
output.kafka:
  version:0.10.2 // 依据不同 Kafka 集群版本配置
  # 设置为 Kafka 实例的接入地址
  hosts: ["xx.xx.xx.xx:xxxx"]
  # 设置指标 topic 的名称
  topic: 'test'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: none
  max_message_bytes: 1000000

  # SASL 须要配置下列信息,如果不须要则上面两个选项可不配置
  username: "yourinstance#yourusername"  //username 须要拼接实例 ID 和用户名
  password: "yourpassword"
步骤 4:Filebeat 发送音讯
  1. 执行如下命令启动客户端。

    sudo ./filebeat -e -c filebeat.yml 
  2. 为监控文件减少数据(示例为写入监听的 testlog 文件)。

    echo ckafka1 >> testlog
    echo ckafka2 >> testlog
    echo ckafka3 >> testlog
  3. 开启 Consumer 生产对应的 Topic,取得以下数据。

    {"@timestamp":"2017-09-29T10:01:27.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka1","offset":500,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
    {"@timestamp":"2017-09-29T10:01:30.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka2","offset":508,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
    {"@timestamp":"2017-09-29T10:01:33.937Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka3","offset":516,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
SASL/PLAINTEXT 模式

如果您须要进行 SALS/PLAINTEXT 配置,则须要配置用户名与明码。在 Kafka 配置区域新减少 username 和 password 配置即可。

参考链接

音讯队列 CKafka – 文档核心 – 腾讯云 (tencent.com)

三人行, 必有我师; 常识共享, 天下为公. 本文由东风微鸣技术博客 EWhisper.cn 编写.

正文完
 0