Spring @Async 的使用与实现的示例代码

网友投稿 245 2023-04-16


Spring @Async 的使用与实现的示例代码

首先Spring AOP有两个重要的基础接口,Advisor和PointcutAdvisor,接口声明如下:

Advisor接口声明:

public interface Advisor {

Advice getAdvice();

boolean isPerInstance();

}

PointcutAdvisor的接口声明:

public interface PointcutAdvisor extends Advisor {

/**

* Get the Pointcut that drives this advisor.

*/

Pointcut getPointcut();

}

PointcutAdvisor用来获取一个切点以及这个切点的处理器(Advise)。

@Async注解使用后置处理器BeanPostProcessor的子类AsyncAnnotationBeanPostProcessor来实现bean处理 :

AsyncAnnotationAdvisor继承了PointcutAdvisor接口。并且在AsyncAnnotationBeanPostProcessor实现了其父类接口的BeanFactoryAware中的setBeanFactory初始化。Spring一旦创建beanFactory回调成功,就会回调这个方法。保证Advisor对象最先被初始化。

@Override

public void setBeanFactory(BeanFactory beanFactory) {

super.setBeanFactory(beanFactory);

AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);

if (this.asyncAnnotationType != null) {

advisor.setAsyncAnnotationType(this.asyncAnnotationType);

}

advisor.setBeanFactory(beanFactory);

this.advisor = advisor;

}

}

具体的后置处理是通过AsyncAnnotationBeanPostProcessor的后置bean处理是通过其父类AbstractAdvisingBeanPostProcessor来实现的。AbstractAdvisingBeanPostProcessor提供的后置bean处理方法对所有的自定义注解的bean处理方法时通用的。其具体的代码如下:

@Override

