大数据系列kafka学习笔记

10次阅读

共计 6405 个字符,预计需要花费 17 分钟才能阅读完成。

1. 大数据领域数据类型

1.1 有界数据

一般批处理(一个文件 或者一批文件), 不管文件多大,都是可以度量

mapreduce hive sparkcore sparksql

1.2 无界数据

源源不断的流水一样 (流数据)

Storm SparkStreaming

2. 消息队列(Message Queue)

  • 消息 Message

    • 网络中的两台计算机或者两个通讯设备之间传递的数据, 例如说:文本、音乐、视频等内容
  • 队列 Queue

    • 一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部移除元素和在尾部追加元素。入队、出队。
  • 消息队列 MQ

    • 消息 + 队列
    • 保存消息的队列
    • 消息的传输过程中的容器
    • 主要提供生产、消费接口供外部调用做数据的存储和获取

3. 消息队列的分类

3.1 点对点(P2P)

  • 一个生产者生产的消息只能被一个消费者消费

3.2 发布订阅(Pub/Sub)

消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber)

  • 消息的发布者
  • 消息的订阅者

    每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到。

4. Kafka 的简介

  • 在大数据领域呢,为了满足日益增长的数据量,也有一款可以满足百万级别消息的生成和消费,分布式、持久稳定的产品——Kafka
  • Kafka 是分布式的发布—订阅消息系统(基于 PS 的一个消息队列)
  • 它最初由 LinkedIn(领英)公司发布, 使用 Scala 语言编写
  • Kafka 是一个高吞吐量的、持久性的、分布式发布订阅消息系统
  • 它主要用于处理活跃的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据

5. Kafka 的特点

  • 高吞吐量

    • 可以满足每秒 百万级 别消息的生产和消费(生产消费)
  • 持久性

    • 有一套完善的消息存储机制,确保数据的高效安全的持久化 (数据的存储)
  • 分布式

    • 基于分布式的扩展和容错机制;Kafka 的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体健壮性

6. Kafka 的组件

  • 一个消息队列需要哪些部分?

    • 生产
    • 消费
    • 消息类别
    • 存储等等
  • Topic(主题)

    • Kafka 处理的消息的不同分类
  • Broker (消息代理)

    • Kafka 集群中的一个 kafka 服务节点称为一个 broker,主要存储消息数据, 存在硬盘中。每个 topic 都是有分区的
  • Partition (物理上的分区)

    • 一个 topic 在 broker 中被分为 1 个或者多个 partition,分区在创建 topic 的时候指定
  • Message (消息)

    • 消息,是通信的基本单位,每个消息都属于一个 partition

7. Kafka 的服务

  • Producer : 消息和数据的生产者,向 Kafka 的一个 topic 发布消息
  • Consumer : 消息和数据的消费者,定于 topic 并处理其发布的消息
  • Zookeeper : 协调 kafka 的正常运行

8. Kafka 的安装

8.1 单机版的安装

  • 准备 kafka

    • kafka_2.10-0.10.0.1.tgz
  • 解压 kafka

    • tar -zxvf kafka_2.10-0.10.0.1.tgz -C /opt/
  • 重命名

    • mv kafka_2.10-0.10.0.1.tgz kafka
  • 配置环境变量

    export KAFKA_HOME=/opt/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
  • 编辑 server.properties

    broker.id=1
    log.dirs=/opt/kafka/logs
    zookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181
    listeners=PLAINTEXT://:9092          
  • 启动 kafka-server 服务

    kafka-server-start.sh [-daemon] server.properties
  • 停止 kafka 服务

     kafka-server-stop.sh

8.2 集群的安装

只需要在每个机器上修改对应的 ==broker.id=1== 即可

9. Kafka 中 Topic 的操作

  • 创建 topic

    kafka-topics.sh  --create --topic t1 --partitions 3 --replication-factor 1  --zookeeper uplooking03:2181,uplooking04:2181

    == 注意: 创建 topic 过程的问题,replication-factor 个数不能超过 brokerserver 的个数 ==

  • 查看 topic

    kafka-topics.sh  --list --zookeeper uplooking03
  • 查看具体 topic 的详情

    kafka-topics.sh  --describe --topic t1 --zookeeper uplooking04:2181
    PartitionCount:topic 对应的 partition 的个数
    ReplicationFactor:topic 对应的副本因子,说白就是副本个数
    Partition:partition 编号,从 0 开始递增
    Leader:当前 partition 起作用的 breaker.id
    Replicas: 当前副本数据存在的 breaker.id,是一个列表,排在最前面的其作用
    Isr:当前 kakfa 集群中可用的 breaker.id 列表    
  • 修改 topic(不能修改 replication-factor,以及只能对 partition 个数进行增加,不能减少)

    kafka-topics.sh --alter --topic t1 --partitions 4 --zookeeper uplooking03
  • 删除 Topic

    kafka-topics.sh --delete --topic t1 --zookeeper uplooking03

    ps: 这种删除只是标记删除, 要想彻底删除必须设置一个属性, 在 server.properties 中配置 delete.topic.enable=true,否则只是标记删除

    配置完成之后,需要重启 kafka 服务

