kafka整合Flume
前提是曾经装置实现flume
1、增加启动flume时的配置文件
# node01执行以下命令开发flume的配置文件cd /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/conf/### 编辑配置文件vi flume-kafka.conf
#为咱们的source channel sink起名a1.sources = r1a1.channels = c1a1.sinks = k1#指定咱们的source数据收集策略a1.sources.r1.type = spooldira1.sources.r1.spoolDir = /kkb/install/flumeData/filesa1.sources.r1.inputCharset = utf-8#指定咱们的source收集到的数据发送到哪个管道a1.sources.r1.channels = c1#指定咱们的channel为memory,即示意所有的数据都装进memory当中a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100#指定咱们的sink为kafka sink,并指定咱们的sink从哪个channel当中读取数据a1.sinks.k1.channel = c1a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = kaikebaa1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1
2、node01执行以下命令创立Topic
cd /kkb/install/kafka_2.11-1.1.0/bin/kafka-topics.sh --create --topic kaikeba --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181
3、启动Flume
node01执行以下命令启动flume
bin/flume-ng agent -n a1 -c conf -f conf/flume-kafka.conf -Dflume.root.logger=info,console
4、启动kafka控制台消费者,验证数据写入胜利
node01执行以下命令生产kafka当中的数据
cd /kkb/install/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --topic kaikeba --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning