简介
Spring框架提供了一种订阅与发布(Publish-Subscribe)的机制,用于解耦应用程序的各个组件。这种机制允许应用程序的不同部分之间通过事件进行通信,从而实现松耦合的架构。
spring源码地址:https://github.com/spring-projects/spring-framework/tree/3.0.x
small-spring源码地址:https://github.com/fuzhengwei/small-spring
原理
Spring的订阅与发布机制基于观察者模式(Observer Pattern)。在这种模式中,有一个主题(Subject)和多个观察者(Observer)。主题负责发布事件,而观察者订阅这些事件并在事件发生时做出响应。
在Spring中,主题是一个事件发布者(Event Publisher),而观察者是事件监听器(Event Listener)。主题发布一个事件后,所有订阅了该事件的监听器都会收到通知并执行相应的逻辑。
实现步骤
下面是使用Spring实现订阅与发布的一般步骤:
- 定义事件:创建一个继承自
ApplicationEvent
的事件类,表示需要发布的事件。可以在事件类中添加一些自定义的属性以便传递更多信息。 - 创建事件发布者:创建一个类,负责发布事件。可以使用
ApplicationEventPublisher
接口的publishEvent()
方法来发布事件。 - 创建事件监听器:创建一个类,实现
ApplicationListener
接口,并指定要监听的事件类型。在监听器中实现事件发生时的逻辑。 - 注册监听器:在Spring配置文件中或使用
@EventListener
注解将监听器注册为Spring的Bean。 - 发布事件:通过调用事件发布者的方法来发布事件。所有订阅了该事件的监听器都会收到通知并执行相应的逻辑。
示例代码
下面是一个简单的示例代码,演示了如何使用Spring实现订阅与发布:
// 1. 定义事件
public class MyEvent extends ApplicationEvent {
private String message;
public MyEvent(Object source, String message) {
super(source);
this.message = message;
}
public String getMessage() {
return message;
}
}
// 2. 创建事件发布者
public class MyEventPublisher {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void publishEvent(String message) {
MyEvent event = new MyEvent(this, message);
eventPublisher.publishEvent(event);
}
}
// 3. 创建事件监听器
public class MyEventListener implements ApplicationListener<MyEvent> {
@Override
public void onApplicationEvent(MyEvent event) {
System.out.println("Received message: " + event.getMessage());
}
}
// 4. 注册监听器(可以在Spring配置文件中配置)
@Configuration
public class EventConfig {
@Bean
public MyEventListener myEventListener() {
return new MyEventListener();
}
}
// 5. 发布事件
public class Main {
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(EventConfig.class);
MyEventPublisher publisher = context.getBean(MyEventPublisher.class);
publisher.publishEvent("Hello, World!");
}
}
在上述示例中,MyEvent
表示一个自定义事件,MyEventPublisher
是事件发布者,MyEventListener
是事件监听器。通过调用MyEventPublisher
的publishEvent()
方法来发布事件,在MyEventListener
中实现事件发生时的逻辑。
事件的传递与处理
在Spring的订阅与发布机制中,事件可以通过继承的方式进行传递。一个事件可以作为另一个事件的子类,这样可以在事件中传递更多的信息。
同时,可以定义多个事件监听器来处理同一类型的事件。当一个事件被发布时,所有订阅了该事件的监听器都会收到通知并执行相应的逻辑。Spring会根据事件的类型来确定应该调用哪些监听器。
异步事件处理
在某些场景下,事件的处理可能是一个耗时的操作,如果在事件的发布者线程中进行处理,可能会导致整个应用程序的性能下降。为了解决这个问题,Spring提供了异步事件处理的机制。
通过在监听器方法上添加@Async
注解,可以指定该方法在一个新的线程中异步执行。这样可以将事件的处理与事件的发布解耦,提高应用程序的吞吐量和响应性能。
事件的顺序
在某些场景下,事件的处理顺序是非常重要的。Spring允许通过在监听器方法上添加@Order
注解来指定监听器的执行顺序。具有较小@Order
值的监听器将先于具有较大@Order
值的监听器执行。
如果多个监听器具有相同的@Order
值,它们的执行顺序将是不确定的。可以通过实现Ordered
接口并重写getOrder()
方法来更精确地控制监听器的执行顺序。
源码解析
注册事件广播器
Spring在注册事件广播器时,会调用AbstractApplicationContext#refresh
方法。这个方法是用来初始化容器的,这个方法会注册Bean对象、初始化Bean对象等等,这里只需要关注 initApplicationEventMulticaster()
方法,这个方法就是用来注册事件广播器的。来看一下这个方法的实现。
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = this.getBeanFactory();
if (beanFactory.containsLocalBean("applicationEventMulticaster")) {
this.applicationEventMulticaster = (ApplicationEventMulticaster)beanFactory.getBean("applicationEventMulticaster", ApplicationEventMulticaster.class);
}
} else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton("applicationEventMulticaster", this.applicationEventMulticaster);
}
}
}
这个方法做两件事:
-
当前容器已经注册了名为
applicationEventMulticaster
的对象,直接获取这个已经注册好的事件广播器。例如,我们通过配置类去提前配置事件广播器对象。@Configuration public class AsynchronousSpringEventsConfig { @Bean(name = "applicationEventMulticaster") public ApplicationEventMulticaster applicationEventMulticaster() { SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster(); multicaster.setTaskExecutor(new SimpleAsyncTaskExecutor()); return multicaster; } }
-
如果当前容器没有提前配置,那么当前容器获取不到这个事件广播器对象,需要由Spring自己创建一个
SimpleApplicationEventMulticaster
对象,并把它注册到容器中。
注册事件监听器
加载事件监听器
Spring是如何加载我们自定义的监听器的呢?自定义的事件监听器是由Spring的 ApplicationListenerDetector
后置处理器的 postProcessAfterInitialization
方法加载的。具体源码如下所示。
class ApplicationListenerDetector implements DestructionAwareBeanPostProcessor, MergedBeanDefinitionPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof ApplicationListener) {
Boolean flag = this.singletonNames.get(beanName);
if (Boolean.TRUE.equals(flag)) {
this.applicationContext.addApplicationListener((ApplicationListener<?>) bean);
}
else if (Boolean.FALSE.equals(flag)) {
this.singletonNames.remove(beanName);
}
}
return bean;
}
}
从 bean instanceof ApplicationListener
这里,我们就知道为什么自定义事件监听器需要实现 ApplicationListener
接口了。
在来看一下 applicationContext.addApplicationListener
的源码。
public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext {
public void addApplicationListener(ApplicationListener<?> listener) {
Assert.notNull(listener, "ApplicationListener must not be null");
if (this.applicationEventMulticaster != null) {
this.applicationEventMulticaster.addApplicationListener(listener);
}
this.applicationListeners.add(listener);
}
}
this.registerListeners()
方法有什么用了?这个方法可以用来通过 context#addApplicationListener
来提前注册监听器。具体源码如下所示。
protected void registerListeners() {
// 注册静态指定的监听器
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
// 这里只是注册了监听器的BeanName
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// 提前发布事件
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
还需要注册@EventListener的监听者
EventListenerMethodProcessor
是 Spring
事件机制中非常重要的一个组件。它管理了一组EventListenerFactory
组件,用来将应用中每个使用@EventListener
注解定义的事件监听方法变成一个ApplicationListener
实例注册到容器。换句话讲,框架开发者,或者应用开发者使用注解@EventListener
定义的事件处理方法,如果没有EventListenerMethodProcessor
的发现和注册,是不会被容器看到和使用的。
EventListenerMethodProcessor
实现了如下三个接口 :
ApplicationContextAware
BeanFactoryPostProcessor
SmartInitializingSingleton
通过实现接口ApplicationContextAware
,容器会将当前应用上下文ApplicationContext
告诉EventListenerMethodProcessor
,这是EventListenerMethodProcessor
用于检测发现@EventListener
注解方法的来源,生成的ApplicationListener
也放到该应用上下文。
通过实现接口BeanFactoryPostProcessor
,EventListenerMethodProcessor
变成了一个BeanFactory
的后置处理器,也就是说,在容器启动过程中的后置处理阶段,启动过程会调用EventListenerMethodProcessor
的方法postProcessBeanFactory
。在这个方法中,EventListenerMethodProcessor
会找到容器中所有类型为EventListenerFactory
的bean
,最终@EventListener
注解方法的检测发现,以及ApplicationListener
实例的生成和注册,靠的是这些EventListenerFactory
组件。
而通过实现接口SmartInitializingSingleton
,在容器启动过程中所有单例bean
创建阶段(此阶段完成前,这些bean
并不会供外部使用)的末尾,EventListenerMethodProcessor
的方法afterSingletonsInstantiated
会被调用。在这里,EventListenerMethodProcessor
会便利容器中所有的bean
,进行@EventListener
注解方法的检测发现,以及ApplicationListener
实例的生成和注册。
源代码
代码版本 : Spring Context 5.2.0.RELEASE
package org.springframework.context.event;
//... 省略 import
/**
* Registers {@link EventListener} methods as individual {@link ApplicationListener} instances.
* Implements {@link BeanFactoryPostProcessor} (as of 5.1) primarily for early retrieval,
* avoiding AOP checks for this processor bean and its {@link EventListenerFactory} delegates.
*
* @author Stephane Nicoll
* @author Juergen Hoeller
* @since 4.2
* @see EventListenerFactory
* @see DefaultEventListenerFactory
*/
public class EventListenerMethodProcessor
implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor {
protected final Log logger = LogFactory.getLog(getClass());
// 用于记录检测发现`@EventListener`注解方法,生成和注册`ApplicationListener`实例的应用上下文
@Nullable
private ConfigurableApplicationContext applicationContext;
// 记录当前 BeanFactory, 实际上这个变量可用可不用,因为通过 applicationContext 也可以找到
// 当前 BeanFactory
@Nullable
private ConfigurableListableBeanFactory beanFactory;
// 记录从容器中找到的所有 EventListenerFactory
@Nullable
private List<EventListenerFactory> eventListenerFactories;
private final EventExpressionEvaluator evaluator = new EventExpressionEvaluator();
// 缓存机制,记住那些根本任何方法上没有使用注解 @EventListener 的类,避免处理过程中二次处理
private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
Assert.isTrue(applicationContext instanceof ConfigurableApplicationContext,
"ApplicationContext does not implement ConfigurableApplicationContext");
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
this.beanFactory = beanFactory;
// 从容器中找到所有的 EventListenerFactory 组件
// 常见的一些 EventListenerFactory :
// TransactionalEventListenerFactory --
// 用于支持使用 @TransactionalEventListener 注解的事件监听器, @TransactionalEventListener 是一种特殊的
// @EventListener,它定义的事件监听器应用于事务提交或者回滚的某些特殊时机,
// 由 ProxyTransactionManagementConfiguration 注册到容器
// 注册到容器
// DefaultEventListenerFactory -- 系统缺省, 最低优先级,如果其他 EventListenerFactory 都不支持的时候使用
Map<String, EventListenerFactory> beans = beanFactory.getBeansOfType(EventListenerFactory.class, false, false);
List<EventListenerFactory> factories = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(factories);
this.eventListenerFactories = factories;
}
@Override
public void afterSingletonsInstantiated() {
ConfigurableListableBeanFactory beanFactory = this.beanFactory;
Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");
// 这里获取容器中所有bean组件的名称,
String[] beanNames = beanFactory.getBeanNamesForType(Object.class);
for (String beanName : beanNames) {
// 遍历每个bean组件,检测其中`@EventListener`注解方法,生成和注册`ApplicationListener`实例
if (!ScopedProxyUtils.isScopedTarget(beanName)) {
Class<?> type = null;
try {
type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);
}
catch (Throwable ex) {
// An unresolvable bean type, probably from a lazy bean - let's ignore it.
if (logger.isDebugEnabled()) {
logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
}
}
if (type != null) {
if (ScopedObject.class.isAssignableFrom(type)) {
try {
Class<?> targetClass = AutoProxyUtils.determineTargetClass(
beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));
if (targetClass != null) {
type = targetClass;
}
}
catch (Throwable ex) {
// An invalid scoped proxy arrangement - let's ignore it.
if (logger.isDebugEnabled()) {
logger.debug("Could not resolve target bean for scoped proxy '" + beanName + "'", ex);
}
}
}
try {
// 注意这一行,针对一个bean的真正的`@EventListener`注解方法检测,
// `ApplicationListener`实例生成注册发生在这里
processBean(beanName, type);
}
catch (Throwable ex) {
throw new BeanInitializationException("Failed to process @EventListener " +
"annotation on bean with name '" + beanName + "'", ex);
}
}
}
}
}
// 该方法拿到某个bean的名称和它的目标类,在这个范围上检测`@EventListener`注解方法,
// 生成和注册`ApplicationListener`实例
private void processBean(final String beanName, final Class<?> targetType) {
if (!this.nonAnnotatedClasses.contains(targetType) &&
AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&
!isSpringContainerClass(targetType)) {
Map<Method, EventListener> annotatedMethods = null;
try {
// *** 注意这里, 这里检测当前类targetType上使用了注解 @EventListener 的方法
annotatedMethods = MethodIntrospector.selectMethods(targetType,
(MethodIntrospector.MetadataLookup<EventListener>) method ->
AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
}
catch (Throwable ex) {
// An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.
if (logger.isDebugEnabled()) {
logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);
}
}
if (CollectionUtils.isEmpty(annotatedMethods)) {
// 如果当前类 targetType 中没有任何使用了 注解 @EventListener 的方法,则将该类保存到
// 缓存 nonAnnotatedClasses, 从而避免当前处理方法重入该类,其目的应该是为了提高效率,
this.nonAnnotatedClasses.add(targetType);
if (logger.isTraceEnabled()) {
logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());
}
}
else {
// 发现当前类 targetType 中有些方法使用了注解 @EventListener,现在根据这些方法上的信息
// 对应地创建和注册ApplicationListener实例
// Non-empty set of methods
ConfigurableApplicationContext context = this.applicationContext;
Assert.state(context != null, "No ApplicationContext set");
// 注意,这里使用到了 this.eventListenerFactories, 这些 EventListenerFactory 是在
// 该类 postProcessBeanFactory 方法调用时被记录的
List<EventListenerFactory> factories = this.eventListenerFactories;
Assert.state(factories != null, "EventListenerFactory List not initialized");
for (Method method : annotatedMethods.keySet()) {
for (EventListenerFactory factory : factories) {
if (factory.supportsMethod(method)) {
// 如果当前 EventListenerFactory factory 支持处理该 @EventListener 注解的方法,
// 则使用它创建 ApplicationListener
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
ApplicationListener<?> applicationListener =
factory.createApplicationListener(beanName, targetType, methodToUse);
if (applicationListener instanceof ApplicationListenerMethodAdapter) {
((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
// 将所生成的 ApplicationListener 实例注册到容器
context.addApplicationListener(applicationListener);
break;
}
}
}
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +
beanName + "': " + annotatedMethods);
}
}
}
}
/**
* Determine whether the given class is an {@code org.springframework}
* bean class that is not annotated as a user or test {@link Component}...
* which indicates that there is no {@link EventListener} to be found there.
* @since 5.1
*/
private static boolean isSpringContainerClass(Class<?> clazz) {
return (clazz.getName().startsWith("org.springframework.") &&
!AnnotatedElementUtils.isAnnotated(ClassUtils.getUserClass(clazz), Component.class));
}
}
处理事件
之前说过Spring默认处理事件的同步的方式,如果是要异步执行的话,我们需要单独配置一个线程池。事件广播器处理事件的逻辑:首先获取 executor
线程池,
如果没有就同步地执行;如果有,就异步地执行;获取事件 event
所匹配的一组监听器,然后遍历每个监听器,执行事件处理。具体源码如下。
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 获取线程池
Executor executor = getTaskExecutor();
// 获取event所匹配的监听器
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
// 有,异步执行
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
// 无,同步执行
invokeListener(listener, event);
}
}
}
看一下是 invokeListener
方法是怎么处理的。首先,看是否配置 ErrorHandler
,这个是用来处理异常的;如果没有配置的话,异常会直接抛出中断后面监听器处理 (没有开启异步执行事件处理)。
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
// 真正执行监听器的处理事件的方法
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
// 真正执行监听器的处理事件的方法
doInvokeListener(listener, event);
}
}
评论区