关于java:BlockingQueue

51次阅读

共计 10894 个字符,预计需要花费 28 分钟才能阅读完成。

引言

BlockingQueue 的性能以及常见应用场景是十分宽泛的,读者能够自行百度去理解 BlockingQueue的外围办法以及 BlockingQueue 家庭大抵有哪些成员,这里就不再班门弄斧。举荐材料:不怕难之 BlockingQueue 及其实现

案例一:实现 web 聊天性能

这里援用一个案例基于 BlockingQueue 实现 Web 中的长连贯聊天性能

BlockingQueueTest

package com.baba.bracelet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
 * @Author wulongbo
 * @Date 2020/12/1 9:16
 * @Version 1.0
 */
 public class BlockingQueueTest {public static void main(String[] args) throws InterruptedException {
        // 申明一个容量为 10 的缓存队列
 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
 //new 了三个生产者和一个消费者
 Producer producer1 = new Producer(queue);
 Producer producer2 = new Producer(queue);
 Producer producer3 = new Producer(queue);
 Consumer consumer = new Consumer(queue);
 // 借助 Executors
 ExecutorService service = Executors.newCachedThreadPool();
 // 启动线程
 service.execute(producer1);
 service.execute(producer2);
 service.execute(producer3);
 service.execute(consumer);
 // 执行 10s
 Thread.sleep(10 * 1000);
 producer1.stop();
 producer2.stop();
 producer3.stop();
 Thread.sleep(2000);
 // 退出 Executor
 service.shutdown();}
}

Producer生产者

package com.spring.security.demo;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @Author wulongbo
 * @Date 2020/12/1 9:17
 * @Version 1.0
 */
 public class Producer implements Runnable {
    private volatile boolean isRunning = true;// 是否在运行标记
 private BlockingQueue queue;// 阻塞队列
 private static AtomicInteger count = new AtomicInteger();// 自动更新的值
 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
 // 构造函数
 public Producer(BlockingQueue queue) {this.queue = queue;}
    public void run() {
        String data = null;
 Random r = new Random();
 System.out.println("启动生产者线程!");
 try {while (isRunning) {System.out.println("正在生产数据...");
 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));// 取 0~DEFAULT_RANGE_FOR_SLEEP 值的一个随机数
 data = "data:" + count.incrementAndGet();// 以原子形式将 count 以后值加 1
 System.out.println("将数据:" + data + "放入队列...");
 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {// 设定的等待时间为 2s,如果超过 2s 还没加进去返回 true
 System.out.println("放入数据失败:" + data);
 }
            }
        } catch (InterruptedException e) {e.printStackTrace();
 Thread.currentThread().interrupt();
 } finally {System.out.println("退出生产者线程!");
 }
    }
    public void stop() {isRunning = false;}
}

Consumer消费者

package com.spring.security.demo;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
 * @Author wulongbo
 * @Date 2020/12/1 9:21
 * @Version 1.0
 */
 public class Consumer implements Runnable {
    private BlockingQueue<String> queue;
 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
 // 构造函数
 public Consumer(BlockingQueue<String> queue) {this.queue = queue;}
    public void run() {System.out.println("启动消费者线程!");
 Random r = new Random();
 boolean isRunning = true;
 try {while (isRunning) {System.out.println("正从队列获取数据...");
 String data = queue.poll(2, TimeUnit.SECONDS);// 有数据时间接从队列的队首取走,无数据时阻塞,在 2s 内有数据,取走,超过 2s 还没数据,返回失败
 if (null != data) {System.out.println("拿到数据:" + data);
 System.out.println("正在生产数据:" + data);
 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
 } else {
                    // 超过 2s 还没数据,认为所有生产线程都曾经退出,主动退出生产线程。isRunning = false;
 }
            }
        } catch (InterruptedException e) {e.printStackTrace();
 Thread.currentThread().interrupt();
 } finally {System.out.println("退出消费者线程!");
 }
    }
}

以上的简略代码演示了如何应用 BlockingQueue以及它的局部外围办法的应用,读者可能很好触类旁通。

案例二:批量插入 mysql

该案例基于 springboot 集成 Mybatis-Plus 的我的项目中来阐明:读者能够参考搭建 Springboot 我的项目并集成 Mybatis-Plus javaspringboot
当初来做后期的环境筹备。
这里为了不便测试咱们退出 swagger 依赖:

<!-- 引入 swagger2 依赖 -->
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
<dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger2</artifactId>
 <version>2.9.2</version>
