关于java:Spring-boot结合Kafka的Demo

欢送关注自己集体博客https://huangshubin.gitee.io,浏览体验更佳!
文章链接KafkaDemo

KafkaDemo参考博客:http://www.54tianzhisheng.cn/…

Spark装置:厦大数据库实验室Spark装置

Scala装置:厦大数据库实验室Scala装置

相应软件的装置

环境:
jdk 1.8.0_161

scala 2.12.12

spark 2.4.7

maven 3.6.3

zookeeper 3.6.1

kafka_2.12-2.5.0

apache-tomcat-8.5.61

nginx-1.17.10.tar.gz

jdk的装置

  1. 在 /usr/local/ 下创立 java ⽂件夹并进⼊。

cd /usr/local/

mkdir java

cd java

  1. 将上⾯筹备好的 JDK 安装包解压到 /usr/local/java 中即可

tar -zxvf /root/jdk-8u161-linux-x64.tar.gz -C ./

解压完之后, /usr/local/java ⽬录中会呈现⼀个 jdk1.8.0_161 的⽬录

  1. 编辑 /etc/profile ⽂件,在⽂件尾部加⼊如下 JDK 环境配置即可
JAVA_HOME=/usr/local/java/jdk1.8.0_161  
CLASSPATH=$JAVA_HOME/lib/  
PATH=$PATH:$JAVA_HOME/bin  
export PATH JAVA_HOME CLASSPATH

而后执⾏如下命令让环境变量⽣效

source /etc/profile

  1. 输⼊如下命令即可查看装置后果:
java -version  
javac

Maven装置

  1. 这⾥下载的是 apache-maven-3.6.3-bin.tar.gz 安装包,并将其搁置于提前创立好的 /opt/maven
    ⽬录下并解压。
tar zxvf apache-maven-3.6.3-bin.tar.gz
  1. 配置Maven阿里减速镜像源

编辑批改 /opt/maven/apache-maven-3.6.3/conf/settings.xml
⽂件,在 <mirrors></mirrors> 标签对⾥增加如下内容即可:

<mirror>
    <id>alimaven</id>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    <mirrorOf>central</mirrorOf>
</mirror>
  1. 配置环境变量

    编辑批改 /etc/profile ⽂件,在⽂件尾部增加如下内容,配置 maven 的装置门路

    export MAVEN_HOME=/opt/maven/apache-maven-3.6.3
    export PATH=$MAVEN_HOME/bin:$PATH

    接下来执行 source /etc/profile 来刷新环境变量,让 maven 环境的门路配置失效

  2. 执⾏ mvn –v ,能打印出 maven 版本信息阐明装置、配置胜利

zookeeper的装置

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务

  1. 在 /usr/local/ 下创立zookeeper ⽂件夹并进⼊.

cd /usr/local/

  1. 将 ZooKeeper 安装包解压到 /usr/local/zookeeper 中即可

tar -zxvf /root/apache-zookeeper-3.6.1-bin.tar.gz -C ./
解压完之后, /usr/local/zookeerper ⽬录中会呈现⼀个 apache-zookeeper-3.6.1-bin 的⽬录

  1. 创立Data目录,这⾥间接在 /usr/local/zookeeper/apache-zookeeper-3.6.1-bin ⽬录中创立⼀个 data ⽬录,等下该 data ⽬录地址要配到 ZooKeeper 的配置⽂件中
  2. 创立配置⽂件并批改,进⼊到 zookeeper 的 conf ⽬录,复制 zoo_sample.cfg 失去 zoo.cfg :

cd conf
cp zoo_sample.cfg zoo.cfg

批改配置⽂件 zoo.cfg ,将其中的 dataDir 批改为上⾯刚创立的 data ⽬录,其余选项能够按需配置

ticiktime = 2000
initiLimit = 10
synLimit = 5
datadir = /home/zcb/zookeeper/data
clientPort = 2181

  1. 启动zooKeeper

./bin/zkServer.sh start

启动后能够通过如下命令来查看启动后的状态:

./bin/zkServer.sh status

  1. 编辑配置⽂件

vim /etc/profile

尾部加⼊ ZooKeeper 的 bin 门路配置即可

export ZOOKEEPER_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.1-bin  
export PATH=$PATH:$ZOOKEEPER_HOME/bin

