首頁技術(shù)文章正文

Java培訓(xùn):Spring的Async注解線程池擴(kuò)展方案

更新時間:2022-08-25 來源:黑馬程序員 瀏覽量:

  目錄

  - [Spring的Async注解線程池擴(kuò)展方案]

  - [目錄]

  - [1. 擴(kuò)展目的]

  - [2. 擴(kuò)展實現(xiàn)]

  - [2.1 擴(kuò)展Async注解的執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`]

  - [2.2 擴(kuò)展Async注解的Spring代理顧問`AsyncAnnotationAdvisor`]

  - [2.3 擴(kuò)展Async注解的 Spring Bean 后置處理器`AsyncAnnotationBeanPostProcessor`]

  - [2.4 擴(kuò)展代理異步配置類`ProxyAsyncConfiguration`]

  - [2.5 擴(kuò)展異步代理配置選擇器`AsyncConfigurationSelector`]

  - [2.6 擴(kuò)展異步啟動注解`@EnableAsync`]

  - [3. 額外擴(kuò)展:給`@Async`注解代理指定線程池]

  1. 擴(kuò)展目的


  1. 異步調(diào)用,改用Spring提供的`@Aysnc`注解實現(xiàn),代替手寫線程池執(zhí)行。

  2. 在實際場景中,可能會遇到需要將主線程的一些個性化參數(shù)、變量、數(shù)據(jù)傳遞到子線程中使用的需求。

  3. `InheritableThreadLocal`可以解決子線程繼承父線程值的需求,但是它存在一些問題。

  1. `SessionUser.SESSION_USER`是中臺提供,無法修改。

  2. `InheritableThreadLocal`在線程池機制應(yīng)用中并不友好,不及時在子線程中清除的話,會造成線程安全問題。

  實現(xiàn)思路有兩種:

  1. 針對`ThreadLocal`進(jìn)行擴(kuò)展,并說服中臺統(tǒng)一改用擴(kuò)展后的`ThreadLocal`。

  2. 針對`@EnableAsync`和`@Async`注解進(jìn)行擴(kuò)展,將手動copy的代碼寫入到Spring代理類中。

  第一種要跟中臺打交道,就很煩,能夠天平自己獨立解決,就自己解決。第二種會是一個不錯的選擇,擴(kuò)展實現(xiàn)也并不困難。

  2. 擴(kuò)展實現(xiàn)

  2.1 擴(kuò)展Async注解的執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`

  類全名:`org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor`

  從調(diào)試記錄可以分析得出`AnnotationAsyncExecutionInterceptor#invoke`方法,正是創(chuàng)建異步任務(wù)并且執(zhí)行異步任務(wù)的核心代碼所在,我們要做的就是重寫這個方法,將父線程的運行參數(shù)手動copy到子線程任務(wù)體中。

1661406736004_1.jpg

  2.2 擴(kuò)展Async注解的Spring代理顧問`AsyncAnnotationAdvisor`

  我們依靠追蹤`AnnotationAsyncExecutionInterceptor`的構(gòu)造方法調(diào)用,定位到了它。

  全類名:`org.springframework.scheduling.annotation.AsyncAnnotationAdvisor`

  > 補充說明:代理顧問(`Advisor`)、建議(`Advice`)以及Spring代理實現(xiàn)原理

  >

  > Spring `@EnableAsync`默認(rèn)的代理模式是 JDK 代理,代理機制如下:

  >

  > Spring 一個 Bean 會在 `BeanPostProcessor#postProcessAfterInitialization()`這個生命周期環(huán)節(jié),遍歷所有的`BeanPostProcessor`實例,判斷Bean是否符合代理條件,如果符合代理條件,就給 Bean 代理對象中追加建議(`Advice`)對象,這樣就完成了代理。

  >

  > 而建議(`Advice`)對象是由顧問(`Advisor`)對象創(chuàng)建和提供。

  >

  > 上一小節(jié)提到的異步執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`就是實現(xiàn)了`Advice`接口的類。

  在`@Async`注解的代理過程中,異步執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`就是通過`AsyncAnnotationAdvisor#buildAdvice`方法創(chuàng)建的。

  所以,當(dāng)我們想要將擴(kuò)展的新的異步執(zhí)行攔截器`LibraAnnotationAsyncExecutionInterceptor`用起來,則需要相應(yīng)的,還要把`AsyncAnnotationAdvisor#buildAdvice`方法重寫。

  2.3 擴(kuò)展Async注解的 Spring Bean 后置處理器`AsyncAnnotationBeanPostProcessor`

  我們依靠追蹤`AsyncAnnotationAdvisor`的構(gòu)造方法調(diào)用,定位到了它。

  類全名:`org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor`

  這個沒什么好說的,Spring Bean 的生命周期其中一環(huán)。是 Spring Bean 實現(xiàn)代理的起點。

  開發(fā)人員可以自定義一個`BeanPostProcessor`類,把它注冊到 Bean 容器中,它就會自動生效,并將后續(xù)的每一個 Bean 實例進(jìn)行條件判斷以及進(jìn)行代理。

  我們要重寫的方法是:`AsyncAnnotationBeanPostProcessor#setBeanFactory`。這個方法構(gòu)造了異步代理顧問`AsyncAnnotationAdvisor`對象。

  2.4 擴(kuò)展代理異步配置類`ProxyAsyncConfiguration`

  `AsyncAnnotationBeanPostProcessor`不是一般的 Spring Bean。它有幾個限制,導(dǎo)致它不能直接通過`@Component`或者`@Configuration`來創(chuàng)建實例。

  1. `AsyncAnnotationBeanPostProcessor`僅僅是實現(xiàn)了基于 JDK 代理,如果開發(fā)決定另外一種(基于ASPECTJ編織),那么它就應(yīng)該受到某種條件判斷來進(jìn)行 Bean 實例化。

  2. `AsyncAnnotationBeanPostProcessor`還需要配置指定的線程池、排序等等屬性,所以無法直接使用`@Component`注解注冊為 Bean。

  我們閱讀一下`@EnableAsync`注解源碼:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
    Class<? extends Annotation> annotation() default Annotation.class;
    boolean proxyTargetClass() default false;
    AdviceMode mode() default AdviceMode.PROXY;
    int order() default Ordered.LOWEST_PRECEDENCE;
}
```

  進(jìn)一步閱讀`AsyncConfigurationSelector`的源碼:

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


    /**
   * 分別為EnableAsync.mode()的PROXY和ASPECTJ值返回{@link ProxyAsyncConfiguration}或{@code AspectJAsyncConfiguration} 。
     */
    @Override
    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                return new String[] {ProxyAsyncConfiguration.class.getName()};
            case ASPECTJ:
                return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
            default:
                return null;
        }
    }

}
```

  謎底揭曉,`ProxyAsyncConfiguration`原來是在這里開始注冊到 Spring 容器中的。

  Spring Boot 啟動后,會根據(jù)`@EnableAsync`注解的`mode()`方法的具體值,來決定整個Spring的 Bean 代理機制。

  既然 Spring 代理機制只會有一種,所以,也就只會在兩種機制的配置類中選擇其中一個來進(jìn)行實例化。

  而默認(rèn)`EnableAsync$mode()`默認(rèn)值是`AdviceMode.PROXY`,所以默認(rèn)采用 JDK 代理機制。

  2.5 擴(kuò)展異步代理配置選擇器`AsyncConfigurationSelector`

  類全名:`org.springframework.scheduling.annotation.AsyncConfigurationSelector`

  2.6 擴(kuò)展異步啟動注解`@EnableAsync`

  類全名:`org.springframework.scheduling.annotation.EnableAsync`

  3. 額外擴(kuò)展:給`@Async`注解代理指定線程池

  `@Async`會自動根據(jù)類型`TaskExecutor.class`從 Spring Bean 容器中找一個已經(jīng)實例化的異步任務(wù)執(zhí)行器(線程池)。如果找不到,則另尋他路,嘗試從 Spring Bean 容器中查找名稱為`taskExecutor`的`Executor.class`實例。最后都還是未找到呢,就默認(rèn)自動`new`一個`SimpleAsyncTaskExecutor`來用。

  > 補充說明:`TaskExecutor.class`是Spring定義的,而`Executor.class`JDK定義的。

  場景:其他小伙伴、或者舊代碼已經(jīng)實現(xiàn)過了一個線程池,但是這個線程池,是個`Executor.class`類型,且 Bean 實例名稱不是`taskExecutor`(假設(shè)是`libraThreadPool`),正常情況下`@Async`根本無法找到它。

  需求:通過配置,將`@Async`的默認(rèn)線程池,指定為名為`libraThreadPool`的`Executor.class`類型線程池。

  我們只需要注冊一個實現(xiàn)`AsyncConfigurer`接口的配置類

  `org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers`:

/**
     * Collect any {@link AsyncConfigurer} beans through autowiring.
     */
    @Autowired(required = false)
    void setConfigurers(Collection<AsyncConfigurer> configurers) {
        if (CollectionUtils.isEmpty(configurers)) {
            return;
        }
        if (configurers.size() > 1) {
            throw new IllegalStateException("Only one AsyncConfigurer may exist");
        }
        AsyncConfigurer configurer = configurers.iterator().next();
        this.executor = configurer::getAsyncExecutor;
        this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
    }
```


分享到:
在線咨詢 我要報名
和我們在線交談!