</dependency>
<dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger-ui</artifactId>
 <version>2.9.2</version>
</dependency>

config 目录中增加 swagger 配置类
SwaggerConfig

package com.mybatis.plus.config;
import freemarker.ext.util.ModelCache;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.ParameterBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.schema.ModelRef;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Parameter;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.ArrayList;
import java.util.List;
/**
 * @Author wulongbo
 * @Date 2020/11/21 11:50
 * @Version 1.0
 */
@Configuration
@EnableSwagger2
public class SwaggerConfig {
    /**
 * 通过 createRestApi 函数来构建一个 DocketBean
 * 函数名, 能够随便命名, 喜爱什么命名就什么命名
 */
 @Bean
 public Docket createRestApi() {return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())// 调用 apiInfo 办法, 创立一个 ApiInfo 实例, 外面是展现在文档页面信息内容
 .select()
                // 管制裸露进来的门路下的实例
 // 如果某个接口不想裸露, 能够应用以下注解
 //@ApiIgnore 这样, 该接口就不会裸露在 swagger2 的页面下
 .apis(RequestHandlerSelectors.basePackage("com.mybatis.plus"))
                .paths(PathSelectors.any())
                .build();}
    // 构建 api 文档的详细信息函数
 private ApiInfo apiInfo() {return new ApiInfoBuilder()
                // 页面题目
 .title("Spring Boot Swagger2 构建把把智能")
                // 条款地址
 .termsOfServiceUrl("http://despairyoke.github.io/")
                .contact("zwd")
                .version("1.0")
                // 形容
 .description("API 形容")
                .build();}
}

留神:swagger 的扫包门路,能力正确拜访 swagger
目录构造如下:

IUserService 中增加两个接口

void recordJob(User job);
@PostConstruct
void init();

实现类 UserServiceImpl中的代码如下:

package com.mybatis.plus.service.impl;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.mybatis.plus.entity.User;
import com.mybatis.plus.mapper.UserMapper;
import com.mybatis.plus.service.IUserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
 * <p>
 * 服务实现类
 * </p>
 *
 * @author wulongbo
 * @since 2020-11-09
 */
@Service("iUserService")
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private static Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);
 // 定义一个容量为 10000 的阻塞队列,BlockingQueue 线程平安能够多个生产者同时 put
 private BlockingQueue<User> dataQueue = new LinkedBlockingQueue<User>(10000);
 private List<User> list = new ArrayList<User>();
 @Override
 public void recordJob(User job) {
        try {
            //put 工作的办法,供生产者调用
 dataQueue.put(job);
 } catch (InterruptedException e) {Thread.currentThread().interrupt();}
    }
    @PostConstruct
 @Override public void init() {Thread thread = new Thread(() -> {logger.info("启动批量守护线程,启动工夫{}", new Date(System.currentTimeMillis()));
 while (Boolean.TRUE) {
                User poll = null;
 boolean pollTimeOut = false;
 long startTime;
 long endTime;
 try {
                    // poll 时设置超时工夫为 2 秒
 poll = dataQueue.poll(2, TimeUnit.SECONDS);
 } catch (InterruptedException e) {logger.info("批量更新 Job 异样");
 Thread.currentThread().interrupt();
 }
                if (null != poll) {
                    // poll 到工作增加到 List 中
 list.add(poll);
 } else {
                    // poll 超时,设置超时标记位
 pollTimeOut = true;
 }
                // 如果工作 List 等于 300 或 poll 超时且 List 中还有工作就批量更新
 if (list.size() == 300||
                        (pollTimeOut && !CollectionUtils.isEmpty(list))){startTime = System.currentTimeMillis();
 saveOrUpdateBatch(list);
 logger.info("Job 工作批量更新 {} 条工作, 耗时 {} 毫秒", list.size(),
 System.currentTimeMillis()-startTime);
 list.clear();}
            }
        });
 thread.setName("job-batchUpdate-deamon");
 // 设置启动的线程为守护线程 直到 jvm 停了才进行
 thread.setDaemon(true);
 thread.start();}
}

管制页面模仿一下插入操作 UserController代码如下:

package com.mybatis.plus.controller;
import com.mybatis.plus.entity.User;
import com.mybatis.plus.service.IUserService;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * <p>
 * 前端控制器
 * </p>
 *
 * @author wulongbo
 * @since 2020-11-09
 */
