spring与disruptor集成的简单示例

网友投稿 296 2023-02-16


spring与disruptor集成的简单示例

disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor

BaseQueueHelper.java

/**

* lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。

*

* 调用init()时才真正启动线程开始处理 系统退出自动清理资源.

*

* @author xielongwang

* @create 2018-01-18 下午3:49

* @email xielong.wang@nvr-china.com

* @description

*/

public abstract class BaseQueueHelper, H extends WorkHandler> {

/**

* 记录所有的队列,系统退出时统一清理资源

*/

private static List queueHelperList = new ArrayList();

/**

* Disruptor 对象

*/

private Disruptor disruptor;

/**

* RingBuffer

*/

private RingBuffer ringBuffer;

/**

* initQueue

*/

private List initQueue = new ArrayList();

/**

* 队列大小

*

* @return 队列长度,必须是2的幂

*/

protected abstract int getQueueSize();

/**

* 事件工厂

*

* @return EventFactory

*/

protected abstract EventFactory eventFactory();

/**

* 事件消费者

*

* @return WorkHandler[]

*/

protected abstract WorkHandler[] getHandler();

/**

* 初始化

*/

public void init() {

ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();

disruptor = new Disruptor(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());

disruptor.setDefaultExceptionHandler(new MyHandlerException());

disruptor.handleEventsWithWorkerPool(getHandler());

ringBuffer = disruptor.start();

//初始化数据发布

for (D data : initQueue) {

ringBuffer.publishEvent(new EventTranslatorOneArg() {

@Override

public void translateTo(E event, long sequence, D data) {

event.setValue(data);

}

}, data);

}

//加入资源清理钩子

synchronized (queueHelperList) {

if (queueHelperList.isEmpty()) {

Runtime.getRuntime().addShutdownHook(new Thread() {

@Override

public void run() {

for (BaseQueueHelper baseQueueHelper : queueHelperList) {

baseQueueHelper.shutdown();

}

}

});

}

queueHelperList.add(this);

}

}

/**

* 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU,

* 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景.

*

* @return WaitStrategy

*/

protected abstract WaitStrategy getStrategy();

/**

* 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.

*/

public synchronized void publishEvent(D data) {

if (ringBuffer == null) {

initQueue.add(data);

return;

}

ringBuffer.publishEvent(new EventTranslatorOneArg() {

@Override

public void tranhttp://slateTo(E event, long sequence, D data) {

event.setValue(data);

}

}, data);

}

/**

* 关闭队列

*/

public void shutdown() {

disruptor.shutdown();

}

}

EventFactory.java

/**

* @author xielongwang

* @create 2018-01-18 下午6:24

* @email xielong.wang@nvr-china.com

* @description

*/

public class EventFactory implements com.lmax.disruptor.EventFactory {

@Override

public SeriesDataEvent newInstance() {

return new SeriesDataEvent();

}

}

MyHandlerException.java

public class MyHandlerException implements ExceptionHandler {

private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);

/*

* (non-Javadoc) 运行过程中发生时的异常

*

* @see

* com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable

* , long, java.lang.Object)

*/

@Override

public void handleEventException(Throwable ex, long sequence, Object evdJLxuOVAoent) {

ex.printStackTrace();

logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());

}

/*

* (non-Javadoc) 启动时的异常

*

* @see

* com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.

* Throwable)

*/

@Override

public void handleOnStartException(Throwable ex) {

logger.error("start disruptor error ==[{}]!", ex.getMessage());

}

/*

* (non-Javadoc) 关闭时的异常

*

* @see

* com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang

* .Throwable)

*/

@Override

public void handleOnShutdownException(Throwable ex) {

logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());

}

}

SeriesData.java (代表应用A发送给应用B的消息)

