0.如何实现动静代理

Java实现动静代理的大抵步骤如下:

1.定义一个委托类和公共接口。

2.本人定义一个类(调用处理器类,即实现 InvocationHandler 接口),这个类的目标是指定运行时将生成的代理类须要实现的具体任务(包含Preprocess和Postprocess),即代理类调用任何办法都会通过这个调用处理器类(在本文最初一节对此进行解释)。

3.生成代理对象(当然也会生成代理类),须要为他指定(1)委托对象(2)实现的一系列接口(3)调用处理器类的实例。因而能够看出一个代理对象对应一个委托对象,对应一个调用处理器实例。

4.Java 实现动静代理次要波及以下几个类:

①java.lang.reflect.Proxy: 这是生成代理类的主类,通过 Proxy 类生成的代理类都继承了 Proxy 类,即 DynamicProxyClass extends Proxy。

②java.lang.reflect.InvocationHandler: 这里称他为"调用处理器",他是一个接口,咱们动静生成的代理类须要实现的具体内容须要本人定义一个类,而这个类必须实现 InvocationHandler 接口。

1.形象工厂模式

提供一个创立一系列相干或相互依赖对象的接口,而无须指定他们具体的类

2.工厂办法模式

是简略工厂模式的进一步形象和推广,是GoF设计模式的一种,因为应用了面向对象的多态性,工厂办法模式放弃了简略工厂模式的长处,而且克服了它的毛病

3.建造者模式

将一个简单对象的构建与它的示意拆散,使得同样的构建过程能够创立不同的示意

4.原型模式

是一种创立型设计模式,使你可能复制已有的对象,而无需使代码依赖它们所属的类

5.单例模式

是一种创立型设计模式,它能让你保障一个类只有一个实例,并提供一个拜访该实例的全局节点

「单例模式示例代码」

6.适配器模式

是一种结构型设计模式,它能使接口不兼容的对象可能互相合作

7.桥接模式

是一种结构型设计模式,可将一个大类或一系列严密相干的类拆分为形象和实现两个独立的层次结构,从而能在开发时别离应用

8.组合模式

是一种结构型设计模式,你能够应用它将对象组合成树状构造,并且能像应用独立对象一样应用他们

9.装璜者模式

是一种结构型设计模式,容许你通过将对象放入蕴含行为的非凡封装对象中来为原对象绑定新的行为

  • 几个角色:

    组件接口:

    组件接口是装璜者和被装璜者的超类或者接口,它定义了被装璜者的外围性能和装璜者须要增强的性能点

    具体组件:

    具体组件实现了组件接口的外围办法,实现某一个具体的业务逻辑,它也是被装璜的对象

    装璜者:

    实现组件接口,并持有一个具体的被装璜者对象

    具体装璜者:

    具体实现装璜的业务逻辑,即实现了被拆散的各个加强性能点。各个具体装璜者是能够互相叠加的,从而能够形成一个性能更弱小的组件对象

「装璜者模式示例代码」

10.外观模式

是一种结构型设计模式,能为程序库、框架或其余简单类提供一个简略的接口

11.享元模式

是一种结构型设计模式,它摈弃了在每个对象中保留所有数据的形式,通过共享多个对象所共有的雷同状态,让你能在无限的内存容量中载入更多对象

  • 几个角色:

    享元工厂:

    用于创立具体享元类,保护雷同的享元对象。它保障雷同的享元对象能够被零碎共享。其外部应用了相似单例模式的形式,当申请对象曾经存在时,间接返回对象,不存在时,再创建对象
    形象享元:

    定义须要共享的对象的业务接口。享元类被创立进去总是为了实现某些特定的业务逻辑,而形象享元便定义这些逻辑的语义行为

    具体享元类:

    实现形象享元类,实现某一具体逻辑

    调用类

    通过享元工厂获得享元对象

「享元模式示例代码」

12.代理模式

是一种结构型设计模式,让你可能提供对象的替代品或其占位符,代理管制着对于原对象的拜访,并容许在将申请提交给对象前后进行一些解决

「代理模式示例代码」

13.责任链模式

是一种行为型设计模式,容许你将申请沿着解决者链进行发送。收到申请后,每个解决者均可对申请进行解决,或将其传递给链上的下一个解决者

14.命令模式

