简单注解实现集群同步锁(spring+redis+注解)

网友投稿 390 2023-06-17


简单注解实现集群同步锁(spring+redis+注解)

互联网面试的时候,是不是面试官常问一个问题如何保证集群环境下数据操作并发问题,常用的synchronized肯定是无法满足了,或许你可以借助for update对数据加锁。本文的最终解决方式你只要在方法上加一个@P4jsyn注解就能保证集群环境下同synchronized的效果,且锁的key可以任意指定。本注解还支持了锁的超时机制。

本文需要对Redis、spring和spring-data-redis有一定的了解。当然你可以借助本文的思路对通过注解对方法返回数据进行缓存,类似com.google.code.simple-spring-memcached的@ReadThroughSingleCache。

第一步:  介绍两个自定义注解P4jSyn、P4jSynKey

P4jSyn:必选项,标记在方法上,表示需要对该方法加集群同步锁;

P4jSynKey:可选项,加在方法参数上,表示以方法某个参数作为锁的key,用来保证更多的坑,P4jSynKey并不是强制要添加的,当没有P4jSynKey标记的情况下只会以P4jSyn的synKey作为锁key。

package com.yaoguoyin.redis.lock;

import java.lang.annotation.ElementType;

import java.lang.annotation.Inherited;

import java.lang.annotation.Retention;

import java.lang.annotation.RetentionPolicy;

import java.lang.annotation.Target;

/**

* 同步锁:

* 主要作用是在服务器集群环境下保证方法的synchronize;

* 标记在方法上,使该方法的执行具有互斥性,并不保证并发执行方法的先后顺序;

* 如果原有“A任务”获取锁后任务执行时间超过最大允许持锁时间,且锁被“B任务”获取到,在“B任务”成功货物锁会并不会终止“A任务”的执行;

*

* 注意:

* 使用过程中需要注意keepMills、toWait、sleepMills、maxSleepMills等参数的场景使用;

* 需要安装redis,并使用spring和spring-data-redis等,借助redis NX等方法实现。

*

* @see com.yaoguoyin.redis.lock.P4jSynKey

* @see com.yaoguoyin.redis.lock.RedisLockAspect

*

* @author partner4java

*

*/

@Target({ ElementType.METHOD })

@Retention(RetentionPolicy.RUNTIME)

@Inherited

public @interface P4jSyn {

/**

* 锁的key

* 如果想增加坑的个数添加非固定锁,可以在参数上添加@P4jSynKey注解,但是本参数是必写选项

* redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey

*

*/

String synKey();

/**

* 持锁时间,超时时间,持锁超过此时间自动丢弃锁

* 单位毫秒,默认20秒

* 如果为0表示永远不释放锁,在设置为0的情况下toWait为true是没有意义的

* 但是没有比较强的业务要求下,不建议设置为0

*/

long keepMills() default 20 * 1000;

/**

* 当获取锁失败,是继续等待还是放弃

* 默认为继续等待

*/

boolean toWait() default true;

/**

* 没有获取到锁的情况下且toWait()为继续等待,睡眠指定毫秒数继续获取锁,也就是轮训获取锁的时间

* 默认为10毫秒

*

* @return

*/

long sleepMills() default 10;

/**

* 锁获取超时时间:

* 没有获取到锁的情况下且toWait()为true继续等待,最大等待时间,如果超时抛出

* {@link java.util.concurrent.TimeoutException.TimeoutException}

* ,可捕获此异常做相应业务处理;

* 单位毫秒,默认一分钟,如果设置为0即为没有超时时间,一直获取下去;

*

* @return

*/

long maxSleepMills() default 60 * 1000;

}

package com.yaoguoyin.redis.lock;

import java.lang.annotation.ElementType;

import java.lang.annotation.Inherited;

import java.lang.annotation.Retention;

import java.lang.annotation.RetentionPolicy;

import java.lang.annotation.Target;

/**

* 同步锁 key

* 加在方法的参数上,指定的参数会作为锁的key的一部分

*

* @author partner4java

*

*/

@Target({ ElementType.PARAMETER })

@Retention(RetentionPolicy.RUNTIME)

@Inherited

public @interface P4jSynKey {

/**

* key的拼接顺序

*

* @return

*/

int index() default 0;

}

这里就不再对两个注解进行使用上的解释了,因为注释已经说明的很详细了。

使用示例:

package com.yaoguoyin.redis.lock;

import org.springframework.stereotype.Component;

@Component

public class SysTest {

private static int i = 0;

@P4jSyn(synKey = "12345")

public void add(@P4jSynKey(index = 1) String key, @P4jSynKey(index = 0) int key1) {

i++;

System.out.println("i=-===========" + i);

}

}

第二步:切面编程

在不影响原有代码的前提下,保证执行同步,目前最直接的方式就是使用切面编程

package com.yaoguoyin.redis.lock;

import java.lang.annotation.Annotation;