public class SeriesData {

private String deviceInfoStr;

public SeriesData() {

}

public SeriesData(String deviceInfoStr) {

this.deviceInfoStr = deviceInfoStr;

}

public String getDeviceInfoStr() {

return deviceInfoStr;

}

public void setDeviceInfoStr(String deviceInfoStr) {

this.deviceInfoStr = deviceInfoStr;

}

@Override

public String toString() {

return "SeriesData{" +

"deviceInfoStr='" + deviceInfoStr + '\'' +

'}';

}

}

SeriesDataEvent.java

public clashttp://s SeriesDataEvent extends ValueWrapper {

}

SeriesDataEventHandler.java

public class SeriesDataEventHandler implements WorkHandler {

private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);

@Autowired

private DeviceInfoService deviceInfoService;

@Override

public void onEvent(SeriesDataEvent event) {

if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {

logger.warn("receiver series data is empty!");

}

//业务处理

deviceInfoService.processData(event.getValue().getDeviceInfoStr());

}

}

SeriesDataEventQueueHelper.java

@Component

public class SeriesDataEventQueueHelper extends BaseQueueHelper implements InitializingBean {

private static final int QUEUE_SIZE = 1024;

@Autowired

private List seriesDataEventHandler;

@Override

protected int getQueueSize() {

return QUEUE_SIZE;

}

@Override

protected com.lmax.disruptor.EventFactory eventFactory() {

return new EventFactory();

}

@Override

protected WorkHandler[] getHandler() {

int size = seriesDataEventHandler.size();

SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);

return paramEventHandlers;

}

@Override

protected WaitStrategy getStrategy() {

return new BlockingWaitStrategy();

//return new YieldingWaitStrategy();

}

@Override

public void afterPropertiesSet() throws Exception {

this.init();

}

}

ValueWrapper.java

public abstract class ValueWrapper {

private T value;

public ValueWrapper() {}

public ValueWrapper(T value) {

this.value = value;

}

public T getValue() {

return value;

}

public void setValue(T value) {

this.value = value;

}

}

DisruptorConfig.java

@Configuration

@ComponentScan(value = {"com.portal.disruptor"})

//多实例几个消费者

public class DisruptorConfig {

/**

* smsParamEventHandler1

*

* @return SeriesDataEventHandler

*/

@Bean

public SeriesDataEventHandler smsParamEventHandler1() {

return new SeriesDataEventHandler();

}

/**

* smsParamEventHandler2

*

* @return SeriesDataEventHandler

*/

@Bean

public SeriesDataEventHandler smsParamEventHandler2() {

return new SeriesDataEventHandler();

}

/**

* smsParamEventHandler3

*

* @return SeriesDataEventHandler

*/

@Bean

public SeriesDataEventHandler smsParamEventHandler3() {

return new SeriesDataEventHandler();

}

/**

* smsParamEventHandler4

*

* @return SeriesDataEventHandler

*/

@Bean

public SeriesDataEventHandler smsParamEventHandler4() {

return new SeriesDataEventHandler();

}

/**

* smsParamEventHandler5

*

* @return SeriesDataEventHandler

*/

@Bean

public SeriesDataEventHandler smsParamEventHandler5() {

return new SeriesDataEventHandler();

}

}

测试

//注入SeriesDataEventQueueHelper消息生产者

@Autowired

private SeriesDataEventQueueHelper seriesDataEventQueueHelper;

@RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_jsON_VALUE)

public DataResponseVo receiverDeviceData(@RequestBody String deviceData) {

long startTime1 = System.currentTimeMillis();

if (StringUtils.isEmpty(deviceData)) {

logger.info("receiver data is empty !");

return new DataResponseVo(400, "failed");

}

seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData));

long startTime2 = System.currentTimeMillis();

logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1);

return new DataResponseVo(200, "success");

}

应用A通过/data 接口把数据发送到应用B ,然后通过seriesDataEventQueueHelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用A. 可接受消息丢失, 可以通过扩展SeriesDataEventQueueHelper来达到对disruptor队列的监控


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Vue 中的compile操作方法
下一篇:关于接口测试培训ppt的信息
相关文章

 发表评论

暂时没有评论,来抢沙发吧~