引言
BlockingQueue
的性能以及常见应用场景是十分宽泛的,读者能够自行百度去理解 BlockingQueue
的外围办法以及BlockingQueue
家庭大抵有哪些成员,这里就不再班门弄斧。举荐材料:不怕难之BlockingQueue及其实现
案例一:实现web聊天性能
这里援用一个案例基于BlockingQueue
实现Web中的长连贯聊天性能
BlockingQueueTest
package com.baba.bracelet;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;/** * @Author wulongbo * @Date 2020/12/1 9:16 * @Version 1.0 */ public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException { // 申明一个容量为10的缓存队列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); //new了三个生产者和一个消费者 Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 借助Executors ExecutorService service = Executors.newCachedThreadPool(); // 启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 执行10s Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); // 退出Executor service.shutdown(); }}
Producer
生产者
package com.spring.security.demo;import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/** * @Author wulongbo * @Date 2020/12/1 9:17 * @Version 1.0 */ public class Producer implements Runnable { private volatile boolean isRunning = true;//是否在运行标记 private BlockingQueue queue;//阻塞队列 private static AtomicInteger count = new AtomicInteger();//自动更新的值 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; //构造函数 public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { String data = null; Random r = new Random(); System.out.println("启动生产者线程!"); try { while (isRunning) { System.out.println("正在生产数据..."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一个随机数 data = "data:" + count.incrementAndGet();//以原子形式将count以后值加1 System.out.println("将数据:" + data + "放入队列..."); if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//设定的等待时间为2s,如果超过2s还没加进去返回true System.out.println("放入数据失败:" + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出生产者线程!"); } } public void stop() { isRunning = false; }}
Consumer
消费者
package com.spring.security.demo;import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;/** * @Author wulongbo * @Date 2020/12/1 9:21 * @Version 1.0 */ public class Consumer implements Runnable { private BlockingQueue<String> queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; //构造函数 public Consumer(BlockingQueue<String> queue) { this.queue = queue; } public void run() { System.out.println("启动消费者线程!"); Random r = new Random(); boolean isRunning = true; try { while (isRunning) { System.out.println("正从队列获取数据..."); String data = queue.poll(2, TimeUnit.SECONDS);//有数据时间接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败 if (null != data) { System.out.println("拿到数据:" + data); System.out.println("正在生产数据:" + data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else { // 超过2s还没数据,认为所有生产线程都曾经退出,主动退出生产线程。 isRunning = false; } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出消费者线程!"); } }}
以上的简略代码演示了如何应用 BlockingQueue
以及它的局部外围办法的应用,读者可能很好触类旁通。
案例二:批量插入mysql
该案例基于springboot
集成Mybatis-Plus
的我的项目中来阐明:读者能够参考搭建Springboot我的项目并集成Mybatis-Plus javaspringboot
当初来做后期的环境筹备。
这里为了不便测试咱们退出swagger依赖:
<!-- 引入swagger2依赖--><!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 --><dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version></dependency><dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version></dependency>
在config
目录中增加swagger配置类SwaggerConfig
package com.mybatis.plus.config;import freemarker.ext.util.ModelCache;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import springfox.documentation.builders.ApiInfoBuilder;import springfox.documentation.builders.ParameterBuilder;import springfox.documentation.builders.PathSelectors;import springfox.documentation.builders.RequestHandlerSelectors;import springfox.documentation.schema.ModelRef;import springfox.documentation.service.ApiInfo;import springfox.documentation.service.Parameter;import springfox.documentation.spi.DocumentationType;import springfox.documentation.spring.web.plugins.Docket;import springfox.documentation.swagger2.annotations.EnableSwagger2;import java.util.ArrayList;import java.util.List;/** * @Author wulongbo * @Date 2020/11/21 11:50 * @Version 1.0 */@Configuration@EnableSwagger2public class SwaggerConfig { /** * 通过 createRestApi函数来构建一个DocketBean * 函数名,能够随便命名,喜爱什么命名就什么命名 */ @Bean public Docket createRestApi() { return new Docket(DocumentationType.SWAGGER_2) .apiInfo(apiInfo())//调用apiInfo办法,创立一个ApiInfo实例,外面是展现在文档页面信息内容 .select() //管制裸露进来的门路下的实例 //如果某个接口不想裸露,能够应用以下注解 //@ApiIgnore 这样,该接口就不会裸露在 swagger2 的页面下 .apis(RequestHandlerSelectors.basePackage("com.mybatis.plus")) .paths(PathSelectors.any()) .build(); } //构建 api文档的详细信息函数 private ApiInfo apiInfo() { return new ApiInfoBuilder() //页面题目 .title("Spring Boot Swagger2 构建把把智能") //条款地址 .termsOfServiceUrl("http://despairyoke.github.io/") .contact("zwd") .version("1.0") //形容 .description("API 形容") .build(); }}
留神:swagger的扫包门路,能力正确拜访swagger
目录构造如下:
在IUserService
中增加两个接口
void recordJob(User job);@PostConstructvoid init();
实现类 UserServiceImpl
中的代码如下:
package com.mybatis.plus.service.impl;import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;import com.mybatis.plus.entity.User;import com.mybatis.plus.mapper.UserMapper;import com.mybatis.plus.service.IUserService;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;/** * <p> * 服务实现类 * </p> * * @author wulongbo * @since 2020-11-09 */@Service("iUserService")public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService { private static Logger logger = LoggerFactory.getLogger(UserServiceImpl.class); //定义一个容量为10000的阻塞队列,BlockingQueue线程平安能够多个生产者同时put private BlockingQueue<User> dataQueue = new LinkedBlockingQueue<User>(10000); private List<User> list = new ArrayList<User>(); @Override public void recordJob(User job) { try { //put工作的办法,供生产者调用 dataQueue.put(job); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } @PostConstruct @Override public void init() { Thread thread = new Thread(() -> { logger.info("启动批量守护线程,启动工夫{}", new Date(System.currentTimeMillis())); while (Boolean.TRUE) { User poll = null; boolean pollTimeOut = false; long startTime; long endTime; try { // poll时设置超时工夫为2秒 poll = dataQueue.poll(2, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.info("批量更新Job异样"); Thread.currentThread().interrupt(); } if (null != poll) { // poll到工作增加到List中 list.add(poll); } else { // poll超时,设置超时标记位 pollTimeOut = true; } // 如果工作List等于300或poll超时且List中还有工作就批量更新 if (list.size() == 300|| (pollTimeOut && !CollectionUtils.isEmpty(list))){ startTime = System.currentTimeMillis(); saveOrUpdateBatch(list); logger.info("Job工作批量更新{}条工作,耗时{}毫秒", list.size(), System.currentTimeMillis()-startTime); list.clear(); } } }); thread.setName("job-batchUpdate-deamon"); // 设置启动的线程为守护线程 直到jvm停了才进行 thread.setDaemon(true); thread.start(); }}
管制页面模仿一下插入操作 UserController
代码如下:
package com.mybatis.plus.controller;import com.mybatis.plus.entity.User;import com.mybatis.plus.service.IUserService;import io.swagger.annotations.ApiImplicitParam;import io.swagger.annotations.ApiImplicitParams;import io.swagger.annotations.ApiOperation;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.CrossOrigin;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * <p> * 前端控制器 * </p> * * @author wulongbo * @since 2020-11-09 */@RestController@RequestMapping("/user")@CrossOriginpublic class UserController { @Autowired private IUserService iUserService; @ApiOperation(value="增加用户信息", notes="增加用户信息") @ApiImplicitParams({ @ApiImplicitParam(paramType = "query", name = "name", dataType = "String", required = true, value = "姓名"), @ApiImplicitParam(paramType = "query", name = "age", dataType = "int", required = true, value = "年龄"), @ApiImplicitParam(paramType = "query", name = "email", dataType = "String", required = true, value = "邮箱") }) @PostMapping("/addUser") public boolean addUser(String name,Integer age,String email) { try { User user=new User(); user.setName(name); user.setAge(age); user.setEmail(email); iUserService.recordJob(user); }catch (Exception e){ return false; } return true; }}
启动我的项目并拜访http://localhost:8080/swagger-ui.html#/
留神检查一下:
User
实体类中的主键id是否设置成主动递增
/** * 主键ID */@TableId(value = "id", type = IdType.AUTO)private Long id;
2.数据库User表主键是否设置主键主动递增
填写用户信息,并调用增加用户信息接口
连按多下(作者按了3次),测试 BlockingQueue
是否把User对象都缓存到了阻塞队列中,再一次性生产掉。
能够看到控制台打印日志:
核查数据库表,查看是否胜利插入数据
OK,证实咱们的队列失效!这里便演示了应用 BlockingQueue
在并发状况下对mysql做的批量操作。
案例三:mysql之批量删除
这里读者曾经能够领会 BlockingQueue
给咱们带来的便当了,批量删除差别点就是在 对阻塞队列的定义下面
咱们应用
//定义一个容量为10000的阻塞队列,BlockingQueue线程平安能够多个生产者同时putprivate BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000);
来给生产者调用,【BlockingQueue<Long>,或者BlockingQueue<Integer>根据理论的主键类型来创立就好】。
同理咱们在 IUserService
中增加删除的接口
void delJob(User job);@PostConstructvoid delInit();
再其实现类 UserServiceImpl
中增加实现办法
//定义一个容量为10000的阻塞队列,BlockingQueue线程平安能够多个生产者同时putprivate BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000);private List<Long> delList = new ArrayList<Long>();@Overridepublic void delJob(User job) { try { //put工作的办法,供生产者调用 delDataQueue.put(job.getId()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }}@PostConstruct@Overridepublic void delInit() { Thread thread = new Thread(() -> { logger.info("启动批量删除守护线程,启动工夫{}", new Date(System.currentTimeMillis())); while (Boolean.TRUE) { Long poll = null; boolean pollTimeOut = false; long startTime; long endTime; try { // poll时设置超时工夫为2秒 poll = delDataQueue.poll(2, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } if (null != poll) { // poll到工作增加到List中 delList.add(poll); } else { // poll超时,设置超时标记位 pollTimeOut = true; } // 如果工作List等于5000或poll超时且List中还有工作就批量更新 if (delList.size() == 300|| (pollTimeOut && !CollectionUtils.isEmpty(delList))){ startTime = System.currentTimeMillis(); removeByIds(delList); logger.info("Job工作批量删除{}条工作,耗时{}毫秒", delList.size(), System.currentTimeMillis()-startTime); delList.clear(); } } }); thread.setName("job-batchUpdate-deamon"); // 设置启动的线程为守护线程 直到jvm停了才进行 thread.setDaemon(true); thread.start();}
管制页面模仿删除操作 UserController
代码如下:
@ApiOperation(value="删除用户信息", notes="删除用户信息")@ApiImplicitParams({ @ApiImplicitParam(paramType = "query", name = "id", dataType = "long", required = true, value = "用户id")})@PostMapping("/delUser")public boolean delUser(Long id) { try { User user=new User(); user.setId(id); iUserService.delJob(user); }catch (Exception e){ return false; } return true;}
咱们在swagger参数中间断输出6,7,8并疾速执行删除接口
能够看到控制台输入后果
核查数据库表中记录是否曾经删除,发现的确曾经删除
至此便用 BlockingQueue
模仿了在并发状况下对数据库的批量删除操作。