Java分布式锁的三种实现方案

网友投稿 208 2023-06-17


Java分布式锁的三种实现方案

方案一:数据库乐观锁

乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0。

异常实现流程

-- 可能会发生的异常情况

-- 线程1查询,当前left_count为1,则有记录

select * from t_bonus where id = 10001 and left_count > 0

-- 线程2查询,当前left_count为1,也有记录

select * from t_bonus where id = 10001 and left_count > 0

-- 线程1完成领取记录,修改left_count为0,

update t_bonus set left_count = left_count - 1 where id = 10001

-- 线程2完成领取记录,修改left_count为-1,产生脏数据

update t_bonus set left_count = left_count - 1 where id = 10001

通过乐观锁实现

-- 添加版本号控制字段

ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus;

-- 线程1查询,当前left_count为1,则有记录,当前版本号为1234

select left_count, version from t_bonus where id = 10001 and left_count > 0

-- 线程2查询,当前left_count为1,有记录,当前版本号为1234

select left_count, version from t_bonus where id = 10001 and left_count > 0

-- 线程1,更新完成后当前的version为1235,update状态为1,更新成功

update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

-- 线程2,更新由于当前的version为1235,udpate状态为0,更新失败,再针对相关业务做异常处理

update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

方案二:基于Redis的分布式锁

SETNX命令(SET if Not eXists)\

语法:SETNX key value\

功能:原子性操作,当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。\

Expire命令\

语法:expire(key, expireTime)\

功能:key设置过期时间\

GETSET命令\

语法:GETSET key value\

功能:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。\

GET命令\

语法:GET key\

功能:返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。\

DEL命令\

语法:DEL key [KEY …]\

功能:删除给定的一个或多个 key ,不存在的 key 会被忽略。

第一种:使用redis的setnx()、expire()方法,用于分布式锁

setnx(lockkey, 1) 如果返回0,则说明占位失败;如果返回1,则说明占位成功

expire()命令对lockkey设置超时时间,为的是避免死锁问题。

执行完业务代码后,可以通过delete命令删除key。

这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题

第二种:使用redis的setnx()、get()、getset()方法,用于分布式锁,解决死锁问题

setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。

get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。

计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。

判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。

在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。

import cn.com.tpig.cache.redis.RedisService;

import cn.com.tpig.utils.SpringUtils;

/**

* Created by IDEA

* User: shma1664

* Date: 2016-08-16 14:01

* Desc: redis分布式锁

*/

public final class RedisLockUtil {

private static final int defaultExpire = 60;

private RedisLockUtil() {

//

}

/**

* 加锁

* @param key redis key

* @param expire 过期时间,单位秒

* @return true:加锁成功,false,加锁失败

*/

public static boolean lock(String key, int expire) {

RedisService redisService = SpringUtils.getBean(RedisService.class);

long status = redisService.setnx(key, "1");

if(status == 1) {

redisService.expire(key, expire);

return true;

}

return false;

}

public static boolean lock(String key) {

return lock2(key, defaultExpire);

}

/**

* 加锁

* @param key redis key

* @param expire 过期时间,单位秒

* @return true:加锁成功,false,加锁失败

*/

public static boolean lock2(String key, int expire) {

RedisService redisService = SpringUtils.getBean(RedisService.class);

long value = System.currentTimeMillis() + expire;

long status = redisService.setnx(key, String.valueOf(value));

if(status == 1) {

return true;

}

long oldExpireTime = Long.parseLong(redisService.get(key, "0"));

if(oldExpireTime < System.currentTimeMillis()) {

//超时

long newExpireTime = System.currentTimeMillis() + expire;

long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime)));

if(currentExpireTime == oldExpireTime) {

return true;

}

}

return false;

}

public static void unLock1(String key) {

RedisService redisService = SpringUtils.getBean(RedisService.class);

redisService.del(key);

}

public static void unLock2(String key) {

RedisService redisService = SpringUtils.getBean(RedisService.class);

long oldExpireTime = Long.parseLong(redisService.get(key, "0"));

if(oldExpireTime > System.currentTimeMillis()) {

redisService.del(key);

}

}

}

public void drawRedPacket(long userId) {

String key = "draw.redpacket.userid:" + userId;

boolean lock = RedisLockUtil.lock2(key, 60);

if(lock) {

try {

//领取操作

} finally {

//释放锁

RedisLockUtil.unLock(key);

}

} else {

new RuntimeException("重复领取奖励");

}

}

