Spring Boot教程之利用ActiveMQ实现延迟消息

网友投稿 276 2022-12-22


Spring Boot教程之利用ActiveMQ实现延迟消息

一、安装activeMQ

linux环境ActiveMQ部署方法:https://jb51.net/article/162320.htm

安装步骤参照上面这篇文章,本文不做介绍

Windows下安装ActiveMQ:

到官网(http://activemq.apache.org/download-archives.html)下载最新发布的压缩包(我下的是5.15.9)到本地后解压(我解压到D盘Dev目录下)即可。进入解压后的bin目录,我是64位机器,再进入win64目录后,双击activemq.bat启动:

wrapper | --> Wrapper Started as Console

wrapper | Launching a JVM...

jvm 1 | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.org

jvm 1 | Copyright 1999-2006 Tanuki Software, Inc. All Rights Reserved.

jvm 1 |

jvm 1 | java Runtime: Oracle Corporation 1.8.0_181 C:\Program Files\Java\jre1.8.0_181

jvm 1 | Heap sizes: current=125952k free=115299k max=932352k

jvm 1 | JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=mChNCWMZ2FoXhZ9g -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=3500 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1

jvm 1 | Extensions classpath:

jvm 1 | [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra]

jvm 1 | ACTIVEMQ_HOME: ..\..

jvm 1 | ACTIVEMQ_BASE: ..\..

jvm 1 | ACTIVEMQ_CONF: ..\..\conf

jvm 1 | ACTIVEMQ_DATA: ..\..\data

jvm 1 | Loading message broker from: xbean:activemq.xml

jvm 1 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@f0ef68d: startup date [Fri May 24 15:16:21 CST 2019]; root of context hierarchy

jvm 1 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb]

jvm 1 | INFO | PListStore:[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\localhost\tmp_storage] started

jvm 1 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:wulf00-51057-1558682182909-0:1) is starting

jvm 1 | INFO | Listening for connections at: tcp://wulf00:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600

jvm 1 | INFO | Connector openwire started

jvm 1 | INFO | Listening for connections at: amqp://wulf00:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600

jvm 1 | INFO | Connector amqp started

jvm 1 | INFO | Listening for connections at: stomp://wulf00:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600

jvm 1 | INFO | Connector stomp started

jvm 1 | INFO | Listening for connections at: mqtt://wulf00:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600

jvm 1 | INFO | Connector mqtt started

jvm 1 | INFO | Starting Jetty server

jvm 1 | INFO | Creating Jetty connector

jvm 1 | WARN | ServletContext@o.e.j.s.ServletContextHandler@17bc7c8a{/,null,STARTING} has uncovered http methods for path: /

jvm 1 | INFO | Listening for connections at ws://wulf00:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600

jvm 1 | INFO | Connector ws started

jvm 1 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:wulf00-51057-1558682182909-0:1) started

jvm 1 | INFO | For help or more information please see: http://activemq.apache.org

jvm 1 | WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb only has 92649 mb of usable space. - resetting to maximum available disk space: 92649 mb

jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath

jvm 1 | INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/

jvm 1 | INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/

jvm 1 | INFO | Initializing Spring FrameworkServlet 'dispatcher'

jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath

jvm 1 | INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml

默认端口8161,访问下http://localhost:8161/admin,用户名密码都是admin,进入控制台页面:

我们用坐上方的Queues来创建一个叫vboxlog的队列:

二、修改activeMQ配置文件

broker新增配置信息 schedulerSupport="true"

三、创建SpringBoot工程

1、配置ActiveMQ工厂信息,信任包必须配置否则会报错

package com.example.demoactivemq.config;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.apache.activemq.RedeliveryPolicy;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;

import java.util.List;

/**

* @author shanks on 2019-11-12

*/

@Configuration

public class ActiveMqConfig {

@Bean

public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);

// 设置信任序列化包集合

List models = new ArrayList<>();

models.add("com.example.demoactivemq.domain");

factory.setTrustedPackages(models);

return factory;

}

}

消息实体类

package com.example.demoactivemq.domain;

