关于后端:quarkus依赖注入之六发布和消费事件

7次阅读

共计 11459 个字符,预计需要花费 29 分钟才能阅读完成。

欢送拜访我的 GitHub

这里分类和汇总了欣宸的全副原创 (含配套源码):https://github.com/zq2599/blog_demos

本篇概览

  • 本文是《quarkus 依赖注入》系列的第六篇,次要内容是学习事件的公布和接管
  • 如果您用过 Kafka、RabbitMQ 等消息中间件,对音讯的作用应该不会生疏,通过音讯的订阅和公布能够升高零碎之间的耦合性,这种形式也能够用在利用外部的多个模块之间,在 quarkus 框架下就是 <font color=”blue”> 事件的公布和接管 </font>
  • 本篇会演示 quarkus 利用中如何公布事件、如何接管事件,全文由以下章节形成
  • 同步事件
  • 异步事件
  • 同一种事件类,用在不同的业务场景
  • 优化
  • 事件元数据

同步事件

  • 同步事件是指事件公布后,事件接受者会在同一个线程处理事件,对事件发布者来说,相当于公布之后的代码不会立刻执行,要等到事件处理的代码执行结束后
  • 同步事件公布和承受的开发流程如下图

<img src=”https://typora-pictures-1253575040.cos.ap-guangzhou.myqcloud.com/%E6%B5%81%E7%A8%8B%E5%9B%BE%20(20).jpg” alt=” 流程图 (20)” style=”zoom:50%;” />

  • 接下来编码实际,先定义事件类 MyEvent.java,如下所示,该类有两个字段,source 示意起源,consumeNum 作为计数器能够累加
public class MyEvent {
    /**
     * 事件源
     */
    private String source;

    /**
     * 事件被生产的总次数
     */
    private AtomicInteger consumeNum;

    public MyEvent(String source) {
        this.source = source;
        consumeNum = new AtomicInteger();}

    /**
     * 事件被生产次数加一
     * @return
     */
    public int addNum() {return consumeNum.incrementAndGet();
    }

    /**
     * 获取事件被生产次数
     * @return
     */
    public int getNum() {return consumeNum.get();
    }

    @Override
    public String toString() {
        return "MyEvent{" +
                "source='" + source + '\'' +
                ", consumeNum=" + getNum() +
                '}';
    }
}
  • 而后是公布事件类,有几处要留神的中央稍后会提到
package com.bolingcavalry.event.producer;

import com.bolingcavalry.event.bean.MyEvent;
import io.quarkus.logging.Log;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.inject.Inject;

@ApplicationScoped
public class MyProducer {

    @Inject
    Event<MyEvent> event;

    /**
     * 发送同步音讯
     * @param source 音讯源
     * @return 被生产次数
     */
    public int syncProduce(String source) {MyEvent myEvent = new MyEvent("syncEvent");
        Log.infov("before sync fire, {0}", myEvent);
        event.fire(myEvent);
        Log.infov("after sync fire, {0}", myEvent);
        return myEvent.getNum();}
}
  • 上述代码有以下几点要留神:
  1. 注入 Event,用于公布事件,通过泛型指定事件类型是 <font color=”blue”>MyEvent</font>
  2. 公布同步事件很简略,调用 <font color=”blue”>fire</font> 即可
  3. 因为是同步事件,会期待事件的消费者将生产的代码执行结束后,fire 办法才会返回
  4. 如果消费者减少了 myEvent 的记数,那么 myEvent.getNum() 应该等于计数的调用次数
  • 接下来是生产事件的代码,如下所示,只有办法的入参是事件类 <font color=”blue”>MyEvent</font>,并且用 <font color=”red”>@Observes</font> 润饰该入参,即可成为 MyEvent 事件的同步消费者,这里用 sleep 来模仿执行了一个耗时的业务操作
package com.bolingcavalry.event.consumer;

import com.bolingcavalry.event.bean.MyEvent;
import io.quarkus.logging.Log;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;

@ApplicationScoped
public class MyConsumer {

