Java redisTemplate阻塞式处理消息队列

网友投稿 559 2022-09-11


Java redisTemplate阻塞式处理消息队列

目录Redis 消息队列redis五种数据结构队列生产者队列消费者测试类并发情况下使用increment递增补充

Redis 消息队列

redis五种数据结构

队列生产者

package cn.stylefeng.guns.knowledge.modular.knowledge.schedule;

import lombok.extern.slf4j.Slf4j;

import org.springframework.data.redis.core.RedisTemplate;

import java.util.Random;

import java.util.UUID;

/**

*

* 队列生产者

*

*

* @SINCE 2021/11/30 21:03

* @AUTHOR dispark

* @Date: 2021/11/30 21:03

*/

@Slf4j

public class QueueProducer implements Runnable {

/**

* 生产者队列 key

*/

public static final String QUEUE_PRODUCTER = "queue-producter";

private RedisTemplate redisTemplate;

public QueueProducer(RedisTemplate redisTemplate) {

this.redisTemplate = redisTemplate;

}

@Override

public void run() {

Random random = new Random();

while (true) {

try {

Thread.sleep(random.nextInt(600) + 600);

// 1.模拟生成一个任务

UUID queueProducerId = UUID.randomUUID();

// 2.将任务插入任务队列:queue-producter

redisTemplate.opsForList().leftPush(QUEUE_PRODUCTER, queueProducerId.toString());

log.info("生产一条数据 >>> {}", queueProducerId.toString());

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

队列消费者

package cn.stylefeng.guns.knowledge.modular.knowledge.schedule;

import lombok.extern.slf4j.Slf4j;

import org.springframework.data.redis.core.RedisTemplate;

import java.util.Random;

/**

*

* 队列消费者

*

*

* @SINCE 2021/11/30 21:14

* @AUTHOR dispark

* @Date: 2021/11/30 21:14

*/

@Slf4j

public class QueueConsumer implements Runnable {

public static final String QUEUE_PRODUCTER = "queue-producter";

public static final String TMP_QUEUE = "tmp-queue";

private RedisTemplate redisTemplate;

public QueueConsumer(RedisTemplate redisTemplate) {

this.redisTemplate = redisTemplate;

}

/**

* 功能描述: 取值 - - 推荐使用

*

* @author dispark

* @date 2021/11/30 21:17

*/

@Override

public void run() {

Random random = new Random();

while (true) {

// 1.从任务队列"queue-producter"中获取一个任务,并将该任务放入暂存队列"tmp-queue"

LojltBFNVng ququeConsumerId = redisTemplate.opsForList().rightPush(QUEUE_PRODUCTER, TMP_QUEUE);

// 2.处理任务----纯属业务逻辑,模拟一下:睡觉

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

// 3.模拟成功和失败的偶然现象,模拟失败的情况,概率为2/13

if (random.nextInt(13) % 7 == 0) {

// 4.将本次处理失败的任务从暂存队列"tmp-queue"中,弹回任务队列"queue-producter"

redisTemplate.opsForList().rightPush(TMP_QUEUE, QUEUE_PRODUCTER);

log.info(ququeConsumerId + "处理失败,被弹回任务队列");

} else {

// 5. 模拟成功的情况,将本次任务从暂存队列"tmp-queue"中清除

redisTemplate.opsForList().rightPop(TMP_QUEUE);

log.info(ququeConsumerId + "处理成功,被清除");

}

}

}

}

测试类

@Test

public void QueueThreadTotalEntry() throws Exception {

// 1.启动一个生产者线程,模拟任务的产生

new Thread(new QueueProducer(redisTemplate)).start();

Thread.sleep(15000);

// 2.启动一个线程者线程,模拟任务的处理

new Thread(new QueueConsumer(redisTemplate)).start();

// 3.主线程

Thread.sleep(Long.MAX_VALUE);

}

并发情况下使用increment递增

线程一:

Long increment = redisTemplate.opsForValue().increment("increment", 1L);

log.info("队列消费者 >> increment递增: {}", increment);

线程二:

Long increment = redisTemplate.opsForValue().increment("increment", 1L);

log.info("生产者队列 >> increment递增: {}", increment);

补充

redisTemplate处理/获取redis消息队列

(参考代码)

/**

* redis消息队列

*/

@Component

public class RedisQueue {

@Autowired

private RedisTemplate redisTemplate;

/** ---------------------------------- redis消息队列 ---------------------------------- */

/**

* 存值

* @param key 键

* @param value 值

* @return

*/

public boolean lpush(String key, Object value) {

try {

redisTemplate.opsForList().leftPush(key, value);

return true;

} catch (Exception e) {

e.printStackTrace();

return false;

}

}

/**

* 取值 -

* @param key 键

* @return

*/

public Object rpop(String key) {

try {

return redisTemplate.opsForList().rightPop(key);

} catch (Exception e) {

e.printStackTrace();

return null;

}

}

/**

* 取值 - <brpop:阻塞式> - 推荐使用

* @param key 键

* @param timeout 超时时间

* @param timeUnit 给定单元粒度的时间段

* TimeUnit.DAYS //天

* TimeUnit.HOURS //小时

* TimeUnit.MINUTES //分钟

* TimeUnit.SECONDS //秒

* TimeUnit.MILLISECONDS //毫秒

* @return

*/

public Object brpop(String key, long timeout, TimeUnit timeUnit) {

try {

return redisTemplate.opsForList().rightPop(key, timeout, timeUnit);

} catch (Exception e) {

e.printStackTrace();

return null;

}

}

/**

* 查看值

* @param key 键

* @param start 开始

* @param end 结束 0 到 -1代表所有值

* @return

*/

public List lrange(String key, long start, long end) {

try {

return redisTemplate.opsForList().range(key, start, end);

} catch (Exception e) {

e.printStackTrace();

return null;

}

}

}

以上就是Java redisTemplate阻塞式处理消息队列的详细内容,更多关于Java redisTemplate 处理消息队列的资料请关注我们其它相关文章!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:网络状态码302和303的区别究竟是啥?面试官最满意这种回答(网络代码302)
下一篇:IRF链路聚合(链路聚合bfd)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~