Spring AOP基于注解方式和SpEL实现开箱即用的redis分布式锁策略

import java.lang.annotation.ElementType;

import java.lang.annotation.Retention;

import java.lang.annotation.RetentionPolicy;

import java.lang.annotation.Target;

/**

* RUNTIME

* 定义注解

* 编译器将把注释记录在类文件中,在运行时 VM 将保留注释,因此可以反射性地读取。

* @author shma1664

*

*/

@Retention(RetentionPolicy.RUNTIME)

@Target(ElementType.METHOD)

public @interface RedisLockable {

String[] key() default "";

long expiration() default 60;

}

http://

import javax.annotation.Resource;

import java.lang.reflect.Method;

import com.autohome.api.dealer.util.cache.RedisClient;

import com.google.common.base.Joiner;

import org.aspectj.lang.ProceedingJoinPoint;

import org.aspectj.lang.Signature;

import org.aspectj.lang.annotation.Around;

import org.aspectj.lang.annotation.Aspect;

import org.aspectj.lang.annotation.Pointcut;

import org.aspectj.lang.reflect.MethodSignature;

import org.springframework.expression.EvaluationContext;

import org.springframework.expression.Expression;

import org.springframework.expression.ExpressionParser;

import org.springframework.expression.spel.standard.SpelExpressionParser;

import org.springframework.expression.spel.support.StandardEvaluationContext;

import org.springframework.stereotype.Component;

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-09-28 18:08

* Desc:

*/

@Aspect

@Component