    /**
     * 生产同步事件
     * @param myEvent
     */
    public void syncConsume(@Observes MyEvent myEvent) {Log.infov("receive sync event, {0}", myEvent);

        // 模仿业务执行,耗时 100 毫秒
        try {Thread.sleep(100);
        } catch (InterruptedException e) {e.printStackTrace();
        }

        // 计数加一
        myEvent.addNum();}
}
  • 最初,写单元测试类验证性能,在 MyProducer 的 syncProduce 办法中,因为是同步事件,MyConsumer.syncConsume 办法执行结束才会继续执行 event.fire 前面的代码,所以 syncProduce 的返回值应该等于 1
package com.bolingcavalry;

import com.bolingcavalry.event.consumer.MyConsumer;
import com.bolingcavalry.event.producer.MyProducer;
import com.bolingcavalry.service.HelloInstance;
import com.bolingcavalry.service.impl.HelloInstanceA;
import com.bolingcavalry.service.impl.HelloInstanceB;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import javax.enterprise.inject.Instance;
import javax.inject.Inject;

@QuarkusTest
public class EventTest {

    @Inject
    MyProducer myProducer;

    @Inject
    MyConsumer myConsumer;

    @Test
    public void testSync() {Assertions.assertEquals(1, myProducer.syncProduce("testSync"));
    }
}
  • 执行单元测试,如下所示,合乎预期,事件的发送和生产在同一线程内程序执行,另外请关注日志的工夫戳,可见 MyProducer 的第二条日志,是在 MyConsumer 日志之后的一百多毫秒,这也证实了程序执行的逻辑
  • 以上就是同步事件的相干代码,很多场景中,生产事件的操作是比拟耗时或者不太重要(例如写日志),这时候让发送事件的线程期待就不适合了,因为发送事件后可能还有其余重要的事件须要立刻去做,这就是接下来的异步事件

异步事件

  • 为了防止事件生产耗时过长对事件发送的线程造成影响,能够应用异步事件,还是用代码来阐明
  • 发送事件的代码还是写在 <font color=”blue”>MyPorducer.java</font>,如下,有两处要留神的中央稍后提到
    public int asyncProduce(String source) {MyEvent myEvent = new MyEvent(source);
        Log.infov("before async fire, {0}", myEvent);
        event.fireAsync(myEvent)
             .handleAsync((e, error) -> {if (null!=error) {Log.error("handle error", error);
                 } else {Log.infov("finish handle, {0}", myEvent);
                 }

                 return null;
             });
        Log.infov("after async fire, {0}", myEvent);
        return myEvent.getNum();}
  • 上述代码有以下两点要留神:
  1. 发送异步事件的 API 是 <font color=”red”>fireAsync</font>
  2. fireAsync 的返回值是 <font color=”red”>CompletionStage</font>,咱们能够调用其 <font color=”blue”>handleAsync</font> 办法,将响应逻辑(对事件生产后果的解决)传入,这段响应逻辑会在事件生产完结后被执行,上述代码中的响应逻辑是查看异样,若有就打印
  • 生产异步事件的代码写在 <font color=”blue”>MyConsumer</font>,与同步的相比惟一的变动就是润饰入参的注解改成了 <font color=”red”>ObservesAsync</font>
    public void aSyncConsume(@ObservesAsync MyEvent myEvent) {Log.infov("receive async event, {0}", myEvent);

        // 模仿业务执行,耗时 100 毫秒
        try {Thread.sleep(100);
        } catch (InterruptedException e) {e.printStackTrace();
        }

        // 计数加一
        myEvent.addNum();}
  • 单元测试代码,有两点须要留神,稍后会提到
    @Test
    public void testAsync() throws InterruptedException {Assertions.assertEquals(0, myProducer.asyncProduce("testAsync"));
        // 如果不期待的话,主线程完结的时候会中断正在生产事件的子线程,导致子线程报错
        Thread.sleep(150);
    }
  • 上述代码有以下两点须要留神
  1. 异步事件的时候,发送事件的线程不会期待,所以 myEvent 实例的计数器在生产线程还没来得及加一,myProducer.asyncProduce 办法就曾经执行完结了,返回值是 0,所以单元测试的 assertEquals 地位,期望值应该是 0
  2. testAsync 办法要期待 100 毫秒以上能力完结,否则过程会立刻完结,导致正在生产事件的子线程被打断,抛出异样
  • 执行单元测试,控制台输入如下图,测试通过,有三个重要信息稍后会提到
  • 上图中有三个要害信息
  1. 事件公布前后的两个日志是紧紧相连的,这证实发送事件之后不会期待生产,而是立刻继续执行发送线程的代码
  2. 生产事件的日志显示,生产逻辑是在一个新的线程中执行的
  3. 生产完结后的回调代码中也打印了日志,显示这端逻辑又在一个新的线程中执行,此线程与发送事件、生产事件都不在同一线程
  • 以上就是根底的异步音讯发送和承受操作,接下来去看略为简单的场景

