Springboot详解线程池与多线程及阻塞队列的应用详解

网友投稿 563 2022-07-22


目录一、案例场景二、使用类三、本例说明1.接收web请求2.后台任务处理3.关系说明四、代码1.OrderController2.FlowStarter流程启动器3.FlowManager流程管理器4.StepContainer线程池容器5.StepExecutor线程执行器6.StepHandler业务处理handler7.阻塞队列7.1 FlowQueue7.2 QueueUtils7.3 ConstantUtils8.任务模型8.1 StepModel8.2 StepResult9.业务数据模型9.1 OrderInfo9.2 ResultObj10.测试10.1 web请求10.2 后台任务日志

版本:Spring Boot 2.6.3

一、案例场景

1>web端接收restful请求生成任务A,并把任务放入队列Queue_A。

2>线程池A的任务线程从队列Queue_A取出任务,处理完成后放入Queue_B。

3>线程池B的任务线程从Queue_B取出任务,处理完成后入库。

本例就使用两个任务步骤,按需扩展延长任务链。

二、使用类

java.util.LinkedHashMap,双向链表。

java.util.concurrent.BlockingQueue,阻塞队列接口。

java.util.concurrent.LinkedBlockingQueue,阻塞队列实现类。

java.util.concurrent.CountDownLatch,线程计数器。

java.util.concurrent.locks.ReentrantLock,可重入锁。

三、本例说明

1.接收web请求

OrderController接收web请求,业务数据封装成任务对象,并写入队列QUEUE_A。Web请求结束,立即返回。

2.后台任务处理

FlowStarter流程启动器

管理FlowManager,创建流程管理器和启动流程管理器。创建线程池容器StepContainer,指定队列、线程池线程数量,以及业务处理Handler。

FlowManager流程管理器

管理线程池容器StepContainer。创建线程池容器,启动线程池容器,关闭线程池容器,线程池容器之间数据传递。使用LinkedHashMap维护一个流程中的多个线程池容器。

StepContainer线程池容器

创建线程池,启动线程执行器(Executor),初始化业务处理Handler,读写队列。使用LinkedHashMap维护一个流程中的多个StepExecutor。

StepExecutor线程执行器

执行抽象公用业务逻辑。实现线程Runnable接口。调用StepHandler的实现类的execute执行具体业务逻辑。

StepHandler业务处理器handler

具体业务在StepHandler的实现类的execute中实现。

任务模型对象StepModel和执行结果对象StepResult

每个具体业务数据必须包装成任务模型对象StepModel,执行结果包装成执行结果对象StepResult,才能在线程池和队列中流转。

3.关系说明

一个FlowStarter可以启动一个或者多个FlowManager。支持一对多和一对一,按需扩展。

一个FlowManager对应一个业务流程。一个业务流程可以拆分为多个步骤。一个步骤对应一个线程池容器StepContainer。一个线程池容器StepContainer,启动多个线程执行器StepExecutor。效果就是并发执行任务。

一个业务流程拆分成若干个步骤,每个步骤之间数据流转,使用任务模型StepModel中的状态标识isFinished,isPutInQueueAgain,isPutInQueueNext 字段来分析任务流向。使用StepModel的StepResult的 nextStepName字段来识别具体流向的线程池容器。

四、代码

1.OrderController

OrderController,接收请求、封装任务、写队列。

@Slf4j

@RestController

@RequestMapping("/order")

public class OrderController {

@PostMapping("/f1")

public Object f1(@RequestBody Object obj) {

log.info("OrderController->f1,接收参数,obj = " + obj.toString());

Map objMap = (Map) obj;

OrderInfo orderInfo = new OrderInfo();

orderInfo.setUserName((String) objMap.get("userName"));

orderInfo.setTradeName((String) objMap.get("tradeName"));

orderInfo.setOrderTime(System.currentTimeMillis());

LinkedBlockingQueue queueA = FlowQueue.getBlockingQueue("QUEUE_A");

QueueUtils.putStepPutInQueue(queueA,orderInfo);

log.info("OrderController->f1,返回." );

return ResultObj.builder().code("200").message("成功").build();

}

}

