Java利用Redis实现消息队列的示例代码

网友投稿 518 2023-04-25


Java利用Redis实现消息队列的示例代码

本文介绍了java利用Redis实现消息队列的示例代码,分享给大家,具体如下:

应用场景

为什么要用redis?

二进制存储、java序列化传输、IO连接数高、连接频繁

一、序列化

这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口;

其代码如下:

package Utils;

import java.io.*;

/**

* Created by Kinglf on 2016/10/17.

*/

public class ObjectUtil {

/**

* 对象转byte[]

* @param obj

* @return

* @throws IOException

*/

public static byte[] object2Bytes(Object obj) throws IOException{

ByteArrayOutputStream bo=new ByteArrayOutputStream();

ObjectOutputStream oo=new ObjectOutputStream(bo);

oo.writeObject(obj);

byte[] bytes=bo.toByteArray();

bo.close();

oo.close();

return bytes;

}

/**

* byte[]转对象

* @param bytes

* @return

* @throws Exception

*/

public static Object bytes2Object(byte[] bytes) throws Exception{

ByteArrayInputStream in=new ByteArrayInputStream(bytes);

ObjectInputStream sIn=new ObjectInputStream(in);

return sIn.readObject();

}

}

二、消息类(实现Serializable接口)

package Model;

import java.io.Serializable;

/**

* Created by Kinglf on 2016/10/17.

*/

public class Message implements Serializable {

private static final long serialVersionUID = -389326121047047723L;

private int id;

private String content;

public Message(int id, String content) {

this.id = id;

this.content = content;

}

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

publOKWoqMqrNic String getContent() {

return content;

}

public void setContent(String content) {

this.content = content;

}

}

三、Redis的操作

利用redis做队列,我们采用的是redis中list的push和pop操作;

结合队列的特点:

只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则 Redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而Redis中list药push或 pop的对象仅需要转换成byte[]即可

java采用Jedis进行Redis的存储和Redis的连接池设置

上代码:

package Utils;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

import java.util.List;

import java.util.Map;

import java.util.Set;

/**

* Created by Kinglf on 2016/10/17.

*/

public class JedisUtil {

private static String JEDIS_IP;

private static int JEDIS_PORT;

private static String JEDIS_PASSWORD;

private static JedisPool jedisPool;

static {

//Configuration自行写的配置文件解析类,继承自Properties

Configuration conf=Configuration.getInstance();

JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");

JEDIS_PORT=conf.getInt("jedis.port",6379);

JEDIS_PASSWORD=conf.getString("jedis.password",null);

JedisPoolConfig config=new JedisPoolConfig();

config.setMaxActive(5000);

config.setMaxIdle(256);

config.setMaxWait(5000L);

config.setTestOnBorrow(true);

config.setTestOnReturn(true);

config.setTestWhileIdle(true);

config.setMinEvictableIdleTimeMillis(60000L);

config.setTimeBetweenEvictionRunsMillis(3000L);

config.setNumTestsPerEvictionRun(-1);

jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);

}

/**

* 获取数据

* @param key

* @return

*/

public static String get(String key){

String value=null;

Jedis jedis=null;

try{

jedis=jedisPool.getResource();

value=jedis.get(key);

}catch (Exception e){

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

}finally {

close(jedis);

}

return value;

}

private static void close(Jedis jedis) {

try{

jedisPool.returnResource(jedis);

}catch (Exception e){

if(jedis.isConnected()){

jedis.quit();

jedis.disconnect();

}

}

}

public static byte[] get(byte[] key){

byte[] value = null;

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

value = jedis.get(key);

} catch (Exception e) {

//释放redis对象

jedisPool.retuOKWoqMqrNrnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

return value;

}

public static void set(byte[] key, byte[] value) {

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.set(key, value);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

}

public static void set(byte[] key, byte[] value, int time) {

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.set(key, valueOKWoqMqrN);

jedis.expire(key, time);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

}

public static void hset(byte[] key, byte[] field, byte[] value) {

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.hset(key, field, value);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

}

public static void hset(String key, String field, String value) {

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.hset(key, field, value);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

}

/**

* 获取数据

*

* @param key

* @return

*/

public static String hget(String key, String field) {

String value = null;

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

value = jedis.hget(key, field);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

return value;

}

/**

* 获取数据

*

* @param key

* @return

*/

public static byte[] hget(byte[] key, byte[] field) {

byte[] value = null;

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

value = jedis.hget(key, field);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

return value;

}

public static void hdel(byte[] key, byte[] field) {

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.hdel(key, field);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

}

/**

* 存储REDIS队列 顺序存储

* @param key reids键名

* @param value 键值

*/

public static void lpush(byte[] key, byte[] value) {

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.lpush(key, value);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

}

