Spring事件机制

avatar
作者
猴君
阅读量:0

文章目录

一、Spring事件

Spring Event(Application Event)其实就是一个观察者设计模式,一个 Bean 处理完成任务后希望通知其它 Bean 或者说一个 Bean 想观察监听另一个Bean 的行为。

Spring的事件(Application Event)为Bean和Bean之间的消息同步提供了支持。当一个Bean处理完成一个任务之后,希望另外一个Bean知道并能做相应的处理,这时我们就需要让另外一个Bean监听当前Bean所发生的事件


Spring的事件需要遵循如下流程:

  1. 自定义事件,继承ApplicationEvent
  2. 定义事件监听器
  3. 使用容器发布事件

二、实现Spring事件

1、自定义事件

自定义一个事件类,继承ApplicationEvent。该事件可以被ApplicationContext通过publishEvent方法进行发送

public class DemoEvent extends ApplicationEvent {     private String msg;      public DemoEvent(Object source, String msg) {         super(source);         this.msg = msg;     }      public String getMsg() {         return msg;     }      public void setMsg(String msg) {         this.msg = msg;     } } 

2、事件监听器

监听器有三种实现方式

  • 实现ApplicationListener接口
  • 使用@EventListener注解
  • 使用@TransactionalEventListener注解

2.1 实现ApplicationListener接口

新建一个类实现 ApplicationListener 接口,并且重写 onApplicationEvent 方法,然后交给Spring管理

@Component public class DemoListener implements ApplicationListener<DemoEvent> {     //实现ApplicationListener接口,并指定监听的事件类型      @Override     public void onApplicationEvent(DemoEvent event) { //使用onApplicationEvent方法对消息进行接受处理          String msg = event.getMsg();         System.out.println("DemoListener获取到了监听消息:" + msg);     } } 

2.2 @EventListener

将处理事件的方法使用 @EventListener 注解标记,此时 Spring将创建一个ApplicationListener的bean对象,使用给定的方法处理事件,参数可以指定处理的事件类型,以及处理条件。

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface EventListener {     @AliasFor("classes") Class<?>[] value() default {};      @AliasFor("value") Class<?>[] classes() default {};      String condition() default ""; } 
@Component public class DemoListener {     @EventListener(DemoEvent.class)     public void sendMsg(DemoEvent event) {         String msg = event.getMsg();         System.out.println("DemoListener获取到了监听消息:" + msg);     } } 

2.3 @TransactionalEventListener

@EventListener@TransactionalEventListener 都是 Spring提供的注解,用于处理事件。主要区别在于处理事件的时间和事务的关联性。

  • @EventListener:可以应用于任何方法,使得该方法成为一个事件监听器。当一个事件被发布时,所有标记为 @EventListener 的方法都会被调用,无论当前是否存在一个活动的事务。 使用@EventListener 注解的方法可能在事务提交之前或之后被调用。
  • @TransactionalEventListener:该注解允许更精细地控制事件监听器在事务处理过程中的执行时机。@TransactionalEventListener 默认在当前事务提交后才处理事件(TransactionPhase.AFTER_COMMIT),这可以确保事件处理器只在事务成功提交后才被调用。也可以通过 phase 属性来改变事件处理的时机,例如在事务开始前、事务提交前、事务提交后或者事务回滚
@Component public class DemoListener {     @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, value = DemoEvent.class)     public void messageListener(DemoEvent event) {         String msg = event.getMsg();         System.out.println("DemoListener获取到了监听消息:" + msg);     } } 

3、事件发布

通过spring上下文对象ApplicationContextpublishEvent方法即可发布事件

@Component public class DemoPublisher {     @Autowired     private ApplicationContext applicationContext; //注入ApplicationContext用来发布事件           public void publish(String msg) {         applicationContext.publishEvent(new DemoEvent(this, msg));  // 使用ApplicationContext对象的publishEvent发布事件     } } 

4、异步使用

如果要开启异步,需要在启动类增加 @EnableAsync 注解,Listener 类需要开启异步的方法增加 @Async 注解

@Async的原理是通过 Spring AOP 动态代理 的方式来实现的。Spring 容器启动初始化bean时,判断类中是否使用了@Async注解,如果使用了则为其创建切入点和切入点处理器,根据切入点创建代理,在线程调用@Async注解标注的方法时,会调用代理,执行切入点处理器invoke方法,将方法的执行提交给线程池中的另外一个线程来处理,从而实现了异步执行。



三、EventBus

  1. EventBus是一个轻量级的发布/订阅模式的应用模式,相比于各种 MQ 中间件更加简洁、轻量,它可以在单体非分布式的小型应用模块内部使用。
  2. 我们也可以把它和 MQ 中间件结合起来使用,使用 EventBus 作为当前应用程序接收 MQ 消息的统一入口,然后应用内部基于 EventBus 进行分发订阅,以达到高内聚低耦合的目的(当应用内部需要消费多种不同 MQ 中间件消息时,不需要在当前应用的好多不同代码位置都编写 MQ 消费代码)

1、事件模式

EventBus 默认为同步调用,同一个 EventBus 中注册的多个订阅处理,再事件下发后是被总线串行逐个调用的,如果其中一个方法占用事件较长,则同一个 EventBus 中的其他事件处于等待状态,且发送消息事件的代码调用处也是同步调用等待的状态。同一个 EventBus 对象,不仅仅在同一个 post 调用中串行执行,在多次并发 post 调用时,多个 post 调用之间也是串行等待执行的关系

在这里插入图片描述

