环境装置:
1、下载镜像这里应用了wurstmeister/kafka和wurstmeister/zookeeper这两个版本的镜像docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka在命令中运行docker images验证两个镜像曾经装置结束2.启动启动zookeeper容器docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper启动kafka容器docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest127.0.0.1 改为宿主机器的IP地址,如果不这么设置,可能会导致在别的机器上拜访不到kafka。3. 测试kafka进入kafka容器的命令行运行 docker ps,找到kafka的 CONTAINER ID,运行 docker exec -it ${CONTAINER ID} /bin/bash,进入kafka容器。进入kafka默认目录 /opt/kafka_2.11-0.10.1.0
生产者:
package mainimport ( "fmt" "github.com/Shopify/sarama")//生产者模式func main() { fmt.Printf("producer_test\n") config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true config.Producer.Return.Errors = true config.Version = sarama.V0_11_0_2 producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config) if err != nil { fmt.Printf("producer_test create producer error :%s\n", err.Error()) return } defer producer.AsyncClose() msg := &sarama.ProducerMessage{ Topic: "kafka_go_test", Key: sarama.StringEncoder("go_test"), } value := "this is message" for { fmt.Scanln(&value) msg.Value = sarama.ByteEncoder(value) fmt.Printf("input [%s]\n", value) // send to chain producer.Input() <- msg select { case suc := <-producer.Successes(): fmt.Printf("offset: %d, timestamp: %s", suc.Offset, suc.Timestamp.String()) case fail := <-producer.Errors(): fmt.Printf("err: %s\n", fail.Err.Error()) } }}
消费者:
package mainimport ( "fmt" "github.com/Shopify/sarama")//消费者模式// docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181//--env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1//--env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latestfunc main() { fmt.Printf("consumer_test") config := sarama.NewConfig() config.Consumer.Return.Errors = true config.Version = sarama.V0_11_0_2 // consumer consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { fmt.Printf("consumer_test create consumer error %s\n", err.Error()) return } defer consumer.Close() //默认生产第0个 Partition中的数据 partitionConsumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest) if err != nil { fmt.Printf("try create partition_consumer error %s\n", err.Error()) return } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n", msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value)) case err := <-partitionConsumer.Errors(): fmt.Printf("err :%s\n", err.Error()) } }}