2.FlowStarter流程启动器

FlowStarter,后台任务线程池和线程启动。实现InitializingBean了接口。那么在spring初始化化bean完成后,就能触发启动线程池和线程。

@Slf4j

@Service

public class FlowStarter implements InitializingBean {

@Override

public void afterPropertiesSet() throws Exception {

log.info("FlowWorker创建流程.");

FlowManager flowManager = new FlowManager();

flowManager.buildContainer(ConstantUtils.STEP_01,5,

FlowQueue.getBlockingQueue("QUEUE_A"), Step01Handler.class

);

flowManager.buildContainer(ConstantUtils.STEP_02,5,

FlowQueue.getBlockingQueue("QUEUE_B"), Step02Handler.class

);

flowManager.startContainers();

log.info("FlowWorker启动流程完成.");

}

}

3.FlowManager流程管理器

一个FlowManager流程管理器,维护多个线程池容器StepContainer,共同完成一个流程的多个步骤。

public class FlowManager {

// 管理器名称

private String name;

// 管理线程池容器

private Map stepContainerMap = new LinkedHashMap<>();

public FlowManager() {}

// 创建线程池容器

public void buildContainer(String name, int poolSize, BlockingQueue queue,

Class extends StepHandler> handlerClazz) {

StepContainer stepWorker = new StepContainer();

stepWorker.createThreadPool(poolSize, queue, handlerClazz);

stepWorker.setName(name);

stepWorker.setFlowManager(this);

this.stepContainerMap.put(name, stepWorker);

}

// 启动线程池容器

public void startContainers() {

for (StepContainer stepContainer : this.stepContainerMap.values()) {

stepContainer.startRunExecutor();

}

}

// 关闭线程池容器

public void stopContainers() {

for (StepContainer stepContainer : this.stepContainerMap.values()) {

stepContainer.stopRunExecutor();

}

this.stepContainerMap.clear();

}

// 任务放入下一个线程池

public boolean sendToNextContainer(String nextStepName, Object obj) {

if (nextStepName != null && !StringUtils.equals(nextStepName, "")) {

if (this.stepContainerMap.containsKey(nextStepName)) {

this.stepContainerMap.get(nextStepName).putStepInQueue(obj);

return true;

} else {

return false;

}

} else {

return false;

}

}

public String getName() {

return name;

}

}

4.StepContainer线程池容器

StepContainer线程池容器,维护多个线程执行器StepExecutor,实现多线程异步完成每个独立任务。

@Slf4j