最初执⾏ source /etc/profile 使环境变量⽣效即可

⾸先进⼊ /etc/rc.d/init.d ⽬录,创立⼀个名为 zookeeper 的⽂件,并赋予执⾏权限

cd /etc/rc.d/init.d/
touch zookeeper
chmod +x zookeeper

接下来编辑 zookeeper ⽂件,并在其中加⼊如下内容:

    #!/bin/bash  
    #chkconfig:- 20 90
    #description:zookeeper
    #processname:zookeeper
    ZOOKEEPER_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.1-bin
    export JAVA_HOME=/usr/local/java/jdk1.8.0_161 # 此处依据你的理论状况更换对应
    case $1 in
        start) su root $ZOOKEEPER_HOME/bin/zkServer.sh start;;
        stop) su root $ZOOKEEPER_HOME/bin/zkServer.sh stop;;
        status) su root $ZOOKEEPER_HOME/bin/zkServer.sh status;;
        restart) su root $ZOOKEEPER_HOME/bin/zkServer.sh restart;;
        *) echo "require start|stop|status|restart" ;;
    esac

Kafka装置部署

Kafka是最后由Linkedin公司开发,是一个分布式、分区的、多正本的、多订阅者,基于zookeeper协调的分布式日志零碎(也能够当做MQ零碎),常见能够用于web/nginx日志、拜访日志,音讯服务等等,Linkedin于2010年奉献给了Apache基金会并成为顶级开源我的项目。因为 Kafka 的运⾏环境依赖于 ZooKeeper ,所以⾸先得装置并运⾏ ZooKeeper 。

  1. 筹备KAFKA安装包,这⾥下载的是 2.5.0 版: kafka_2.12-2.5.0.tgz ,将下载后的安装包放在了 /root ⽬录下
  2. 解压并装置,在 /usr/local/ 下创立 kafka ⽂件夹并进⼊

cd /usr/local/
mkdir kafka
cd kafka

  1. 将Kafka安装包解压到 /usr/local/kafka 中即可

tar -zxvf /root/kafka_2.12-2.5.0.tgz -C ./

解压完之后, /usr/local/kafka ⽬录中会呈现⼀个 kafka_2.12-2.5.0 的⽬录

  1. 创立Logs目录,这⾥间接在 /usr/local/kafka/kafka_2.12-2.5.0 ⽬录中创立⼀个 logs ⽬录等下该 logs ⽬录地址要配到Kafka的配置⽂件中。
  2. 批改配置⽂件,进⼊到 Kafka 的 config ⽬录,编辑配置⽂件 server.properties

cd config/
vim server.properties

批改配置⽂件,⼀是将其中的 log.dirs 批改为上⾯刚创立的 logs ⽬录,其余选项能够按需配置

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/home/zcb/kafka_2.12-2.5.0/logs

另外关注⼀下连贯 ZooKeeper 的相干配置,依据理论状况进⾏配置:

zooKeeper.connect = localhost:2181

zooKeeper.connection = 18000

  1. 启动KAFKA,执行如下命令即可:

./bin/kafka-server-start.sh ./config/server.properties

如果须要后盾启动,则在./bin/kafka-server-start.sh后加上 -daemon 参数即可

  1. 试验验证,这里要留神要先敞开防火墙

而后⾸先创立⼀个名为 demo 的 topic(对应一个音讯队列) :

./bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic demo

创立实现当前,能够使⽤命令来列出⽬前已有的 topic 列表

./bin/kafka-topics.sh –list –bootstrap-server localhost:9092

接下来创立⼀个⽣产者,⽤于在demo 这个 topic 上⽣产音讯:

./bin/kafka-console-producer.sh –bootstrap-server localhost:9092 –topic demo

⽽后接着创立⼀个消费者,⽤于在 demo 这个 topic 上获取音讯:

./bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic demo

此时在生产者中发送音讯,能够察看是否在消费者中承受到,若胜利承受到,则代表装置实现

tomcat装置

装置tomcat参考博客

版本为 apache-tomcat-8.5.61

简化tomcat启动参考博客

只看其中的第四局部:创立SystemD单元文件

配置文件如下:

[Unit]
Description=Tomcat Service
After=network.target

