乐趣区

kafka-入门详解

Kafka

Kafka 核心概念

什么是 Kafka

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布 / 订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka 可以通过 Kafka Connect 连接到外部系统(用于数据输入 / 输出),并提供了 Kafka Streams——一个 Java 流式处理库。该设计受事务日志的影响较大。

基本概念

Kafka 是一个分布式数据流平台,可以运行在单台服务器上,也可以在多台服务器上部署形成集群。它提供了发布和订阅功能,使用者可以发送数据到 Kafka 中,也可以从 Kafka 中读取数据(以便进行后续的处理)。Kafka 具有高吞吐、低延迟、高容错等特点。下面介绍一下 Kafka 中常用的基本概念:

  • Broker

消息队列中常用的概念,在 Kafka 中指部署了 Kafka 实例的服务器节点。

  • Topic

用来区分不同类型信息的主题。比如应用程序 A 订阅了主题 t1,应用程序 B 订阅了主题 t2 而没有订阅 t1,那么发送到主题 t1 中的数据将只能被应用程序 A 读到,而不会被应用程序 B 读到。

  • Partition

每个 topic 可以有一个或多个 partition(分区)。分区是在物理层面上的,不同的分区对应着不同的数据文件。Kafka 使用分区支持物理上的并发写入和读取,从而大大提高了吞吐量。

  • Record

实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了 key、value 和 timestamp。

  • Producer

生产者,用来向 Kafka 中发送数据(record)。

  • Consumer

消费者,用来读取 Kafka 中的数据(record)。

  • Consumer Group

一个消费者组可以包含一个或多个消费者。使用多分区 + 多消费者方式可以极大提高数据下游的处理速度。

kafka 核心名词解释

  • Topic(主题): 每一条发送到 kafka 集群的消息都可以有一个类别,这个类别叫做 topic, 不同的消息会进行分开存储,如果 topic 很大,可以分布到多个 broker 上,也可以这样理解:topic 被认为是一个队列,每一条消息都必须指定它的 topic,可以说我们需要明确把消息放入哪一个队列。对于传统的 message queue 而言,一般会删除已经被消费的消息,而 Kafka 集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此 Kafka 提供两种策略删除旧数据。一是基于时间,二是基于 Partition 文件大小。
  • Broker(代理): 一台 kafka 服务器就可以称之为 broker. 一个集群由多个 broker 组成,一个 broker 可以有多个 topic
  • Partition(分区): 为了使得 kafka 吞吐量线性提高,物理上把 topic 分成一个或者多个分区,每一个分区是一个有序的队列。且每一个分区在物理上都对应着一个文件夹,该文件夹下存储这个分区所有消息和索引文件。

分区的表示: topic 名字 - 分区的 id 每个日志文件都是一个 Log Entry 序列,每个 Log Entry 包含一个 4 字节整型数值(值为 M +5),1 个字节的 ”magic value”,4 个字节的 CRC 校验码,然后跟 M 个字节的消息这个 log entries 并非由一个文件构成,而是分成多个 segment,每个 segment 以该 segment 第一条消息的 offset 命名并以“.kafka”为后缀。另外会有一个索引文件,它标明了每个 segment 下包含的 log entry 的 offset 范围分区中每条消息都有一个当前 Partition 下唯一的 64 字节的 offset,它指明了这条消息的起始位置,Kafka 只保证一个分区的数据顺序发送给消费者,而不保证整个 topic 里多个分区之间的顺序

  • Replicas(副本): 试想:一旦某一个 Broker 宕机,则其上所有的 Partition 数据都不可被消费,所以需要对分区备份。其中一个宕机后其它 Replica 必须要能继续服务并且即不能造成数据重复也不能造成数据丢失。