public class RedisLockAop {

@Resource

private RedisClient redisClient;

@Pointcut("execution(* com.autohome.api.dealer.tuan.service.*.*(..))")

public void pointcut(){}

@Around("pointcut()")

public Object doAround(ProceedingJoinPoint point) throws Throwable{

Signature signature = point.getSignature();

MethodSignature methodSignature = (MethodSignature) signature;

Method method = methodSignature.getMethod();

String targetName = point.getTarget().getClass().getName();

String methodName = point.getSignature().getName();

Object[] arguments = point.getArgs();

if (method != null && method.isAnnotationPresent(RedisLockable.class)) {

RedisLockable redisLock = method.getAnnotation(RedisLockable.class);

long expire = redisLock.expiration();

String redisKey = getLockKey(targetName, methodName, redisLock.key(), arguments);

boolean isLock = RedisLockUtil.lock2(redisKey, expire);

if(!isLock) {

try {

return point.proceed();

} finally {

unLock2(redisKey);

}

} else {

throw new RuntimeException("您的操作太频繁,请稍后再试");

}

}

return point.proceed();

}

private String getLockKey(String targetName, String methodName, String[] keys, Object[] arguments) {

StringBuilder sb = new StringBuilder();

sb.append("lock.").append(targetName).append(".").append(methodName);

if(keys != null) {

String keyStr = Joiner.on(".").skipNulls().join(keys);

String[] parameters = ReflectParamNames.getNames(targetName, methodName);

ExpressionParser parser = new SpelExpressionParser();

Expression expression = parser.parseExpression(keyStr);

EvaluationContext context = new StandardEvaluationContext();

int length = parameters.length;

if (length > 0) {

for (int i = 0; i < length; i++) {

context.setVariable(parameters[i], arguments[i]);

}

}

String keysValue = expression.getValue(context, String.class);

sb.append("#").append(keysValue);

}

return sb.toString();

}

org.javassist

javassist

3.18.1-GA

import javassist.*;

import javassist.bytecode.CodeAttribute;

import javassist.bytecode.LocalVariableAttribute;

import javassist.bytecode.MethodInfo;

import org.apache.log4j.Logger;

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-09-28 18:39

* Desc:

*/

public class ReflectParamNames {

private static Logger log = Logger.getLogger(ReflectParamNames.class);

private static ClassPool pool = ClassPool.getDefault();

static{

ClassClassPath classPath = new ClassClassPath(ReflectParamNames.class);

pool.insertClassPath(classPath);

}

public static String[] getNames(String className,String methodName) {

CtClass cc = null;

try {

cc = pool.get(className);

CtMethod cm = cc.getDeclaredMethod(methodName);

// 使用javaassist的反射方法获取方法的参数名

MethodInfo methodInfo = cm.getMethodInfo();

CodeAttribute codeAttribute = methodInfo.getCodeAttribute();

LocalVariableAttribute attr = (LocalVariableAttribute) codeAttribute.getAttribute(LocalVariableAttribute.tag);

if (attr == null) return new String[0];

int begin = 0;

String[] paramNames = new String[cm.getParameterTypes().length];

int count = 0;

int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1;

for (int i = 0; i < attr.tableLength(); i++){

// 为什么 加这个判断,发现在windows 跟linux执行时,参数顺序不一致,通过观察,实际的参数是从this后面开始的

if (attr.variableName(i).equals("this")){

begin = i;

break;

}

}

for (int i = begin+1; i <= begin+paramNames.length; i++){

paramNames[count] = attr.variableName(i);

count++;

}

return paramNames;

} catch (Exception e) {

e.printStackTrace();

}finally{

try {

if(cc != null) cc.detach();

} catch (Exception e2) {

log.error(e2.getMessage());

}

}

return new String[0];

}

}

在需要使用分布式锁的地方添加注解

/**

* 抽奖接口

* 添加redis分布式锁保证一个订单只有一个请求处理,防止用户刷礼物,支持SpEL表达式

* redisLockKey:lock.com.autohome.api.dealer.tuan.service.impl.drawBonus#orderId

* @param orderId 订单id

* @return 抽中的奖品信息

*/

@RedisLockable(key = {"#orderId"}, expiration = 120)

@Override

public BonusConvertBean drawBonus(Integer orderId) throws BonusException{

// 业务逻辑

}

第三种方案:基于Zookeeper的分布式锁

利用节点名称的唯一性来实现独占锁

ZooKeeper机制规定同一个目录下只能有一个唯一的文件名,zookeeper上的一个znode看作是一把锁,通过createznode的方式来实现。所有客户端都去创建/lock/${lock_name}_lock节点,最终成功创建的那个客户端也即拥有了这把锁,创建失败的可以选择监听继续等待,还是放弃抛出异常实现独占锁。

package com.shma.example.zookeeper.lock;

import java.io.IOException;

import java.util.ArrayList;

import java.util.Collections;

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-09-30 16:09

* Desc:

*/

public class ZookeeperLock implements Lock, Watcher {

private ZooKeeper zk;

private String root = "/locks";//根

private String lockName;//竞争资源的标志

private String myZnode;//当前锁

private int sessionTimeout = 30000;

private List exception = new ArrayList();

/**

* 创建分布式锁,使用前请确认config配置的zookeeper服务可用

* @param config 127.0.0.1:2181

* @param lockName 竞争资源标志,lockName中不能包含单词lock

*/

public ZookeeperLock(String config, String lockName){

this.lockName = lockName;

// 创建一个与服务器的连接

try {

zk = new ZooKeeper(config, sessionTimeout, this);

Stat stat = zk.exists(root, false);

if(stat == null){

// 创建根节点

zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

}

} catch (IOException e) {

exception.add(e);

} catch (KeeperException e) {

exception.add(e);

} catch (InterruptedException e) {

exception.add(e);

}

}

@Override

public void lock() {

if(exception.size() > 0){

throw new LockException(exception.get(0));

}

if(!tryLock()) {

throw new LockException("您的操作太频繁,请稍后再试");

}

}

@Override

public void lockInterruptibly() throws InterruptedException {

this.lock();

}

@Override

public boolean tryLock() {

try {

myZnode = zk.create(root + "/" + lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

return true;

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

return false;

}

@Override

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

return tryLock();

}

@Override

public void unlock() {

try {

zk.delete(myZnode, -1);

myZnode = null;

zk.close();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (KeeperException e) {

e.printStackTrace();

}

}

@Override

public Condition newCondition() {

return null;

}

@Override

public void process(WatchedEvent watchedEvent) {

//

}

}

ZookeeperLock lock = null;

try {

lock = new ZookeeperLock("127.0.0.1:2182","test1");

lock.lock();

//业务逻辑处理

} catch (LockException e) {

throw e;

} finally {

if(lock != null)

lock.unlock();

}

利用临时顺序节点控制时序实现

/lock已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,和选master一样,编号最小的获得锁,用完删除,依次方便。\

算法思路:对于加锁操作,可以让所有客户端都去/lock目录下创建临时顺序节点,如果创建的客户端发现自身创建节点序列号是/lock/目录下最小的节点,则获得锁。否则,监视比自己创建节点的序列号小的节点(比自己创建的节点小的最大节点),进入等待。

对于解锁操作,只需要将自身创建的节点删除即可。

package com.shma.example.zookeeper.lock;

import java.io.IOException;

import java.util.ArrayList;

import java.util.Collections;

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-09-30 16:09

* Desc:

*/

public class DistributedLock implements Lock, Watcher{

private ZooKeeper zk;

private String root = "/locks";//根

private String lockName;//竞争资源的标志

private String waitNode;//等待前一个锁

private String myZnode;//当前锁

private CountDownLatch latch;//计数器

private int sessionTimeout = 30000;

private List exception = new ArrayList();

/**

* 创建分布式锁,使用前请确认config配置的zookeeper服务可用

* @param config 127.0.0.1:2181

* @param lockName 竞争资源标志,lockName中不能包含单词lock

*/

public DistributedLock(String config, String lockName){

this.lockName = lockName;

// 创建一个与服务器的连接

try {

zk = new ZooKeeper(config, sessionTimeout, this);

Stat stat = zk.exists(root, false);

if(stat == null){

// 创建根节点

zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

}

} catch (IOException e) {

exception.add(e);

} catch (KeeperException e) {

exception.add(e);

} catch (InterruptedException e) {

exception.add(e);

}

}

/**

* zookeeper节点的监视器

*/

public void process(WatchedEvent event) {

if(this.latch != null) {

this.latch.countDown();

}

}

public void lock() {

if(exception.size() > 0){

throw new LockException(exception.get(0));

}

try {

if(this.tryLock()){

System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");

return;

}

else{

waitForLock(waitNode, sessionTimeout);//等待锁

}

} catch (KeeperException e) {

throw new LockException(e);

} catch (InterruptedException e) {

throw new LockException(e);

}

}

public boolean tryLock() {

try {

String splitStr = "_lock_";

if(lockName.contains(splitStr))

throw new LockException("lockName can not contains \\u000B");

//创建临时子节点

myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println(myZnode + " is created ");

//取出所有子节点

List subNodes = zk.getChildren(root, false);

//取出所有lockName的锁

List lockObjNodes = new ArrayList();

for (String node : subNodes) {

String _node = node.split(splitStr)[0];

if(_node.equals(lockName)){

lockObjNodes.add(node);

}

}

Collections.sort(lockObjNodes);

System.out.println(myZnode + "==" + lockObjNodes.getwOERnFBUo(0));

if(myZnode.equals(root+"/"+lockObjNodes.get(0))){

//如果是最小的节点,则表示取得锁

return true;

}

//如果不是最小的节点,找到比自己小1的节点

String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);

waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);

} catch (KeeperException e) {

throw new LockException(e);

} catch (InterruptedException e) {

throw new LockException(e);

}

return false;

}

public boolean tryLock(long time, TimeUnit unit) {

try {

if(this.tryLock()){

return true;

}

return waitForLock(waitNode,time);

} catch (Exception e) {

e.printStackTrace();

}

return false;

}

private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {

Stat stat = zk.exists(root + "/" + lower,true);

//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听

if(stat != null){

System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);

this.latch = new CountDownLatch(1);

this.latch.await(waitTime, TimeUnit.MILLISECONDS);

this.latch = null;

}

return true;

}

public void unlock() {

try {

System.out.println("unlock " + myZnode);

zk.delete(myZnode,-1);

myZnode = null;

zk.close();

} catch (InterruptedExceptionhttp:// e) {

e.printStackTrace();

} catch (KeeperException e) {

e.printStackTrace();

}

}

public void lockInterruptibly() throws InterruptedException {

this.lock();

}

public Condition newCondition() {

return null;

}

public class LockException extends RuntimeException {

private static final long serialVersionUID = 1L;

public LockException(String e){

super(e);

}

public LockException(Exception e){

super(e);

}

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring+MyBatis实现数据库读写分离方案
下一篇:详解微信小程序入门五: wxml文件引用、模版、生命周期
相关文章

 发表评论

暂时没有评论,来抢沙发吧~