关于java:Spring-boot结合Kafka的Demo

40次阅读

共计 14499 个字符,预计需要花费 37 分钟才能阅读完成。

欢送关注自己集体博客 https://huangshubin.gitee.io,浏览体验更佳!
文章链接 KafkaDemo

KafkaDemo 参考博客:http://www.54tianzhisheng.cn/…

Spark 装置:厦大数据库实验室 Spark 装置

Scala 装置:厦大数据库实验室 Scala 装置

相应软件的装置

环境:
jdk 1.8.0_161

scala 2.12.12

spark 2.4.7

maven 3.6.3

zookeeper 3.6.1

kafka_2.12-2.5.0

apache-tomcat-8.5.61

nginx-1.17.10.tar.gz

jdk 的装置

  1. 在 /usr/local/ 下创立 java ⽂件夹并进⼊。

cd /usr/local/

mkdir java

cd java

  1. 将上⾯筹备好的 JDK 安装包解压到 /usr/local/java 中即可

tar -zxvf /root/jdk-8u161-linux-x64.tar.gz -C ./

解压完之后,/usr/local/java ⽬录中会呈现⼀个 jdk1.8.0_161 的⽬录

  1. 编辑 /etc/profile ⽂件,在⽂件尾部加⼊如下 JDK 环境配置即可
JAVA_HOME=/usr/local/java/jdk1.8.0_161  
CLASSPATH=$JAVA_HOME/lib/  
PATH=$PATH:$JAVA_HOME/bin  
export PATH JAVA_HOME CLASSPATH

而后执⾏如下命令让环境变量⽣效

source /etc/profile

  1. 输⼊如下命令即可查看装置后果:
java -version  
javac

Maven 装置

  1. 这⾥下载的是 apache-maven-3.6.3-bin.tar.gz 安装包,并将其搁置于提前创立好的 /opt/maven
    ⽬录下并解压。
tar zxvf apache-maven-3.6.3-bin.tar.gz
  1. 配置 Maven 阿里减速镜像源

编辑批改 /opt/maven/apache-maven-3.6.3/conf/settings.xml
⽂件,在 <mirrors></mirrors> 标签对⾥增加如下内容即可:

<mirror>
    <id>alimaven</id>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    <mirrorOf>central</mirrorOf>
</mirror>
  1. 配置环境变量

    编辑批改 /etc/profile ⽂件,在⽂件尾部增加如下内容,配置 maven 的装置门路

    export MAVEN_HOME=/opt/maven/apache-maven-3.6.3
    export PATH=$MAVEN_HOME/bin:$PATH

    接下来执行 source /etc/profile 来刷新环境变量,让 maven 环境的门路配置失效

  2. 执⾏ mvn –v,能打印出 maven 版本信息阐明装置、配置胜利

zookeeper 的装置

ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务

  1. 在 /usr/local/ 下创立 zookeeper ⽂件夹并进⼊.

cd /usr/local/

  1. 将 ZooKeeper 安装包解压到 /usr/local/zookeeper 中即可

tar -zxvf /root/apache-zookeeper-3.6.1-bin.tar.gz -C ./
解压完之后,/usr/local/zookeerper ⽬录中会呈现⼀个 apache-zookeeper-3.6.1-bin 的⽬录

  1. 创立 Data 目录,这⾥间接在 /usr/local/zookeeper/apache-zookeeper-3.6.1-bin ⽬录中创立⼀个 data ⽬录, 等下该 data ⽬录地址要配到 ZooKeeper 的配置⽂件中
  2. 创立配置⽂件并批改,进⼊到 zookeeper 的 conf ⽬录,复制 zoo_sample.cfg 失去 zoo.cfg:

cd conf
cp zoo_sample.cfg zoo.cfg

批改配置⽂件 zoo.cfg,将其中的 dataDir 批改为上⾯刚创立的 data ⽬录,其余选项能够按需配置

ticiktime = 2000
initiLimit = 10
synLimit = 5
datadir = /home/zcb/zookeeper/data
clientPort = 2181

  1. 启动 zooKeeper

./bin/zkServer.sh start

启动后能够通过如下命令来查看启动后的状态:

./bin/zkServer.sh status

  1. 编辑配置⽂件

vim /etc/profile

尾部加⼊ ZooKeeper 的 bin 门路配置即可

export ZOOKEEPER_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.1-bin  
export PATH=$PATH:$ZOOKEEPER_HOME/bin

