关于kafka:WSL-下-ubuntu1804-docker安装kafka

6次阅读

共计 2665 个字符,预计需要花费 7 分钟才能阅读完成。

环境装置:

1、下载镜像
这里应用了 wurstmeister/kafka 和 wurstmeister/zookeeper 这两个版本的镜像

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
在命令中运行 docker images 验证两个镜像曾经装置结束

2. 启动
启动 zookeeper 容器
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

启动 kafka 容器

docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest

127.0.0.1 改为宿主机器的 IP 地址,如果不这么设置,可能会导致在别的机器上拜访不到 kafka。3. 测试 kafka
进入 kafka 容器的命令行

运行 docker ps,找到 kafka 的 CONTAINER ID,运行 docker exec -it ${CONTAINER ID} /bin/bash,进入 kafka 容器。进入 kafka 默认目录 /opt/kafka_2.11-0.10.1.0

生产者:

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

// 生产者模式
func main() {fmt.Printf("producer_test\n")
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    config.Version = sarama.V0_11_0_2

    producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
    if err != nil {fmt.Printf("producer_test create producer error :%s\n", err.Error())
        return
    }

    defer producer.AsyncClose()
    msg := &sarama.ProducerMessage{
        Topic: "kafka_go_test",
        Key:   sarama.StringEncoder("go_test"),
    }
    value := "this is message"
    for {fmt.Scanln(&value)
        msg.Value = sarama.ByteEncoder(value)
        fmt.Printf("input [%s]\n", value)
        // send to chain
        producer.Input() <- msg

        select {case suc := <-producer.Successes():
            fmt.Printf("offset: %d,  timestamp: %s", suc.Offset, suc.Timestamp.String())
        case fail := <-producer.Errors():
            fmt.Printf("err: %s\n", fail.Err.Error())
        }
    }
}

消费者:

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

// 消费者模式
// docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
//--env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
//--env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
func main() {fmt.Printf("consumer_test")
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    config.Version = sarama.V0_11_0_2

    // consumer
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {fmt.Printf("consumer_test create consumer error %s\n", err.Error())
        return
    }

    defer consumer.Close()
    // 默认生产第 0 个 Partition 中的数据
    partitionConsumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)
    if err != nil {fmt.Printf("try create partition_consumer error %s\n", err.Error())
        return
    }
    defer partitionConsumer.Close()
    for {
        select {case msg := <-partitionConsumer.Messages():
            fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
                msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
        case err := <-partitionConsumer.Errors():
            fmt.Printf("err :%s\n", err.Error())
        }
    }
}
正文完
 0