如果没有一个 Leader,所有 Replica 都可同时读 / 写数据,那就需要保证多个 Replica 之间互相(N×N 条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了 Replication 实现的复杂性,同时也增加了出现异常的几率。而引入 Leader 后,只有 Leader 负责数据读写,Follower 只向 Leader 顺序 Fetch 数据(N 条通路),系统更加简单且高效。
每一个分区,根据复制因子 N,会有 N 个副本,比如在 broker1 上有一个 topic,分区为 topic-1, 复制因子为 2,那么在两个 broker 的数据目录里,就都有一个 topic-1, 其中一个是 leader,一个 replicas 同一个 Partition 可能会有多个 Replica,而这时需要在这些 Replication 之间选出一个 Leader,Producer 和 Consumer 只与这个 Leader 交互,其它 Replica 作为 Follower 从 Leader 中复制数据

  • Producer: Producer 将消息发布到指定的 topic 中,同时,producer 还需要指定该消息属于哪个 partition
  • Consumer: 本质上 kafka 只支持 topic,每一个 consumer 属于一个 consumer group,每个 consumer group 可以包含多个 consumer。发送到 topic 的消息只会被订阅该 topic 的每个 group 中的一个 consumer 消费。如果所有的 consumer 都具有相同的 group,这种情况和 queue 很相似,消息将会在 consumer 之间均衡分配;如果所有的 consumer 都在不同的 group 中,这种情况就是广播模式,消息会被发送到所有订阅该 topic 的 group 中,那么所有的 consumer 都会消费到该消息。kafka 的设计原理决定,对于同一个 topic,同一个 group 中 consumer 的数量不能多于 partition 的数量,否则就会有 consumer 无法获取到消息。
  • Offset: Offset 专指 Partition 以及 User Group 而言,记录某个 user group 在某个 partiton 中当前已经消费到达的位置。

kafka 使用场景

目前主流使用场景基本如下:

  • 消息队列(MQ)

在系统架构设计中,经常会使用消息队列(Message Queue)——MQ。MQ 是一种跨进程的通信机制,用于上下游的消息传递,使用 MQ 可以使上下游解耦,消息发送上游只需要依赖 MQ,逻辑上和物理上都不需要依赖其他下游服务。MQ 的常见使用场景如流量削峰、数据驱动的任务依赖等等。在 MQ 领域,除了 Kafka 外还有传统的消息队列如 ActiveMQ 和 RabbitMQ 等。

  • 追踪网站活动

Kafka 最出就是被设计用来进行网站活动(比如 PV、UV、搜索记录等)的追踪。可以将不同的活动放入不同的主题,供后续的实时计算、实时监控等程序使用,也可以将数据导入到数据仓库中进行后续的离线处理和生成报表等。

  • Metrics

Kafka 经常被用来传输监控数据。主要用来聚合分布式应用程序的统计数据,将数据集中后进行统一的分析和展示等。

  • 日志聚合

很多人使用 Kafka 作为日志聚合的解决方案。日志聚合通常指将不同服务器上的日志收集起来并放入一个日志中心,比如一台文件服务器或者 HDFS 中的一个目录,供后续进行分析处理。相比于 Flume 和 Scribe 等日志聚合工具,Kafka 具有更出色的性能。

kafka 集群搭建

安装 kefka 集群

由于 kafka 依赖 zookeeper 环境所以先安装 zookeeper,zk 安装

安装环境


linux: CentSO-7.5_x64
java: jdk1.8.0_191
zookeeper: zookeeper3.4.10
kafka: kafka_2.11-2.0.1
# 下载
$ wget http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

# 解压
$ tar -zxvf kafka_2.11-2.1.0.tgz

# 编辑配置文件修改一下几个配置
$ vim $KAFKA_HOME/config/server.properties

# 每台服务器的 broker.id 都不能相同只能是数字
broker.id=1

# 修改为你的服务器的 ip 或主机名
advertised.listeners=PLAINTEXT://node-1:9092

# 设置 zookeeper 的连接端口, 将下面的 ip 修改为你的 IP 称或主机名
zookeeper.connect=node-1:2181,node-2:2181,node-3:2181

启动 Kafka 集群并测试

$ cd $KAFKA_HOME

# 分别在每个节点启动 kafka 服务 (-daemon 表示在后台运行)
$ bin/kafka-server-start.sh -daemon config/server.properties

# 创建一个名词为 test-topic 的 Topic,partitions 表示分区数量为 3 --replication-factor 表示副本数量为 2
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test-topic

# 查看 topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181

# 查看 topic 状态
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic

# 查看 topic 详细信息
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic

# 修改 topic 信息
$ bin/kafka-topics.sh --alter --topic test-topic --zookeeper localhost:2181 --partitions 5

# 删除 topic( 简单的删除,只是标记删除)$ bin/kafka-topics.sh --delete --topic test-topic --zookeeper localhost:2181

# 在一台服务器上创建一个 producer (生产者)
$ bin/kafka-console-producer.sh --broker-list node-1:9092,node-2:9092,node-3:9092 --topic test-topic

# 在一台服务器上创建一个 consumer (消费者)
$ bin/kafka-console-consumer.sh --bootstrap-server node-2:9092,node-3:9092,node-4:9092 --topic test-topic --from-beginning

# 现在可以在生产者的控制台输入任意字符就可以看到消费者端有消费消息。

java 客户端连接 kafka

普通 java 形式

  • pom.xml
<dependencies>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
    </plugins>
</build>
  • JavaKafkaConsumer.java 消费者

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
public class JavaKafkaConsumer {private static Logger logger = LoggerFactory.getLogger(JavaKafkaConsumer.class);

    private static Producer<String, String> producer;

    private final static String TOPIC = "kafka-test-topic";

    private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";

    private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";

    private static Properties properties;

    static {properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_BROKER);
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
    }

    public static void main(String[] args) {final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList(TOPIC), new ConsumerRebalanceListener() {public void onPartitionsRevoked(Collection<TopicPartition> collection) { }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                // 将偏移设置到最开始
                consumer.seekToBeginning(collection);
            }
        });
        while (true) {ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {logger.info("offset: {}, key: {}, value: {}", record.offset(), record.key(), record.value());
            }
        }
    }

}
  • JavaKafkaProducer.java 生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.UUID;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
