基于Zookeeper实现分布式锁详解

网友投稿 223 2022-09-09


基于Zookeeper实现分布式锁详解

目录1、什么是Zookeeper?2、Zookeeper节点类型3、Zookeeper环境搭建4、Zookeeper基本使用5、Zookeeper应用场景6、Zookeeper分布式锁7、公平式Zookeeper分布式锁8、zookeeper和Redis锁对比?

1、什么是Zookeeper?

Zookeeper是一个分布式的,开源的分布式应用程序协调服务,是Hadoop和hbase的重要组件。

引用官网的图例:

特征:

zookeeper的数据机构是一种节点树的数据结构,zNode是基本的单位,znode是一种和unix文件系统相似的节点,可以往这个节点存储或向这个节点获取数据

通过客户端可以对znode进行数据操作,还可以注册watcher监控znode的改变

2、Zookeeper节点类型

持久节点(Persistent)

持久顺序节点(Persistent_Sequential)

临时节点(Ephemeral)

临时顺序节点(Ephemeral_Sequential)

3、Zookeeper环境搭建

下载zookeeper,官网链接,https://zookeeper.apache.org/releases.html#download,去官网找到对应的软件下载到本地

修改配置文件,${ZOOKEEPER_HOME}\conf,找到zoo_sample.cfg文件,先备份一份,另外一份修改为zoo.cfg

解压后点击zkServer.cmd运行服务端:

4、Zookeeper基本使用

在cmd窗口或者直接在idea编辑器里的terminal输入命令:

zkCli.cmd -server 127.0.0.1:2181

输入命令help查看帮助信息:

ZooKeeper -server host:port -client-configuration properties-file cmd args

addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE

addauth scheme auth

close

config [-c] [-w] [-s]

connect host:port

create [-s] [-e] [-c] [-t ttl] path [data] [acl]

delete [-v version] path

deleteall path [-b batch size]

delquota [-n|-b|-N|-B] path

get [-s] [-w] path

getAcl [-s] path

getAllChildrenNumber path

getEphemerals path

history

listquota path

ls [-s] [-w] [-R] path

printwatches on|off

quit

reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]

redo cmdno

removewatches path [-c|-d|-a] [-l]

set [-s] [-v version] path data

setAcl [-s] [-v version] [-R] path acl

setquota -n|-b|-N|-B val path

stat [-w] path

sync path

version

whoami

create [-s] [-e] [-c] [-t ttl] path [data] [acl],-s表示顺序节点,-e表示临时节点,若不指定表示持久节点,acl是来进行权限控制的

[zk: 127.0.0.1:2181(CONNECTED) 1] create -s /zk-test 0

Created /zk-test0000000000

查看

[zk: 127.0.0.1:2181(CONNECTED) 4] ls /

[zk-test0000000000, zookeeper]

设置修改节点数据

set /zk-test 123

获取节点数据

get /zk-test

ps,zookeeper命令详情查看help帮助文档,也可以去官网看看文档

ok,然后java写个例子,进行watcher监听

package com.example.concurrent.zkSample;

import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

/**

*

* Zookeeper 例子

*

*

*

* @author mazq

* 修改记录

* 修改后版本: 修改人: 修改日期: 2021/12/09 16:57 修改内容:

*

*/

