前言

入手实际往往比看看更重要????

单机版 Docker 搭建

version: '2' services:  zookeeper:    image: wurstmeister/zookeeper    ports:      - "2181:2181"  kafka:    image: wurstmeister/kafka    depends_on: [ zookeeper ]    ports:      - "9092:9092"    environment:      KAFKA_ADVERTISED_HOST_NAME: kafka      KAFKA_ADVERTISED_PORT: 9092      KAFKA_CREATE_TOPICS: "test:1:1"      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"

注意事项:

  • 如果想要 java 客户端可能失常连贯上 kafka, 须要配置宿主机的 host
sudo vim /etc/hosts172.20.10.6 kafka
  • 如何应用 kafka 自带的 kafka-console-producer 测试发送音讯?
kafka-console-producer.sh --bootstrap-server kafka:9092 --topic test

集群版 + kafka manager

kafka 集群 docker-compose

version: '2'services:  zookeeper:    image: wurstmeister/zookeeper    ports:      - "2181:2181"  kafka1:    restart: always    image: wurstmeister/kafka    depends_on: [ zookeeper ]    ports:      - "9092:9092"    environment:      KAFKA_BROKER_ID: 1      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092"      KAFKA_LISTENERS: "PLAINTEXT://kafka1:9092"      KAFKA_PORT: 9092  kafka2:    restart: always    image: wurstmeister/kafka    depends_on: [ zookeeper ]    ports:      - "9093:9093"    environment:      KAFKA_BROKER_ID: 2      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka2:9093"      KAFKA_LISTENERS: "PLAINTEXT://kafka2:9093"      KAFKA_PORT: 9093  kafka3:    restart: always    image: wurstmeister/kafka    depends_on: [ zookeeper ]    ports:      - "9094:9094"    environment:      KAFKA_BROKER_ID: 3      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka3:9094"      KAFKA_LISTENERS: "PLAINTEXT://kafka3:9094"      KAFKA_PORT: 9094

注意事项:

  • 如果想要 java 客户端可能失常连贯上 kafka, 须要配置宿主机的 host
sudo vim /etc/hosts172.20.10.6 kafka1172.20.10.6 kafka2172.20.10.6 kafka3
  • 如何应用 kafka 自带的 kafka-console-producer 测试发送音讯?这里假如是进入到 kafka3 容器中
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 3 --topic test2
kafka-console-producer.sh --bootstrap-server kafka3:9094 --topic test

kafka-manager docker-compose

version: "2"services:  kafka-manager:    image: kafkamanager/kafka-manager    container_name: kafka-manager    ports:      - "9000:9000"    external_links:  # 连贯本compose文件以外的container      - kafka_kafka1_1      - kafka_kafka2_1      - kafka_kafka3_1    environment:      ZK_HOSTS: kafka_zookeeper_1:2181networks:  default:    external:      name: kafka_default

注意事项
kafka-manager、与 kafka 集群不在同一个 compose 中。因而这里须要应用 networks 连贯到 kafka 集群的网络中

基本操作

以下均在 docker 内操作

cd /opt/kafka/bin
  1. 创立主题

创立了 1 个正本 1 个分区的主题

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test2
  1. 查看主题
kafka-topics.sh  --zookeeper zookeeper:2181 --list
  1. 查看主题详情
kafka-topics.sh  --zookeeper zookeeper:2181  --describe  --topic test2
  1. 发送音讯
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test2
  1. 生产音讯
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2 --from-beginning

springboot 连贯 kafka

maven

<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.1.10.RELEASE</version></parent><dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.kafka</groupId>        <artifactId>spring-kafka</artifactId>    </dependency></dependencies>

yaml

server:  port: 9009spring:  kafka:    bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094    producer:      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer      group-id: goup1 # 生产组

生产者

@Autowiredprivate KafkaTemplate<String,Object> kafkaTemplate;kafkaTemplate.send("test2","qweqwe");

消费者

@KafkaListener(topics = "test2")public void onMsg(String msg) {    log.error("kafka {}" ,msg);    System.out.println(msg);}

相干文档

  • Kafka 官网配置参数
  • kafka中文文档broker配置参数