public class JavaKafkaProducer {private static Logger logger = LoggerFactory.getLogger(JavaKafkaProducer.class);

    private static Producer<String, String> producer;

    private final static String TOPIC = "kafka-test-topic";

    private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";

    private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";

    private static Properties properties;

    static {properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_BROKER);
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
    }

    public static void main(String[] args) {Producer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 200; i++) {
            try {Thread.sleep(1000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            String uuid = UUID.randomUUID().toString();
            producer.send(new ProducerRecord<>(TOPIC, Integer.toString(i), uuid));
            logger.info("send message success key: {}, value: {}", i, uuid);
        }
        producer.close();}

}
  • KafkaClient.java

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;

import java.util.*;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
public class KafkaClient {

    private final static String TOPIC = "kafka-test-topic";

    private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";

    private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";

    private static Properties properties = new Properties();

    static {properties.put("bootstrap.servers", KAFKA_BROKER);
    }

    /**
     * 创建 topic
     */
    @Test
    public void createTopic() {AdminClient adminClient = AdminClient.create(properties);
        List<NewTopic> newTopics = Arrays.asList(new NewTopic(TOPIC, 1, (short) 1));
        CreateTopicsResult result = adminClient.createTopics(newTopics);
        try {result.all().get();} catch (Exception e) {e.printStackTrace();
        }
    }


    /**
     * 创建 topic
     */
    @Test
    public void create() {ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        // 创建一个 3 个分区 2 个副本名为 t1 的 topic
        AdminUtils.createTopic(zkUtils, "t1", 3, 2, new Properties(), RackAwareMode.Enforced$.MODULE$);
        zkUtils.close();}

