侧边栏壁纸
博主头像
憨憨大头个人博客博主等级

心存希冀,目有繁星

  • 累计撰写 110 篇文章
  • 累计创建 13 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Spring 订阅与发布的原理

Administrator
2024-09-02 / 0 评论 / 0 点赞 / 23 阅读 / 32297 字

简介

Spring框架提供了一种订阅与发布(Publish-Subscribe)的机制,用于解耦应用程序的各个组件。这种机制允许应用程序的不同部分之间通过事件进行通信,从而实现松耦合的架构。

spring源码地址:https://github.com/spring-projects/spring-framework/tree/3.0.x

small-spring源码地址:https://github.com/fuzhengwei/small-spring

原理

Spring的订阅与发布机制基于观察者模式(Observer Pattern)。在这种模式中,有一个主题(Subject)和多个观察者(Observer)。主题负责发布事件,而观察者订阅这些事件并在事件发生时做出响应。

在Spring中,主题是一个事件发布者(Event Publisher),而观察者是事件监听器(Event Listener)。主题发布一个事件后,所有订阅了该事件的监听器都会收到通知并执行相应的逻辑。

实现步骤

下面是使用Spring实现订阅与发布的一般步骤:

  1. 定义事件:创建一个继承自ApplicationEvent的事件类,表示需要发布的事件。可以在事件类中添加一些自定义的属性以便传递更多信息。
  2. 创建事件发布者:创建一个类,负责发布事件。可以使用ApplicationEventPublisher接口的publishEvent()方法来发布事件。
  3. 创建事件监听器:创建一个类,实现ApplicationListener接口,并指定要监听的事件类型。在监听器中实现事件发生时的逻辑。
  4. 注册监听器:在Spring配置文件中或使用@EventListener注解将监听器注册为Spring的Bean。
  5. 发布事件:通过调用事件发布者的方法来发布事件。所有订阅了该事件的监听器都会收到通知并执行相应的逻辑。

示例代码

下面是一个简单的示例代码,演示了如何使用Spring实现订阅与发布:

// 1. 定义事件
public class MyEvent extends ApplicationEvent {
    private String message;

    public MyEvent(Object source, String message) {
        super(source);
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}

// 2. 创建事件发布者
public class MyEventPublisher {
    @Autowired
    private ApplicationEventPublisher eventPublisher;

    public void publishEvent(String message) {
        MyEvent event = new MyEvent(this, message);
        eventPublisher.publishEvent(event);
    }
}

// 3. 创建事件监听器
public class MyEventListener implements ApplicationListener<MyEvent> {
    @Override
    public void onApplicationEvent(MyEvent event) {
        System.out.println("Received message: " + event.getMessage());
    }
}

// 4. 注册监听器(可以在Spring配置文件中配置)
@Configuration
public class EventConfig {
    @Bean
    public MyEventListener myEventListener() {
        return new MyEventListener();
    }
}

// 5. 发布事件
public class Main {
    public static void main(String[] args) {
        ApplicationContext context = new AnnotationConfigApplicationContext(EventConfig.class);
        MyEventPublisher publisher = context.getBean(MyEventPublisher.class);
        publisher.publishEvent("Hello, World!");
    }
}

在上述示例中,MyEvent表示一个自定义事件,MyEventPublisher是事件发布者,MyEventListener是事件监听器。通过调用MyEventPublisherpublishEvent()方法来发布事件,在MyEventListener中实现事件发生时的逻辑。

事件的传递与处理

在Spring的订阅与发布机制中,事件可以通过继承的方式进行传递。一个事件可以作为另一个事件的子类,这样可以在事件中传递更多的信息。

同时,可以定义多个事件监听器来处理同一类型的事件。当一个事件被发布时,所有订阅了该事件的监听器都会收到通知并执行相应的逻辑。Spring会根据事件的类型来确定应该调用哪些监听器。

异步事件处理

在某些场景下,事件的处理可能是一个耗时的操作,如果在事件的发布者线程中进行处理,可能会导致整个应用程序的性能下降。为了解决这个问题,Spring提供了异步事件处理的机制。

通过在监听器方法上添加@Async注解,可以指定该方法在一个新的线程中异步执行。这样可以将事件的处理与事件的发布解耦,提高应用程序的吞吐量和响应性能。

事件的顺序

在某些场景下,事件的处理顺序是非常重要的。Spring允许通过在监听器方法上添加@Order注解来指定监听器的执行顺序。具有较小@Order值的监听器将先于具有较大@Order值的监听器执行。

如果多个监听器具有相同的@Order值,它们的执行顺序将是不确定的。可以通过实现Ordered接口并重写getOrder()方法来更精确地控制监听器的执行顺序。

源码解析

注册事件广播器

Spring在注册事件广播器时,会调用AbstractApplicationContext#refresh方法。这个方法是用来初始化容器的,这个方法会注册Bean对象、初始化Bean对象等等,这里只需要关注 initApplicationEventMulticaster() 方法,这个方法就是用来注册事件广播器的。来看一下这个方法的实现。

protected void initApplicationEventMulticaster() {
        ConfigurableListableBeanFactory beanFactory = this.getBeanFactory();
        if (beanFactory.containsLocalBean("applicationEventMulticaster")) {
            this.applicationEventMulticaster = (ApplicationEventMulticaster)beanFactory.getBean("applicationEventMulticaster", ApplicationEventMulticaster.class);
            }
        } else {
            this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
            beanFactory.registerSingleton("applicationEventMulticaster", this.applicationEventMulticaster);
            }
        }
    }

