环境装置:
1、下载镜像
这里应用了 wurstmeister/kafka 和 wurstmeister/zookeeper 这两个版本的镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
在命令中运行 docker images 验证两个镜像曾经装置结束
2. 启动
启动 zookeeper 容器
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
启动 kafka 容器
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
127.0.0.1 改为宿主机器的 IP 地址,如果不这么设置,可能会导致在别的机器上拜访不到 kafka。3. 测试 kafka
进入 kafka 容器的命令行
运行 docker ps,找到 kafka 的 CONTAINER ID,运行 docker exec -it ${CONTAINER ID} /bin/bash,进入 kafka 容器。进入 kafka 默认目录 /opt/kafka_2.11-0.10.1.0
生产者:
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
// 生产者模式
func main() {fmt.Printf("producer_test\n")
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Version = sarama.V0_11_0_2
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {fmt.Printf("producer_test create producer error :%s\n", err.Error())
return
}
defer producer.AsyncClose()
msg := &sarama.ProducerMessage{
Topic: "kafka_go_test",
Key: sarama.StringEncoder("go_test"),
}
value := "this is message"
for {fmt.Scanln(&value)
msg.Value = sarama.ByteEncoder(value)
fmt.Printf("input [%s]\n", value)
// send to chain
producer.Input() <- msg
select {case suc := <-producer.Successes():
fmt.Printf("offset: %d, timestamp: %s", suc.Offset, suc.Timestamp.String())
case fail := <-producer.Errors():
fmt.Printf("err: %s\n", fail.Err.Error())
}
}
}
消费者:
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
// 消费者模式
// docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
//--env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
//--env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
func main() {fmt.Printf("consumer_test")
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V0_11_0_2
// consumer
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {fmt.Printf("consumer_test create consumer error %s\n", err.Error())
return
}
defer consumer.Close()
// 默认生产第 0 个 Partition 中的数据
partitionConsumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)
if err != nil {fmt.Printf("try create partition_consumer error %s\n", err.Error())
return
}
defer partitionConsumer.Close()
for {
select {case msg := <-partitionConsumer.Messages():
fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
case err := <-partitionConsumer.Errors():
fmt.Printf("err :%s\n", err.Error())
}
}
}