/**

* 存储REDIS队列 反向存储

* @param key reids键名

* @param value 键值

*/

public static void rpush(byte[] key, byte[] value) {

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.rpush(key, value);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

}

/**

* 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端

* @param key reids键名

* @param destination 键值

*/

public static void rpoplpush(byte[] key, byte[] destination) {

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.rpoplpush(key, destination);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

}

/**

* 获取队列数据

* @param key 键名

* @return

*/

public static List lpopList(byte[] key) {

List list = null;

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

list = jedis.lrange(key, 0, -1);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

return list;

}

/**

* 获取队列数据

* @param key 键名

* @return

*/

public static byte[] rpop(byte[] key) {

byte[] bytes = null;

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

bytes = jedis.rpop(key);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

return bytes;

}

public static void hmset(Object key, Map hash) {

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.hmset(key.toString(), hash);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

}

public static void hmset(Object key, Map hash, int time) {

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.hmset(key.toString(), hash);

jedis.expire(key.toString(), time);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} fhttp://inally {

//返还到连接池

close(jedis);

}

}

public static List hmget(Object key, String... fields) {

List result = null;

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

result = jedis.hmget(key.toString(), fields);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

return result;

}

public static Set hkeys(String key) {

Set result = null;

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

result = jedis.hkeys(key);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

return result;

}

public static List lrange(byte[] key, int from, int to) {

List result = null;

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

result = jedis.lrange(key, from, to);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

return result;

}

public static Map hgetAll(byte[] key) {

Map result = null;

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

result = jedis.hgetAll(key);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

return result;

}

public static void del(byte[] key) {

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.del(key);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

}

public static long llen(byte[] key) {

long len = 0;

Jedis jedis = null;

try {

jedis = jedisPool.getResource();

jedis.llen(key);

} catch (Exception e) {

//释放redis对象

jedisPool.returnBrokenResource(jedis);

e.printStackTrace();

} finally {

//返还到连接池

close(jedis);

}

return len;

}

}

四、Configuration主要用于读取Redis的配置信息

package Utils;

import java.io.IOException;

import java.io.InputStream;

import java.util.Properties;

/**

* Created by Kinglf on 2016/10/17.

*/

public class Configuration extends Properties {

private static final long serialVersionUID = -2296275030489943706L;

private static Configuration instance = null;

public static synchronized Configuration getInstance() {

if (instance == null) {

instance = new Configuration();

}

return instance;

}

public String getProperty(String key, String defaultValue) {

String val = getProperty(key);

return (val == null || val.isEmpty()) ? defaultValue : val;

}

public String getString(String name, String defaultValue) {

return this.getProperty(name, defaultValue);

}

public int getInt(String name, int defaultValue) {

String val = this.getProperty(name);

return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);

}

public long getLong(String name, long defaultValue) {

String val = this.getProperty(name);

return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);

}

public float getFloat(String name, float defaultValue) {

String val = this.getProperty(name);

return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);

}

public double getDouble(String name, double defaultValue) {

String val = this.getProperty(name);

return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);

}

public byte getByte(String name, byte defaultValue) {

String val = this.getProperty(name);

return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);

}

public Configuration() {

InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");

try {

this.loadFromXML(in);

in.close();

} catch (IOException ioe) {

}

}

}

五、测试

import Model.Message;

import Utils.JedisUtil;

import Utils.ObjectUtil;

import redis.clients.jedis.Jedis;

import java.io.IOException;

/**

* Created by Kinglf on 2016/10/17.

*/

public class TestRedisQueue {

public static byte[] redisKey = "key".getBytes();

static {

try {

init();

} catch (IOException e) {

e.printStackTrace();

}

}

private static void init() throws IOException {

for (int i = 0; i < 1000000; i++) {

Message message = new Message(i, "这是第" + i + "个内容");

JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));

}

}

public static void main(String[] args) {

try {

pop();

} catch (Exception e) {

e.printStackTrace();

}

}

private static void pop() throws Exception {

byte[] bytes = JedisUtil.rpop(redisKey);

Message msg = (Message) ObjectUtil.bytes2Object(bytes);

if (msg != null) {

System.out.println(msg.getId() + "----" + msg.getContent());

}

}

}

每执行一次pop()方法,结果如下:


1----这是第1个内容


2----这是第2个内容


3----这是第3个内容


4----这是第4个内容

总结

至此,整个Redis消息队列的生产者和消费者代码已经完成

1.Message 需要传送的实体类(需实现Serializable接口)

2.Configuration Redis的配置读取类,继承自Properties

3.ObjectUtil 将对象和byte数组双向转换的工具类

4.Jedis 通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Vue列表页渲染优化详解
下一篇:在React中如何优雅的处理事件响应详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~