最初执⾏ source /etc/profile 使环境变量⽣效即可

⾸先进⼊ /etc/rc.d/init.d ⽬录,创立⼀个名为 zookeeper 的⽂件,并赋予执⾏权限

cd /etc/rc.d/init.d/
touch zookeeper
chmod +x zookeeper

接下来编辑 zookeeper ⽂件,并在其中加⼊如下内容:

    #!/bin/bash  
    #chkconfig:- 20 90
    #description:zookeeper
    #processname:zookeeper
    ZOOKEEPER_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.1-bin
    export JAVA_HOME=/usr/local/java/jdk1.8.0_161 # 此处依据你的理论状况更换对应
    case $1 in
        start) su root $ZOOKEEPER_HOME/bin/zkServer.sh start;;
        stop) su root $ZOOKEEPER_HOME/bin/zkServer.sh stop;;
        status) su root $ZOOKEEPER_HOME/bin/zkServer.sh status;;
        restart) su root $ZOOKEEPER_HOME/bin/zkServer.sh restart;;
        *) echo "require start|stop|status|restart" ;;
    esac

Kafka 装置部署

Kafka 是最后由 Linkedin 公司开发,是一个分布式、分区的、多正本的、多订阅者,基于 zookeeper 协调的分布式日志零碎(也能够当做 MQ 零碎),常见能够用于 web/nginx 日志、拜访日志,音讯服务等等,Linkedin 于 2010 年奉献给了 Apache 基金会并成为顶级开源我的项目。因为 Kafka 的运⾏环境依赖于 ZooKeeper,所以⾸先得装置并运⾏ ZooKeeper。

  1. 筹备 KAFKA 安装包, 这⾥下载的是 2.5.0 版:kafka_2.12-2.5.0.tgz,将下载后的安装包放在了 /root ⽬录下
  2. 解压并装置,在 /usr/local/ 下创立 kafka ⽂件夹并进⼊

cd /usr/local/
mkdir kafka
cd kafka

  1. 将 Kafka 安装包解压到 /usr/local/kafka 中即可

tar -zxvf /root/kafka_2.12-2.5.0.tgz -C ./

解压完之后,/usr/local/kafka ⽬录中会呈现⼀个 kafka_2.12-2.5.0 的⽬录

  1. 创立 Logs 目录,这⾥间接在 /usr/local/kafka/kafka_2.12-2.5.0 ⽬录中创立⼀个 logs ⽬录等下该 logs ⽬录地址要配到 Kafka 的配置⽂件中。
  2. 批改配置⽂件,进⼊到 Kafka 的 config ⽬录,编辑配置⽂件 server.properties

cd config/
vim server.properties

批改配置⽂件,⼀是将其中的 log.dirs 批改为上⾯刚创立的 logs ⽬录,其余选项能够按需配置

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/home/zcb/kafka_2.12-2.5.0/logs

另外关注⼀下连贯 ZooKeeper 的相干配置,依据理论状况进⾏配置:

zooKeeper.connect = localhost:2181

zooKeeper.connection = 18000

  1. 启动 KAFKA,执行如下命令即可:

./bin/kafka-server-start.sh ./config/server.properties

如果须要后盾启动,则在./bin/kafka-server-start.sh 后加上 -daemon 参数即可

  1. 试验验证,这里要留神要先敞开防火墙

而后⾸先创立⼀个名为 demo 的 topic(对应一个音讯队列):

./bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic demo

创立实现当前,能够使⽤命令来列出⽬前已有的 topic 列表

./bin/kafka-topics.sh –list –bootstrap-server localhost:9092

接下来创立⼀个⽣产者,⽤于在 demo 这个 topic 上⽣产音讯:

./bin/kafka-console-producer.sh –bootstrap-server localhost:9092 –topic demo

⽽后接着创立⼀个消费者,⽤于在 demo 这个 topic 上获取音讯:

./bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic demo

此时在生产者中发送音讯, 能够察看是否在消费者中承受到, 若胜利承受到, 则代表装置实现

tomcat 装置

装置 tomcat 参考博客

版本为 apache-tomcat-8.5.61

简化 tomcat 启动参考博客

只看其中的第四局部:创立 SystemD 单元文件

配置文件如下:

[Unit]
Description=Tomcat Service
After=network.target

[Service]
Type=forking

