文章目录
一、Spring事件
Spring Event(Application Event)其实就是一个观察者设计模式,一个 Bean 处理完成任务后希望通知其它 Bean 或者说一个 Bean 想观察监听另一个Bean 的行为。
Spring的事件(Application Event)为Bean和Bean之间的消息同步提供了支持。当一个Bean处理完成一个任务之后,希望另外一个Bean知道并能做相应的处理,这时我们就需要让另外一个Bean监听当前Bean所发生的事件
Spring的事件需要遵循如下流程:
- 自定义事件,继承ApplicationEvent
- 定义事件监听器
- 使用容器发布事件
二、实现Spring事件
1、自定义事件
自定义一个事件类,继承ApplicationEvent。该事件可以被ApplicationContext通过publishEvent方法进行发送
public class DemoEvent extends ApplicationEvent { private String msg; public DemoEvent(Object source, String msg) { super(source); this.msg = msg; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } }
2、事件监听器
监听器有三种实现方式
- 实现
ApplicationListener
接口 - 使用
@EventListener
注解 - 使用
@TransactionalEventListener
注解
2.1 实现ApplicationListener接口
新建一个类实现 ApplicationListener
接口,并且重写 onApplicationEvent
方法,然后交给Spring管理
@Component public class DemoListener implements ApplicationListener<DemoEvent> { //实现ApplicationListener接口,并指定监听的事件类型 @Override public void onApplicationEvent(DemoEvent event) { //使用onApplicationEvent方法对消息进行接受处理 String msg = event.getMsg(); System.out.println("DemoListener获取到了监听消息:" + msg); } }
2.2 @EventListener
将处理事件的方法使用 @EventListener
注解标记,此时 Spring将创建一个ApplicationListener
的bean对象,使用给定的方法处理事件,参数可以指定处理的事件类型,以及处理条件。
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface EventListener { @AliasFor("classes") Class<?>[] value() default {}; @AliasFor("value") Class<?>[] classes() default {}; String condition() default ""; }
@Component public class DemoListener { @EventListener(DemoEvent.class) public void sendMsg(DemoEvent event) { String msg = event.getMsg(); System.out.println("DemoListener获取到了监听消息:" + msg); } }
2.3 @TransactionalEventListener
@EventListener
和 @TransactionalEventListener
都是 Spring提供的注解,用于处理事件。主要区别在于处理事件的时间和事务的关联性。
@EventListener
:可以应用于任何方法,使得该方法成为一个事件监听器。当一个事件被发布时,所有标记为@EventListener
的方法都会被调用,无论当前是否存在一个活动的事务。 使用@EventListener
注解的方法可能在事务提交之前或之后被调用。@TransactionalEventListener
:该注解允许更精细地控制事件监听器在事务处理过程中的执行时机。@TransactionalEventListener
默认在当前事务提交后才处理事件(TransactionPhase.AFTER_COMMIT
),这可以确保事件处理器只在事务成功提交后才被调用。也可以通过 phase 属性来改变事件处理的时机,例如在事务开始前、事务提交前、事务提交后或者事务回滚
@Component public class DemoListener { @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, value = DemoEvent.class) public void messageListener(DemoEvent event) { String msg = event.getMsg(); System.out.println("DemoListener获取到了监听消息:" + msg); } }
3、事件发布
通过spring上下文对象ApplicationContext
的publishEvent
方法即可发布事件
@Component public class DemoPublisher { @Autowired private ApplicationContext applicationContext; //注入ApplicationContext用来发布事件 public void publish(String msg) { applicationContext.publishEvent(new DemoEvent(this, msg)); // 使用ApplicationContext对象的publishEvent发布事件 } }
4、异步使用
如果要开启异步,需要在启动类增加 @EnableAsync
注解,Listener 类需要开启异步的方法增加 @Async
注解
@Async
的原理是通过 Spring AOP 动态代理 的方式来实现的。Spring 容器启动初始化bean时,判断类中是否使用了@Async
注解,如果使用了则为其创建切入点和切入点处理器,根据切入点创建代理,在线程调用@Async
注解标注的方法时,会调用代理,执行切入点处理器invoke
方法,将方法的执行提交给线程池中的另外一个线程来处理,从而实现了异步执行。
三、EventBus
- EventBus是一个轻量级的发布/订阅模式的应用模式,相比于各种 MQ 中间件更加简洁、轻量,它可以在单体非分布式的小型应用模块内部使用。
- 我们也可以把它和 MQ 中间件结合起来使用,使用 EventBus 作为当前应用程序接收 MQ 消息的统一入口,然后应用内部基于 EventBus 进行分发订阅,以达到高内聚低耦合的目的(当应用内部需要消费多种不同 MQ 中间件消息时,不需要在当前应用的好多不同代码位置都编写 MQ 消费代码)
1、事件模式
EventBus
默认为同步调用,同一个 EventBus
中注册的多个订阅处理,再事件下发后是被总线串行逐个调用的,如果其中一个方法占用事件较长,则同一个 EventBus
中的其他事件处于等待状态,且发送消息事件的代码调用处也是同步调用等待的状态。同一个 EventBus
对象,不仅仅在同一个 post 调用中串行执行,在多次并发 post 调用时,多个 post 调用之间也是串行等待执行的关系
- 同步事件模式:同步事件模式下,事件的触发和事件的处理在同一个线程中同步处理
- 异步事件模式:异步事件模式下,事件的触发和事件的处理在不同的线程中,事件的处理在一个线程池中
2、EventBus三要素
Event
事件,它可以是任意类型Subscriber
事件订阅者,需要加上注解@Subscribe()
,并且指定线程模型,默认是POSTING
Publisher
事件的发布者。我们可以在任意线程里发布事件,调用post(Object)
方法即可
3、同步事件
3.1 定义事件类
public class MessageEvent { private String message; public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } @Override public String toString() { return "MessageEvent{" + "message='" + message + '\'' + '}'; } }
3.2 定义事件监听
import com.google.common.eventbus.Subscribe; public class MessageListener { @Subscribe public void listen(MessageEvent event) { System.out.println(Thread.currentThread().getName() + " : " + event); } }
3.3 测试
import com.google.common.eventbus.EventBus; public class SyncEventTest { public static void main(String[] args) { EventBus eventBus = new EventBus(); eventBus.register(new MessageListener()); MessageEvent messageEvent = new MessageEvent(); messageEvent.setMessage("你好!"); for (int i = 0; i < 100; i++) { eventBus.post(messageEvent); } } }
测试结果
从结果可以看出,事件的触发和事件的处理都是在同一个线程中。
main : MessageEvent{message='你好!'} main : MessageEvent{message='你好!'} main : MessageEvent{message='你好!'} main : MessageEvent{message='你好!'} main : MessageEvent{message='你好!'} main : MessageEvent{message='你好!'} main : MessageEvent{message='你好!'} main : MessageEvent{message='你好!'} ......
4、异步事件
4.1 定义事件
public class MessageEvent { private String message; public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } @Override public String toString() { return "MessageEvent{" + "message='" + message + '\'' + '}'; } }
4.2 定义事件监听
import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; public class MessageListener { /** * 注解@AllowConcurrentEvents是用来标识当前订阅者是线程安全的 * Guava会对listener对象,遍历其带有@Subscribe注解的所有方法,然后对针对每一个listener对象和method方法,标识唯一一个订阅者 * 找到唯一识别的观察者后,会对该观察者进行包装wrap,包装成一个EventSubscriber对象, * 对于没有@AllowConcurrentEvents注解的方法,会被包装成SynchronizedEventSubscriber,即同步订阅者对象。 * 同步订阅者对象在处理事件时是使用了synchronized,强同步锁! * 总结: * 如果当前观察者(method)是线程安全的thread-safe,建议增加注解@AllowConcurrentEvents,以减少同步开销。 * 对于使用的是非异步(AsyncEventBus),也建议增加@AllowConcurrentEvents,因为不需要进行同步。 */ @AllowConcurrentEvents @Subscribe public void listen(MessageEvent event) { System.out.println(Thread.currentThread().getName() + " : " + event); } }
4.3 测试
import com.google.common.eventbus.AsyncEventBus; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class AsyncEventTest { private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); private static final int ALIVE_TIME = 60; private static final int CACHE_SIZE = 1000; public static void main(String[] args) { AsyncEventBus eventBus = new AsyncEventBus(new ThreadPoolExecutor(CORE_SIZE, CACHE_SIZE << 1, ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(CACHE_SIZE))); eventBus.register(new MessageListener()); MessageEvent messageEvent = new MessageEvent(); messageEvent.setMessage("你好!"); for (int i = 0; i < 100; i++) { eventBus.post(messageEvent); } } }
测试结果
从结果可以看出,异步事件模式下,事件的触发和处理是在不同的线程中的,事件的处理是在单独的线程池中进行处理
pool-1-thread-2 : MessageEvent{message='你好!'} pool-1-thread-3 : MessageEvent{message='你好!'} pool-1-thread-4 : MessageEvent{message='你好!'} pool-1-thread-5 : MessageEvent{message='你好!'} pool-1-thread-6 : MessageEvent{message='你好!'} pool-1-thread-7 : MessageEvent{message='你好!'} pool-1-thread-8 : MessageEvent{message='你好!'} pool-1-thread-9 : MessageEvent{message='你好!'} pool-1-thread-10 : MessageEvent{message='你好!'} .......
四、EventBus和Spring Event区别
项目 | 事件 | 发布者 | 发布方法 | 是否异步 | 监听者 | 注册方式 |
---|---|---|---|---|---|---|
EventBus | 任意对象 | EventBus | EventBus#post | 是 | 注解Subscribe的方法 | 手动注册EventBus#register |
Spring Event | 任意对象 | ApplicationEventPublisher | ApplicationEventPublisher#publishEvent | 支持同步异步 | 注解EventListener的方法 | 系统注册 |