import lombok.Builder;

import lombok.Data;

import java.io.Serializable;

/**

* @author shanks on 2019-11-12

*/

@Builder

@Data

public class MessageModel implements Serializable {

private String titile;

private String message;

}

生产者

package com.example.demoactivemq.producer;

import lombok.extern.slf4j.Slf4j;

import org.apache.activemq.ScheduledMessage;

import org.apache.activemq.command.ActiveMQQueue;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.autoconfigure.jms.JmsProperties;

import org.springframework.jms.core.JmsMessagingTemplate;

import org.springframework.stereotype.Service;

import javax.jms.*;

import java.io.Serializable;

/**

* 消息生产者

*

* @author shanks

*/

@Service

@Slf4j

public class Producer {

public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");

@Autowired

private JmsMessagingTemplate template;

/**

* 发送消息

*

* @param destination destination是发送到的队列

* @param message message是待发送的消息

*/

public void send(Destination destination, T message) {

template.convertAndSend(destination, message);

}

/**

* 延时发送

*

* @param destination 发送的队列

* @param data 发送的消息

* @param time 延迟时间

*/

public void delaySend(Destination destination, T data, Long time) {

Connection connection = null;

Session session = null;

MessageProducer producer = null;

// 获取连接工厂

ConnectionFactory connectionFactory = template.getConnectionFactory();

try {

// 获取连接

connection = connectionFactory.createConnection();

connection.start();

// 获取session,true开启事务,false关闭事务

session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

// 创建一个消息队列

producer = session.createProducer(destination);

producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());

ObjectMessage message = session.createObjectMessage(data);

//设置延迟时间

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);

// 发送消息

producer.send(message);

log.info("发送消息:{}", data);

session.commit();

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

if (producer != null) {

producer.close();

}

if (session != null) {

session.close();

}

if (connection != null) {

connection.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

消费者

package com.example.demoactivemq.producer;

import com.example.demoactivemq.domain.MessageModel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;

/**

* 消费者

*/

@Component

@Slf4j

public class Consumer {

@JmsListener(destination = "delay.queue")

public void receiveQueue(MessageModel message) {

log.info("收到消息:{}", message);

}

}

application.yml

spring:

activemq:

broker-url: tcp://localhost:61616

测试类

package com.example.demoactivemq;

import com.example.demoactivemq.domain.MessageModel;

import com.example.demoactivemq.producer.Producer;

import org.junit.jupiter.api.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = DemoActivemqApplication.class)

@RunWith(SpringRunner.class)

class DemoActivemqApplicationTests {

/**

* 消息生产者

*/

@Autowired

private Producer producer;

/**

* 及时消息队列测试

*/

@Test

public void test() {

MessageModel messageModel = MessageModel.builder()

.message("测试消息")

.titile("消息000")

.build();

// 发送消息

producer.send(Producer.DEFAULT_QUEUE, messageModel);

}

/**

* 延时消息队列测试

*/

@Test

public void test2() {

for (int i = 0; i < 5; i++) {

MessageModel messageModel = MessageModel.builder()

.titile("延迟10秒执行")

.message("测试消息" + i)

.build();

// 发送延迟消息

producer.delaySend(Producer.DEFAULT_QUEUE, messageModel, 10000L);

}

try {

// 休眠100秒,等等消息执行

Thread.currentThread().sleep(100000L);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

执行结果

2019-11-12 22:18:52.939  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息0)

2019-11-12 22:18:52.953  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息1)

2019-11-12 22:18:52.958  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息2)

2019-11-12 22:18:52.964  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息3)

2019-11-12 22:18:52.970  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息4)

2019-11-12 22:19:03.012  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息0)

2019-11-12 22:19:03.017  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息1)

2019-11-12 22:19:03.019  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息2)

2019-11-12 22:19:03.020  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息3)

2019-11-12 22:19:03.021  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息4)

比你优秀的人比你还努力,你有什么资格不去奋斗!!!

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java import导入及访问控制权限修饰符原理解析
下一篇:Java Junit单元测试实例详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~