@RestController
@RequestMapping("/user")
@CrossOrigin
public class UserController {
    @Autowired
 private IUserService iUserService;
 @ApiOperation(value="增加用户信息", notes="增加用户信息")
    @ApiImplicitParams({@ApiImplicitParam(paramType = "query", name = "name", dataType = "String", required = true, value = "姓名"),
 @ApiImplicitParam(paramType = "query", name = "age", dataType = "int", required = true, value = "年龄"),
 @ApiImplicitParam(paramType = "query", name = "email", dataType = "String", required = true, value = "邮箱") })
    @PostMapping("/addUser")
    public boolean addUser(String name,Integer age,String email) {
        try {User user=new User();
 user.setName(name);
 user.setAge(age);
 user.setEmail(email);
 iUserService.recordJob(user);
 }catch (Exception e){return false;}
        return true;
 }
}

启动我的项目并拜访 http://localhost:8080/swagger-ui.html#/

留神检查一下:

  1. User实体类中的主键 id 是否设置成主动递增
/**
 * 主键 ID
 */@TableId(value = "id", type = IdType.AUTO)
private Long id;

2. 数据库 User 表主键是否设置主键主动递增

填写用户信息,并调用增加用户信息接口

连按多下 (作者按了 3 次),测试 BlockingQueue 是否把 User 对象都缓存到了阻塞队列中,再一次性生产掉。
能够看到控制台打印日志:

核查数据库表,查看是否胜利插入数据

OK, 证实咱们的队列失效!这里便演示了应用 BlockingQueue 在并发状况下对 mysql 做的批量操作。

案例三:mysql 之批量删除

这里读者曾经能够领会 BlockingQueue 给咱们带来的便当了,批量删除差别点就是在 对阻塞队列的定义下面 咱们应用

// 定义一个容量为 10000 的阻塞队列,BlockingQueue 线程平安能够多个生产者同时 put
private BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000);

来给生产者调用,【BlockingQueue<Long>,或者 BlockingQueue<Integer> 根据理论的主键类型来创立就好】。

同理咱们在 IUserService 中增加删除的接口

void delJob(User job);
@PostConstruct
void delInit();

再其实现类 UserServiceImpl 中增加实现办法

// 定义一个容量为 10000 的阻塞队列,BlockingQueue 线程平安能够多个生产者同时 put
private BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000);
private List<Long> delList = new ArrayList<Long>();

@Override
public void delJob(User job) {
    try {
        //put 工作的办法,供生产者调用
 delDataQueue.put(job.getId());
 } catch (InterruptedException e) {Thread.currentThread().interrupt();}
}
@PostConstruct
@Override
public void delInit() {Thread thread = new Thread(() -> {logger.info("启动批量删除守护线程,启动工夫{}", new Date(System.currentTimeMillis()));
 while (Boolean.TRUE) {
            Long poll = null;
 boolean pollTimeOut = false;
 long startTime;
 long endTime;
 try {
                // poll 时设置超时工夫为 2 秒
 poll = delDataQueue.poll(2, TimeUnit.SECONDS);
 } catch (InterruptedException e) {Thread.currentThread().interrupt();}
            if (null != poll) {
                // poll 到工作增加到 List 中
 delList.add(poll);
 } else {
                // poll 超时,设置超时标记位
 pollTimeOut = true;
 }
            // 如果工作 List 等于 5000 或 poll 超时且 List 中还有工作就批量更新
 if (delList.size() == 300||
                    (pollTimeOut && !CollectionUtils.isEmpty(delList))){startTime = System.currentTimeMillis();
 removeByIds(delList);
 logger.info("Job 工作批量删除 {} 条工作, 耗时 {} 毫秒", delList.size(),
 System.currentTimeMillis()-startTime);
 delList.clear();}
        }
    });
 thread.setName("job-batchUpdate-deamon");
 // 设置启动的线程为守护线程 直到 jvm 停了才进行
 thread.setDaemon(true);
 thread.start();}

管制页面模仿删除操作 UserController代码如下:

@ApiOperation(value="删除用户信息", notes="删除用户信息")
@ApiImplicitParams({@ApiImplicitParam(paramType = "query", name = "id", dataType = "long", required = true, value = "用户 id")})
@PostMapping("/delUser")
public boolean delUser(Long id) {
    try {User user=new User();
 user.setId(id);
 iUserService.delJob(user);
 }catch (Exception e){return false;}
    return true;
}

咱们在 swagger 参数中间断输出 6,7,8 并疾速执行删除接口

能够看到控制台输入后果

核查数据库表中记录是否曾经删除,发现的确曾经删除

至此便用 BlockingQueue 模仿了在并发状况下对数据库的批量删除操作。

正文完
 0