  • 同步事件模式:同步事件模式下,事件的触发和事件的处理在同一个线程中同步处理
  • 异步事件模式:异步事件模式下,事件的触发和事件的处理在不同的线程中,事件的处理在一个线程池中

2、EventBus三要素

  • Event 事件,它可以是任意类型
  • Subscriber 事件订阅者,需要加上注解@Subscribe(),并且指定线程模型,默认是POSTING
  • Publisher 事件的发布者。我们可以在任意线程里发布事件,调用post(Object)方法即可

3、同步事件

3.1 定义事件类

public class MessageEvent {     private String message;      public String getMessage() {         return message;     }      public void setMessage(String message) {         this.message = message;     }      @Override     public String toString() {         return "MessageEvent{" + "message='" + message + '\'' + '}';     } } 

3.2 定义事件监听

import com.google.common.eventbus.Subscribe;  public class MessageListener {     @Subscribe     public void listen(MessageEvent event) {         System.out.println(Thread.currentThread().getName() + " : " + event);     } } 

3.3 测试

import com.google.common.eventbus.EventBus;  public class SyncEventTest {     public static void main(String[] args) {         EventBus eventBus = new EventBus();         eventBus.register(new MessageListener());         MessageEvent messageEvent = new MessageEvent();         messageEvent.setMessage("你好!");         for (int i = 0; i < 100; i++) {             eventBus.post(messageEvent);         }     } } 

测试结果
从结果可以看出,事件的触发和事件的处理都是在同一个线程中。

main : MessageEvent{message='你好!'}  main : MessageEvent{message='你好!'}  main : MessageEvent{message='你好!'}  main : MessageEvent{message='你好!'}  main : MessageEvent{message='你好!'}  main : MessageEvent{message='你好!'}  main : MessageEvent{message='你好!'}  main : MessageEvent{message='你好!'}  ...... 


4、异步事件

4.1 定义事件

public class MessageEvent {     private String message;      public String getMessage() {         return message;     }      public void setMessage(String message) {         this.message = message;     }      @Override     public String toString() {         return "MessageEvent{" + "message='" + message + '\'' + '}';     } } 

4.2 定义事件监听

import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe;  public class MessageListener {     /**      * 注解@AllowConcurrentEvents是用来标识当前订阅者是线程安全的       * Guava会对listener对象,遍历其带有@Subscribe注解的所有方法,然后对针对每一个listener对象和method方法,标识唯一一个订阅者      * 找到唯一识别的观察者后,会对该观察者进行包装wrap,包装成一个EventSubscriber对象,      * 对于没有@AllowConcurrentEvents注解的方法,会被包装成SynchronizedEventSubscriber,即同步订阅者对象。       * 同步订阅者对象在处理事件时是使用了synchronized,强同步锁!       * 总结:       * 如果当前观察者(method)是线程安全的thread-safe,建议增加注解@AllowConcurrentEvents,以减少同步开销。       * 对于使用的是非异步(AsyncEventBus),也建议增加@AllowConcurrentEvents,因为不需要进行同步。      */     @AllowConcurrentEvents     @Subscribe     public void listen(MessageEvent event) {         System.out.println(Thread.currentThread().getName() + " : " + event);     } } 

4.3 测试

import com.google.common.eventbus.AsyncEventBus;  import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;  public class AsyncEventTest {     private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors();     private static final int ALIVE_TIME = 60;     private static final int CACHE_SIZE = 1000;      public static void main(String[] args) {         AsyncEventBus eventBus = new AsyncEventBus(new ThreadPoolExecutor(CORE_SIZE, CACHE_SIZE << 1, ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(CACHE_SIZE)));         eventBus.register(new MessageListener());         MessageEvent messageEvent = new MessageEvent();         messageEvent.setMessage("你好!");         for (int i = 0; i < 100; i++) {             eventBus.post(messageEvent);         }     } } 

测试结果
从结果可以看出,异步事件模式下,事件的触发和处理是在不同的线程中的,事件的处理是在单独的线程池中进行处理

pool-1-thread-2 : MessageEvent{message='你好!'}  pool-1-thread-3 : MessageEvent{message='你好!'}  pool-1-thread-4 : MessageEvent{message='你好!'}  pool-1-thread-5 : MessageEvent{message='你好!'}  pool-1-thread-6 : MessageEvent{message='你好!'}  pool-1-thread-7 : MessageEvent{message='你好!'}  pool-1-thread-8 : MessageEvent{message='你好!'}  pool-1-thread-9 : MessageEvent{message='你好!'}  pool-1-thread-10 : MessageEvent{message='你好!'}  ....... 


四、EventBus和Spring Event区别

项目事件发布者发布方法是否异步监听者注册方式
EventBus任意对象EventBusEventBus#post注解Subscribe的方法手动注册EventBus#register
Spring Event任意对象ApplicationEventPublisherApplicationEventPublisher#publishEvent支持同步异步注解EventListener的方法系统注册

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!