共计 3381 个字符,预计需要花费 9 分钟才能阅读完成。
版本
•JDK 14•Zookeeper•Kafka
装置 Zookeeper 和 Kafka
Kafka 依赖 Zookeeper,所以咱们须要在装置 Kafka 之前先领有 Zookeeper。筹备如下的 docker-compose.yaml 文件,将文件中的主机地址 192.168.1.100 替换成你本人的环境中的主机地址即可。
version: "3"
services:
zookeeper:
image: zookeeper
build:
context: ./
container_name: zookeeper
ports:
- 2181:2181
volumes:
- ./data/zookeeper/data:/data
- ./data/zookeeper/datalog:/datalog
- ./data/zookeeper/logs:/logs
restart: always
kafka_node_0:
depends_on:
- zookeeper
build:
context: ./
container_name: kafka-node-0
image: wurstmeister/kafka
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
ports:
- 9092:9092
volumes:
- ./data/kafka/node_0:/kafka
restart: unless-stopped
kafka_node_1:
depends_on:
- kafka_node_0
build:
context: ./
container_name: kafka-node-1
image: wurstmeister/kafka
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
ports:
- 9093:9093
volumes:
- ./data/kafka/node_1:/kafka
restart: unless-stopped
kafka_node_2:
depends_on:
- kafka_node_1
build:
context: ./
container_name: kafka-node-2
image: wurstmeister/kafka
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9094
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
ports:
- 9094:9094
volumes:
- ./data/kafka/node_2:/kafka
restart: unless-stopped
输出 docker-compose up -d 运行脚本文件进行集群构建。期待一会儿,失去如下后果即为胜利。
SpringBoot 集成 Kafka 集群
创立一个全新的 SpringBoot 工程,在 build.gradle 文件中增加下列依赖。
dependencies {
...
...
implementation 'org.springframework.kafka:spring-kafka:2.5.2.RELEASE'
implementation 'com.alibaba:fastjson:1.2.71'
}
1. 在 application.properties 进行 Kafka 相干参数配置。
spring.kafka.bootstrap-servers=192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
2. 创立音讯体类。
public class Message {
private Long id;
private String message;
private Date sendAt;
}
3. 创立音讯发送者
public class Sender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send() {Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMessage(UUID.randomUUID().toString());
message.setSendAt(new Date());
log.info("message = {}", JSON.toJSONString(message));
kafkaTemplate.send("test", JSON.toJSONString(message));
}
}
4. 创立音讯接收者
public class Receiver {@KafkaListener(topics = {"test"}, groupId = "test")
public void listen(ConsumerRecord<?, ?> record) {Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) {log.info("receiver record =" + record);
log.info("receiver message =" + message.get());
}
}
}
5. 测试音讯队列
public class QueueController {
@Autowired
private Sender sender;
@PostMapping("/test")
public void testQueue() {sender.send();
sender.send();
sender.send();}
}
失去如下日志即为集成胜利。
到这里就咱们就胜利搭建了一个 Kafka 伪集群,并胜利与 SpringBoot 进行整合。
福利:豆花同学为大家精心整顿了一份对于 linux 和 python 的学习材料大合集!有须要的小伙伴们,关注豆花集体公众号:python 头条!回复关键词“材料合集”即可收费支付!
正文完