import java.lang.reflect.Method;

import java.util.SortedMap;

import java.util.TreeMap;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.TimeoutException;

import org.aspectj.lang.ProceedingJoinPoint;

import org.aspectj.lang.annotation.Around;

import org.aspectj.lang.annotation.Aspect;

import org.aspectj.lang.reflect.MethodSignature;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.data.redis.core.BoundValueOperations;

import org.springframework.data.redis.core.RedisTemplate;

/**

* 锁的切面编程

* 针对添加@RedisLock 注解的方法进行加锁

*

* @see com.yaoguoyin.redis.lock.P4jSyn

*

* @author partner4java

*

*/

@Aspect

public class RedisLockAspect {

@Autowired

@Qualifier("redisTemplate")

private RedisTeVBtmccmplate redisTemplate;

@Around("execution(* com.yaoguoyin..*(..)) && @annotation(com.yaoguoyin.redis.lock.P4jSyn)")

public Object lock(ProceedingJoinPoint pjp) throws Throwable {

P4jSyn lockInfo = getLockInfo(pjp);

if (lockInfo == null) {

throw new IllegalArgumentException("配置参数错误");

}

String synKey = getSynKey(pjp, lockInfo.synKey());

if (synKey == null || "".equals(synKey)) {

throw new IllegalArgumentException("配置参数synKey错误");

}

boolean lock = false;

Object obj = null;

try {

// 超时时间

long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills();

while (!lock) {

long keepMills = System.currentTimeMillis() + lockInfo.keepMills();

lock = setIfAbsent(synKey, keepMills);

// 得到锁,没有人加过相同的锁

if (lock) {

obj = pjp.proceed();

}

// 锁设置了没有超时时间

else if (lockInfo.keepMills() <= 0) {

// 继续等待获取锁

if (lockInfo.toWait()) {

// 如果超过最大等待时间抛出异常

if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) {

throw new TimeoutException("获取锁资源等待超时");

}

TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());

} else {

break;

}

}

// 已过期,并且getAndSet后旧的时间戳依然是过期的,可以认为获取到了锁

else if (System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills))) {

lock = true;

obj = pjp.proceed();

}

// 没有得到任何锁

else {

// 继续等待获取锁

if (lockInfo.toWait()) {

// 如果超过最大等待时间抛出异常

if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) {

throw new TimeoutException("获取锁资源等待超时");

}

TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());

}

// 放弃等待

else {

break;

}

}

}

} catch (Exception e) {

e.printStackTrace();

throw e;

} finally {

// 如果获取到了锁,释放锁

if (lock) {

releaseLock(synKey);

}

}

return obj;

}

/**

* 获取包括方法参数上的key

* redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey

*

*/

private String getSynKey(ProceedingJoinPoint pjp, String synKey) {

try {

synKey = "RedisSyn+" + synKey;

Object[] args = pjp.getArgs();

if (args != null && args.length > 0) {

MethodSignature methodSignature = (MethodSignature) pjp.getSignature();

Annotation[][] paramAnnotationArrays = methodSignature.getMethod().getParameterAnnotations();

SortedMap keys = new TreeMap();

for (int ix = 0; ix < paramAnnotationArrays.length; ix++) {

P4jSynKey p4jSynKey = getAnnotation(P4jSynKey.class, paramAnnotationArrays[ix]);

if (p4jSynKey != null) {

Object arg = args[ix];

if (arg != null) {

keys.put(p4jSynKey.index(), arg.toString());

}

}

}

if (keys != null && keys.size() > 0) {

for (String key : keys.values()) {

synKey = synKey + key;

}

}

}

return synKey;

} catch (Exception e) {

e.printStackTrace();

}

return null;

}

@SuppressWarnings("unchecked")

private static T getAnnotation(final Class annotationClass, final Annotation[] annotations) {

if (annotations != null && annotations.length > 0) {

for (final Annotation annotation : annotations) {

if (annotationClass.equals(annotation.annotationType())) {

return (T) annotation;

}

}

}

return null;

}

/**

* 获取RedisLock注解信息

*/

private P4jSyn getLockInfo(ProceedingJoinPoint pjp) {

try {

MethodSignature methodSignature = (MethodSignature) pjp.getSignature();

Method method = methodSignature.getMethod();

P4jSyn lockInfo = method.getAnnotation(P4jSyn.class);

return lockInfo;

} catch (Exception e) {

e.printStackTrace();

}

return null;

}

public BoundValueOperations getOperations(String key) {

return redisTemplate.boundValueOps(key);

}

/**

* Set {@code value} for {@code key}, only if {@code key} does not exist.

*

* See http://redis.io/commands/setnx

*

* @param key

* must not be {@literal null}.

* @param value

* must not be {@literal null}.

* @return

*/

public boolean setIfAbsent(String key, Long value) {

return getOperations(key).setIfAbsent(value);

}