这个方法做两件事:

  • 当前容器已经注册了名为 applicationEventMulticaster 的对象,直接获取这个已经注册好的事件广播器。例如,我们通过配置类去提前配置事件广播器对象。

    @Configuration
    public class AsynchronousSpringEventsConfig {
    
        @Bean(name = "applicationEventMulticaster")
        public ApplicationEventMulticaster applicationEventMulticaster() {
            SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster();
            multicaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
            return multicaster;
        }
    }
    
  • 如果当前容器没有提前配置,那么当前容器获取不到这个事件广播器对象,需要由Spring自己创建一个 SimpleApplicationEventMulticaster 对象,并把它注册到容器中。

注册事件监听器

加载事件监听器

Spring是如何加载我们自定义的监听器的呢?自定义的事件监听器是由Spring的 ApplicationListenerDetector 后置处理器的 postProcessAfterInitialization 方法加载的。具体源码如下所示。

class ApplicationListenerDetector implements DestructionAwareBeanPostProcessor, MergedBeanDefinitionPostProcessor {
    
    @Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (bean instanceof ApplicationListener) {
			Boolean flag = this.singletonNames.get(beanName);
			if (Boolean.TRUE.equals(flag)) {
				this.applicationContext.addApplicationListener((ApplicationListener<?>) bean);
			}
			else if (Boolean.FALSE.equals(flag)) {
				this.singletonNames.remove(beanName);
			}
		}
		return bean;
	}
}

bean instanceof ApplicationListener 这里,我们就知道为什么自定义事件监听器需要实现 ApplicationListener 接口了。

在来看一下 applicationContext.addApplicationListener 的源码。

public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext {
    
    public void addApplicationListener(ApplicationListener<?> listener) {
        Assert.notNull(listener, "ApplicationListener must not be null");
        if (this.applicationEventMulticaster != null) {
            this.applicationEventMulticaster.addApplicationListener(listener);
        }

        this.applicationListeners.add(listener);
    }
}

this.registerListeners() 方法有什么用了?这个方法可以用来通过 context#addApplicationListener 来提前注册监听器。具体源码如下所示。

protected void registerListeners() {
	// 注册静态指定的监听器
    for (ApplicationListener<?> listener : getApplicationListeners()) {
        getApplicationEventMulticaster().addApplicationListener(listener);
    }

    // Do not initialize FactoryBeans here: We need to leave all regular beans
    // uninitialized to let post-processors apply to them!
    String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
    for (String listenerBeanName : listenerBeanNames) {
        // 这里只是注册了监听器的BeanName
        getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
    }

    // 提前发布事件
    Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
    this.earlyApplicationEvents = null;
    if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
        for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
            getApplicationEventMulticaster().multicastEvent(earlyEvent);
        }
    }
}
还需要注册@EventListener的监听者

EventListenerMethodProcessorSpring 事件机制中非常重要的一个组件。它管理了一组EventListenerFactory组件,用来将应用中每个使用@EventListener注解定义的事件监听方法变成一个ApplicationListener实例注册到容器。换句话讲,框架开发者,或者应用开发者使用注解@EventListener定义的事件处理方法,如果没有EventListenerMethodProcessor的发现和注册,是不会被容器看到和使用的。

EventListenerMethodProcessor实现了如下三个接口 :

  • ApplicationContextAware
  • BeanFactoryPostProcessor
  • SmartInitializingSingleton

通过实现接口ApplicationContextAware,容器会将当前应用上下文ApplicationContext告诉EventListenerMethodProcessor,这是EventListenerMethodProcessor用于检测发现@EventListener注解方法的来源,生成的ApplicationListener也放到该应用上下文。

