环境装置:

1、下载镜像这里应用了wurstmeister/kafka和wurstmeister/zookeeper这两个版本的镜像docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka在命令中运行docker images验证两个镜像曾经装置结束2.启动启动zookeeper容器docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper启动kafka容器docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest127.0.0.1 改为宿主机器的IP地址,如果不这么设置,可能会导致在别的机器上拜访不到kafka。3. 测试kafka进入kafka容器的命令行运行 docker ps,找到kafka的 CONTAINER ID,运行 docker exec -it ${CONTAINER ID} /bin/bash,进入kafka容器。进入kafka默认目录 /opt/kafka_2.11-0.10.1.0

生产者:

package mainimport (    "fmt"    "github.com/Shopify/sarama")//生产者模式func main() {    fmt.Printf("producer_test\n")    config := sarama.NewConfig()    config.Producer.RequiredAcks = sarama.WaitForAll    config.Producer.Partitioner = sarama.NewRandomPartitioner    config.Producer.Return.Successes = true    config.Producer.Return.Errors = true    config.Version = sarama.V0_11_0_2    producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)    if err != nil {        fmt.Printf("producer_test create producer error :%s\n", err.Error())        return    }    defer producer.AsyncClose()    msg := &sarama.ProducerMessage{        Topic: "kafka_go_test",        Key:   sarama.StringEncoder("go_test"),    }    value := "this is message"    for {        fmt.Scanln(&value)        msg.Value = sarama.ByteEncoder(value)        fmt.Printf("input [%s]\n", value)        // send to chain        producer.Input() <- msg        select {        case suc := <-producer.Successes():            fmt.Printf("offset: %d,  timestamp: %s", suc.Offset, suc.Timestamp.String())        case fail := <-producer.Errors():            fmt.Printf("err: %s\n", fail.Err.Error())        }    }}

消费者:

package mainimport (    "fmt"    "github.com/Shopify/sarama")//消费者模式// docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181//--env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1//--env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latestfunc main() {    fmt.Printf("consumer_test")    config := sarama.NewConfig()    config.Consumer.Return.Errors = true    config.Version = sarama.V0_11_0_2    // consumer    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)    if err != nil {        fmt.Printf("consumer_test create consumer error %s\n", err.Error())        return    }    defer consumer.Close()    //默认生产第0个 Partition中的数据    partitionConsumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)    if err != nil {        fmt.Printf("try create partition_consumer error %s\n", err.Error())        return    }    defer partitionConsumer.Close()    for {        select {        case msg := <-partitionConsumer.Messages():            fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",                msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))        case err := <-partitionConsumer.Errors():            fmt.Printf("err :%s\n", err.Error())        }    }}