public class StepContainer {

// 线程池名称

private String name;

// 线程池

private ExecutorService threadPool;

// 线程数目

private int nThreads = 0;

// 线程处理业务handler类

private Class handlerClazz;

// 线程处理业务队列

private BlockingQueue queue = null;

// 线程池内线程管理

private Map stepExecutorMap = new LinkedHashMap<>();

// 线程池运行状态

private boolean isRun = false;

// 线程池管理器

private FlowManager flowManager = null;

// 构造函数

public StepContainer() {}

// 创建线程池

public boolean createThreadPool(int nThreads, BlockingQueue queue,

Class extends StepHandler> handlerClazz) {

try {

this.nThreads = nThreads;

this.queue = queue;

this.handlerClazz = handlerClazz;

this.threadPool = Executors.newFixedThreadPool(this.nThreads, new ThreadFactory() {

@Override

public Thread newThread(Runnable runnable) {

return new Thread(runnable);

}

});

} catch (Exception e) {

e.printStackTrace();

return false;

}

return true;

}

// 启动线程

public void startRunExecutor() {

if (!this.isRun) {

if (this.handlerClazz != null) {

log.info("线程池: " + this.name + ",启动,加载线程Executor.");

StepExecutor stepExecutor;

String executorName = "";

for (int num = 0; num < this.nThreads; num++) {

try {

executorName = this.name + "_" + (num + 1);

StepHandler stepHandler = (StepHandler) createStepHandler(this.handlerClazz);

stepExecutor = new StepExecutor(executorName, this.queue, stepHandler, this);

this.threadPool.execute(stepExecutor);

this.stepExecutorMap.put(executorName, stepExecutor);

} catch (Exception e) {

e.printStackTrace();

}

}

this.isRun = true;

}

}

}

// 关闭线程

public void stopRunExecutor() {

if (isRun) {

Iterator iterator = this.stepExecutorMap.values().iterator();

while (iterator.hasNext()) {

StepExecutor stepExecutor = (StepExecutor) iterator.next();

stepExecutor.stop();

}

this.stepExecutorMap.clear();

this.isRun = false;

}

}

// 从队列获取任务

public StepModel getStepFromQueue() {

StepModel stepModel = null;

synchronized (this.queue) {

try {

if (this.queue.size() > 0) {

stepModel = this.queue.take();

}

} catch (Exception e) {

log.info("从队列获取任务异常.");

e.printStackTrace();

}

}

return stepModel;

}

// 任务放入队列

public void putStepInQueue(Object obj) {

try {

StepModel stepModel = new StepModel(obj);

stepModel.setPutInQueueTime(System.currentTimeMillis());

this.queue.put(stepModel);

} catch (InterruptedException e) {

log.info("任务放入队列异常.");

e.printStackTrace();

}

}

// 重新放入

public void putStepInQueueAgain(StepModel stepModel) {

stepModel.setFinished(false);

stepModel.setPutInQueueNext(false);

stepModel.setPutInQueueAgain(false);

try {

this.queue.put(stepModel);

} catch (InterruptedException e) {

log.info("任务重新放入队列异常.");

e.printStackTrace();

}

}

// 清空队列

public void clearQueue() {

if (this.queue != null) {

this.queue.clear();

}

}

// 初始化实例对象

public Object createStepHandler(Class clazz)

throws InstantiationException, IllegalAccessException {

Object object = clazz.newInstance();

return object;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public FlowManager getFlowManager() {

return flowManager;

}

public void setFlowManager(FlowManager fhttp://lowManager) {

this.flowManager = flowManager;

}

}

5.StepExecutor线程执行器

StepExecutor线程执行器,实现Runnable接口。线程执行单元通用逻辑,具体业务逻辑通过调用StepHandler的execute方法实现。

@Slf4j

public class StepExecutor implements Runnable {

// 执行器名称

private String name;

// 线程执行的任务

private StepModel stepModel;

// 线程执行的队列

private BlockingQueue queue;

// 线程执行的业务处理逻辑

private Object stepHandler;

// 线程运行状态

private volatile boolean isRun = false;

// 线程开启(True)和关闭(False)

private volatile boolean isClose = false;

// 线程隶属容器

private StepContainer stepContainer;

// 线程计数器(关闭线程使用)

private CountDownLatch countDownLatch = null;

public StepExecutor() {}

public StepExecutor(String name, BlockingQueue queue,

StepHandler stepHandler, StepContainer stepContainer) {

this.name = name;

this.queue = queue;

this.stepHandler = stepHandler;

this.stepContainer = stepContainer;

}

@Override

public void run() {

this.isRun = true;

this.countDownLatch = new CountDownLatch(1);

// 没收到关闭信号,则循环运行

while (!this.isClose) {

this.stepModel = null;

String threadName = "【线程池:" + this.stepContainer.getName()

+ ",线程:" + Thread.currentThread().getName() + "】";

// 循环运行,为防止中断和卡主,需捕获异常

try {

StepHandler stepHandler = (StepHandler) this.stepHandler;

this.stepModel = this.stepContainer.getStepFromQueue();

if (this.stepModel != null) {

log.info(threadName + ",处理任务.");

this.stepModel.getStepResultList().clear();

stepHandler.execute(this.stepModel);

// 执行完成后结果数据

List stepResultList = this.stepModel.getStepResultList();

boolean isFinished = this.stepModel.isFinished();

boolean isPutInQueueAgain = this.stepModel.isPutInQueueAgain();

boolean isPutInQueueNext = this.stepModel.isPutInQueueNext();

if (isFinished && !isPutInQueueAgain && !isPutInQueueNext) {

log.info(threadName + ",任务结束.");

}

if (!isFinished && isPutInQueueAgain && !isPutInQueueNext) {

log.info(threadName + ",任务在本步骤未完成,重新放队列.");

this.stepContainer.putStepInQueueAgain(this.stepModel);

}

if (!isFinished && !isPutInQueueAgain && isPutInQueueNext) {

int resultNum = stepResultList.size();

if (resultNum > 0) {

for (StepResult stepResult : stepResultList) {

log.info(threadName + ",任务在本步骤已经完成,发送给下一个线程池: "

+ stepResult.getNextStepName() + ",执行.");

this.stepContainer.getFlowManager().sendToNextContainer(

stepResult.getNextStepName(),

stepResult.getResult());

}

}

}

} else {

threadToSleep(1000 * 3L);

}

} catch (Exception e) {

log.info("执行器异常.");

e.printStackTrace();

this.stepContainer.putStepInQueueAgain(this.stepModel);

}

}

// 跳出循环后,线程计数减1

this.countDownLatch.countDown();

this.isRun = false;

}

public void stop() {

this.isClose = true;

if (this.countDownLatch != null) {

while (this.countDownLatch.getCount() > 0L) {

try {

this.countDownLatch.await();

} catch (InterruptedException e) {

log.info("线程关闭异常.");

e.printStackTrace();

}

}

}

this.isClose = false;

}

public void threadToSleep(long time) {

try {

Thread.sleep(time);

} catch (Exception e) {

log.info("线程休眠异常.");

e.printStackTrace();

}

}

}

6.StepHandler业务处理handler

StepHandler是StepExecutor线程执行器,具体执行业务逻辑的入口。

StepHandler抽象类

每个具体的实现类都继承抽象的StepHandler。

public abstract class StepHandler {

public StepHandler() {}

public abstract void execute(StepModel stepModel);

}

Step01Handler

Step01Handler是StepHandler实现类,从队列中取任务执行,执行完成后放入下一个业务处理器Step02Handler。

@Slf4j

public class Step01Handler extends StepHandler {

@Override

public void execute(StepModel stepModel) {

log.info("Step01Handler执行开始,stepModel: " + stepModel.toString());

OrderInfo orderInfo = (OrderInfo) stepModel.getObj();

List stepResultList = stepModel.getStepResultList();

try {

log.info("Step01Handler执行,处理订单.");

String orderNo = UUID.randomUUID().toString()

.replace("-", "").toUpperCase();

orderInfo.setOrderNo(orderNo);

orderInfo.setPlatformType("线上");

orderInfo.setOrderSource("Web");

stepModel.setFinished(false);

stepModel.setPutInQueueNext(true);

stepModel.setPutInQueueAgain(false);

stepResultList.add(new StepResult(ConstantUtils.STEP_02, orderInfo));

} catch (Exception e) {

stepModel.setFinished(false);

stepModel.setPutInQueueNext(false);

stepModel.setPutInQueueAgain(true);

stepResultList.add(new StepResult(ConstantUtils.STEP_01, orderInfo));

}

log.info("Step01Handler执行完成,stepModel: " + stepModel.toString());

}

}

Step02Handler

Step02Handler是StepHandler实现类,从队列中取任务执行。

@Slf4j

public class Step02Handler extends StepHandler{

@Override

public void execute(StepModel stepModel) {

log.info("Step02Handler执行开始,stepModel: " + stepModel.toString());

OrderInfo orderInfo = (OrderInfo) stepModel.getObj();

List stepResultList = stepModel.getStepResultList();

try {

orderInfo.setEndTime(System.currentTimeMillis());

stepModel.setFinished(true);

stepModel.setPutInQueueNext(false);

stepModel.setPutInQueueAgain(false);

log.info("Step02Handler执行,入库.");

} catch (Exception e) {

stepModel.setFinished(true);

stepModel.setPutInQueueNext(false);

stepModel.setPutInQueueAgain(false);

}

log.info("Step02Handler执行完成,stepModel: " + stepModel.toString());

}

}

7.阻塞队列

BlockingQueue是线程安全的阻塞队列。

7.1 FlowQueue

FlowQueue,管理本例使用的两个阻塞队列。

public class FlowQueue {

private static final LinkedBlockingQueue queueA = new LinkedBlockingQueue();

private static final LinkedBlockingQueue queueB = new LinkedBlockingQueue();

public static LinkedBlockingQueue getBlockingQueue(String queueName) {

LinkedBlockingQueue queue = null;

switch (queueName) {

case "QUEUE_A":

queue = queueA;

break;

case "QUEUE_B":

queue = queueB;

break;

}

return queue;

}

}

7.2 QueueUtils

QueueUtils,队列简易工具。

@Slf4j

public class QueueUtils {

public static StepModel getStepFromQueue(

LinkedBlockingQueue queue) {

StepModel stepModel = null;

try {

if (queue.size() > 0) {

stepModel = queue.take();

}

} catch (Exception e) {

log.info("读队列异常.");

e.printStackTrace();

}

return stepModel;

}

public static void putStepPutInQueue(

LinkedBlockingQueue queue, Object obj) {

try {

StepModel stepModel = new StepModel(obj);

stepModel.setPutInQueueTime(System.currentTimeMillis());

queue.put(stepModel);

} catch (Exception e) {

log.info("写队列异常.");

e.printStackTrace();

}

}

public static int getQueueSize(

LinkedBlockingQueue queue) {

int size = 0;

try {

size = queue.size();

} catch (Exception e) {

log.info("获取队列Size异常.");

e.printStackTrace();

}

return size;

}

}

7.3 ConstantUtils

ConstantUtils,管理常量,即线程池名称。

public class ConstantUtils {

public static final String STEP_01 = "STEP_01_THREAD_POOL";

public static final String STEP_02 = "STEP_02_THREAD_POOL";

}

8.任务模型

任务模型,即具体需要处理对象,封装成线程使用的任务模型,这样可以把业务和流程框架解耦。

8.1 StepModel

StepModel,任务模型封装。

@Data

public class StepModel {

// 任务对象

private Object obj;

// 任务执行结果

private List stepResultList;

// 任务接收时间

private long putInQueueTime;

// 任务完成标识

private boolean isFinished = falsehttp://;

// 任务重新放入队列标识

private boolean isPutInQueueAgain = false;

// 任务放入下一个队列标识

private boolean isPutInQueueNext = false;

public StepModel(Object object) {

this.obj = object;

this.stepResultList = new ArrayList<>();

}

}

8.2 StepResult

StepResult,执行结果模型封装。

@Data

public class StepResult {

// 目标线程池名

private String nextStepName;

// 执行结果

private Object result;

public StepResult(String nextStepName,Object result){

this.nextStepName = nextStepName;

this.result = result;

}

}

9.业务数据模型

业务数据模型,即生成具体需要处理的数据,在传入给线程池的线程执行前,需要封装成任务模型。

9.1 OrderInfo

OrderInfo,本例要处理的业务数据模型。

@Data

@NoArgsConstructor

public class OrderInfo {

private String userName;

private String orderNo;

private String tradeName;

private String platformType;

private String orderSource;

private long orderTime;

private long endTime;

}

9.2 ResultObj

ResultObj,web请求返回的统一封装对象。

@Data

@NoArgsConstructor

@AllArgsConstructor

@Builder

public class ResultObj {

private String code;

private String message;

}

10.测试

包括web请求和后台任务

10.1 web请求

请求URL: http://127.0.0.1:8080/server/order/f1

入参:

{    "userName": "HangZhou0614",    "tradeName": "vue进阶教程"}

返回值:

{    "code": "200",    "message": "成功"}

10.2 后台任务日志

日志输出:


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:使用Mybatis
下一篇:深入介绍Spring框架及故障排除
相关文章

 发表评论

暂时没有评论,来抢沙发吧~