public Object postProcessAfterInitialization(Object bean, String beanName) {

if (bean instanceof AopInfrastructureBean) {

// Ignore AOP infrastructure such as scoped proxies.

return bean;

}

/*

* bean对象如果是一个ProxyFactory对象。ProxyFactory继承了AdvisedSupport,而 AdvisedSupport又继承了Advised接口。这个时候就把不同的Advisor添加起来。

*

if (bean instanceof Advised) {

Advised advised = (Advised) bean;

if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {

// Add our local Advisor to the existing proxy's Advisor chain...

if (this.beforeExistingAdvisors) {

advised.addAdvisor(0, this.advisor);

}

else {

advised.addAdvisor(this.advisor);

}

return bean;

}

}

if (isEligible(bean, beanName)) {

ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);

if (!proxyFactory.isProxyTargetClass()) {

evaluateProxyInterfaces(bean.getClass(), proxyFactory);

}

proxyFactory.addAdvisor(this.advisor);

customizeProxyFactory(proxyFactory);

return proxyFactory.getProxy(getProxyClassLoader());

}

可以看得出来,isEligible用于判断这个类或者这个类中的某个方法是否含有注解。这个方法最终进入到AopUtils的canApply方法中间:

public static boolean canApply(Advisor advisor, Class> targetClass, boolean hasIntroductions) {

if (advisor instanceof IntPlXtOKmaroductionAdvisor) {

return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);

}

else if (advisor instanceof PointcutAdvisor) {

PointcutAdvisor pca = (PointcutAdvisor) advisor;

return canApply(pca.getPointcut(), targetClass, hasIntroductions);

}

else {

// It doesn't have a pointcut so we assume it applies.

return true;

}

}

这里的advisor就是AsyncAnnotationAdvisor对象。然后调用AsyncAnnotationAdvisor对象的getPointcut()方法,得到了Pointcut对象。在AOP规范中间,表示一个具体的切点。那么在方法上注释@Async注解,就意味着声明了一个切点。

然后再根据Pointcut判断是否含有指定的注解。

切点的执行

由于生成了JDK动态代理对象,那么每一个方法的执行必然进入到JdkDynamicAopProxy中的invoke方法中间去执行:

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

MethodInvocation invocation;

Object oldProxy = null;

boolean setProxyContext = false;

TargetSource targetSource = this.advised.targetSource;

Class> targetClass = null;

Object target = null;

try {

if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {

// The target does not implement the equals(Object) method itself.

return equals(args[0]);

}

else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {

// The target does not implement the hashCode() method itself.

return hashCode();

}

else if (method.getDeclaringClass() == DecoratingProxy.class) {

// There is only getDecoratedClass() declared -> dispatch to proxy config.

return AopProxyUtils.ultimateTargetClass(this.advised);

}

else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&

method.getDeclaringClass().isAssignableFrom(Advised.class)) {

// Service invocations on ProxyConfig with the proxy config...

return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);

}

Object retVal;

if (this.advised.exposeProxy) {

// Make invocation available if necessary.

oldProxy = AopContext.setCurrentProxy(proxy);

setProxyContext = true;

}

// May be null. Get as late as possible to minimize the time we "own" the target,

// in case it comes from a pool.

target = targetSource.getTarget();

if (target != null) {

targetClass = target.getClass();

}

// Get the interception chain for this method.

List chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

// Check whether we have any advice. If we don't, we can fallback on direct

// reflective invocation of the target, and avoid creating a MethodInvocation.

if (chain.isEmpty()) {

// We can skip creating a MethodInvocation: just invoke the target directly

// Note that the final invoker must be an InvokerInterceptor so we know it does

// nothing but a reflective operation on the target, and no hot swapping or fancy proxying.

Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);

retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);

}

else {

// We need to create a method invocation...

invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);

// Proceed to the joinpoint through the interceptor chain.

retVal = invocation.proceed();

}

// Massage return value if necessary.

Class> returnType = method.getReturnType();

if (retVal != null && retVal == target && returnType.isInstance(proxy) &&

!RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {

// Special case: it returned "this" and the return type of the method

// is type-compatible. Note that we can't help if the target sets

// a reference to itself in another returned object.

retVal = proxy;

}

else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {

throw new AopInvocationException(

"Null return value from advice does not match primitive return type for: " + method);

}

return retVal;

}

finally {

if (target != null && !targetSource.isStatic()) {

// Must have come from TargetSource.

targetSource.releaseTarget(target);

}

if (setProxyContext) {

// Restore old proxy.

AopContext.setCurrentProxy(oldProxy);

}

}

}

重点的执行语句:

// 获取拦截器

List chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

// Check whether we have any advice. If we don't, we can fallback on direct

// reflective invocation of the target, and avoid creating a MethodInvocation.

if (chain.isEmpty()) {

// We can skip creating a MethodInvocation: just invoke the target directly

// Note that the final invoker must be an InvokerInterceptor so we know it does

// nothing but a reflective operation on the target, and no hot swapping or fancy proxying.

Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);

retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);

}

else {

// 根据拦截器来执行

invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);

// Proceed to the joinpoint through the interceptor chain.

retVal = invocation.proceed();

}

@Async注解的拦截器是AsyncExecutionInterceptor,它继承了MethodInterceptor接口。而MethodInterceptor就是AOP规范中的Advice(切点的处理器)。

自定义注解

由于其bean处理器是通用的,所以只要实现PointcutAdvisor和具体的处理器就好了。首先自定义一个注解,只要方法加入了这个注解,就可以输出这个方法的开始时间和截止时间,注解的名字叫做@Log:

@Target({ElementType.METHOD, ElementType.TYPE})

@Retention(RetentionPolicy.RUNTIME)

@Documented

public @interface Log {

}

定义一个简单的方法用于测试:

public interface IDemoService {

void add(int a, int b);

String getName();

}

@Service

public class DemoServiceImpl implements IDemoService {

@Log

public void add(int a, int b) {

System.out.println(Thread.currentThread().getName());

System.out.println(a + b);

}

@Override

public String getName() {

System.out.println("DemoServiceImpl.getName");

return "DemoServiceImpl";

}

}

定义Advisor:

public class LogAnnotationAdvisor extends AbstractPointcutAdvisor {

private Advice advice;

private Pointcut pointcut;

public LogAnnotationAdvisor() {

this.advice = new LogAnnotationInterceptor();

}

@Override

public Advice getAdvice() {

return this.advice;

}

@Override

public boolean isPerInstance() {

return false;

}

@Override

public Pointcut getPointcut() {

return this.pointcut;

}

public void setAsyncAnnotationType(Class extends Annotation> asyncAnnotationType) {

Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");

Set> asyncAnnotationTypes = new HashSet>();

asyncAnnotationTypes.add(asyncAnnotationType);

this.pointcut = buildPointcut(asyncAnnotationTypes);

}

protected Pointcut buildPointcut(Set> asyncAnnotationTypes) {

ComposablePointcut result = null;

for (Class extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {

Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);

Pointcut mpc = AnnotationMatchingPointcut.forMethodAnnotation(asyncAnnotationType);

if (result == null) {

result = new ComposablePointcut(cpc).union(mpc);

} else {

result.union(cpc).union(mpc);

}

}

return result;

}

}

定义具体的处理器:

public class LogAnnotationInterceptor implements MethodInterceptor, Ordered {

@Override

public int getOrder() {

return Ordered.HIGHEST_PRECEDENCE;

}

@Override

public Object invoke(MethodInvocation invocation) throws Throwable {

System.out.println("开始执行");

Object result = invocation.proceed();

System.out.println("结束执行");

return result;

}

}

定义@Log专属的BeanPostProcesser对象:

@SuppressWarnings("serial")

@Service

public class LogAnnotationBeanPostProcesser extends AbstractBeanFactoryAwareAdvisingPostProcessor {

@Override

public void setBeanFactory(BeanFactory beanFactory) {

super.setBeanFactory(beanFactory);

LogAnnotationAdvisor advisor = new LogAnnotationAdvisor();

advisor.setAsyncAnnotationType(Log.class);

this.advisor PlXtOKma= advisor;

}

}

对bean的后置处理方法直接沿用其父类的方法。当然也可以自定义其后置处理方法,那么就需要自己判断这个对象的方法是否含有注解,并且生成代理对象:

@Override

public Object postProcessAfterInitialization(Object bean, String beanName) {

Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());

for (Method method : methods) {

if (method.isAnnotationPresent(Log.class)) {

ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);

System.out.println(proxyFactory);

if (!proxyFactory.isProxyTargetClass()) {

evaluateProxyInterfaces(bean.getClass(), proxyFactory);

}

proxyFactory.addAdvisor(this.advisor);

customizeProxyFactory(proxyFactory);

return proxyFactory.getProxy(getProxyClassLoader());

}

}

return bean;

}

测试注解是否是正常运行的:

public class Main {

public static void main(String[] args) {

@SuppressWarnings("resource")

ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application-context.xml");

IDemoService demoService = context.getBean(IDemoService.class);

demoService.add(1, 2);

demoService.getName();

//// AsyncAnnotationAdvisor

// AsyncAnnotationBeanPostProcessor

}

}

输出:

开始执行

main

3

结束执行

DemoServiceImpl.getName

功能一切正常。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:谈谈对Java多态性的一点理解
下一篇:Java核心编程之文件随机读写类RandomAccessFile详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~