关于java:Java基于LinkedList手写一个消息队列

32次阅读

共计 4425 个字符,预计需要花费 12 分钟才能阅读完成。

前言

日常开发中,咱们最罕用的汇合次要有两个,一个是 ArrayList,一个是 LinkedList

如果对 ArrayList 还有不明确的同学,能够看一下我之前写的一篇文章:Java 汇合,ArrayList 源码深刻解析

那么本篇文章,咱们次要是基于 LinkedList 写一个简略的队列。


设计思路:

首先咱们想要设计一个计划的时候,要先捋分明思路,想一下现有的,他人曾经实现的计划,而后思考本人如何能力实现。(比方 rabbitMq)

队列管理中心 :集中管理所有创立的队列

提供方 :往音讯队列中发送音讯

生产方 :监听音讯队列中的音讯,并进行生产(如果监听到队列中新放入了音讯,则主动生产解决)

第一步:实现音讯队列

咱们要明确队列所须要实现的性能,次要是发送音讯,接管音讯。

package com.dm.black.modules.myQuere;

import java.util.LinkedList;

/**
 * 基于 LinkedList 实现音讯队列
 * @author wjy
 * @date 2021/1/20
 */
public class MQueue extends QueueCenter {private LinkedList<Object> queue = new LinkedList<>();

    /**
     * 留神:这里加锁是为了避免并发操作,因为 LinkedList 自身是线程不平安的
     * @method 放入音讯
     * @param o
     * @return
     */
    public boolean putMessage(Object o) {synchronized (queue) {
            // 如果队列在期待,则执行唤醒
            if (queue.isEmpty()) {System.out.println("唤醒队列...");
                queue.notifyAll();}
            // 将音讯放入队列
            queue.push(o);
            return true;
        }
    }

    /**
     * @method 取得音讯 (获取首条音讯并删除)
     * @return
     */
    public Object pollFirst() {synchronized (queue) {
            // 如果队列中没有音讯,则处于梗塞状态,有音讯则进行生产
            if (queue.isEmpty()) {
                try {System.out.println("队列中没有数据,开始期待....");
                    queue.wait();
                    // 被唤醒后,持续往下执行
                    Object o = queue.pollFirst();
                    return o;
                } catch (InterruptedException e) {e.printStackTrace();
                }
            } else {Object o = queue.pollFirst();
                return o;
            }
        }
        return null;
    }

    /**
     * 取得音讯 (获取首条音讯但不删除)
     * @return
     */
    public Object getFrist(){synchronized (queue) {Object first = queue.getFirst();
            return first;
        }
    }

    /**
     * 队列中是否存在音讯
     * @return
     */
    public boolean isReady() {if (!queue.isEmpty()) {return true;}
        return false;
    }

} 

第二步:实现音讯队列管理中心

目标:为了将队列统一化治理,比方 N 个注册者,或者 N 个消费者应用雷同名称的队列时,保障操作的是同一个队列。

package com.dm.black.modules.myQuere;

import java.util.HashMap;
import java.util.Map;

/**
 * Queue-center
 * @author wjy
 * @date 2021/1/20
 */
public class QueueCenter {

    /**
     * @description 这里应用 Map 作为队列管理中心
     * 创立一个 queue 管理中心,所有创立的 Queue 在这里进行治理
     * Map -> key : queue 名称
     * Map -> value : 队列
     */
    private static Map<String, MQueue> queueCenter = new HashMap<>();

    /**
     * @method 从 Queue-center 获取 Queue
     * 加锁目标:避免同时创立雷同名称的 queue
     */
    public static MQueue getQueue(String queueName) {synchronized (queueName) {
            // 从 map 中依据名称获取队列,如果曾经存在,则返回 map 中的队列
            MQueue queue = queueCenter.get(queueName);
            // 如果是第一次创立队列,则新建队列并放入 map,而后将新建的队列返回
            if (queue == null) {queue = new MQueue();
                putQueue(queueName, queue);
                MQueue mQueue = queueCenter.get(queueName);
                return mQueue;
            }
            return queue;
        }
    }

    /**
     * @method 将 Queue 放入 Queue-center
     * @param queueName
     */
    private static void putQueue(String queueName, MQueue queue) {queueCenter.put(queueName, queue);
    }

} 

第三步:音讯注册

咱们要实现一个提供方,将音讯放入队列中,供生产方应用

package com.dm.black.modules.myQuere;

/**
 * 注册者
 * @author wjy
 * @date 2021/1/20
 */
public class MProvider {

    // 队列名称
    private final String queueName = "demo";

    /**
     * 发送音讯到 'demo' 队列
     * @param message
     */
    public void sendMessage(String message) {
        // 获取到 queue
        MQueue queue = QueueCenter.getQueue(queueName);
        // 放入音讯
        queue.putMessage(message);
        System.out.println("提供者:" + queueName + ": 发送音讯:" + message);
    }
} 

第四步:音讯者实现

其实到这里,音讯注册和音讯生产曾经实现了,然而咱们想要做到的是一个能够主动生产音讯的队列,所以思路是,咱们要在我的项目启动时,就将消费者处于就绪状态,提供者发送音讯后,消费者能够实时进行生产。

package com.dm.black.modules.myQuere;

import java.util.LinkedList;

/**
 * 消费者
 * @author wjy
 * @date 2021/1/20
 */
public class MConsumer {

    private final String queueName = "demo";

    /**
     * 接管音讯并删除
     */
    public void receiveMessageAndDelete() {MQueue queue = QueueCenter.getQueue(queueName);
        // 如果队列中存在音讯,则始终处于生产状态
        while (true) {
            // 生产音讯, 执行巴拉巴拉一大堆业务解决后删除
            Object o = queue.pollFirst();
            System.out.println("消费者:" + queueName + ": 接管到音讯:" + o.toString());
        }
    }

} 

如何让消费者就处于随时生产音讯的状态呢?答案就是在我的项目启动式,就初始化消费者,让消费者实时的去监听音讯。

package com.dm.black;

import com.dm.black.modules.myQuere.MConsumer;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@MapperScan("com.dm.black.modules.*.mapper")
@SpringBootApplication
public class BlackApplication implements CommandLineRunner {public static void main(String[] args) {SpringApplication.run(BlackApplication.class, args);
    }

    @Override
    public void run(String... strings) throws Exception {
        // 启动消费者
        MConsumer mConsumer = new MConsumer();
        mConsumer.receiveMessageAndDelete();}
} 

测试

咱们提供一个 controller 来注册音讯

package com.dm.black.modules.myQuere;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author wjy
 * @date 2021/1/22
 */
@RestController
@RequestMapping("/mQueue")
public class MQueueController {@GetMapping("/sendMessage")
    public String sendMessage(String message) {MProvider mProvider = new MProvider();
        mProvider.sendMessage(message);
        return "success";
    }

} 

咱们看到,当我的项目启动的时候,消费者曾经处于就绪状态,队列中没有音讯,所以处于梗塞状态,当监听到音讯后,立马工作进行生产。

咱们调用一下

看一下控制台输入

能够看到,提供者第一次注册音讯时,将队列唤醒,并注册到队列中,消费者监听到音讯,立马开始工作。

到这里咱们曾经实现了一个简易版的音讯队列,如果对大家有帮忙,心愿多多反对。

因为没有新起我的项目去做这个 Demo,所以就不提供源码了,大家自行 copy 我贴出来的代码吧

正文完
 0