同一种事件类,用在不同的业务场景

  • 构想这样一个场景:管理员发送 XXX 类型的事件,消费者应该是解决管理员事件的办法,普通用户也发送 XXX 类型的事件,消费者应该是解决普通用户事件的办法,简略的说就是同一个数据结构的事件可能用在不同场景,如下图

<img src=”https://typora-pictures-1253575040.cos.ap-guangzhou.myqcloud.com/%E6%B5%81%E7%A8%8B%E5%9B%BE%20(21).jpg” alt=” 流程图 (21)” style=”zoom:50%;” />

  • 从技术上剖析,实现上述性能的关键点是:音讯的消费者要准确过滤掉不该本人生产的音讯
  • 此刻,您是否回忆起后面文章中的一个场景:依赖注入时,如何从多个 bean 中抉择本人所需的那个,这两个问题何其相似,而依赖注入的抉择问题是用 <font color=”blue”>Qualifier</font> 注解解决的,明天的音讯场景,仍旧能够用 Qualifier 来对音讯做准确过滤,接下来编码实战
  • 首先定义事件类 ChannelEvent.java,管理员和普通用户的音讯数据都用这个类(和后面的 MyEvent 事件类的代码一样)
public class TwoChannelEvent {
    /**
     * 事件源
     */
    private String source;

    /**
     * 事件被生产的总次数
     */
    private AtomicInteger consumeNum;

    public TwoChannelEvent(String source) {
        this.source = source;
        consumeNum = new AtomicInteger();}

    /**
     * 事件被生产次数加一
     * @return
     */
    public int addNum() {return consumeNum.incrementAndGet();
    }

    /**
     * 获取事件被生产次数
     * @return
     */
    public int getNum() {return consumeNum.get();
    }

    @Override
    public String toString() {
        return "TwoChannelEvent{" +
                "source='" + source + '\'' +
                ", consumeNum=" + getNum() +
                '}';
    }
}
  • 而后就是关键点:自定义注解 <font color=”blue”>Admin</font>,这是管理员事件的过滤器,要用 <font color=”red”>Qualifier</font> 润饰
package com.bolingcavalry.annonation;

import javax.inject.Qualifier;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Qualifier
@Retention(RUNTIME)
@Target({FIELD, PARAMETER})
public @interface Admin {}
  • 自定义注解 <font color=”blue”>Normal</font>,这是普通用户事件的过滤器,要用 <font color=”red”>Qualifier</font> 润饰
@Qualifier
@Retention(RUNTIME)
@Target({FIELD, PARAMETER})
public @interface Normal {}
  • Admin 和 Normal 先用在发送事件的代码中,再用在生产事件的代码中,这样就实现了匹配,先写发送代码,有几处要留神的中央稍后会提到
@ApplicationScoped
public class TwoChannelWithTwoEvent {

    @Inject
    @Admin
    Event<TwoChannelEvent> adminEvent;

    @Inject
    @Normal
    Event<TwoChannelEvent> normalEvent;

    /**
     * 管理员音讯
     * @param source
     * @return
     */
    public int produceAdmin(String source) {TwoChannelEvent event = new TwoChannelEvent(source);
        adminEvent.fire(event);
        return event.getNum();}

