欢送关注自己集体博客https://huangshubin.gitee.io,浏览体验更佳!
文章链接KafkaDemo
KafkaDemo参考博客:http://www.54tianzhisheng.cn/...
Spark装置:厦大数据库实验室Spark装置
Scala装置:厦大数据库实验室Scala装置
相应软件的装置
环境:
jdk 1.8.0_161scala 2.12.12
spark 2.4.7
maven 3.6.3
zookeeper 3.6.1
kafka_2.12-2.5.0
apache-tomcat-8.5.61
nginx-1.17.10.tar.gz
jdk的装置
- 在 /usr/local/ 下创立 java ⽂件夹并进⼊。
cd /usr/local/mkdir java
cd java
- 将上⾯筹备好的 JDK 安装包解压到 /usr/local/java 中即可
tar -zxvf /root/jdk-8u161-linux-x64.tar.gz -C ./
解压完之后, /usr/local/java ⽬录中会呈现⼀个 jdk1.8.0_161 的⽬录
- 编辑 /etc/profile ⽂件,在⽂件尾部加⼊如下 JDK 环境配置即可
JAVA_HOME=/usr/local/java/jdk1.8.0_161 CLASSPATH=$JAVA_HOME/lib/ PATH=$PATH:$JAVA_HOME/bin export PATH JAVA_HOME CLASSPATH
而后执⾏如下命令让环境变量⽣效
source /etc/profile
- 输⼊如下命令即可查看装置后果:
java -version javac
Maven装置
- 这⾥下载的是
apache-maven-3.6.3-bin.tar.gz
安装包,并将其搁置于提前创立好的/opt/maven
⽬录下并解压。
tar zxvf apache-maven-3.6.3-bin.tar.gz
- 配置Maven阿里减速镜像源
编辑批改 /opt/maven/apache-maven-3.6.3/conf/settings.xml
⽂件,在 <mirrors></mirrors>
标签对⾥增加如下内容即可:
<mirror> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <mirrorOf>central</mirrorOf></mirror>
配置环境变量
编辑批改
/etc/profile
⽂件,在⽂件尾部增加如下内容,配置 maven 的装置门路export MAVEN_HOME=/opt/maven/apache-maven-3.6.3export PATH=$MAVEN_HOME/bin:$PATH
接下来执行
source /etc/profile
来刷新环境变量,让 maven 环境的门路配置失效- 执⾏
mvn –v
,能打印出 maven 版本信息阐明装置、配置胜利
zookeeper的装置
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务
- 在 /usr/local/ 下创立zookeeper ⽂件夹并进⼊.
cd /usr/local/
- 将 ZooKeeper 安装包解压到 /usr/local/zookeeper 中即可
tar -zxvf /root/apache-zookeeper-3.6.1-bin.tar.gz -C ./
解压完之后, /usr/local/zookeerper ⽬录中会呈现⼀个 apache-zookeeper-3.6.1-bin 的⽬录
- 创立Data目录,这⾥间接在 /usr/local/zookeeper/apache-zookeeper-3.6.1-bin ⽬录中创立⼀个 data ⽬录,等下该 data ⽬录地址要配到 ZooKeeper 的配置⽂件中
- 创立配置⽂件并批改,进⼊到 zookeeper 的 conf ⽬录,复制 zoo_sample.cfg 失去 zoo.cfg :
cd conf
cp zoo_sample.cfg zoo.cfg
批改配置⽂件 zoo.cfg ,将其中的 dataDir 批改为上⾯刚创立的 data ⽬录,其余选项能够按需配置
ticiktime = 2000
initiLimit = 10
synLimit = 5
datadir = /home/zcb/zookeeper/data
clientPort = 2181
- 启动zooKeeper
./bin/zkServer.sh start
启动后能够通过如下命令来查看启动后的状态:
./bin/zkServer.sh status
- 编辑配置⽂件
vim /etc/profile
尾部加⼊ ZooKeeper 的 bin 门路配置即可
export ZOOKEEPER_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.1-bin export PATH=$PATH:$ZOOKEEPER_HOME/bin
最初执⾏ source /etc/profile 使环境变量⽣效即可
⾸先进⼊ /etc/rc.d/init.d ⽬录,创立⼀个名为 zookeeper 的⽂件,并赋予执⾏权限
cd /etc/rc.d/init.d/
touch zookeeper
chmod +x zookeeper
接下来编辑 zookeeper ⽂件,并在其中加⼊如下内容:
#!/bin/bash #chkconfig:- 20 90 #description:zookeeper #processname:zookeeper ZOOKEEPER_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.1-bin export JAVA_HOME=/usr/local/java/jdk1.8.0_161 # 此处依据你的理论状况更换对应 case $1 in start) su root $ZOOKEEPER_HOME/bin/zkServer.sh start;; stop) su root $ZOOKEEPER_HOME/bin/zkServer.sh stop;; status) su root $ZOOKEEPER_HOME/bin/zkServer.sh status;; restart) su root $ZOOKEEPER_HOME/bin/zkServer.sh restart;; *) echo "require start|stop|status|restart" ;; esac
Kafka装置部署
Kafka是最后由Linkedin公司开发,是一个分布式、分区的、多正本的、多订阅者,基于zookeeper协调的分布式日志零碎(也能够当做MQ零碎),常见能够用于web/nginx日志、拜访日志,音讯服务等等,Linkedin于2010年奉献给了Apache基金会并成为顶级开源我的项目。因为 Kafka 的运⾏环境依赖于 ZooKeeper ,所以⾸先得装置并运⾏ ZooKeeper 。
- 筹备KAFKA安装包,这⾥下载的是 2.5.0 版: kafka_2.12-2.5.0.tgz ,将下载后的安装包放在了 /root ⽬录下
- 解压并装置,在 /usr/local/ 下创立 kafka ⽂件夹并进⼊
cd /usr/local/
mkdir kafka
cd kafka
- 将Kafka安装包解压到 /usr/local/kafka 中即可
tar -zxvf /root/kafka_2.12-2.5.0.tgz -C ./
解压完之后, /usr/local/kafka ⽬录中会呈现⼀个 kafka_2.12-2.5.0 的⽬录
- 创立Logs目录,这⾥间接在 /usr/local/kafka/kafka_2.12-2.5.0 ⽬录中创立⼀个 logs ⽬录等下该 logs ⽬录地址要配到Kafka的配置⽂件中。
- 批改配置⽂件,进⼊到 Kafka 的 config ⽬录,编辑配置⽂件 server.properties
cd config/
vim server.properties
批改配置⽂件,⼀是将其中的 log.dirs 批改为上⾯刚创立的 logs ⽬录,其余选项能够按需配置
############################# Log Basics ############################## A comma separated list of directories under which to store log fileslog.dirs=/home/zcb/kafka_2.12-2.5.0/logs
另外关注⼀下连贯 ZooKeeper 的相干配置,依据理论状况进⾏配置:
zooKeeper.connect = localhost:2181zooKeeper.connection = 18000
- 启动KAFKA,执行如下命令即可:
./bin/kafka-server-start.sh ./config/server.properties如果须要后盾启动,则在./bin/kafka-server-start.sh后加上 -daemon 参数即可
- 试验验证,这里要留神要先敞开防火墙
而后⾸先创立⼀个名为 demo 的 topic(对应一个音讯队列) :
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic demo
创立实现当前,能够使⽤命令来列出⽬前已有的 topic 列表
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
接下来创立⼀个⽣产者,⽤于在demo 这个 topic 上⽣产音讯:
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic demo
⽽后接着创立⼀个消费者,⽤于在 demo 这个 topic 上获取音讯:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo
此时在生产者中发送音讯,能够察看是否在消费者中承受到,若胜利承受到,则代表装置实现
tomcat装置
装置tomcat参考博客
版本为 apache-tomcat-8.5.61
简化tomcat启动参考博客
只看其中的第四局部:创立SystemD单元文件
配置文件如下:
[Unit]Description=Tomcat ServiceAfter=network.target[Service]Type=forkingEnvironment="JAVA_HOME=/usr/local/java/jdk1.8.0_161"Environment="JAVA_OPTS=-Djava.security.egd=file:///dev/urandom -Djava.awt.headless=true"Environment="CATALINA_BASE=/usr/local/apache-tomcat-8.5.61"Environment="CATALINA_HOME=/usr/local/apache-tomcat-8.5.61"Environment="CATALINA_PID=/usr/local/apache-tomcat-8.5.61/temp/tomcat.pid"Environment="CATALINA_OPTS=-Xms512M -Xmx1024M -server -XX:+UseParallelGC"ExecStart=/usr/local/apache-tomcat-8.5.61/bin/startup.shExecStop=/usr/local/apache-tomcat-8.5.61/bin/shutdown.sh[Install]WantedBy=multi-user.target
将设置开机启动tomcat
sudo systemctl enable tomcat
启动tomcat
sudo systemctl start tomcat
在浏览器中拜访localhost:8080
呈现tomcat页面示意tomcat启动胜利
查看的服务状态
sudo systemctl status tomcat
敞开tomcat
sudo systemctl stop tomcat
nginx装置
- 首先下载nginx-1.17.10.tar.gz安装包
- 解压缩安装包
cd /usr/localmkdir nginxcd nginxtar zxvf /home/hsb/LinuxSoftware/nginx-1.17.10.tar.gz -C ./
之后将在/usr/local/nginx目录下呈现一个nginx-1.17.10的文件夹
- 装置环境依赖
apt-get install gccapt-get install libpcre3 libpcre3-devapt-get install zlib1g zlib1g-dev# Ubuntu的仓库中没有发现openssl-dev,由上面openssl和libssl-dev代替#apt-get install openssl openssl-devsudo apt-get install openssl sudo apt-get install libssl-dev
- 编译装置nginx
cd nginx-1.17.10./configuremake && make install
- 启动nginx
/usr/local/nginx/sbin/nginx
进行nginx服务
/usr/local/nginx/sbin/nginx -s stop
批改了配置文件后想从新加载Nginx
/usr/local/nginx/sbin/nginx -s reload
配置文件位于/usr/local/nginx/conf/nginx.conf
- 验证
在浏览器中输出本地IP,呈现Welcome to nginx!
示意装置胜利。
尝试Spring Boot联合Kafka的Demo
通过之前的配置,相应的软件在如下地位:
Kafka /usr/local/kafka/kafka_2.12-2.5.0zookeeper /usr/local/zookeeper/apache-zookeeper-3.6.1-binjdk /usr/local/java/jdk1.8.0_161
我的项目配置
为了便于开发,我在Ubuntu虚拟机中装置了IDEA,能够不便地创立springboot我的项目
我的项目配置文件 application.properties
对kafka和服务器端口进行配置
spring.kafka.bootstrap-servers后是本机IP地址的9092端口
为了保障正确运行,应用sudo ufw disable命令来敞开防火墙
#============== kafka ===================# 指定kafka 代理地址,能够多个spring.kafka.bootstrap-servers=192.168.133.130:9092#=============== provider =======================spring.kafka.producer.retries=0# 每次批量发送音讯的数量spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432# 指定音讯key和音讯体的编解码形式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#=============== consumer =======================# 指定默认消费者group idspring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100# 指定音讯key和音讯体的编解码形式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 设置服务器端口号server.port=8085
Maven依赖
pom文件次要引入了spring-web、kafka、lombok、gson的依赖
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.graduation</groupId> <artifactId>kafkademo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafkademo</name> <description>KafakaDemo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency><!-- Lombok是一个能够通过简略的注解模式来帮忙咱们简化打消一些必须有但显得很臃肿的Java代码的工具,通过应用对应的注解,能够在编译源码的时候生成对应的办法。--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency><!-- GSON是Google提供的用来在Java对象和JSON数据之间进行映射的Java类库。能够将一个Json字符转成一个Java对象,或者将一个Java转化为Json字符串。--> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> <!--spark相干依赖--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.5.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>2.4.7</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>2.4.7</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>2.4.7</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build></project>
gson能够将一个Json字符转成一个Java对象,或者将一个Java转化为Json字符串,在Demo中用来对要发送的音讯进行解决。
代码
我的项目构造如下:
./src/
├── main
├── java │ └── com │ └── graduation │ └── kafkademo │ ├── **beans** │ │ └── Message.java │ ├── **consumer** │ │ └── KafkaConsumer.java │ ├── KafkademoApplication.java │ └── **producer** │ └── KafkaProducer.java └── resources
创立音讯类Message.java,音讯生产者类KafkaProducer.java,音讯消费者类KafkaConsumer.java,用来尝试kafka的Producer API和Consumer API。
Message.java定义了要发送音讯的格局
package com.graduation.kafkademo.beans;import lombok.Data;import java.util.Date;@Datapublic class Message { private Long id; //id private String msg; //音讯 private Date sendTime; //工夫戳}
KafkaProducer.java
package com.graduation.kafkademo.producer;import com.google.gson.Gson;import com.google.gson.GsonBuilder;import com.graduation.kafkademo.beans.Message;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;import java.util.Date;import java.util.UUID;@Component@Slf4jpublic class KafkaProducer{ @Autowired private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); //发送音讯办法 public void send() { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("+++++++++++++++++++++ message = {}", gson.toJson(message)); kafkaTemplate.send("kafkademo", gson.toJson(message)); }}
Demo发送的音讯是转换为json字符串的Message对象。
kafka发送音讯应用kafkaTemplate.send("kafkademo", gson.toJson(message))
其中kafkademo是topic,在发送音讯时会主动创立该topic,gson.toJson(message)即是要发送的音讯。
KafkaConsumer.java
package com.graduation.kafkademo.consumer;import lombok.extern.slf4j.Slf4j;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;import java.util.Optional;@Component@Slf4jpublic class KafkaConsumer { @KafkaListener(topics = {"kafkademo"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("----------------- record =" + record); log.info("------------------ message =" + message); } }}
应用注解@KafkaListener(topics = {"kafkademo"})即可让consumer接管音讯,其中的参数是要承受音讯的topic,要和KafkaProducer.java 中的topic雷同。
启动类 KafkademoApplication.java
package com.graduation.kafkademo;import com.graduation.kafkademo.producer.KafkaProducer;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.ConfigurableApplicationContext;/*参考博客:http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/ */@SpringBootApplicationpublic class KafkademoApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args); KafkaProducer sender = context.getBean(KafkaProducer.class); for (int i = 0; i < 3; i++) { //调用音讯发送类中的音讯发送办法 sender.send(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }}
在启动类中调用KafkaProducer.java中的办法进行测试,Demo中发送了三次音讯。
发送音讯和生产音讯的内容将在日志中展示。
运行后果
启动Demo
启动zookeeper
在kafka装置目录下(/usr/local/kafka/kafka_2.12-2.5.0)
应用
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动zookeeper服务
启动kafka
在kafka装置目录下(/usr/local/kafka/kafka_2.12-2.5.0)
应用
bin/kafka-server-start.sh config/server.properties
启动kafka服务
在IDEA中点击运行我的项目,可在Console中的日志查看Producer和Consumer发送、接管音讯的过程。
2021-01-13 19:08:15.249 INFO 8461 --- [ restartedMain] c.g.kafkademo.producer.KafkaProducer : +++++++++++++++++++++ message = {"id":1610536095248,"msg":"82e876d8-7b87-43c6-903c-4413b12f0f04","sendTime":"Jan 13, 2021 7:08:15 PM"}2021-01-13 19:08:15.257 INFO 8461 --- [ntainer#0-0-C-1] c.g.kafkademo.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = kafkademo, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1610536095250, serialized key size = -1, serialized value size = 102, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1610536095248,"msg":"82e876d8-7b87-43c6-903c-4413b12f0f04","sendTime":"Jan 13, 2021 7:08:15 PM"})2021-01-13 19:08:15.258 INFO 8461 --- [ntainer#0-0-C-1] c.g.kafkademo.consumer.KafkaConsumer : ------------------ message ={"id":1610536095248,"msg":"82e876d8-7b87-43c6-903c-4413b12f0f04","sendTime":"Jan 13, 2021 7:08:15 PM"}
其中一条音讯的产生和发送如上。
在kafka装置目录下运行命令
bin/kafka-topics.sh --list --zookeeper localhost:2181
可发现topic kafkademo曾经被主动创立。