关于kafka:kafka整合Flume

2次阅读

共计 1290 个字符,预计需要花费 4 分钟才能阅读完成。

kafka 整合 Flume

前提是曾经装置实现 flume

  • 1、增加启动 flume 时的配置文件

    # node01 执行以下命令开发 flume 的配置文件
    cd /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/conf/
    ### 编辑配置文件
    vi flume-kafka.conf
    # 为咱们的 source channel  sink 起名
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    #指定咱们的 source 数据收集策略
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /kkb/install/flumeData/files
    a1.sources.r1.inputCharset = utf-8
    
    #指定咱们的 source 收集到的数据发送到哪个管道
    a1.sources.r1.channels = c1
    
    #指定咱们的 channel 为 memory, 即示意所有的数据都装进 memory 当中
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    
    #指定咱们的 sink 为 kafka sink,并指定咱们的 sink 从哪个 channel 当中读取数据
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = kaikeba
    a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
  • 2、node01 执行以下命令创立 Topic

    cd /kkb/install/kafka_2.11-1.1.0/
    bin/kafka-topics.sh --create --topic kaikeba --partitions 3 --replication-factor 2  --zookeeper node01:2181,node02:2181,node03:2181
  • 3、启动 Flume

    ​ node01 执行以下命令启动 flume

    bin/flume-ng agent -n a1 -c conf -f conf/flume-kafka.conf -Dflume.root.logger=info,console
  • 4、启动 kafka 控制台消费者,验证数据写入胜利

    node01 执行以下命令生产 kafka 当中的数据

    cd /kkb/install/kafka_2.11-1.1.0/
    
    bin/kafka-console-consumer.sh --topic kaikeba --bootstrap-server node01:9092,node02:9092,node03:9092  --from-beginning
正文完
 0