public class ZookeeperSample {

public static void main(String[] args) {

ZkClient client = new ZkClient("localhost:2181");

client.setZkSerializer(new MyZkSerializer());

client.subscribeDataChanges("/zk-test", new IZkDataListener() {

@Override

public void handleDataChange(String dataPath, Object data) throws Exception {

System.out.println("监听到节点数据改变!");

}

@Override

public void handleDataDeleted(String dataPath) throws Exception {

System.out.println("监听到节点数据被删除了");

}

});

try {

Thread.sleep(1000 * 60 * 2);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

5、Zookeeper应用场景

Zookeeper有什么典型的应用场景:

注册中心(Dubbo)

命名服务

Master选举

集群管理

分布式队列

分布式锁

6、Zookeeper分布式锁

Zookeeper适合用来做分布式锁,然后具体实现是利用什么原理?我们知道zookeeper是类似于unix的文件系统,文件系统我们也知道在一个文件夹下面,会有文件名称不能一致的特性的,也就是互斥的特性。同样zookeeper也有这个特性,在同个znode节点下面,子节点命名不能重复。所以利用这个特性可以来实现分布式锁

业务场景:在高并发的情况下面进行订单场景,这是一个典型的电商场景

自定义的Zookeeper序列化类:

package com.example.concurrent.zkSample;

import org.I0Itec.zkclient.exception.ZkMarshallingError;

import org.I0Itec.zkclient.serialize.ZkSerializer;

import java.io.UnsupportedEncodingException;

public class MyZkSerializer implements ZkSerializer {

private String charset = "UTF-8";

@Override

public byte[] serialize(Object o) throws ZkMarshallingError {

return String.valueOf(o).getBytes();

}

@Override

public Object deserialize(byte[] bytes) throws ZkMarshallingError {

try {

return new String(bytes , charset);

} catch (UnsupportedEncodingException e) {

throw new ZkMarshallingError();

}

}

}

订单编号生成器类,因为SimpleDateFormat是线程不安全的,所以还是要加上ThreadLocal

package com.example.concurrent.zkSample;

import java.text.DateFormat;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.concurrent.atomic.AtomicInteger;

public class OrderCodeGenerator {

private static final String DATE_FORMAT = "yyyyMMddHHmmss";

private static AtomicInteger ai = new AtomicInteger(0);

private static int i = 0;

private static ThreadLocal threadLocal = new ThreadLocal() {

@Override

protected SimpleDateFormat initialValue() {

return new SimpleDateFormat(DATE_FORMAT);

}

};

public static DateFormat getDateFormat() {

return (DateFormat) threadLocal.get();

}

public static String generatorOrderCode() {

try {

return getDateFormat().format(new Date(System.currentTimeMillis()))

+ i++;

} finally {

threadLocal.remove();

}

}

}

pom.xml加上zookeeper客户端的配置:

com.101tec

zkclient

0.10

实现一个zookeeper分布式锁,思路是获取节点,这个是多线程竞争的,能获取到锁,也就是创建节点成功,就执行业务,其它抢不到锁的线程,阻塞等待,注册watcher监听锁是否释放了,释放了,取消注册watcher,继续抢锁

package com.example.concurrent.zkSample;

import lombok.extern.slf4j.Slf4j;

import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

@Slf4j

public class ZKDistributeLock implements Lock {

private String localPath;

private ZkClient zkClient;

ZKDistributeLock(String localPath) {

super();

this.localPath = localPath;

zkClient = new ZkClient("localhost:2181");

zkClient.setZkSerializer(new MyZkSerializer());

}

@Override

public void lock() {

while (!tryLock()) {

waitForLock();

}

}

private void waitForLock() {

// 创建countdownLatch协同

CountDownLatch countDownLatch = new CountDownLatch(1);

// 注册watcher监听

IZkDataListener listener = new IZkDataListener() {

@Override

public void handleDataChange(String path, Object o) throws Exception {

//System.out.println("zookeeper data has change!!!");

}

@Override

public void handleDataDeleted(String s) throws Exception {

// System.out.println("zookeeper data has delete!!!");

// 监听到锁释放了,释放线程

countDownLatch.countDown();

}

};

zkClient.subscribeDataChanges(localPath , listener);

// 线程等待

if (zkClient.exists(localPath)) {

try {

countDownLatch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

// 取消注册

zkClient.unsubscribeDataChanges(localPath , listener);

}

@Override

public void unlock() {

zkClient.delete(localPath);

}

@Override

public boolean tryLock() {

try {

zkClient.createEphemeral(localPath);

} catch (ZkNodeExistsException e) {

return false;

}

return true;

}

@Override

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

return false;

}

@Override

public void lockInterruptibly() throws InterruptedException {

}

@Override

public Condition newCondition() {

return null;

}

}

订单服务api

package com.example.concurrent.zkSample;

public interface OrderService {

void createOrder();

}

订单服务实现类,加上zookeeper分布式锁

package com.example.concurrent.zkSample;

import java.util.concurrent.locks.Lock;

public class OrderServiceInvoker implements OrderService{

@Override

public void createOrder() {

Lock zkLock = new ZKDistributeLock("/zk-test");

//Lock zkLock = new ZKDistributeImproveLock("/zk-test");

String orderCode = null;

try {

zkLock.lock();

orderCode = OrderCodeGenerator.generatorOrderCode();

} finally {

zkLock.unlock();

}

System.out.println(String.format("thread name : %s , orderCode : %s" ,

Thread.currentThread().getName(),

orderCode));

}

}

因为搭建分布式环境比较繁琐,所以这里使用juc里的并发协同工具类,CyclicBarrier模拟多线程并发的场景,模拟分布式环境的高并发场景

package com.example.concurrent.zkSample;

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

public class ConcurrentDistributeTest {

public static void main(String[] args) {

// 多线程数

int threadSize = 30;

// 创建多线程循环屏障

CyclicBarrier cyclicBarrier = new CyclicBarrier(threadSize , ()->{

System.out.println("准备完成!");

}) ;

// 模拟分布式集群的场景

for (int i = 0 ; i < threadSize ; i ++) {

new Thread(()->{

OrderService orderService = new OrderServiceInvoker();

// 所有线程都等待

try {

cyclicBarrier.await();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (BrokenBarrierException e) {

e.printStackTrace();

}

// 模拟并发请求

orderService.createOrder();

}).start();

}

}

}

跑多几次,没有发现订单号重复的情况,分布式锁还是有点效果的

thread name : Thread-6 , orderCode : 202112100945110

thread name : Thread-1 , orderCode : 202112100945111

thread name : Thread-13 , orderCode : 202112100945112

thread name : Thread-11 , orderCode : 202112100945113

thread name : Thread-14 , orderCode : 202112100945114

thread name : Thread-0 , orderCode : 202112100945115

thread name : Thread-8 , orderCode : 202112100945116

thread name : Thread-17 , orderCode : 202112100945117

thread name : Thread-10 , orderCode : 202112100945118

thread name : Thread-5 , orderCode : 202112100945119

thread name : Thread-2 , orderCode : 2021121009451110

thread name : Thread-16 , orderCode : 2021121009451111

thread name : Thread-19 , orderCode : 2021121009451112

thread name : Thread-4 , orderCode : 2021121009451113

thread name : Thread-18 , orderCode : 2021121009451114

thread name : Thread-3 , orderCode : 2021121009451115

thread name : Thread-9 , orderCode : 2021121009451116

thread name : Thread-12 , orderCode : 2021121009451117

thread name : Thread-15 , orderCode : 2021121009451118

thread name : Thread-7 , orderCode : 2021121009451219

注释加锁的代码,再加大并发数,模拟一下

package com.example.concurrent.zkSample;

import java.util.concurrent.locks.Lock;

public class OrderServiceInvoker implements OrderService{

@Override

public void createOrder() {

//Lock zkLock = new ZKDistributeLock("/zk-test");

//Lock zkLock = new ZKDistributeImproveLock("/zk-test");

String orderCode = null;

try {

//zkLock.lock();

orderCode = OrderCodeGenerator.generatorOrderCode();

} finally {

//zkLock.unlock();

}

System.out.println(String.format("thread name : %s , orderCode : %s" ,

Thread.currentThread().getName(),

orderCode));

}

}

跑多几次,发现出现订单号重复的情况,所以分布式锁是可以保证分布式环境的线程安全的

7、公平式Zookeeper分布式锁

上面例子是一种非公平锁的方式,一旦监听到锁释放了,所有线程都会去抢锁,所以容易出现“惊群效应”:

巨大的服务器性能损耗

网络冲击

可能造成宕机

所以,需要改进分布式锁,改成一种GEEQefZS公平锁的模式

公平锁:多个线程按照申请锁的顺序去获取锁,线程会在队列里排队,按照顺序去获取锁。只有队列第1个线程才能获取到锁,获取到锁之后,其它线程都会阻塞等待,等到持有锁的线程释放锁,其它线程才会被唤醒。

非公平锁:多个线程都会去竞争获取锁,获取不到就进入队列等待,竞争得到就直接获取锁;然后持有锁的线程释放锁之后,所有等待的线程就都会去竞争锁。

流程图:

代码改进:

package com.example.concurrent.zkSample;

import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import java.util.Collections;

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

public class ZKDistributeImproveLock implements Lock {

private String localPath;

private ZkClient zkClient;

private String currentPath;

private String beforePath;

ZKDistributeImproveLock(String localPath) {

super();

this.localPath = localPath;

zkClient = new ZkClient("localhost:2181");

zkClient.setZkSerializer(new MyZkSerializer());

if (!zkClient.exists(localPath)) {

try {

this.zkClient.createPersistent(localPath);

} catch (ZkNodeExistsException e) {

}

}

}

@Override

public void lock() {

while (!tryLock()) {

waitForLock();

}

}

private void waitForLock() {

CountDownLatch countDownLatch = new CountDownLatch(1);

// 注册watcher

IZkDataListener listener = new IZkDataListener() {

@Override

public void handleDataChange(String dataPath, Object data) throws Exception {

}

@Override

public void handleDataDeleted(String dataPath) throws Exception {

// 监听到锁释放,唤醒线程

countDownLatch.countDown();

}

};

zkClient.subscribeDataChanges(beforePath, listener);

// 线程等待

if (zkClient.exists(beforePath)) {

try {

countDownLatch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

// 取消注册

zkClient.unsubscribeDataChanges(beforePath , listener);

}

@Override

public void unlock() {

zkClient.delete(this.currentPath);

}

@Override

public boolean tryLock() {

if (this.currentPath == null) {

currentPath = zkClient.createEphemeralSequential(localPath +"/" , "123");

}

// 获取Znode节点下面的所有子节点

List children = zkClient.getChildren(localPath);

// 列表排序

Collections.sort(children);

if (currentPath.equals(localPath + "/" + children.get(0))) { // 当前节点是第1个节点

return true;

} else {

//得到当前的索引号

int index = children.indexOf(currentPath.substring(localPath.length() + 1));

//取到前一个

beforePath = localPath + "/" + children.get(index - 1);

}

return false;

}

@Override

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

return false;

}

@Override

public void lockInterruptibly() throws InterruptedException {

}

@Override

public Condition newCondition() {

return null;

}

}

thread name : Thread-13 , orderCode : 202112100936140

thread name : Thread-3 , orderCode : 202112100936141

thread name : Thread-14 , orderCode : 202112100936142

thread name : Thread-16 , orderCode : 202112100936143

thread name : Thread-1 , orderCode : 202112100936144

thread name : Thread-9 , orderCode : 202112100936145

thread name : Thread-4 , orderCode : 202112100936146

thread name : Thread-5 , orderCode : 202112100936147

thread name : Thread-7 , orderCode : 202112100936148

thread name : Thread-2 , orderCode : 202112100936149

thread name : Thread-17 , orderCode : 2021121009361410

thread name : Thread-15 , orderCode : 2021121009361411

thread name : Thread-0 , orderCode : 2021121009361412

thread name : Thread-10 , orderCode : 2021121009361413

thread name : Thread-18 , orderCode : 2021121009361414

thread name : Thread-19 , orderCode : 2021121009361415

thread name : Thread-8 , orderCode : 2021121009361416

thread name : Thread-12 , orderCode : 2021121009361417

thread name : Thread-11 , orderCode : 2021121009361418

thread name : Thread-6 , orderCode : 2021121009361419

8、zookeeper和Redis锁对比?

Redis和Zookeeper都可以用来实现分布式锁,两者可以进行对比:

基于Redis实现分布式锁

实现比较复杂

存在死锁的可能

性能比较好,基于内存 ,而且保证的是高可用,redis优先保证的是AP(分布式CAP理论)

基于Zookeeper实现分布式锁

实现相对简单

可靠性高,因为zookeeper保证的是CP(分布式CAP理论)

性能相对较好 并发1~2万左右,并发太高,还是redis性能好

本博客代码可以在github找到下载链接


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:综合实验1(综合实验室英文)
下一篇:实验案例1(实验案例竞赛)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~