    /**
     * 一般音讯
     * @param source
     * @return
     */
    public int produceNormal(String source) {TwoChannelEvent event = new TwoChannelEvent(source);
        normalEvent.fire(event);
        return event.getNum();}
}
  • 上述代码有以下两点须要留神
  1. 注入了两个 Event 实例 adminEvent 和 normalEvent,它们的类型截然不同,然而别离用 <font color=”blue”>Admin</font> 和 <font color=”red”>Normal</font>

注解润饰,相当于为它们增加了不同的标签,在生产的时候也能够用这两个注解来过滤

  1. 发送代码并无特别之处,用 adminEvent.fire 收回的事件,在生产的时候不过滤、或者用 <font color=”blue”>Admin</font> 过滤,这两种形式都能收到
  • 接下来看生产事件的代码 TwoChannelConsumer.java,有几处要留神的中央稍后会提到
@ApplicationScoped
public class TwoChannelConsumer {

    /**
     * 生产管理员事件
     * @param event
     */
    public void adminEvent(@Observes @Admin TwoChannelEvent event) {Log.infov("receive admin event, {0}", event);
        // 管理员的计数加两次,不便单元测试验证
        event.addNum();
        event.addNum();}

    /**
     * 生产普通用户事件
     * @param event
     */
    public void normalEvent(@Observes @Normal TwoChannelEvent event) {Log.infov("receive normal event, {0}", event);
        // 计数加一
        event.addNum();}

    /**
     * 如果不必注解润饰,所有 TwoChannelEvent 类型的事件都会在此被生产
     * @param event
     */
    public void allEvent(@Observes TwoChannelEvent event) {Log.infov("receive event (no Qualifier), {0}", event);
        // 计数加一
        event.addNum();}
}
  • 上述代码有以下两处须要留神
  1. 生产事件的办法,除了 <font color=”blue”>Observes</font> 注解,再带上 <font color=”red”>Admin</font>,这样此办法只会生产 Admin 润饰的 Event 收回的事件
  2. <font color=”blue”>allEvent</font> 只有 <font color=”blue”>Observes</font> 注解,这就意味着此办法不做过滤,只有是 TwoChannelEvent 类型的同步事件,它都会生产
  3. 为了不便前面的验证,在生产 Admin 事件时,计数器执行了两次,而 Normal 事件只有一次,这样两种事件的生产后果就不一样了
  • 以上就是同一事件类在多个场景被同时应用的代码了,接下来写单元测试验证
@QuarkusTest
public class EventTest {
  
    @Inject
    TwoChannelWithTwoEvent twoChannelWithTwoEvent;

    @Test
    public void testTwoChnnelWithTwoEvent() {
        // 对管理员来说,// TwoChannelConsumer.adminEvent 生产时计数加 2,// TwoChannelConsumer.allEvent 生产时计数加 1,// 所以最终计数是 3
        Assertions.assertEquals(3, twoChannelWithTwoEvent.produceAdmin("admin"));

        // 对一般人员来说,// TwoChannelConsumer.normalEvent 生产时计数加 1,// TwoChannelConsumer.allEvent 生产时计数加 1,// 所以最终计数是 2
        Assertions.assertEquals(2, twoChannelWithTwoEvent.produceNormal("normal"));
    }
}
  • 执行单元测试顺利通过,如下图

小优化,不须要注入多个 Event 实例

  • 方才的代码尽管能够失常工作,然而有一点小瑕疵:为了发送不同事件,须要注入不同的 Event 实例,如下图红框,如果事件类型越来越多,注入的 Event 实例岂不是越来越多?

<img src=”https://typora-pictures-1253575040.cos.ap-guangzhou.myqcloud.com/image-20220403170857712.png” alt=”image-20220403170857712″ style=”zoom:50%;” />

  • quarkus 提供了一种缓解上述问题的形式,再写一个发送事件的类 TwoChannelWithSingleEvent.java,代码中有两处要留神的中央稍后会提到
/**
 * @author will
 * @email zq2599@gmail.com
 * @date 2022/4/3 10:16
 * @description 用同一个事件构造体 TwoChannelEvent,别离发送不同业务类型的事件
 */
@ApplicationScoped
public class TwoChannelWithSingleEvent {

    @Inject
    Event<TwoChannelEvent> singleEvent;
    