Environment="JAVA_HOME=/usr/local/java/jdk1.8.0_161"
Environment="JAVA_OPTS=-Djava.security.egd=file:///dev/urandom -Djava.awt.headless=true"

Environment="CATALINA_BASE=/usr/local/apache-tomcat-8.5.61"
Environment="CATALINA_HOME=/usr/local/apache-tomcat-8.5.61"
Environment="CATALINA_PID=/usr/local/apache-tomcat-8.5.61/temp/tomcat.pid"
Environment="CATALINA_OPTS=-Xms512M -Xmx1024M -server -XX:+UseParallelGC"

ExecStart=/usr/local/apache-tomcat-8.5.61/bin/startup.sh
ExecStop=/usr/local/apache-tomcat-8.5.61/bin/shutdown.sh

[Install]
WantedBy=multi-user.target

将设置开机启动 tomcat

sudo systemctl enable tomcat

启动 tomcat

sudo systemctl start tomcat

在浏览器中拜访 localhost:8080

呈现 tomcat 页面示意 tomcat 启动胜利

查看的服务状态

sudo systemctl status tomcat

敞开 tomcat

sudo systemctl stop tomcat

nginx 装置

  1. 首先下载 nginx-1.17.10.tar.gz 安装包
  2. 解压缩安装包
cd /usr/local
mkdir nginx
cd nginx
tar zxvf /home/hsb/LinuxSoftware/nginx-1.17.10.tar.gz -C ./

之后将在 /usr/local/nginx 目录下呈现一个 nginx-1.17.10 的文件夹

  1. 装置环境依赖
apt-get install gcc
apt-get install libpcre3 libpcre3-dev
apt-get install zlib1g zlib1g-dev
# Ubuntu 的仓库中没有发现 openssl-dev,由上面 openssl 和 libssl-dev 代替
#apt-get install openssl openssl-dev
sudo apt-get install openssl 
sudo apt-get install libssl-dev
  1. 编译装置 nginx
cd nginx-1.17.10
./configure
make && make install
  1. 启动 nginx

/usr/local/nginx/sbin/nginx

进行 nginx 服务

/usr/local/nginx/sbin/nginx -s stop

批改了配置文件后想从新加载 Nginx

/usr/local/nginx/sbin/nginx -s reload

配置文件位于/usr/local/nginx/conf/nginx.conf

  1. 验证

在浏览器中输出本地 IP,呈现 Welcome to nginx! 示意装置胜利。


尝试 Spring Boot 联合 Kafka 的 Demo

通过之前的配置,相应的软件在如下地位:

Kafka    /usr/local/kafka/kafka_2.12-2.5.0
zookeeper    /usr/local/zookeeper/apache-zookeeper-3.6.1-bin
jdk        /usr/local/java/jdk1.8.0_161

我的项目配置

为了便于开发,我在 Ubuntu 虚拟机中装置了 IDEA,能够不便地创立 springboot 我的项目

我的项目配置文件 application.properties

对 kafka 和服务器端口进行配置

spring.kafka.bootstrap-servers 后是本机 IP 地址的 9092 端口

为了保障正确运行,应用 sudo ufw disable 命令 来敞开防火墙

#============== kafka ===================
# 指定 kafka 代理地址,能够多个
spring.kafka.bootstrap-servers=192.168.133.130:9092
#=============== provider  =======================
spring.kafka.producer.retries=0
# 每次批量发送音讯的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定音讯 key 和音讯体的编解码形式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer  =======================
# 指定默认消费者 group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定音讯 key 和音讯体的编解码形式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 设置服务器端口号
server.port=8085

Maven 依赖

pom 文件次要引入了 spring-web、kafka、lombok、gson 的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.graduation</groupId>
    <artifactId>kafkademo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafkademo</name>
    <description>KafakaDemo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
<!--        Lombok 是一个能够通过简略的注解模式来帮忙咱们简化打消一些必须有但显得很臃肿的 Java 代码的工具,通过应用对应的注解,能够在编译源码的时候生成对应的办法。-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
<!--        GSON 是 Google 提供的用来在 Java 对象和 JSON 数据之间进行映射的 Java 类库。能够将一个 Json 字符转成一个 Java 对象,或者将一个 Java 转化为 Json 字符串。-->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!--spark 相干依赖 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.5.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>2.4.7</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>2.4.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>2.4.7</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-configuration-processor</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

gson 能够将一个 Json 字符转成一个 Java 对象,或者将一个 Java 转化为 Json 字符串,在 Demo 中用来对要发送的音讯进行解决。