10. Kafka 中的生产者和消费者接口

  • 自己写代码实现 kafka 提供的消息生产和消费的接口
  • kafka 自身也实现了自身的生产和消费的接口, 给出了两个工具(kafka-console-producer.sh , kafka-console-consumer.sh)

11. Kafka 自带的生产和消费消息的工具

11.1 kafka-console-producer.sh(生产工具)

kafka-console-producer.sh --topic t1  --broker-list uplooking03:9092,uploo
king04:9092,uplooking05:9092

11.2 kafka-console-consumer.sh(消费工具)

kafka-console-consumer.sh  --zookeeper uplooking03 --topic t1
--from-beginning: 从头开始消费
--blacklist: 黑名单过滤(kafka-console-consumer.sh  --zookeeper uplooking03   --blacklist t1,t3)
--whitelist: 白名单过滤(kafka-console-consumer.sh  --zookeeper uplooking03   --whitelist t2)    

ps:--topic|--blacklist|--whitelist 只能出现其中一个

12. ==Flume 与 Kafka 的整合 ==

  • 配置 flume 的 agent 配置文件

    touch flume-kafka.properties

    # 对各个组件的描述说明
    # 其中 a1 为 agent 的名字
    # r1 是 a1 的 source 的代号名字
    # c1 是 a1 的 channel 的代号名字
    # k1 是 a1 的 sink 的代号名字
    ############################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 用于描述 source 的,类型是 netcat 网络
    a1.sources.r1.type = netcat
    # source 监听的网络 ip 地址和端口号
    a1.sources.r1.bind = uplooking01
    a1.sources.r1.port = 44444
    
    
    
    # 用于描述 sink,类型是 kafka
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = hadoop
    a1.sinks.k1.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 2
    
    
    # 用于描述 channel,在内存中做数据的临时的存储
    a1.channels.c1.type = memory
    # 该内存中最大的存储容量,1000 个 events 事件
    a1.channels.c1.capacity = 1000
    # 能够同时对 100 个 events 事件监管事务
    a1.channels.c1.transactionCapacity = 100
    
    
    # 将 a1 中的各个组件建立关联关系,将 source 和 sink 都指向了同一个 channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • 启动 flume 开始采集数据

    [root@uplooking01:/opt/flume/conf]
        flume-ng agent --name a1 --conf-file flume-kafka.properties
  • 开启 Kafka 消息消费工具

    [root@uplooking03:/opt/flume/conf]
        kafka-console-consumer.sh  --zookeeper uplooking03 --topic hadoop
  • 给 flume 监听的 Source 发送数据

    [root@uplooking03:/]
        nc uplooking01 44444
  • 现在就可以到 kafka 的消费工具 (kafka-console-consumer.sh) 中区查看 nc 发送的数据

13. Kafka 的 API 操作(生产者和消费者)

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>0.10.0.1</version>
</dependency>

13.1 Kafka 的生产者

  • 创建生产者的配置文件 producer.properties

    bootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
  • 创建生产者并且发送数据到 topic 中

    public class MyKafkaProducer {public static void main(String[] args) throws IOException {Properties prop = new Properties();
            prop.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(prop);
            kafkaProducer.send(new ProducerRecord<String, String>("hadoop", "name", "admin123"));
            kafkaProducer.close();}
    }

13.2 Kafka 的消费者

  • 创建消费者的配置文件 consumer.properties

    zookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181
    group.id=test-consumer-group
    bootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • 创建消息消费者消费 topic 中的数据

    public static void main(String[] args) throws Exception {Properties prop = new Properties();
        prop.load(MyKafkaConsumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(prop);
        Collection topics = new ArrayList();
        topics.add("hadoop");
        kafkaConsumer.subscribe(topics);
        while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());
            }
        }
    }
  • 自定义分区(MyCustomPartition)

    package com.uplooking.bigdata.kafka.partition;
    public class MyCustomPartition implements Partitioner {public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)  {
    // 获取分区数,    分区编号一般都是从 0 开始
    int partitionSize = cluster.partitionCountForTopic(topic);
    int keyHash = Math.abs(key.hashCode());
    int valueHash = Math.abs(value.hashCode());
    return keyHash % partitionSize;
    }
    public void close() {}
    public void configure(Map<String, ?> configs) {}}

    配置自定义分区(producer.properties)

    partitioner.class=com.uplooking.bigdata.kafka.partition.MyCustomPartition
正文完
 0