[Service]
Type=forking

Environment="JAVA_HOME=/usr/local/java/jdk1.8.0_161"
Environment="JAVA_OPTS=-Djava.security.egd=file:///dev/urandom -Djava.awt.headless=true"

Environment="CATALINA_BASE=/usr/local/apache-tomcat-8.5.61"
Environment="CATALINA_HOME=/usr/local/apache-tomcat-8.5.61"
Environment="CATALINA_PID=/usr/local/apache-tomcat-8.5.61/temp/tomcat.pid"
Environment="CATALINA_OPTS=-Xms512M -Xmx1024M -server -XX:+UseParallelGC"

ExecStart=/usr/local/apache-tomcat-8.5.61/bin/startup.sh
ExecStop=/usr/local/apache-tomcat-8.5.61/bin/shutdown.sh

[Install]
WantedBy=multi-user.target

将设置开机启动tomcat

sudo systemctl enable tomcat

启动tomcat

sudo systemctl start tomcat

在浏览器中拜访localhost:8080

呈现tomcat页面示意tomcat启动胜利

查看的服务状态

sudo systemctl status tomcat

敞开tomcat

sudo systemctl stop tomcat

nginx装置

  1. 首先下载nginx-1.17.10.tar.gz安装包
  2. 解压缩安装包
cd /usr/local
mkdir nginx
cd nginx
tar zxvf /home/hsb/LinuxSoftware/nginx-1.17.10.tar.gz -C ./

之后将在/usr/local/nginx目录下呈现一个nginx-1.17.10的文件夹

  1. 装置环境依赖
apt-get install gcc
apt-get install libpcre3 libpcre3-dev
apt-get install zlib1g zlib1g-dev
# Ubuntu的仓库中没有发现openssl-dev,由上面openssl和libssl-dev代替
#apt-get install openssl openssl-dev
sudo apt-get install openssl 
sudo apt-get install libssl-dev
  1. 编译装置nginx
cd nginx-1.17.10
./configure
make && make install
  1. 启动nginx

/usr/local/nginx/sbin/nginx

进行nginx服务

/usr/local/nginx/sbin/nginx -s stop

批改了配置文件后想从新加载Nginx

/usr/local/nginx/sbin/nginx -s reload

配置文件位于/usr/local/nginx/conf/nginx.conf

  1. 验证

在浏览器中输出本地IP,呈现Welcome to nginx!示意装置胜利。


尝试Spring Boot联合Kafka的Demo

通过之前的配置,相应的软件在如下地位:

Kafka    /usr/local/kafka/kafka_2.12-2.5.0
zookeeper    /usr/local/zookeeper/apache-zookeeper-3.6.1-bin
jdk        /usr/local/java/jdk1.8.0_161

我的项目配置

为了便于开发,我在Ubuntu虚拟机中装置了IDEA,能够不便地创立springboot我的项目

我的项目配置文件 application.properties

对kafka和服务器端口进行配置

spring.kafka.bootstrap-servers后是本机IP地址的9092端口

为了保障正确运行,应用sudo ufw disable命令来敞开防火墙

#============== kafka ===================
# 指定kafka 代理地址,能够多个
spring.kafka.bootstrap-servers=192.168.133.130:9092
#=============== provider  =======================
spring.kafka.producer.retries=0
# 每次批量发送音讯的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定音讯key和音讯体的编解码形式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer  =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定音讯key和音讯体的编解码形式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 设置服务器端口号
server.port=8085

Maven依赖

pom文件次要引入了spring-web、kafka、lombok、gson的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.graduation</groupId>
    <artifactId>kafkademo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafkademo</name>
    <description>KafakaDemo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
<!--        Lombok是一个能够通过简略的注解模式来帮忙咱们简化打消一些必须有但显得很臃肿的Java代码的工具,通过应用对应的注解,能够在编译源码的时候生成对应的办法。-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
<!--        GSON是Google提供的用来在Java对象和JSON数据之间进行映射的Java类库。能够将一个Json字符转成一个Java对象,或者将一个Java转化为Json字符串。-->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!--spark相干依赖-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.5.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>2.4.7</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>2.4.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>2.4.7</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-configuration-processor</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

