共计 4425 个字符,预计需要花费 12 分钟才能阅读完成。
前言
日常开发中,咱们最罕用的汇合次要有两个,一个是 ArrayList,一个是 LinkedList
如果对 ArrayList 还有不明确的同学,能够看一下我之前写的一篇文章:Java 汇合,ArrayList 源码深刻解析
那么本篇文章,咱们次要是基于 LinkedList 写一个简略的队列。
设计思路:
首先咱们想要设计一个计划的时候,要先捋分明思路,想一下现有的,他人曾经实现的计划,而后思考本人如何能力实现。(比方 rabbitMq)
队列管理中心 :集中管理所有创立的队列
提供方 :往音讯队列中发送音讯
生产方 :监听音讯队列中的音讯,并进行生产(如果监听到队列中新放入了音讯,则主动生产解决)
第一步:实现音讯队列
咱们要明确队列所须要实现的性能,次要是发送音讯,接管音讯。
package com.dm.black.modules.myQuere;
import java.util.LinkedList;
/**
* 基于 LinkedList 实现音讯队列
* @author wjy
* @date 2021/1/20
*/
public class MQueue extends QueueCenter {private LinkedList<Object> queue = new LinkedList<>();
/**
* 留神:这里加锁是为了避免并发操作,因为 LinkedList 自身是线程不平安的
* @method 放入音讯
* @param o
* @return
*/
public boolean putMessage(Object o) {synchronized (queue) {
// 如果队列在期待,则执行唤醒
if (queue.isEmpty()) {System.out.println("唤醒队列...");
queue.notifyAll();}
// 将音讯放入队列
queue.push(o);
return true;
}
}
/**
* @method 取得音讯 (获取首条音讯并删除)
* @return
*/
public Object pollFirst() {synchronized (queue) {
// 如果队列中没有音讯,则处于梗塞状态,有音讯则进行生产
if (queue.isEmpty()) {
try {System.out.println("队列中没有数据,开始期待....");
queue.wait();
// 被唤醒后,持续往下执行
Object o = queue.pollFirst();
return o;
} catch (InterruptedException e) {e.printStackTrace();
}
} else {Object o = queue.pollFirst();
return o;
}
}
return null;
}
/**
* 取得音讯 (获取首条音讯但不删除)
* @return
*/
public Object getFrist(){synchronized (queue) {Object first = queue.getFirst();
return first;
}
}
/**
* 队列中是否存在音讯
* @return
*/
public boolean isReady() {if (!queue.isEmpty()) {return true;}
return false;
}
}
第二步:实现音讯队列管理中心
目标:为了将队列统一化治理,比方 N 个注册者,或者 N 个消费者应用雷同名称的队列时,保障操作的是同一个队列。
package com.dm.black.modules.myQuere;
import java.util.HashMap;
import java.util.Map;
/**
* Queue-center
* @author wjy
* @date 2021/1/20
*/
public class QueueCenter {
/**
* @description 这里应用 Map 作为队列管理中心
* 创立一个 queue 管理中心,所有创立的 Queue 在这里进行治理
* Map -> key : queue 名称
* Map -> value : 队列
*/
private static Map<String, MQueue> queueCenter = new HashMap<>();
/**
* @method 从 Queue-center 获取 Queue
* 加锁目标:避免同时创立雷同名称的 queue
*/
public static MQueue getQueue(String queueName) {synchronized (queueName) {
// 从 map 中依据名称获取队列,如果曾经存在,则返回 map 中的队列
MQueue queue = queueCenter.get(queueName);
// 如果是第一次创立队列,则新建队列并放入 map,而后将新建的队列返回
if (queue == null) {queue = new MQueue();
putQueue(queueName, queue);
MQueue mQueue = queueCenter.get(queueName);
return mQueue;
}
return queue;
}
}
/**
* @method 将 Queue 放入 Queue-center
* @param queueName
*/
private static void putQueue(String queueName, MQueue queue) {queueCenter.put(queueName, queue);
}
}
第三步:音讯注册
咱们要实现一个提供方,将音讯放入队列中,供生产方应用
package com.dm.black.modules.myQuere;
/**
* 注册者
* @author wjy
* @date 2021/1/20
*/
public class MProvider {
// 队列名称
private final String queueName = "demo";
/**
* 发送音讯到 'demo' 队列
* @param message
*/
public void sendMessage(String message) {
// 获取到 queue
MQueue queue = QueueCenter.getQueue(queueName);
// 放入音讯
queue.putMessage(message);
System.out.println("提供者:" + queueName + ": 发送音讯:" + message);
}
}
第四步:音讯者实现
其实到这里,音讯注册和音讯生产曾经实现了,然而咱们想要做到的是一个能够主动生产音讯的队列,所以思路是,咱们要在我的项目启动时,就将消费者处于就绪状态,提供者发送音讯后,消费者能够实时进行生产。
package com.dm.black.modules.myQuere;
import java.util.LinkedList;
/**
* 消费者
* @author wjy
* @date 2021/1/20
*/
public class MConsumer {
private final String queueName = "demo";
/**
* 接管音讯并删除
*/
public void receiveMessageAndDelete() {MQueue queue = QueueCenter.getQueue(queueName);
// 如果队列中存在音讯,则始终处于生产状态
while (true) {
// 生产音讯, 执行巴拉巴拉一大堆业务解决后删除
Object o = queue.pollFirst();
System.out.println("消费者:" + queueName + ": 接管到音讯:" + o.toString());
}
}
}
如何让消费者就处于随时生产音讯的状态呢?答案就是在我的项目启动式,就初始化消费者,让消费者实时的去监听音讯。
package com.dm.black;
import com.dm.black.modules.myQuere.MConsumer;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@MapperScan("com.dm.black.modules.*.mapper")
@SpringBootApplication
public class BlackApplication implements CommandLineRunner {public static void main(String[] args) {SpringApplication.run(BlackApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
// 启动消费者
MConsumer mConsumer = new MConsumer();
mConsumer.receiveMessageAndDelete();}
}
测试
咱们提供一个 controller 来注册音讯
package com.dm.black.modules.myQuere;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author wjy
* @date 2021/1/22
*/
@RestController
@RequestMapping("/mQueue")
public class MQueueController {@GetMapping("/sendMessage")
public String sendMessage(String message) {MProvider mProvider = new MProvider();
mProvider.sendMessage(message);
return "success";
}
}
咱们看到,当我的项目启动的时候,消费者曾经处于就绪状态,队列中没有音讯,所以处于梗塞状态,当监听到音讯后,立马工作进行生产。
咱们调用一下
看一下控制台输入
能够看到,提供者第一次注册音讯时,将队列唤醒,并注册到队列中,消费者监听到音讯,立马开始工作。
到这里咱们曾经实现了一个简易版的音讯队列,如果对大家有帮忙,心愿多多反对。
因为没有新起我的项目去做这个 Demo,所以就不提供源码了,大家自行 copy 我贴出来的代码吧