    /**
     * 管理员音讯
     * @param source
     * @return
     */
    public int produceAdmin(String source) {TwoChannelEvent event = new TwoChannelEvent(source);

        singleEvent.select(new AnnotationLiteral<Admin>() {})
                   .fire(event);

        return event.getNum();}

    /**
     * 一般音讯
     * @param source
     * @return
     */
    public int produceNormal(String source) {TwoChannelEvent event = new TwoChannelEvent(source);

        singleEvent.select(new AnnotationLiteral<Normal>() {})
                .fire(event);

        return event.getNum();}
}
  • 上述发送音讯的代码,有以下两处须要留神
  1. 不论是 Admin 事件还是 Normal 事件,都是用 <font color=”blue”>singleEvent</font> 发送的,如此防止了事件类型越多 Event 实例越多的状况产生
  2. 执行 fire 办法发送事件前,先执行 <font color=”red”>select</font> 办法,入参是 AnnotationLiteral 的匿名子类,并且 <font color=”red”> 通过泛型指定事件类型 </font>,这和后面 TwoChannelWithTwoEvent 类发送两种类型音讯的成果是一样的
  • 既然用 select 办法过滤和后面两个 Event 实例的成果一样,那么生产事件的类就不改变了
  • 写个单元测试来验证成果
@QuarkusTest
public class EventTest {
    @Inject
    TwoChannelWithSingleEvent twoChannelWithSingleEvent;

    @Test
    public void testTwoChnnelWithSingleEvent() {
        // 对管理员来说,// TwoChannelConsumer.adminEvent 生产时计数加 2,// TwoChannelConsumer.allEvent 生产时计数加 1,// 所以最终计数是 3
        Assertions.assertEquals(3, twoChannelWithSingleEvent.produceAdmin("admin"));

        // 对一般人员来说,// TwoChannelConsumer.normalEvent 生产时计数加 1,// TwoChannelConsumer.allEvent 生产时计数加 1,// 所以最终计数是 2
        Assertions.assertEquals(2, twoChannelWithSingleEvent.produceNormal("normal"));
    }
}
  • 如下图所示,单元测试通过,也就说从消费者的视角来看,两种音讯发送形式并无区别

事件元数据

  • 在生产事件时,除了从事件对象中获得业务数据(例如 MyEvent 的 source 和 consumeNum 字段),有时还可能须要用到事件自身的信息,例如类型是 Admin 还是 Normal、Event 对象的注入点在哪里等,这些都算是事件的 <font color=”blue”> 元数据 </font>
  • 为了演示消费者如何获得事件元数据,将 TwoChannelConsumer.java 的 <font color=”blue”>allEvent</font> 办法改成上面的样子,须要留神的中央稍后会提到
public void allEvent(@Observes TwoChannelEvent event, EventMetadata eventMetadata) {Log.infov("receive event (no Qualifier), {0}", event);

        // 打印事件类型
        Log.infov("event type : {0}", eventMetadata.getType());

        // 获取该事件的所有注解
        Set<Annotation> qualifiers = eventMetadata.getQualifiers();

        // 将事件的所有注解一一打印
        if (null!=qualifiers) {qualifiers.forEach(annotation -> Log.infov("qualify : {0}", annotation));
        }

        // 计数加一
        event.addNum();}
  • 上述代码中,以下几处须要留神
  1. 给 <font color=”blue”>allEvent</font> 办法减少一个入参,类型是 EventMetadata,bean 容器会将事件的元数据设置到此参数
  2. EventMetadata 的 getType 办法能获得事件类型
  3. EventMetadata 的 getType 办法能获得事件的所有润饰注解,包含 Admin 或者 Normal
  • 运行方才的单元测试,看批改后的 allEvent 办法执行会有什么输入,如下图,红框 1 打印出事件是 TwoChannelEvent 实例,红框 2 将润饰事件的注解打印进去了,包含发送时润饰的 Admin
  • 至此,事件相干的学习和实战就实现了,过程内用事件能够无效地解除模块间的耦合,心愿本文能给您一些参考

欢送关注思否:程序员欣宸

学习路上,你不孤独,欣宸原创一路相伴 …

正文完
 0