代码

我的项目构造如下:

./src/
├── main

 ├── java
 │   └── com
 │       └── graduation
 │           └── kafkademo
 │               ├── **beans**
 │               │   └── Message.java
 │               ├── **consumer**
 │                │   └── KafkaConsumer.java
 │                ├── KafkademoApplication.java
 │                └── **producer**
 │                   └── KafkaProducer.java
 └── resources

创立音讯类 Message.java,音讯生产者类 KafkaProducer.java,音讯消费者类 KafkaConsumer.java,用来尝试 kafka 的 Producer API 和 Consumer API。

Message.java 定义了要发送音讯的格局

package com.graduation.kafkademo.beans;

import lombok.Data;

import java.util.Date;

@Data
public class Message {
    private Long id;    //id

    private String msg; // 音讯

    private Date sendTime;  // 工夫戳
}

KafkaProducer.java

package com.graduation.kafkademo.producer;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.graduation.kafkademo.beans.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

@Component
@Slf4j
public class KafkaProducer{

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    // 发送音讯办法
    public void send() {Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
        kafkaTemplate.send("kafkademo", gson.toJson(message));
    }
}

Demo 发送的音讯是转换为 json 字符串的 Message 对象。

kafka 发送音讯应用 kafkaTemplate.send(“kafkademo”, gson.toJson(message))

其中 kafkademo 是 topic,在发送音讯时会主动创立该 topic,gson.toJson(message)即是要发送的音讯。

KafkaConsumer.java

package com.graduation.kafkademo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = {"kafkademo"})
    public void listen(ConsumerRecord<?, ?> record) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();

            log.info("----------------- record =" + record);
            log.info("------------------ message =" + message);
        }

    }
}

应用注解 @KafkaListener(topics = {“kafkademo”})即可让 consumer 接管音讯,其中的参数是要承受音讯的 topic,要和 KafkaProducer.java 中的 topic 雷同。

启动类 KafkademoApplication.java

package com.graduation.kafkademo;

import com.graduation.kafkademo.producer.KafkaProducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

/*
参考博客:http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/
 */

@SpringBootApplication
public class KafkademoApplication {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);

        KafkaProducer sender = context.getBean(KafkaProducer.class);

        for (int i = 0; i < 3; i++) {
            // 调用音讯发送类中的音讯发送办法
            sender.send();

            try {Thread.sleep(3000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
    }

}

在启动类中调用 KafkaProducer.java 中的办法进行测试,Demo 中发送了三次音讯。

发送音讯和生产音讯的内容将在日志中展示。

运行后果

启动 Demo

  1. 启动 zookeeper

    在 kafka 装置目录下(/usr/local/kafka/kafka_2.12-2.5.0)

    应用

    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

    启动 zookeeper 服务

  2. 启动 kafka

    在 kafka 装置目录下(/usr/local/kafka/kafka_2.12-2.5.0)

    应用

    bin/kafka-server-start.sh  config/server.properties

    启动 kafka 服务

在 IDEA 中点击运行我的项目,可在 Console 中的日志查看 Producer 和 Consumer 发送、接管音讯的过程。

2021-01-13 19:08:15.249  INFO 8461 --- [restartedMain] c.g.kafkademo.producer.KafkaProducer     : +++++++++++++++++++++  message = {"id":1610536095248,"msg":"82e876d8-7b87-43c6-903c-4413b12f0f04","sendTime":"Jan 13, 2021 7:08:15 PM"}
2021-01-13 19:08:15.257  INFO 8461 --- [ntainer#0-0-C-1] c.g.kafkademo.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = kafkademo, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1610536095250, serialized key size = -1, serialized value size = 102, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1610536095248,"msg":"82e876d8-7b87-43c6-903c-4413b12f0f04","sendTime":"Jan 13, 2021 7:08:15 PM"})
2021-01-13 19:08:15.258  INFO 8461 --- [ntainer#0-0-C-1] c.g.kafkademo.consumer.KafkaConsumer     : ------------------ message ={"id":1610536095248,"msg":"82e876d8-7b87-43c6-903c-4413b12f0f04","sendTime":"Jan 13, 2021 7:08:15 PM"}

其中一条音讯的产生和发送如上。

在 kafka 装置目录下运行命令

bin/kafka-topics.sh --list --zookeeper localhost:2181

可发现 topic kafkademo 曾经被主动创立。

正文完
 0