java基于mongodb实现分布式锁的示例代码

网友投稿 245 2022-10-08


java基于mongodb实现分布式锁的示例代码

目录原理 实现 使用

原理

通过线程安全findAndModify 实现锁

实现

定义锁存储对象:

/**

* mongodb 分布式锁

*/

@Data

@NoArgsConstructor

@AllArgsConstructor

@Document(collection = "distributed-lock-doc")

public class LockDocument {

@Id

private String id;

private long expireAt;

private String token;

}

定义Lock API:

public interface LockService {

String acquire(String key, long expiration);

boolean release(String key, String token);

boolean refresh(String key, String token, long expiration);

}

获取锁:

@Override

public String acquire(String key, long expiration) {

Query query = Query.query(Criteria.where("_id").is(key));

String token = this.generateToken();

Update update = new Update()

.setOnInsert("_id", key)

.setOnInsert("expireAt", System.currentTimeMillis() + expiration)

.setOnInsert("token", token);

FindAndModifyOptions options = new FindAndModifyOptions().upsert(true)

.returnNew(true);

LockDocument doc = mongoTemplate.findAndModify(query, update, options,

LockDocument.class);

boolean locked = doc.getToken() != null && doc.getToken().equals(token);

// 如果已过期

if (!locked && doc.getExpireAt() < System.currentTimeMillis()) {

DeleteResult deleted = this.mongoTemplate.remove(

Query.query(Criteria.where("_id").is(key)

.and("token").is(doc.getToken())

http:// .and("expireAt").is(doc.getExpireAt())),

LockDocument.class);

if (deleted.getDeletedCount() >= 1) {

// 成功释放锁, 再次尝试获取锁

return this.acquire(key, expiration);

}

}

log.debug("Tried to acquire lock for key {} with token {} . Locked: {}",

key, token, locked);

return locked ? token : null;

}http://

原理:

先尝试upsert锁对象,如果成功且token一致,说明拿到锁

否则加锁失败

如果未拿到锁,但是锁已过期,尝试删除锁

如果删除成功,再次尝试拿锁

如果失败,说明锁可能已经续期了

释放和续期锁:

@Override

public boolean release(String key, String token) {

Query query = Query.query(Criteria.where("_id").is(key)

.and("token").is(token));

DeleteResult deleted = mongoTemplate.remove(query, LockDocument.class);

boolean released = deleted.getDeletedCount() == 1;

if (released) {

log.debug("Remove query successfully affected 1 record for key {} with token {}",

key, token);

} else if (deleted.getDeletedCount() > 0) {

log.error("Unexpected result from release for key {} with token {}, released {}",

key, token, deleted);

} else {

log.error("Remove query did not affect any records for key {} with token {}",

key, token);

}

return released;

}

@Override

public boolean refresh(String key, String token,

long expiration) {

Query query = Query.query(Criteria.where("_id").is(key)

.and("token").is(token));

Update update = Update.update("expireAt",

System.currentTimeMillis() + expiration);

UpdateResult updated =

mongoTemplate.updateFirst(query, update, LockDocument.class);

final boolean refreshed = updated.getModifiedCount() == 1;

if (refreshed) {

log.debug("Refresh query successfully affected 1 record for key {} " +

"with token {}", key, token);

} else if (updated.getModifiedCount() > 0) {

log.error("Unexpected result from refresh for key {} with token {}, " +

"released {}", key, token, updated);

} else {

log.warn("Refresh query did nozgqSfjTUt affect any records for key {} with token {}. " +

"This is possible when refresh interval fires for the final time " +

"after the lock has been released",

key, token);

}

return refreshed;

}

使用

private LockService lockService;

private void tryAcquireLockAndSchedule() {

while (!this.stopSchedule) {

// 尝试拿锁

this.token = this.lockService.acquire(SCHEDULER_LOCK, 20000);

if (this.token != null) {

// 拿到锁

} else {

// 等待LOCK_EXPIRATION, 再次尝试

Thread.sleep(LOCK_EXPIRATION);

}

}

}

先尝试拿锁,如果获取到token,说明拿锁成功

否则可以sleep一段时间后再拿锁

完整代码,可到github查看 https://github.com/jadepeng/docker-pipeline/blob/main/pipeline-master/src/main/java/com/github/jadepeng/pipeline/service/impl/MongoLockService.java


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SSG550防火墙调试(juniper ssg5防火墙配置)
下一篇:Android安全指南 之 Xposed Hook实战(android 安全)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~