引言

BlockingQueue 的性能以及常见应用场景是十分宽泛的,读者能够自行百度去理解 BlockingQueue的外围办法以及BlockingQueue家庭大抵有哪些成员,这里就不再班门弄斧。举荐材料:不怕难之BlockingQueue及其实现

案例一:实现web聊天性能

这里援用一个案例基于BlockingQueue实现Web中的长连贯聊天性能

BlockingQueueTest

package com.baba.bracelet;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;/** * @Author wulongbo * @Date 2020/12/1 9:16 * @Version 1.0 */ public class BlockingQueueTest {    public static void main(String[] args) throws InterruptedException {        // 申明一个容量为10的缓存队列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); //new了三个生产者和一个消费者 Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 借助Executors ExecutorService service = Executors.newCachedThreadPool(); // 启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 执行10s Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); // 退出Executor service.shutdown(); }}

Producer生产者

package com.spring.security.demo;import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/** * @Author wulongbo * @Date 2020/12/1 9:17 * @Version 1.0 */ public class Producer implements Runnable {    private volatile boolean isRunning = true;//是否在运行标记 private BlockingQueue queue;//阻塞队列 private static AtomicInteger count = new AtomicInteger();//自动更新的值 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; //构造函数 public Producer(BlockingQueue queue) {        this.queue = queue; }    public void run() {        String data = null; Random r = new Random(); System.out.println("启动生产者线程!"); try {            while (isRunning) {                System.out.println("正在生产数据..."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一个随机数 data = "data:" + count.incrementAndGet();//以原子形式将count以后值加1 System.out.println("将数据:" + data + "放入队列..."); if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//设定的等待时间为2s,如果超过2s还没加进去返回true System.out.println("放入数据失败:" + data); }            }        } catch (InterruptedException e) {            e.printStackTrace(); Thread.currentThread().interrupt(); } finally {            System.out.println("退出生产者线程!"); }    }    public void stop() {        isRunning = false; }}

Consumer消费者

package com.spring.security.demo;import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;/** * @Author wulongbo * @Date 2020/12/1 9:21 * @Version 1.0 */ public class Consumer implements Runnable {    private BlockingQueue<String> queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; //构造函数 public Consumer(BlockingQueue<String> queue) {        this.queue = queue; }    public void run() {        System.out.println("启动消费者线程!"); Random r = new Random(); boolean isRunning = true; try {            while (isRunning) {                System.out.println("正从队列获取数据..."); String data = queue.poll(2, TimeUnit.SECONDS);//有数据时间接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败 if (null != data) {                    System.out.println("拿到数据:" + data); System.out.println("正在生产数据:" + data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else {                    // 超过2s还没数据,认为所有生产线程都曾经退出,主动退出生产线程。 isRunning = false; }            }        } catch (InterruptedException e) {            e.printStackTrace(); Thread.currentThread().interrupt(); } finally {            System.out.println("退出消费者线程!"); }    }}

以上的简略代码演示了如何应用 BlockingQueue以及它的局部外围办法的应用,读者可能很好触类旁通。

案例二:批量插入mysql

该案例基于springboot集成Mybatis-Plus的我的项目中来阐明:读者能够参考搭建Springboot我的项目并集成Mybatis-Plus javaspringboot
当初来做后期的环境筹备。
这里为了不便测试咱们退出swagger依赖:

<!-- 引入swagger2依赖--><!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 --><dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version></dependency><dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version></dependency>

config目录中增加swagger配置类
SwaggerConfig

package com.mybatis.plus.config;import freemarker.ext.util.ModelCache;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import springfox.documentation.builders.ApiInfoBuilder;import springfox.documentation.builders.ParameterBuilder;import springfox.documentation.builders.PathSelectors;import springfox.documentation.builders.RequestHandlerSelectors;import springfox.documentation.schema.ModelRef;import springfox.documentation.service.ApiInfo;import springfox.documentation.service.Parameter;import springfox.documentation.spi.DocumentationType;import springfox.documentation.spring.web.plugins.Docket;import springfox.documentation.swagger2.annotations.EnableSwagger2;import java.util.ArrayList;import java.util.List;/** * @Author wulongbo * @Date 2020/11/21 11:50 * @Version 1.0 */@Configuration@EnableSwagger2public class SwaggerConfig {    /** * 通过 createRestApi函数来构建一个DocketBean * 函数名,能够随便命名,喜爱什么命名就什么命名 */ @Bean public Docket createRestApi() {        return new Docket(DocumentationType.SWAGGER_2)                .apiInfo(apiInfo())//调用apiInfo办法,创立一个ApiInfo实例,外面是展现在文档页面信息内容 .select()                //管制裸露进来的门路下的实例 //如果某个接口不想裸露,能够应用以下注解 //@ApiIgnore 这样,该接口就不会裸露在 swagger2 的页面下 .apis(RequestHandlerSelectors.basePackage("com.mybatis.plus"))                .paths(PathSelectors.any())                .build(); }    //构建 api文档的详细信息函数 private ApiInfo apiInfo() {        return new ApiInfoBuilder()                //页面题目 .title("Spring Boot Swagger2 构建把把智能")                //条款地址 .termsOfServiceUrl("http://despairyoke.github.io/")                .contact("zwd")                .version("1.0")                //形容 .description("API 形容")                .build(); }}

留神:swagger的扫包门路,能力正确拜访swagger
目录构造如下:

IUserService中增加两个接口

void recordJob(User job);@PostConstructvoid init();

实现类 UserServiceImpl中的代码如下:

package com.mybatis.plus.service.impl;import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;import com.mybatis.plus.entity.User;import com.mybatis.plus.mapper.UserMapper;import com.mybatis.plus.service.IUserService;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;/** * <p> * 服务实现类 * </p> * * @author wulongbo * @since 2020-11-09 */@Service("iUserService")public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {    private static Logger logger = LoggerFactory.getLogger(UserServiceImpl.class); //定义一个容量为10000的阻塞队列,BlockingQueue线程平安能够多个生产者同时put private BlockingQueue<User> dataQueue = new LinkedBlockingQueue<User>(10000); private List<User> list = new ArrayList<User>(); @Override public void recordJob(User job) {        try {            //put工作的办法,供生产者调用 dataQueue.put(job); } catch (InterruptedException e) {            Thread.currentThread().interrupt(); }    }    @PostConstruct @Override public void init() {        Thread thread = new Thread(() -> {            logger.info("启动批量守护线程,启动工夫{}", new Date(System.currentTimeMillis())); while (Boolean.TRUE) {                User poll = null; boolean pollTimeOut = false; long startTime; long endTime; try {                    // poll时设置超时工夫为2秒 poll = dataQueue.poll(2, TimeUnit.SECONDS); } catch (InterruptedException e) {                    logger.info("批量更新Job异样"); Thread.currentThread().interrupt(); }                if (null != poll) {                    // poll到工作增加到List中 list.add(poll); } else {                    // poll超时,设置超时标记位 pollTimeOut = true; }                // 如果工作List等于300或poll超时且List中还有工作就批量更新 if (list.size() == 300||                        (pollTimeOut && !CollectionUtils.isEmpty(list))){                    startTime = System.currentTimeMillis(); saveOrUpdateBatch(list); logger.info("Job工作批量更新{}条工作,耗时{}毫秒", list.size(), System.currentTimeMillis()-startTime); list.clear(); }            }        }); thread.setName("job-batchUpdate-deamon"); // 设置启动的线程为守护线程 直到jvm停了才进行 thread.setDaemon(true); thread.start(); }}

管制页面模仿一下插入操作 UserController代码如下:

package com.mybatis.plus.controller;import com.mybatis.plus.entity.User;import com.mybatis.plus.service.IUserService;import io.swagger.annotations.ApiImplicitParam;import io.swagger.annotations.ApiImplicitParams;import io.swagger.annotations.ApiOperation;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.CrossOrigin;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * <p> * 前端控制器 * </p> * * @author wulongbo * @since 2020-11-09 */@RestController@RequestMapping("/user")@CrossOriginpublic class UserController {    @Autowired private IUserService iUserService; @ApiOperation(value="增加用户信息", notes="增加用户信息")    @ApiImplicitParams({            @ApiImplicitParam(paramType = "query", name = "name", dataType = "String", required = true, value = "姓名"), @ApiImplicitParam(paramType = "query", name = "age", dataType = "int", required = true, value = "年龄"), @ApiImplicitParam(paramType = "query", name = "email", dataType = "String", required = true, value = "邮箱") })    @PostMapping("/addUser")    public boolean addUser(String name,Integer age,String email) {        try {            User user=new User(); user.setName(name); user.setAge(age); user.setEmail(email); iUserService.recordJob(user); }catch (Exception e){            return false; }        return true; }}

启动我的项目并拜访http://localhost:8080/swagger-ui.html#/

留神检查一下:

  1. User实体类中的主键id是否设置成主动递增
/** * 主键ID */@TableId(value = "id", type = IdType.AUTO)private Long id;

2.数据库User表主键是否设置主键主动递增

填写用户信息,并调用增加用户信息接口

连按多下(作者按了3次),测试 BlockingQueue是否把User对象都缓存到了阻塞队列中,再一次性生产掉。
能够看到控制台打印日志:

核查数据库表,查看是否胜利插入数据

OK,证实咱们的队列失效!这里便演示了应用 BlockingQueue 在并发状况下对mysql做的批量操作。

案例三:mysql之批量删除

这里读者曾经能够领会 BlockingQueue 给咱们带来的便当了,批量删除差别点就是在 对阻塞队列的定义下面 咱们应用

//定义一个容量为10000的阻塞队列,BlockingQueue线程平安能够多个生产者同时putprivate BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000);

来给生产者调用,【BlockingQueue<Long>,或者BlockingQueue<Integer>根据理论的主键类型来创立就好】。

同理咱们在 IUserService 中增加删除的接口

void delJob(User job);@PostConstructvoid delInit();

再其实现类 UserServiceImpl 中增加实现办法

//定义一个容量为10000的阻塞队列,BlockingQueue线程平安能够多个生产者同时putprivate BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000);private List<Long> delList = new ArrayList<Long>();@Overridepublic void delJob(User job) {    try {        //put工作的办法,供生产者调用 delDataQueue.put(job.getId()); } catch (InterruptedException e) {        Thread.currentThread().interrupt(); }}@PostConstruct@Overridepublic void delInit() {    Thread thread = new Thread(() -> {        logger.info("启动批量删除守护线程,启动工夫{}", new Date(System.currentTimeMillis())); while (Boolean.TRUE) {            Long poll = null; boolean pollTimeOut = false; long startTime; long endTime; try {                // poll时设置超时工夫为2秒 poll = delDataQueue.poll(2, TimeUnit.SECONDS); } catch (InterruptedException e) {                Thread.currentThread().interrupt(); }            if (null != poll) {                // poll到工作增加到List中 delList.add(poll); } else {                // poll超时,设置超时标记位 pollTimeOut = true; }            // 如果工作List等于5000或poll超时且List中还有工作就批量更新 if (delList.size() == 300||                    (pollTimeOut && !CollectionUtils.isEmpty(delList))){                startTime = System.currentTimeMillis(); removeByIds(delList); logger.info("Job工作批量删除{}条工作,耗时{}毫秒", delList.size(), System.currentTimeMillis()-startTime); delList.clear(); }        }    }); thread.setName("job-batchUpdate-deamon"); // 设置启动的线程为守护线程 直到jvm停了才进行 thread.setDaemon(true); thread.start();}

管制页面模仿删除操作 UserController代码如下:

@ApiOperation(value="删除用户信息", notes="删除用户信息")@ApiImplicitParams({        @ApiImplicitParam(paramType = "query", name = "id", dataType = "long", required = true, value = "用户id")})@PostMapping("/delUser")public boolean delUser(Long id) {    try {        User user=new User(); user.setId(id); iUserService.delJob(user); }catch (Exception e){        return false; }    return true;}

咱们在swagger参数中间断输出6,7,8并疾速执行删除接口

能够看到控制台输入后果

核查数据库表中记录是否曾经删除,发现的确曾经删除

至此便用 BlockingQueue 模仿了在并发状况下对数据库的批量删除操作。