Springboot实现高吞吐量异步处理详解(适用于高并发场景)

网友投稿 565 2022-12-22


Springboot实现高吞吐量异步处理详解(适用于高并发场景)

技术要点

org.springframework.web.context.request.async.DeferredResult

示例如下:

1.   新建Maven项目  async

2.   pom.xml

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0

http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.java

async

1.0.0

org.springframework.boot

spring-boot-starter-parent

2.0.5.RELEASE

org.springframework.boot

spring-boot-starter-web

org.springframework

springloaded

1.2.8.RELEASE

provided

org.springframework.boot

spring-boot-devtools

provided

${project.artifactId}

org.apache.maven.plugins

maven-compiler-plugin

1.8

1.8

UTF-8

org.springframework.boot

spring-boot-maven-plugin

repackage

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0

http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.java

async

1.0.0

org.springframework.boot

spring-boot-starter-parent

2.0.5.RELEASE

org.springframework.boot

spring-boot-starter-web

org.springframework

springloaded

1.2.8.RELEASE

provided

org.springframework.boot

spring-boot-devtools

provided

${project.artifactId}

org.apache.maven.plugins

maven-compiler-plugin

1.8

1.8

UTF-8

org.springframework.boot

spring-boot-maven-plugin

repackage

3.   AsyncStarter.java

package com.java;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class AsyncStarter {

public static void main(String[] args) {

SpringApplication.run(AsyncStarter.class, args);

}

}

4.   AsyncVo.java

package com.java.vo;

import org.springframework.web.context.request.async.DeferredResult;

/**

* 存储异步处理信息

*

* @author Logen

*

* @param 接口输入参数

* @param 接口返回参数

*/

public class AsyncVo {

/**

* 请求参数

*/

private I params;

/**

* 响应结果

*/

private DeferredResult result;

public I getParams() {

return params;

}

public void setParams(I params) {

this.params = params;

}

public DeferredResult getResult() {

return result;

}

public void setResult(DeferredResult<O> result) {

this.result = result;

}

}

5.   RequestQueue.java

package com.java.queue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.stereotype.Component;

import com.java.vo.AsyncVo;

/**

* 存放所有异步处理接口请求队列的对象,一个接口对应一个队列

*

* @author Logen

*

*/

@Component

public class RequestQueue {

/**

* 处理下订单接口的队列,设置缓冲容量为50

*/

private BlockingQueue> orderQueue = new LinkedBlockingQueue<>(50);

public BlockingQueue> getOrderQueue() {

return orderQueue;

}

}

6.   OrderTask.java

package com.java.task;

import java.util.HashMap;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import com.java.queue.RequestQueue;

import com.java.vo.AsyncVo;

/**

* 处理订单接口的任务,每个任务类处理一种接口

*

* @author Logen

*

*/

@Component

public class OrderTask extends Thread {

@Autowired

private RequestQueue queue;

private boolean running = true;

@Override

public void run() {

while (running) {

try {

AsyncVo vo = queue.getOrderQueue().take();

System.out.println("[ OrderTask ]开始处理订单");

String params = vo.getParams();

Thread.sleep(3000);

Map<String, Object> map = new HashMap<>();

map.put("params", params);

map.put("time", System.currentTimeMillis());

vo.getResult().setResult(map);

System.out.println("[ OrderTask ]订单处理完成");

} catch (InterruptedException e) {

e.printStackTrace();

running = false;

}

}

}

public void setRunning(boolean running) {

this.running = running;

}

}

7.   QueueListener.java

package com.java.listener;

import javax.annotation.PostConstruct;

import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import com.java.task.OrderTask;

/**

* 队列监听器,初始化启动所有监听任务

*

* @author Logen

*

*/

@Component

public class QueueListener {

@Autowired

private OrderTask orderTask;

/**

* 初始化时启动监听请求队列

*/

@PostConstruct

public void init() {

orderTask.start();

}

/**

* 销毁容器时停止监听任务

*/

@PreDestroy

public void destory() {

orderTask.setRunning(false);

}

}

8.   OrderController.java

package com.java.controller;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RestController;

import org.springframework.web.context.request.async.DeferredResult;

import com.java.queue.RequestQueue;

import com.java.vo.AsyncVo;

/**

*

*

*

*

* 模拟下单处理,实现高吞吐量异步处理请求

*

* 1、 Controller层接口只接收请求,不进行处理,而是把请求信息放入到对应该接口的请求队列中

* 2、 该接口对应的任务类监听对应接口的请求队列,从队列中顺序取出请求信息并进行处理

*

* 优点:接口几乎在收到请求的同时就已经返回,处理程序在后台异步进行处理,大大提高吞吐量

*

*

*

*

*

*

* @author Logen

*

*/

@RestController

public class OrderController {

@Autowired

private RequestQueue queue;

@GetMapping("/order")

public DeferredResult order(String number) throws InterruptedException {

System.out.println("[ OrderController ] 接到下单请求");

System.out.println("当前待处理订单数: " + queue.getOrderQueue().size());

AsyncVo vo = new AsyncVo<>();

DeferredResult result = new DeferredResult<>();

vo.setParams(number);

vo.setResult(result);

queue.getOrderQueue().put(vo);

System.out.println("[ OrderController ] 返回下单结果");

return result;

}

}

9.   运行 AsyncStarter.java ,启动测试

浏览器输入 http://localhost:8080/order?number=10001

正常情况处理3秒返回,返回结果如下

{"time":1548241500718,"params":"10001"}

观察控制台打印日志,如下所示:

[ OrderController ] 接到下单请求

当前待处理订单数: 0

[ OrderController ] 返回下单结果

[ OrderTask ]开始处理订单

[ OrderTask ]订单处理完成

结论:Controller层几乎在接收到请求的同时就已经返回,处理程序在后台异步处理任务。

快速多次刷新浏览器,目的为了高并发测试,观察控制台打印信息

现象:Controller层快速返回,待处理请求在队列中开始增加,异步处理程序在按顺序处理请求。

优点:对客户端响应时间不变,但提高了服务端的吞吐量。大大提升高并发处理性能!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java 数组复制clone方法实现详解
下一篇:springboot高并发下提高吞吐量的实现
相关文章

 发表评论

暂时没有评论,来抢沙发吧~