public long getLock(String key) {

Long time = getOperations(key).get();

if (time == null) {

return 0;

}

return time;

}

public long getSet(String key, Long value) {

Long time = getOperations(key).getAndSet(value);

if (time == null) {

return 0;

}

return time;

}

public void releaseLock(String key) {

redisTemplate.delete(key);

}

}

RedisLockAspect会对添加注解的方法进行特殊处理,具体可看lock方法。

大致思路就是:

1、首选借助redis本身支持对应的setIfAbsent方法,该方法的特点是如果redis中已有该数据不保存返回false,不存该数据保存返回true;

2、如果setIfAbsent返回true标识拿到同步锁,可进行操作,操作后并释放锁;

3、如果没有通过setIfAbsent拿到数据,判断是否对锁设置了超时机制,没有设置判断是否需要继续等待;

4、判断是否锁已经过期,需要对(System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills)))进行细细的揣摩一下,getSet可能会改变了其他人拥有锁的超时时间,但是几乎可以忽略;

5、没有得到任何锁,判断继续等待还是退出。

第三步:spring的基本配置

#*****************jedis连接参数设置*********************#

#redis服务器ip #

redis.hostName=127.0.0.1

#redis服务器端口号#

redis.port=6379

#redis服务器外部访问密码

redis.password=XXXXXXXXXX

#************************jedis池参数设置*******************#

#jedis的最大分配对象#

jedis.pool.maxActive=1000

jedis.pool.minIdle=100

#jedis最大保存idel状态对象数 #

jedis.pool.maxIdle=1000

#jedis池没有对象返回时,最大等待时间 #

jedis.pool.maxWait=5000

#jedis调用borrowObject方法时,是否进行有效检查#

jedis.pool.testOnBorrow=true

#jedis调用returnObject方法时,是否进行有效检查 #

jedis.pool.testOnReturn=true

p:password="${redis.password}" p:pool-config-ref="poolConfig" />

redis的安装本文就不再说明。

测试

package com.yaoguoyin.redis;

import org.junit.runner.RunWith;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = { "classpath:META-INF/spring/redis.xml" })

public class BaseTest extends AbstractJUnit4SpringContextTests {

}

package com.yaoguoyin.redis.lock;

import java.util.concurrent.TimeUnit;

import org.junit.Test;

import org.springframework.beans.factory.annotation.Autowired;

import com.yaoguoyin.redis.BaseTest;

public class RedisTest extends BaseTest {

@Autowired

private SysTest sysTest;

@Test

public void testHello() throws InterruptedException {

for (int i = 0; i < 100; i++) {

new Thread(new Runnable() {

@Override

public void run() {

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

sysTest.add("xxxxx", 111111);

}

}).start();

}

TimeUnit.SECONDS.sleep(20);

}

@Test

public void testHello2() throws InterruptedException{

sysTest.add("xxxxx", 111111);

TimeUnit.SECONDS.sleep(10);

}

}

你可以对

void com.yaoguoyin.redis.lock.SysTest.add(@P4jSynKey(index=1) String key, @P4jSynKey(index=0) int key1)

去除注解@P4jSyn进行测试对比。

ps:本demo的执行性能取决于redis和Java交互距离;成千山万单锁并发建议不要使用这种形式,直接通过redis等解决,本demo只解决小并发不想耦合代码的形式。

p:password="${redis.password}" p:pool-config-ref="poolConfig" />

redis的安装本文就不再说明。

测试

package com.yaoguoyin.redis;

import org.junit.runner.RunWith;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = { "classpath:META-INF/spring/redis.xml" })

public class BaseTest extends AbstractJUnit4SpringContextTests {

}

package com.yaoguoyin.redis.lock;

import java.util.concurrent.TimeUnit;

import org.junit.Test;

import org.springframework.beans.factory.annotation.Autowired;

import com.yaoguoyin.redis.BaseTest;

public class RedisTest extends BaseTest {

@Autowired

private SysTest sysTest;

@Test

public void testHello() throws InterruptedException {

for (int i = 0; i < 100; i++) {

new Thread(new Runnable() {

@Override

public void run() {

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

sysTest.add("xxxxx", 111111);

}

}).start();

}

TimeUnit.SECONDS.sleep(20);

}

@Test

public void testHello2() throws InterruptedException{

sysTest.add("xxxxx", 111111);

TimeUnit.SECONDS.sleep(10);

}

}

你可以对

void com.yaoguoyin.redis.lock.SysTest.add(@P4jSynKey(index=1) String key, @P4jSynKey(index=0) int key1)

去除注解@P4jSyn进行测试对比。

ps:本demo的执行性能取决于redis和Java交互距离;成千山万单锁并发建议不要使用这种形式,直接通过redis等解决,本demo只解决小并发不想耦合代码的形式。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java校验银行卡是否正确的核心代码
下一篇:浅谈java Iterator.remove()方法的用法(详解)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~