是一种行为设计模式,它可将申请转换为一个蕴含在申请相干的所有信息的独立对象

15.迭代器模式

是一种行为设计模式,让你能在不裸露汇合低层表现形式(列表、栈、树等)的状况下遍历汇合中的所有元素

16.中介者模式

是一种行为设计模式,能让你缩小对象之间凌乱无序的依赖关系,该模式会限度对象之间的间接交互,迫使它们通过一个中介对象进行单干

17.备忘录模式

是一种行为设计模式,容许在不裸露对象实现细节的状况下保留和复原对象之间的状态

18.观察者模式

是一种行为设计模式,容许你定义一种订阅机制,可在对象事件产生时告诉多个“察看”该对象的其余对象

  • 几个角色

    主题接口:

    指被察看的对象。当其状态产生扭转或者某事件产生时,它会将这个变动告诉观察者。它保护了观察者所须要依赖的状态。

    具体主题:

    实现了主题接口中的办法。如新增观察者,删除观察者,告诉观察者。其外部保护一个观察者列表

    观察者接口:

    定义了观察者的根本办法。当依赖状态产生扭转时,主题接口就会调用观察者的update()办法

    具体观察者:

    实现了观察者接口的update()办法,具体解决当被观察者状态扭转或者某一个事件产生时的业务逻辑

观察者模式示例代码

19.状态模式

是一种行为设计模式,让你能在一个对象的外部状态变动时扭转其行为,使其看上去就像扭转了本身所属类一样

20.策略模式

是一种行为设计模式,能让你定义一系列算法,并将每种算法别离放入独立的类中,以使算法的对象可能相互替换

21.模版办法模式

是一种行为设计模式,它在超类中定义一个算法框架,容许子类在不批改构造的状况下重写算法的特定步骤

22.访问者模式

一种行为型设计模式,能将操作与其所作用的对象隔离开

23.单例模式代码

//1. 饿汉模式public class InstanceFactory {        // 利用动态变量来记录惟一实例,间接初始化动态变量    private static final Single instance = new Single();        //结构器私有化    private InstanceFactory() {}        public static Single getInstance() {        return this.instance;    }}//2. 懒汉模式,懒加载,双重校验锁public class InstanceFactory {        //volatile避免重排序    private volatile static Single instance;        private static final Object object = new Object();        //结构器私有化    private InstanceFactory(){}        public static Single getInstance() {        if(instance == null) {            synchronized(object) {                if (instance == null) {                    instance = new Single();                }            }        }        return instance;    }}//3. 动态外部类,懒加载public class InstanceFactory {    private static class InnerClass {        private static final Single INSTANCE = new Single();    }        private InstanceFacotry() {}        public static final Single getInstance() {        return InnerClass.INSTANCE;    } }//以上办法如果在思考反射的状况下,仍然是可能创立不同的实例的//4. 枚举实现,最佳单例实现,懒加载public enum InstanceFactory {    INSTANCE;    public InstanceFactory getInstance() {        return INSTANCE;    }}
//枚举实现单例demopublic class User {    //私有化构造函数    private User(){ }     //定义一个动态枚举类    static enum SingletonEnum{        //创立一个枚举对象,该对象天生为单例        INSTANCE;        private User user;        //私有化枚举的构造函数        private SingletonEnum(){            user=new User();        }        public User getInstnce(){            return user;        }    }     //对外裸露一个获取User对象的静态方法    public static User getInstance(){        return SingletonEnum.INSTANCE.getInstnce();    }}public class Test {    public static void main(String [] args){        System.out.println(User.getInstance());        System.out.println(User.getInstance());        System.out.println(User.getInstance()==User.getInstance());    }}//后果为true

24.代理模式示例代码

//定义一个数据库操作的接口public interface IDBQuery {    String request();}

动态代理