    /**
     * 查询 topic
     */
    @Test
    public void listTopic() {ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        // 获取 topic 所有属性
        Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "streaming-topic");

        Iterator it = props.entrySet().iterator();
        while (it.hasNext()) {Map.Entry entry = (Map.Entry) it.next();
            System.err.println(entry.getKey() + "=" + entry.getValue());
        }
        zkUtils.close();}

    /**
     * 修改 topic
     */
    @Test
    public void updateTopic() {ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "log-test");
        // 增加 topic 级别属性
        props.put("min.cleanable.dirty.ratio", "0.4");
        // 删除 topic 级别属性
        props.remove("max.message.bytes");
        // 修改 topic 'test' 的属性
        AdminUtils.changeTopicConfig(zkUtils, "log-test", props);
        zkUtils.close();}

    /**
     * 删除 topic 't1'
     */
    @Test
    public void deleteTopic() {ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        AdminUtils.deleteTopic(zkUtils, "t1");
        zkUtils.close();}


}
  • log4j.properties 日志配置
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

基于 spirngboot 整合 kafka

  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <artifactId>spring-boot-kafka</artifactId>
    <groupId>com.andy</groupId>
    <version>1.0.7.RELEASE</version>
    
    <packaging>jar</packaging>
    <modelVersion>4.0.0</modelVersion>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.spring.platform</groupId>
                <artifactId>platform-bom</artifactId>
                <version>Cairo-SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.0.3.RELEASE</version>
                <configuration>
                    <!--<mainClass>${start-class}</mainClass>-->
                    <layout>ZIP</layout>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  • application.yml
spring:
  application:
    name: spring-jms
  kafka:
    bootstrap-servers: node-2:9092,node-3:9092,node-4:9092
    producer:
      retries:
      batch-size: 16384
      buffer-memory: 33554432
      compressionType: snappy
      acks: all
    consumer:
      group-id: 0
      auto-offset-reset: earliest
      enable-auto-commit: true
  • Message.java 消息

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
@ToString
public class Message<T> {

    private Long id;

    private T message;

    private Date time;

    public Long getId() {return id;}

    public void setId(Long id) {this.id = id;}

    public T getMessage() {return message;}

    public void setMessage(T message) {this.message = message;}

    public Date getTime() {return time;}

    public void setTime(Date time) {this.time = time;}
}
  • KafkaController.java 控制器
import com.andy.jms.kafka.service.KafkaSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * <p> 
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@RestController
public class KafkaController {

    @Autowired
    private KafkaSender kafkaSender;

    @GetMapping("/kafka/{topic}")
    public String send(@PathVariable("topic") String topic, @RequestParam String message) {kafkaSender.send(topic, message);
        return "success";
    }

}
  • KafkaReceiver.java
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * <p> 
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@Component
public class KafkaReceiver {@KafkaListener(topics = {"order"})
    public void listen(ConsumerRecord<?, ?> record) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();
            log.info("record:{}", record);
            log.info("message:{}", message);
        }
    }
}
  • KafkaSender.java
import com.andy.jms.kafka.commen.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@Component
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    /**
     *
     * @param topic
     * @param body
     */
    public void send(String topic, Object body) {Message<String> message = new Message<>();
        message.setId(System.currentTimeMillis());
        message.setMessage(body.toString());
        message.setTime(new Date());
        String content = null;
        try {content = objectMapper.writeValueAsString(message);
        } catch (JsonProcessingException e) {e.printStackTrace();
        }
        kafkaTemplate.send(topic, content);
        log.info("send {} to {} success!", message, topic);
    }
}
  • 启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Leone
 * @since 2018-04-10
 **/
@SpringBootApplication
public class JmsApplication {public static void main(String[] args) {SpringApplication.run(JmsApplication.class, args);
    }
}

github 地址

退出移动版