通过实现接口BeanFactoryPostProcessor,EventListenerMethodProcessor变成了一个BeanFactory的后置处理器,也就是说,在容器启动过程中的后置处理阶段,启动过程会调用EventListenerMethodProcessor的方法postProcessBeanFactory。在这个方法中,EventListenerMethodProcessor会找到容器中所有类型为EventListenerFactorybean,最终@EventListener注解方法的检测发现,以及ApplicationListener实例的生成和注册,靠的是这些EventListenerFactory组件。

而通过实现接口SmartInitializingSingleton,在容器启动过程中所有单例bean创建阶段(此阶段完成前,这些bean并不会供外部使用)的末尾,EventListenerMethodProcessor的方法afterSingletonsInstantiated会被调用。在这里,EventListenerMethodProcessor会便利容器中所有的bean,进行@EventListener注解方法的检测发现,以及ApplicationListener实例的生成和注册。

源代码

代码版本 : Spring Context 5.2.0.RELEASE

package org.springframework.context.event;

//... 省略 import

/**
 * Registers {@link EventListener} methods as individual {@link ApplicationListener} instances.
 * Implements {@link BeanFactoryPostProcessor} (as of 5.1) primarily for early retrieval,
 * avoiding AOP checks for this processor bean and its {@link EventListenerFactory} delegates.
 *
 * @author Stephane Nicoll
 * @author Juergen Hoeller
 * @since 4.2
 * @see EventListenerFactory
 * @see DefaultEventListenerFactory
 */
public class EventListenerMethodProcessor
		implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor {

	protected final Log logger = LogFactory.getLog(getClass());

    // 用于记录检测发现`@EventListener`注解方法,生成和注册`ApplicationListener`实例的应用上下文
	@Nullable
	private ConfigurableApplicationContext applicationContext;

   // 记录当前 BeanFactory, 实际上这个变量可用可不用,因为通过 applicationContext 也可以找到
   // 当前 BeanFactory
	@Nullable
	private ConfigurableListableBeanFactory beanFactory;

   // 记录从容器中找到的所有 EventListenerFactory
	@Nullable
	private List<EventListenerFactory> eventListenerFactories;

	private final EventExpressionEvaluator evaluator = new EventExpressionEvaluator();

    // 缓存机制,记住那些根本任何方法上没有使用注解 @EventListener 的类,避免处理过程中二次处理 
	private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) {
		Assert.isTrue(applicationContext instanceof ConfigurableApplicationContext,
				"ApplicationContext does not implement ConfigurableApplicationContext");
		this.applicationContext = (ConfigurableApplicationContext) applicationContext;
	}

	@Override
	public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
		this.beanFactory = beanFactory;

       // 从容器中找到所有的  EventListenerFactory 组件
       // 常见的一些 EventListenerFactory :
       // TransactionalEventListenerFactory --
       // 用于支持使用 @TransactionalEventListener 注解的事件监听器, @TransactionalEventListener 是一种特殊的
       // @EventListener,它定义的事件监听器应用于事务提交或者回滚的某些特殊时机,
       // 由 ProxyTransactionManagementConfiguration 注册到容器
       // 注册到容器
       // DefaultEventListenerFactory -- 系统缺省, 最低优先级,如果其他 EventListenerFactory 都不支持的时候使用
		Map<String, EventListenerFactory> beans = beanFactory.getBeansOfType(EventListenerFactory.class, false, false);
		List<EventListenerFactory> factories = new ArrayList<>(beans.values());
		AnnotationAwareOrderComparator.sort(factories);
		this.eventListenerFactories = factories;
	}

	@Override
	public void afterSingletonsInstantiated() {
		ConfigurableListableBeanFactory beanFactory = this.beanFactory;
		Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");
       // 这里获取容器中所有bean组件的名称, 
		String[] beanNames = beanFactory.getBeanNamesForType(Object.class);
		for (String beanName : beanNames) {
          // 遍历每个bean组件,检测其中`@EventListener`注解方法,生成和注册`ApplicationListener`实例
			if (!ScopedProxyUtils.isScopedTarget(beanName)) {
				Class<?> type = null;
				try {
					type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);
				}
				catch (Throwable ex) {
					// An unresolvable bean type, probably from a lazy bean - let's ignore it.
					if (logger.isDebugEnabled()) {
						logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
					}
				}
				if (type != null) {
					if (ScopedObject.class.isAssignableFrom(type)) {
						try {
							Class<?> targetClass = AutoProxyUtils.determineTargetClass(
									beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));
							if (targetClass != null) {
								type = targetClass;
							}
						}
						catch (Throwable ex) {
							// An invalid scoped proxy arrangement - let's ignore it.
							if (logger.isDebugEnabled()) {
								logger.debug("Could not resolve target bean for scoped proxy '" + beanName + "'", ex);
							}
						}
					}
					try {
                     // 注意这一行,针对一个bean的真正的`@EventListener`注解方法检测,
                     // `ApplicationListener`实例生成注册发生在这里
						processBean(beanName, type);
					}
					catch (Throwable ex) {
						throw new BeanInitializationException("Failed to process @EventListener " +
								"annotation on bean with name '" + beanName + "'", ex);
					}
				}
			}
		}
	}

   // 该方法拿到某个bean的名称和它的目标类,在这个范围上检测`@EventListener`注解方法,
   // 生成和注册`ApplicationListener`实例
	private void processBean(final String beanName, final Class<?> targetType) {
		if (!this.nonAnnotatedClasses.contains(targetType) &&
				AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&
				!isSpringContainerClass(targetType)) {

			Map<Method, EventListener> annotatedMethods = null;
			try {
             // *** 注意这里, 这里检测当前类targetType上使用了注解 @EventListener 的方法
				annotatedMethods = MethodIntrospector.selectMethods(targetType,
						(MethodIntrospector.MetadataLookup<EventListener>) method ->
								AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
			}
			catch (Throwable ex) {
				// An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.
				if (logger.isDebugEnabled()) {
					logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);
				}
			}

			if (CollectionUtils.isEmpty(annotatedMethods)) {
              // 如果当前类 targetType 中没有任何使用了 注解 @EventListener 的方法,则将该类保存到
              // 缓存 nonAnnotatedClasses, 从而避免当前处理方法重入该类,其目的应该是为了提高效率,             
				this.nonAnnotatedClasses.add(targetType);
				if (logger.isTraceEnabled()) {
					logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());
				}
			}
			else {
              // 发现当前类 targetType 中有些方法使用了注解 @EventListener,现在根据这些方法上的信息
              // 对应地创建和注册ApplicationListener实例
				// Non-empty set of methods
				ConfigurableApplicationContext context = this.applicationContext;
				Assert.state(context != null, "No ApplicationContext set");
              // 注意,这里使用到了  this.eventListenerFactories, 这些 EventListenerFactory 是在 
              // 该类 postProcessBeanFactory 方法调用时被记录的
				List<EventListenerFactory> factories = this.eventListenerFactories;
				Assert.state(factories != null, "EventListenerFactory List not initialized");
				for (Method method : annotatedMethods.keySet()) {
					for (EventListenerFactory factory : factories) {
						if (factory.supportsMethod(method)) {
                        // 如果当前 EventListenerFactory factory 支持处理该 @EventListener 注解的方法,
                        // 则使用它创建 ApplicationListener
							Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
							ApplicationListener<?> applicationListener =
									factory.createApplicationListener(beanName, targetType, methodToUse);
							if (applicationListener instanceof ApplicationListenerMethodAdapter) {
								((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
							}
                        // 将所生成的  ApplicationListener 实例注册到容器   
							context.addApplicationListener(applicationListener);
							break;
						}
					}
				}
				if (logger.isDebugEnabled()) {
					logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +
							beanName + "': " + annotatedMethods);
				}
			}
		}
	}

	/**
	 * Determine whether the given class is an {@code org.springframework}
	 * bean class that is not annotated as a user or test {@link Component}...
	 * which indicates that there is no {@link EventListener} to be found there.
	 * @since 5.1
	 */
	private static boolean isSpringContainerClass(Class<?> clazz) {
		return (clazz.getName().startsWith("org.springframework.") &&
				!AnnotatedElementUtils.isAnnotated(ClassUtils.getUserClass(clazz), Component.class));
	}

}


