引言
BlockingQueue
的性能以及常见应用场景是十分宽泛的,读者能够自行百度去理解 BlockingQueue
的外围办法以及 BlockingQueue
家庭大抵有哪些成员,这里就不再班门弄斧。举荐材料:不怕难之 BlockingQueue 及其实现
案例一:实现 web 聊天性能
这里援用一个案例基于 BlockingQueue
实现 Web 中的长连贯聊天性能
BlockingQueueTest
package com.baba.bracelet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @Author wulongbo
* @Date 2020/12/1 9:16
* @Version 1.0
*/
public class BlockingQueueTest {public static void main(String[] args) throws InterruptedException {
// 申明一个容量为 10 的缓存队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
//new 了三个生产者和一个消费者
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
// 借助 Executors
ExecutorService service = Executors.newCachedThreadPool();
// 启动线程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
// 执行 10s
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
// 退出 Executor
service.shutdown();}
}
Producer
生产者
package com.spring.security.demo;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author wulongbo
* @Date 2020/12/1 9:17
* @Version 1.0
*/
public class Producer implements Runnable {
private volatile boolean isRunning = true;// 是否在运行标记
private BlockingQueue queue;// 阻塞队列
private static AtomicInteger count = new AtomicInteger();// 自动更新的值
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
// 构造函数
public Producer(BlockingQueue queue) {this.queue = queue;}
public void run() {
String data = null;
Random r = new Random();
System.out.println("启动生产者线程!");
try {while (isRunning) {System.out.println("正在生产数据...");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));// 取 0~DEFAULT_RANGE_FOR_SLEEP 值的一个随机数
data = "data:" + count.incrementAndGet();// 以原子形式将 count 以后值加 1
System.out.println("将数据:" + data + "放入队列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {// 设定的等待时间为 2s,如果超过 2s 还没加进去返回 true
System.out.println("放入数据失败:" + data);
}
}
} catch (InterruptedException e) {e.printStackTrace();
Thread.currentThread().interrupt();
} finally {System.out.println("退出生产者线程!");
}
}
public void stop() {isRunning = false;}
}
Consumer
消费者
package com.spring.security.demo;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @Author wulongbo
* @Date 2020/12/1 9:21
* @Version 1.0
*/
public class Consumer implements Runnable {
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
// 构造函数
public Consumer(BlockingQueue<String> queue) {this.queue = queue;}
public void run() {System.out.println("启动消费者线程!");
Random r = new Random();
boolean isRunning = true;
try {while (isRunning) {System.out.println("正从队列获取数据...");
String data = queue.poll(2, TimeUnit.SECONDS);// 有数据时间接从队列的队首取走,无数据时阻塞,在 2s 内有数据,取走,超过 2s 还没数据,返回失败
if (null != data) {System.out.println("拿到数据:" + data);
System.out.println("正在生产数据:" + data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
} else {
// 超过 2s 还没数据,认为所有生产线程都曾经退出,主动退出生产线程。isRunning = false;
}
}
} catch (InterruptedException e) {e.printStackTrace();
Thread.currentThread().interrupt();
} finally {System.out.println("退出消费者线程!");
}
}
}
以上的简略代码演示了如何应用 BlockingQueue
以及它的局部外围办法的应用,读者可能很好触类旁通。
案例二:批量插入 mysql
该案例基于 springboot
集成 Mybatis-Plus
的我的项目中来阐明:读者能够参考搭建 Springboot 我的项目并集成 Mybatis-Plus javaspringboot
当初来做后期的环境筹备。
这里为了不便测试咱们退出 swagger 依赖:
<!-- 引入 swagger2 依赖 -->
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
在 config
目录中增加 swagger 配置类SwaggerConfig
package com.mybatis.plus.config;
import freemarker.ext.util.ModelCache;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.ParameterBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.schema.ModelRef;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Parameter;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.ArrayList;
import java.util.List;
/**
* @Author wulongbo
* @Date 2020/11/21 11:50
* @Version 1.0
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
/**
* 通过 createRestApi 函数来构建一个 DocketBean
* 函数名, 能够随便命名, 喜爱什么命名就什么命名
*/
@Bean
public Docket createRestApi() {return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())// 调用 apiInfo 办法, 创立一个 ApiInfo 实例, 外面是展现在文档页面信息内容
.select()
// 管制裸露进来的门路下的实例
// 如果某个接口不想裸露, 能够应用以下注解
//@ApiIgnore 这样, 该接口就不会裸露在 swagger2 的页面下
.apis(RequestHandlerSelectors.basePackage("com.mybatis.plus"))
.paths(PathSelectors.any())
.build();}
// 构建 api 文档的详细信息函数
private ApiInfo apiInfo() {return new ApiInfoBuilder()
// 页面题目
.title("Spring Boot Swagger2 构建把把智能")
// 条款地址
.termsOfServiceUrl("http://despairyoke.github.io/")
.contact("zwd")
.version("1.0")
// 形容
.description("API 形容")
.build();}
}
留神:swagger 的扫包门路,能力正确拜访 swagger
目录构造如下:
在 IUserService
中增加两个接口
void recordJob(User job);
@PostConstruct
void init();
实现类 UserServiceImpl
中的代码如下:
package com.mybatis.plus.service.impl;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.mybatis.plus.entity.User;
import com.mybatis.plus.mapper.UserMapper;
import com.mybatis.plus.service.IUserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* <p>
* 服务实现类
* </p>
*
* @author wulongbo
* @since 2020-11-09
*/
@Service("iUserService")
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private static Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);
// 定义一个容量为 10000 的阻塞队列,BlockingQueue 线程平安能够多个生产者同时 put
private BlockingQueue<User> dataQueue = new LinkedBlockingQueue<User>(10000);
private List<User> list = new ArrayList<User>();
@Override
public void recordJob(User job) {
try {
//put 工作的办法,供生产者调用
dataQueue.put(job);
} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}
@PostConstruct
@Override public void init() {Thread thread = new Thread(() -> {logger.info("启动批量守护线程,启动工夫{}", new Date(System.currentTimeMillis()));
while (Boolean.TRUE) {
User poll = null;
boolean pollTimeOut = false;
long startTime;
long endTime;
try {
// poll 时设置超时工夫为 2 秒
poll = dataQueue.poll(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {logger.info("批量更新 Job 异样");
Thread.currentThread().interrupt();
}
if (null != poll) {
// poll 到工作增加到 List 中
list.add(poll);
} else {
// poll 超时,设置超时标记位
pollTimeOut = true;
}
// 如果工作 List 等于 300 或 poll 超时且 List 中还有工作就批量更新
if (list.size() == 300||
(pollTimeOut && !CollectionUtils.isEmpty(list))){startTime = System.currentTimeMillis();
saveOrUpdateBatch(list);
logger.info("Job 工作批量更新 {} 条工作, 耗时 {} 毫秒", list.size(),
System.currentTimeMillis()-startTime);
list.clear();}
}
});
thread.setName("job-batchUpdate-deamon");
// 设置启动的线程为守护线程 直到 jvm 停了才进行
thread.setDaemon(true);
thread.start();}
}
管制页面模仿一下插入操作 UserController
代码如下:
package com.mybatis.plus.controller;
import com.mybatis.plus.entity.User;
import com.mybatis.plus.service.IUserService;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* <p>
* 前端控制器
* </p>
*
* @author wulongbo
* @since 2020-11-09
*/
@RestController
@RequestMapping("/user")
@CrossOrigin
public class UserController {
@Autowired
private IUserService iUserService;
@ApiOperation(value="增加用户信息", notes="增加用户信息")
@ApiImplicitParams({@ApiImplicitParam(paramType = "query", name = "name", dataType = "String", required = true, value = "姓名"),
@ApiImplicitParam(paramType = "query", name = "age", dataType = "int", required = true, value = "年龄"),
@ApiImplicitParam(paramType = "query", name = "email", dataType = "String", required = true, value = "邮箱") })
@PostMapping("/addUser")
public boolean addUser(String name,Integer age,String email) {
try {User user=new User();
user.setName(name);
user.setAge(age);
user.setEmail(email);
iUserService.recordJob(user);
}catch (Exception e){return false;}
return true;
}
}
启动我的项目并拜访 http://localhost:8080/swagger-ui.html#/
留神检查一下:
User
实体类中的主键 id 是否设置成主动递增
/**
* 主键 ID
*/@TableId(value = "id", type = IdType.AUTO)
private Long id;
2. 数据库 User 表主键是否设置主键主动递增
填写用户信息,并调用增加用户信息接口
连按多下 (作者按了 3 次),测试 BlockingQueue
是否把 User 对象都缓存到了阻塞队列中,再一次性生产掉。
能够看到控制台打印日志:
核查数据库表,查看是否胜利插入数据
OK, 证实咱们的队列失效!这里便演示了应用 BlockingQueue
在并发状况下对 mysql 做的批量操作。
案例三:mysql 之批量删除
这里读者曾经能够领会 BlockingQueue
给咱们带来的便当了,批量删除差别点就是在 对阻塞队列的定义下面
咱们应用
// 定义一个容量为 10000 的阻塞队列,BlockingQueue 线程平安能够多个生产者同时 put
private BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000);
来给生产者调用,【BlockingQueue<Long>,或者 BlockingQueue<Integer> 根据理论的主键类型来创立就好】。
同理咱们在 IUserService
中增加删除的接口
void delJob(User job);
@PostConstruct
void delInit();
再其实现类 UserServiceImpl
中增加实现办法
// 定义一个容量为 10000 的阻塞队列,BlockingQueue 线程平安能够多个生产者同时 put
private BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000);
private List<Long> delList = new ArrayList<Long>();
@Override
public void delJob(User job) {
try {
//put 工作的办法,供生产者调用
delDataQueue.put(job.getId());
} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}
@PostConstruct
@Override
public void delInit() {Thread thread = new Thread(() -> {logger.info("启动批量删除守护线程,启动工夫{}", new Date(System.currentTimeMillis()));
while (Boolean.TRUE) {
Long poll = null;
boolean pollTimeOut = false;
long startTime;
long endTime;
try {
// poll 时设置超时工夫为 2 秒
poll = delDataQueue.poll(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {Thread.currentThread().interrupt();}
if (null != poll) {
// poll 到工作增加到 List 中
delList.add(poll);
} else {
// poll 超时,设置超时标记位
pollTimeOut = true;
}
// 如果工作 List 等于 5000 或 poll 超时且 List 中还有工作就批量更新
if (delList.size() == 300||
(pollTimeOut && !CollectionUtils.isEmpty(delList))){startTime = System.currentTimeMillis();
removeByIds(delList);
logger.info("Job 工作批量删除 {} 条工作, 耗时 {} 毫秒", delList.size(),
System.currentTimeMillis()-startTime);
delList.clear();}
}
});
thread.setName("job-batchUpdate-deamon");
// 设置启动的线程为守护线程 直到 jvm 停了才进行
thread.setDaemon(true);
thread.start();}
管制页面模仿删除操作 UserController
代码如下:
@ApiOperation(value="删除用户信息", notes="删除用户信息")
@ApiImplicitParams({@ApiImplicitParam(paramType = "query", name = "id", dataType = "long", required = true, value = "用户 id")})
@PostMapping("/delUser")
public boolean delUser(Long id) {
try {User user=new User();
user.setId(id);
iUserService.delJob(user);
}catch (Exception e){return false;}
return true;
}
咱们在 swagger 参数中间断输出 6,7,8 并疾速执行删除接口
能够看到控制台输入后果
核查数据库表中记录是否曾经删除,发现的确曾经删除
至此便用 BlockingQueue
模仿了在并发状况下对数据库的批量删除操作。