基于ZooKeeper实现队列源码

网友投稿 251 2023-04-02


基于ZooKeeper实现队列源码

实现原理

先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建PERSISTENT_EQUENTIAL节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下Zookeeper的znode用于消息存储,znode存储的数据就是消息队列中的消息内容,SEQUENTIAL序列号就是消息的编号,按序取出即可。由于创建的节点是持久化的,所以不必担心队列消息的丢失问题。

队列(Queue)

分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队cNLOkJDPZ列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。

应用场景

Zookeeper队列不太适合要求高性能的场合,但可以在数据量不大的情况下考虑使用。比如已在项目中使用Zookeeper又需要小规模的队列应用,这时可以使用Zookeeper实现的队列;毕竟引进一个消息中间件会增加系统的复杂性和运维的压力。

详细代码

ZookeeperClient工具类

package org.massive.common;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

/**

* Created by Massive on 2016/12/18.

*/

public class ZooKeeperClient {

private static String connectionString = "localhost:2181";

private static int sessionTimeout = 10000;

public static ZooKeeper getInstance() throws IOException, InterruptedException {

//--------------------------------------------------------------

// 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)

// 这里等Zookeeper的连接完成才返回实例

//--------------------------------------------------------------

final CountDownLatch connectedSignal = new CountDownLatch(1);

ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {

@Override

public void process(WatchedEvent event) {

if (event.getState() == Event.KeeperState.SyncConnected) {

connectedSignal.countDown();

} else if (event.getState() == Event.KeeperState.Expired) {

}

}

});

connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);

return zk;

}

public static int getSessionTimeout() {

return sessionTimeout;

}

public static void setSessionTimeout(int sessionTimeout) {

ZooKeeperClient.sessionTimeout = sessionTimeout;

}

}

ZooKeeperQueue

package org.massive.queue;

import org.apache.commons.lang3.RandomUtils;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

import org.massive.common.ZooKeeperClient;

import java.io.IOException;

import java.io.UnsupportedEncodingException;

import java.util.List;

import java.util.SortedSet;

import java.util.TreeSet;

/**

* Created by Allen on 2016/12/22.

*/

public class ZooKeeperQueue {

private ZooKeeper zk;

private int sessionTimeout;

private static byte[] ROOT_QUEUE_DATA = {0x12,0x34};

private static String QUEUE_ROOT = "/QUEUE";

private String queueName;

private String queuePath;

private Object mutex = new Object();

public ZooKeeperQueue(String queueName) throws IOException, KeeperException, InterruptedException {

this.queueName = queueName;

this.queuePath = QcNLOkJDPZUEUE_ROOT + "/" + queueName;

this.zk = ZooKeeperClient.getInstance();

this.sessionTimeout = zk.getSessionTimeout();

//----------------------------------------------------

// 确保队列根目录/QUEUE和当前队列的目录的存在

//----------------------------------------------------

ensureExists(QUEUE_ROOT);

ensureExists(queuePath);

}

public byte[] consume() throws InterruptedException, KeeperException, UnsupportedEncodingException {

List nodes = null;

byte[] returnVal = null;

Stat stat = null;

do {

synchronized (mutex) {

nodes = zk.getChildren(queuePath, new ProduceWatcher());

//----------------------------------------------------

// 如果没有消息节点,等待生产者的通知

//----------------------------------------------------

if (nodes == null || nodes.size() == 0) {

mutex.wait();

} else {

SortedSet sortedNode = new TreeSet();

for (String node : nodes) {

sortedNode.add(queuePath + "/" + node);

}

//----------------------------------------------------

// 消费队列里序列号最小的消息

//----------------------------------------------------

String first = sortedNode.first();

returnVal = zk.getData(first, false, stat);

zk.delete(first, -1);

System.out.print(Thread.currentThread().getName() + " ");

System.out.print("consume a message from queue:" + first);

System.out.println(", message data is: " + new String(returnVal,"UTF-8"));

return returnVal;

}

}

} while (true);

}

class ProduceWatcher implements Watcher {

@Override

public void process(WatchedEvent event) {

//----------------------------------------------------

// 生产一条消息成功后通知一个等待线程

//----------------------------------------------------

synchronized (mutex) {

mutex.notify();

}

}

}

public void produce(byte[] data) throws KeeperException, InterruptedException, UnsupportedEncodingException {

//----------------------------------------------------

// 确保当前队列目录存在

// example: /QUEUE/queueName

//----------------------------------------------------

ensureExists(queuePath);

String node = zk.create(queuePath + "/", data,

ZooDefs.Ids.OPEN_ACL_UNSAFE,

CreateMode.PERSISTENT_SEQUENTIAL);

System.out.print(Thread.currentThread().getName() + " ");

System.out.print("produce a message to queue:" + node);

System.out.println(" , message data is: " + new String(data,"UTF-8"));

}

public void ensureExists(String path) {

try {

Stat stat = zk.exists(path, false);

if (stat == null) {

zk.create(path, ROOT_QUEUE_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

}

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

String queueName = "test";

final ZooKeeperQueue queue = new ZooKeeperQueue(queueName);

for (int i = 0; i < 10; i++) {

new Thread(new Runnable() {

@Override

public void run() {

try {

queue.consume();

System.out.println("--------------------------------------------------------");

System.out.println();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (KeeperException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

}).start();

}

new Thread(new Runnable() {

@Override

public void run() {

for (int i = 0; i < 10; i++) {

try {

Thread.sleep(RandomUtils.nextInt(100 * i, 200 * i));

queue.produce(("massive" + i).getBytes());

} catch (InterruptedException e) {

e.printStackTrace();

} catch (KeeperException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

}

},"Produce-thread").start();

}

}

测试

运行main方法,本机器的某次输出结果

Produce-thread produce a message to queue:/QUEUE/test/0000000000 , message data is: massive0

Thread-8 consume a message from queue:/QUEUE/test/0000000000, message data is: massive0

--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000001 , message data is: massive1

Thread-6 consume a message from queue:/QUEUE/test/0000000001, message data is: massive1

--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000002 , message data is: massive2

Thread-3 consume a message from queue:/QUEUE/test/0000000002, message data is: massive2

--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000003 , message data is: massive3

Thread-0 consume a message from queue:/QUEUE/test/0000000003, message data is: massive3

--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000004 , message data is: massive4

Thread-5 consume a message from queue:/QUEUE/test/0000000004, message data is: massive4

--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000005 , message data is: massive5

Thread-2 consume a message from queue:/QUEUE/test/0000000005, message data is: massive5

--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000006 , message data is: massive6

Thread-4 consume a message from queue:/QUEUE/test/0000000006, message data is: massive6

--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000007 , message data is: massive7

Thread-9 consume a message from queue:/QUEUE/test/0000000007, message data is: massive7

--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000008 , message data is: massive8

Thread-7 consume a message from queue:/QUEUE/test/0000000008, message data is: massive8

--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000009 , message data is: massive9

Thread-1 consume a message from queue:/QUEUE/test/0000000009, message data is: massive9

总结

以上就是本文有关于队列和基于ZooKeeper实现队列源码介绍的全部内容,希望对大家有所帮助。

感谢朋友们对本站的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:JavaWeb项目中dll文件动态加载方法解析(详细步骤)
下一篇:注册接口的测试用例设计(注册接口的测试用例设计要求)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~