Java多线程并发执行demo代码实例

网友投稿 579 2022-12-03


Java多线程并发执行demo代码实例

主类:MultiThread,执行并发类

package java8test;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.Callable;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.LinkedBlockingQueue;

/**

* @param 为被处理的数据类型

* @param 返回数据类型

* 知识点1:X,T为泛型,为什么要用泛型,泛型和Object的区别请看:https://cnblogs.com/xiaoxiong2015/p/12705815.html

*/

public abstract class MultiThread {

public static int i = 0;

// 知识点2:线程池:https://cnblogs.com/xiaoxiong2015/p/12706153.html

private final ExecutorService exec; // 线程池

// 知识点3:@author Doung Lea 队列:https://cnblogs.com/xiaoxiong2015/p/12825636.html

private final BlockingQueue> queue = new LinkedBlockingQueue<>();

// 知识点4:计数器,还是并发包大神 @author Doug Lea 编写。是一个原子安全的计数器,可以利用它实现发令枪

private final CountDownLatch startLock = new CountDownLatch(1); // 启动门,当所有线程就绪时调用countDown

private final CountDownLatch endLock; // 结束门

private final List listData;// 被处理的数据

/**

* @param list list.size()为多少个线程处理,list里面的H为被处理的数据

*/

public MultiThread(List list) {

if (list != null && list.size() > 0) {

this.listData = list;

exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // 创建线程池,线程池共有nThread个线程

endLock = new CountDownLatch(list.size()); // 设置结束门计数器,当一个线程结束时调用countDown

} else {

listData = null;

exec = null;

endLock = null;

}

}

/**

*

* @return 获取每个线程处理结速的数组

* @throws InterruptedException

* @throws ExecutionException

*/

public List getResult() throws InterruptedException, ExecutionException {

List resultList = new ArrayList<>();

if (listData != null && listData.size() > 0) {

int nThread = listData.size(); // 线程数量

for (int i = 0; i < nThread; i++) {

X data = listData.get(i);

Future future = exec.submit(new Task(i, data) {

@Override

public T execute(int currentThread, X data) {

returhttp://n outExecute(currentThread, data);

}

}); // 将任务提交到线程池

queue.add(future); // 将Future实例添加至队列

}

startLock.countDown(); // 所有任务添加完毕,启动门计数器减1,这时计数器为0,所有添加的任务开始执行

endLock.await(); // 主线程阻塞,直到所有线程执行完成

for (Future future : queue) {

resultList.add(future.get());

}

exec.shutdown(); // 关闭线程池

}

return resultList;

}

/**

* 每一个线程执行的功能,需要调用者来实现

* @param currentThread 线程号

* @param data 每个线程被处理的数据

* @return T返回对象

*/

public abstract T outExecute(int currentThread, X data);

/**

* 线程类

*/

private abstract class Task implements Callable {

private int currentThread;// 当前线程号

private X data;

public Task(int currentThread, X data) {

this.currentThread = currentThread;

this.data = data;

}

@Override

public T call() throws Exception {

// startLock.await(); // 线程启动后调用await,当前线程阻塞,只有启动门计数器为0时当前线程才会往下执行

T t = null;

try {

t = execute(currentThread, data);

} finally {

endLock.countDown(); // 线程执行完毕,结束门计数器减1

xzzni }

return t;

}

/**

* 每一个线程执行的功能

* @param currentThread 线程号

* @param data 每个线程被处理的数据

* @return T返回对象

*/

public abstract T execute(int currentThread, X data);

}

}

结果类:ResultVO,保存返回结果,根据实际情况替换成自己的

package java8test;

public class ResultVo {

int i;

public ResultVo(int i) {

this.i = i;

}

public ResultVo() {

// TODO Auto-generated constructor stub

}

}

参数类:ParamVO,传入参数类,根据实际情况替换成自己的

package java8test;

public class ParamVo {

private int i;

ParamVo(int i) {

this.i = i;

}

public int getI() {

return i;

}

@Override

public String toString() {

return String.valueOf(i) + " " + hashCode();

}

}

测试类:new两个MultiThread,可以看到MultiThread这个类不存在线程安全问题。

package java8test;

import java.util.ArrayList;

import java.util.List;

public class Test {

public static void main(String[] args) {

try {

List splitList = new ArrayList();

for (int i = 0; i < 100; i++) {

splitList.add(new ParamVo(i));

}

List splitList1 = new ArrayList();

for (int i = 200; i < 300; i++) {

splitList1.add(new ParamVo(i));

}

MultiThread multiThread = new MultiThread(splitList) {

@Override

public ResultVo outExecute(int currentThread, ParamVo data) {

System.out.println("当前线程名称:" + Thread.currentThread().getName() + "当前线程号=" + currentThread

+ " data=" + data);

i--;

return new ResultVo(data.getI());

}

};

MultiThread multiThread1 = new MultiThread(splitList1) {

@Override

public ResultVo outExecute(int currentThread, ParamVo data) {

System.out.println("当前线程名称:" + Thread.currentThread().getName() + "当前线程号=" + currentThread

+ " data=" + data);

i--;

return new ResultVo(data.getI());

}

};

List list = multiThread.getResult();

List list1 = multiThread1.getResult();

// 获取每一批次处理结果

System.out.println("获取处理结果........................");

for (ResultVo vo : list) {

System.out.println(vo.i);

}

System.out.println("获取1处理结果........................");

for (ResultVo vo : list1) {

System.out.println(vo.i);

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

这个类也用在了生产当中,用来并发插入数据。但是事务不能被管控,需要自己保证最终事务一致。需要注意。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:IDEA2020.1常用配置说明
下一篇:IDEA搭建SpringBoot离线工程的方法
相关文章

 发表评论

暂时没有评论,来抢沙发吧~