gson能够将一个Json字符转成一个Java对象,或者将一个Java转化为Json字符串,在Demo中用来对要发送的音讯进行解决。

代码

我的项目构造如下:

./src/
├── main

 ├── java
 │   └── com
 │       └── graduation
 │           └── kafkademo
 │               ├── **beans**
 │               │   └── Message.java
 │               ├── **consumer**
 │                │   └── KafkaConsumer.java
 │                ├── KafkademoApplication.java
 │                └── **producer**
 │                   └── KafkaProducer.java
 └── resources

创立音讯类Message.java,音讯生产者类KafkaProducer.java,音讯消费者类KafkaConsumer.java,用来尝试kafka的Producer API和Consumer API。

Message.java定义了要发送音讯的格局

package com.graduation.kafkademo.beans;

import lombok.Data;

import java.util.Date;

@Data
public class Message {
    private Long id;    //id

    private String msg; //音讯

    private Date sendTime;  //工夫戳
}

KafkaProducer.java

package com.graduation.kafkademo.producer;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.graduation.kafkademo.beans.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

@Component
@Slf4j
public class KafkaProducer{

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    //发送音讯办法
    public void send() {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
        kafkaTemplate.send("kafkademo", gson.toJson(message));
    }
}

Demo发送的音讯是转换为json字符串的Message对象。

kafka发送音讯应用kafkaTemplate.send(“kafkademo”, gson.toJson(message))

其中kafkademo是topic,在发送音讯时会主动创立该topic,gson.toJson(message)即是要发送的音讯。

KafkaConsumer.java

package com.graduation.kafkademo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = {"kafkademo"})
    public void listen(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();

            log.info("----------------- record =" + record);
            log.info("------------------ message =" + message);
        }

    }
}

应用注解@KafkaListener(topics = {“kafkademo”})即可让consumer接管音讯,其中的参数是要承受音讯的topic,要和KafkaProducer.java 中的topic雷同。

启动类 KafkademoApplication.java

package com.graduation.kafkademo;

import com.graduation.kafkademo.producer.KafkaProducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

/*
参考博客:http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/
 */

@SpringBootApplication
public class KafkademoApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);

        KafkaProducer sender = context.getBean(KafkaProducer.class);

        for (int i = 0; i < 3; i++) {
            //调用音讯发送类中的音讯发送办法
            sender.send();

            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

在启动类中调用KafkaProducer.java中的办法进行测试,Demo中发送了三次音讯。

发送音讯和生产音讯的内容将在日志中展示。

运行后果

启动Demo

  1. 启动zookeeper

    在kafka装置目录下(/usr/local/kafka/kafka_2.12-2.5.0)

    应用

    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

    启动zookeeper服务

  2. 启动kafka

    在kafka装置目录下(/usr/local/kafka/kafka_2.12-2.5.0)

    应用

    bin/kafka-server-start.sh  config/server.properties

    启动kafka服务

在IDEA中点击运行我的项目,可在Console中的日志查看Producer和Consumer发送、接管音讯的过程。

2021-01-13 19:08:15.249  INFO 8461 --- [  restartedMain] c.g.kafkademo.producer.KafkaProducer     : +++++++++++++++++++++  message = {"id":1610536095248,"msg":"82e876d8-7b87-43c6-903c-4413b12f0f04","sendTime":"Jan 13, 2021 7:08:15 PM"}
2021-01-13 19:08:15.257  INFO 8461 --- [ntainer#0-0-C-1] c.g.kafkademo.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = kafkademo, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1610536095250, serialized key size = -1, serialized value size = 102, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1610536095248,"msg":"82e876d8-7b87-43c6-903c-4413b12f0f04","sendTime":"Jan 13, 2021 7:08:15 PM"})
2021-01-13 19:08:15.258  INFO 8461 --- [ntainer#0-0-C-1] c.g.kafkademo.consumer.KafkaConsumer     : ------------------ message ={"id":1610536095248,"msg":"82e876d8-7b87-43c6-903c-4413b12f0f04","sendTime":"Jan 13, 2021 7:08:15 PM"}

其中一条音讯的产生和发送如上。

在kafka装置目录下运行命令

bin/kafka-topics.sh --list --zookeeper localhost:2181

可发现topic kafkademo曾经被主动创立。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理