处理事件

之前说过Spring默认处理事件的同步的方式,如果是要异步执行的话,我们需要单独配置一个线程池。事件广播器处理事件的逻辑:首先获取 executor 线程池,

如果没有就同步地执行;如果有,就异步地执行;获取事件 event 所匹配的一组监听器,然后遍历每个监听器,执行事件处理。具体源码如下。

@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
    ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
    // 获取线程池
    Executor executor = getTaskExecutor();
    // 获取event所匹配的监听器
    for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
        // 有,异步执行
        if (executor != null) {
            executor.execute(() -> invokeListener(listener, event));
        }
        else {
            // 无,同步执行
            invokeListener(listener, event);
        }
    }
}

看一下是 invokeListener 方法是怎么处理的。首先,看是否配置 ErrorHandler ,这个是用来处理异常的;如果没有配置的话,异常会直接抛出中断后面监听器处理 (没有开启异步执行事件处理)。

protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
    ErrorHandler errorHandler = getErrorHandler();
    if (errorHandler != null) {
        try {
            // 真正执行监听器的处理事件的方法
            doInvokeListener(listener, event);
        }
        catch (Throwable err) {
            errorHandler.handleError(err);
        }
    }
    else {
        // 真正执行监听器的处理事件的方法
        doInvokeListener(listener, event);
    }
}

0

评论区