/** * 重量级对象,假如构建会比较慢 * @author jujun chen * @date 2020/07/25 */public class DBQuery implements IDBQuery {    public DBQuery() {        try {            //假如蕴含耗时操作            TimeUnit.SECONDS.sleep(1);        } catch (InterruptedException e) {            e.printStackTrace();        }    }    @Override    public String request() {        return "request string";    }}/** * 轻量级代理对象,只有在须要的时候才会创立实在对象 * @author jujun chen * @date 2020/07/25 */public class DBQueryProxy implements IDBQuery{    private DBQuery real;    @Override    public String request() {        if (real == null)            real = new DBQuery();        return real.request();    }}//动态代理测试public class StaticProxyTest {    public static void main(String[] args) {        IDBQuery query = new DBQueryProxy();        query.request();    }}

JDK代理

//实现办法解决类public class JdkDbQueryHandler implements InvocationHandler {    IDBQuery real = null;    @Override    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {        if (real == null)            real = new DBQuery();        return real.request();    }}//创立JDK动静代理public static IDBQuery createJdkProxy() {    IDBQuery jdkProxy = (IDBQuery) Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(),                                                          new Class[]{IDBQuery.class}, new JdkDbQueryHandler());    return jdkProxy;}

**CGLIB代理

public class CglibDbQueryInterceptor implements MethodInterceptor {    IDBQuery real = null;    @Override    public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {        if (real == null)            real = new DBQuery();        return real.request();    }}//创立办法//cglib创立动静代理public static IDBQuery createCglibProxy() {    Enhancer enhancer = new Enhancer();    enhancer.setCallback(new CglibDbQueryInterceptor());    enhancer.setInterfaces(new Class[]{IDBQuery.class});    IDBQuery cglibProxy = (IDBQuery) enhancer.create();    return cglibProxy;}

Javassist代理

工厂形式创立动静代理

public class JavassistDynDbQueryHandler implements MethodHandler {    IDBQuery real = null;    @Override    public Object invoke(Object o, Method method, Method method1, Object[] objects) throws Throwable {        if (real == null)            real = new DBQuery();        return real.request();    }}//创立办法//Javassist创立动静代理,工厂形式创立public static IDBQuery createJavassistDynProxy() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {    ProxyFactory proxyFactory = new ProxyFactory();    //指定接口    proxyFactory.setInterfaces(new Class[]{IDBQuery.class});    Class proxyClass = proxyFactory.createClass();    IDBQuery javassistProxy = (IDBQuery) proxyClass.getDeclaredConstructor().newInstance();    //设置Handler处理器    ((ProxyObject)javassistProxy).setHandler(new JavassistDynDbQueryHandler());    return javassistProxy;}

动静代码形式创立代理

//Javassist应用动静Java代码创立public static IDBQuery createJavassistBytecodeDynamicProxy() throws NotFoundException, CannotCompileException,        NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {    ClassPool mPool = new ClassPool(true);    //定义类名    CtClass mCtc = mPool.makeClass(IDBQuery.class.getName() + "JavassistBytecodeProxy");    //须要实现的接口    mCtc.addInterface(mPool.get(IDBQuery.class.getName()));    //增加构造函数    mCtc.addConstructor(CtNewConstructor.defaultConstructor(mCtc));    //增加类的字段信息,应用动静java代码    mCtc.addField(CtField.make("public " + IDBQuery.class.getName() + " real;", mCtc));    String dbqueryname = DBQuery.class.getName();    //增加办法,这里应用动静Java代码指定外部逻辑    mCtc.addMethod(CtNewMethod.make("public String request() {if(real==null)real=new " + dbqueryname + "();return real.request();}", mCtc));    //基于以上信息,生成动静类    Class pc = mCtc.toClass();    //生成动静类的实例    IDBQuery bytecodeProxy = (IDBQuery) pc.getDeclaredConstructor().newInstance();    return bytecodeProxy;}

动静代理性能比拟

//几种创立办法性能比拟//创立工夫:CGLIB > Javassist动静代码创立 > Javassist工厂类创立 > Jdk//调用工夫:Javassist工厂类创立 > Jdk > CGLIB > Javassist动静代码创立public static final int CIRCLE = 3000000;public static void main(String[] args) throws InvocationTargetException, NoSuchMethodException,InstantiationException, IllegalAccessException, NotFoundException, CannotCompileException {    IDBQuery d = null;    long begin = System.currentTimeMillis();    d = createJdkProxy(); //测试JDK动静代理    System.out.println("createJdkProxy:" + (System.currentTimeMillis() - begin));    System.out.println("JdkProxy class:" + d.getClass().getName());    begin = System.currentTimeMillis();    for (int i = 0; i < CIRCLE; i++) {        d.request();    }    System.out.println("callJdkProxy:" + (System.currentTimeMillis() - begin));    //测试CGLIB动静代理    begin = System.currentTimeMillis();    d = createCglibProxy();    System.out.println("createCGlibProxy:" + (System.currentTimeMillis() - begin));    System.out.println("CglibProxy class:" + d.getClass().getName());    begin=System.currentTimeMillis();    for (int i = 0; i < CIRCLE; i++) {        d.request();    }    System.out.println("callCglibProxy:" + (System.currentTimeMillis()-begin));    //测试Javassist动静代理    begin = System.currentTimeMillis();    d = createJavassistDynProxy();    System.out.println("createJavassistDynProxy:" + (System.currentTimeMillis() - begin));    System.out.println("JavassistDynProxy class:" + d.getClass().getName());    begin = System.currentTimeMillis();    for (int i = 0; i < CIRCLE; i++) {        d.request();    }    System.out.println("callJavassistDynProxy:" + (System.currentTimeMillis() - begin));    //测试Javassist动静java代码创立    begin = System.currentTimeMillis();    d = createJavassistBytecodeDynamicProxy();    System.out.println("createJavassistBytecodeDynamicProxy:" + (System.currentTimeMillis() - begin));    System.out.println("JavassistBytecodeDynamicProxy class:" + d.getClass().getName());    begin = System.currentTimeMillis();    for (int i = 0; i < CIRCLE; i++) {        d.request();    }    System.out.println("callJavassistBytecodeDynamicProxy:" + (System.currentTimeMillis() - begin));}

25.享元模式示例代码

/** * 享元接口,创立报表 * @author jujun chen * @date 2020/07/25 */public interface IReportManager {    public String createReport();}
/** * 具体享元类,能够继承形象享元,也能够间接实现,财务报表 * @author jujun chen * @date 2020/07/25 */public class FinancialReportManager implements IReportManager {    protected String tenantId = null;    public FinancialReportManager(String tenantId) {        this.tenantId = tenantId;    }    @Override    public String createReport() {        return "This is a financial report";    }}
/** * 具体享元类,能够继承形象享元,也能够间接实现,员工报表 * @author jujun chen * @date 2020/07/25 */public class EmployeeReportManager implements IReportManager {    protected String tenantId = null;    public EmployeeReportManager(String tenantId) {        this.tenantId = tenantId;    }    @Override    public String createReport() {        return "This is a employee report";    }}
/** * 享元工厂类,报表工厂类 * @author jujun chen * @date 2020/07/25 */public class ReportManagerFactory {    Map<String, IReportManager> financialReportManager = new HashMap<>();    Map<String, IReportManager> employeeReportManager = new HashMap<>();    //获取财务报表治理类    public IReportManager getFinancialReportManager(String tenantId) {        IReportManager r = financialReportManager.get(tenantId);        if (r == null) {            r = new FinancialReportManager(tenantId);            financialReportManager.put(tenantId, r);        }        return r;    }    //获取员工报表治理类    public IReportManager getEmployeeReportReportManager(String tenantId) {        IReportManager r = employeeReportManager.get(tenantId);        if (r == null) {            r = new EmployeeReportManager(tenantId);            employeeReportManager.put(tenantId, r);        }        return r;    }}
/** * 具体实现类 * @author jujun chen * @date 2020/07/25 */public class FlyWeightTest {    public static void main(String[] args) {        ReportManagerFactory rmf = new ReportManagerFactory();        IReportManager rm = rmf.getFinancialReportManager("A");        System.out.println(rm.createReport());    }}

26.装璜者模式示例代码

示例:当初须要将某一个后果通过HTML公布,那么首先须要将内容转化为一个HTML文本,同时因为须要在网络上传输,还须要为其减少HTTP头,我当初临时先减少这两个局部。

public interface IPacketCreator {    /**     * 用于解决内容     * @return     */    public String handleContent();}
/** * 解决HTTP主体内容 * @author jujun chen * @date 2020/07/26 */public class PacketBodyCreator implements IPacketCreator {    @Override    public String handleContent() {        //结构外围数据,但不包含格局        return "Content of Packet";    }}
/** * 保护外围组件component,负责通知子类, * 其外围业务逻辑应该全权委托component实现,本人仅做加强解决 * * @author jujun chen * @date 2020/07/26 */public abstract class PacketDecorator implements IPacketCreator {    IPacketCreator component;    public PacketDecorator(IPacketCreator c) {        component = c;    }}
/** * 解决HTML的具体装璜器 * @author jujun chen * @date 2020/07/26 */public class PacketHTMLHeaderCreator extends PacketDecorator {    public PacketHTMLHeaderCreator(IPacketCreator c) {        super(c);    }    /**     * 用于解决内容     *     * @return     */    @Override    public String handleContent() {        StringBuffer sb = new StringBuffer();        sb.append("<html>");        sb.append("<body>");        sb.append(component.handleContent());        sb.append("</body>");        sb.append("</html>\n");        return sb.toString();    }}
/** * 实现数据包HTTP头部解决 * @author jujun chen * @date 2020/07/26 */public class PacketHTTPHeaderCreator extends PacketDecorator {    public PacketHTTPHeaderCreator(IPacketCreator c) {        super(c);    }    /**     * 用于解决内容     *     * @return     */    @Override    public String handleContent() {        StringBuffer sb = new StringBuffer();        sb.append("Cache-Control:no-cache\n");        sb.append("Date:Mon,31Dec202004:25:58GMT\n");        sb.append(component.handleContent());        return sb.toString();    }}
/** * 通过层层结构,将装璜者组装到一起,加强被装璜者 * @author jujun chen * @date 2020/07/26 */public class DecoratorTest {    public static void main(String[] args) {        IPacketCreator pc = new PacketHTTPHeaderCreator((new PacketHTMLHeaderCreator(new PacketBodyCreator())));        System.out.println(pc.handleContent());    }}

27.观察者模式示例代码

应用自定义

//主题接口public interface ISubject {    //增加观察者    void attach(IObserver observer);    //删除观察者    void detach(IObserver observer);    //告诉所有观察者    void inform();}
/** * 观察者接口 * @author jujun chen * @date 2020/07/26 */public interface IObserver {    void update();}
/** * 被观察者 * @author jujun chen * @date 2020/07/26 */public class ConcreteSubject implements ISubject{    CopyOnWriteArrayList<IObserver> observers = new CopyOnWriteArrayList<>();    private int state;    public int getState() {        return state;    }    public void setState(int state) {        this.state = state;        inform();    }    @Override    public void attach(IObserver observer) {        observers.add(observer);    }    @Override    public void detach(IObserver observer) {        observers.remove(observer);    }    @Override    public void inform() {        for (IObserver observer : observers) {            observer.update();        }    }}
/** * 具体观察者 * @author jujun chen * @date 2020/07/26 */public class ConcreteObserver implements IObserver {    @Override    public void update() {        System.out.println("observer receives information");    }}
public class ObserverTest {    public static void main(String[] args) {        ConcreteSubject subject = new ConcreteSubject();        subject.attach(new ConcreteObserver());        //扭转被观察者的状态        subject.setState(1);    }}

JDK自带

java9中曾经过期

代替办法:应用PropertyChangeEvent,PropertyChangeListener

Flow类

/** * 观察者,实现Observer接口 * @author jujun chen * @date 2020/07/26 */public class ConcreteObserver implements Observer {    @Override    public void update(Observable o, Object arg) {        if (o instanceof ConcreteSubject) {            System.out.println("observer recevice info: " + arg.toString());        }    }}
/** * 被观察者,继承Observable * @author jujun chen * @date 2020/07/26 */public class ConcreteSubject extends Observable {    public void changeState(int state) {        this.setChanged();        this.notifyObservers(state);    }}
/** * 应用JDK自带观察者 * @author jujun chen * @date 2020/07/26 */public class ObserverTest {    public static void main(String[] args) {        ConcreteSubject subject = new ConcreteSubject();        //告诉程序跟注册程序相同        subject.addObserver(new ConcreteObserver());        subject.changeState(1);    }}

应用PropertyChangeListener 和 PropertyChangeSupport

/** * 观察者 * @author jujun chen * @date 2020/07/26 */public class DeviceChangeListener implements PropertyChangeListener {    @Override    public void propertyChange(PropertyChangeEvent evt) {        System.out.println(evt);    }}
/** * 被察看对象 * @author jujun chen * @date 2020/07/26 */public class DeviceInfo {    private final PropertyChangeSupport changeSupport = new PropertyChangeSupport(this);    private int state;    //减少监听    public void addPropertyChangeListener(PropertyChangeListener listener) {        changeSupport.addPropertyChangeListener(listener);    }    //移除监听    public void removePropertyChangeListener(PropertyChangeListener listener) {        changeSupport.removePropertyChangeListener(listener);    }    public int getState() {        return state;    }    public void setState(int newState) {        int oldState = this.state;        this.state = newState;        changeSupport.firePropertyChange("state", oldState, state);    }}
public class DeviceTest {    public static void main(String[] args) {        DeviceInfo d = new DeviceInfo();        d.addPropertyChangeListener(new DeviceChangeListener());        d.setState(1);    }}

应用FLow

Java9新增,详细分析见源码剖析

package design.design.flow;import java.util.concurrent.ExecutorService;import java.util.concurrent.Flow;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.Future;/** * 生产者 * @author jujun chen * @date 2020/07/27 */class OneShotPublisher implements Flow.Publisher<Boolean> {    private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based    private boolean subscribed; // true after first subscribe    @Override    public synchronized void subscribe(Flow.Subscriber<? super Boolean> subscriber) {        if (subscribed)            subscriber.onError(new IllegalStateException()); // only one allowed        else {            subscribed = true;            //调用消费者            subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));        }    }    static class OneShotSubscription implements Flow.Subscription {        private final Flow.Subscriber<? super Boolean> subscriber;        private final ExecutorService executor;        private Future<?> future; // to allow cancellation        private boolean completed;        OneShotSubscription(Flow.Subscriber<? super Boolean> subscriber,                            ExecutorService executor) {            this.subscriber = subscriber;            this.executor = executor;        }        @Override        public synchronized void request(long n) {            if (!completed) {                completed = true;                if (n <= 0) {                    IllegalArgumentException ex = new IllegalArgumentException();                    executor.execute(() -> subscriber.onError(ex));                } else {                    future = executor.submit(() -> {                        //发送数据                        subscriber.onNext(Boolean.TRUE);                        subscriber.onComplete();                    });                }            }        }        //勾销数据发送        @Override        public synchronized void cancel() {            completed = true;            if (future != null) future.cancel(false);        }    }}
package design.design.flow;import java.util.concurrent.Flow;import java.util.function.Consumer;/** * 消费者 * @author jujun chen * @date 2020/07/27 */class SampleSubscriber<T> implements Flow.Subscriber<T> {    final Consumer<? super T> consumer;    Flow.Subscription subscription;    final long bufferSize;    long count;    SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {        this.bufferSize = bufferSize;        this.consumer = consumer;    }    /**     * Publisher在被指定一个新的Subscriber时调用此办法。     * 一般来说你须要在subscriber外部保留这个subscription实例,因为前面会须要通过她向publisher     * 发送信号来实现:申请更多数据,或者勾销订阅。     */    @Override    public void onSubscribe(Flow.Subscription subscription) {        long initialRequestSize = bufferSize;        count = bufferSize - bufferSize / 2; // re-request when half consumed        (this.subscription = subscription).request(initialRequestSize);    }    /**     * 每当新的数据产生,这个办法会被调用     * @param item     */    @Override    public void onNext(T item) {        if (--count <= 0)            subscription.request(count = bufferSize - bufferSize / 2);        consumer.accept(item);    }    /**     * 当publisher出现异常时会调用subscriber的这个办法     * @param ex     */    @Override    public void onError(Throwable ex) { ex.printStackTrace(); }    /**     * 当publisher数据推送结束时会调用此办法,于是整个订阅过程完结。     */    @Override    public void onComplete() {        System.out.println("has finished!");    }}
package design.design.flow;/** * @author jujun chen * @date 2020/07/27 */public class FlowTest {    public static void main(String[] args) {        OneShotPublisher oneShotPublisher = new OneShotPublisher();        oneShotPublisher.subscribe(new SampleSubscriber<>(Integer.MAX_VALUE,System.out::println));    }}

应用SubmissionPublisher

@Testpublic void test1() {    SubmissionPublisher<Integer> submissionPublisher = new SubmissionPublisher<>();    submissionPublisher.consume(System.out::println);    submissionPublisher.submit(1);    submissionPublisher.submit(2);    submissionPublisher.offer(3, (x, y) -> {        System.out.println("xxx");        return false;    });}