java并发编程_线程池的使用方法(详解)

网友投稿 267 2023-05-11


java并发编程_线程池的使用方法(详解)

一、任务和执行策略之间的隐性耦合

Executor可以将任务的提交和任务的执行策略解耦

只有任务是同类型的且执行时间差别不大,才能发挥最大性能,否则,如将一些耗时长的任务和耗时短的任务放在一个线程池,除非线程池很大,否则会造成死锁等问题

1.线程饥饿死锁

类似于:将两个任务提交给一个单线程池,且两个任务之间相互依赖,一个任务等待另一个任务,则会发生死锁;表现为池不够

定义:某个任务必须等待池中其他任务的运行结果,有可能发生饥饿死锁

2.线程池大小

注意:线程池的大小还受其他的限制,如其他资源池:数据库连接池

如果每个任务都是一个连接,那么线程池的大小就受制于数据库连接池的大小

3.配置ThreadPoolExecutor线程池

实例:

1.通过Executors的工厂方法返回默认的一些实现

2.通过实例化ThreadPoolExecutor(.....)自定义实现

线程池的队列

1.无界队列:任务到达,线程池饱满,则任务在队列中等待,如果任务无限达到,则队列会无限扩张

如:单例和固定大小的线程池用的就是此种

2.有界队列:如果新任务到达,队列满则使用饱和策略

3.同步移交:如果线程池很大,将任务放入队列后在移交就会产生延时,如果任务生产者很快也会导致任务排队

SynchronousQueue直接将任务移交给工作线程

机制:将一个任务放入,必须有一个线程等待接受,如果没有,则新增线程,如果线程饱和,则拒绝任务

如:CacheThreadPool就是使用的这种策略

饱和策略:

setRejectedExecutionHandler来修改饱和策略

1.终止Abort(默认):抛出异常由调用者处理

2.抛弃Discard

3.抛弃DiscardOldest:抛弃最旧的任务,注意:如果是优先级队列将抛弃优先级最高的任务

4.CallerRuns:回退任务,有调用者线程自行处理

4.线程工厂ThreadFactoy

每当创建线程时:其实是调用了线程工厂来完成

自定义线程工厂:implements ThreadFactory

可以定制该线程工厂的行为:如UncaughtExceptionHandler等

public class MyAppThread extends Thread {

public static final String DEFAULT_NAME = "MyAppThread";

private static volatile boolean debugLifecycle = false;

private static final AtomicInteger created = new AtomicInteger();

private static final AtomicInteger alive = new AtomicInteger();

private static final Logger log = Logger.getAnonymousLogger();

public MyAppThread(Runnable r) {

this(r, DEFAULT_NAME);

}

public MyAppThread(Runnable runnable, String name) {

super(runnable, name + "-" + created.incrementAndGet());

//设置该线程工厂创建的线程的 未捕获异常的行为

setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

public void uncaughtException(Thread t,

Throwable e) {

log.log(Level.SEVERE,

"UNCAUGHT in thread " + t.getName(), e);

}

});

}

public void run() {

// Copy debug flag to ensure consistent value throughout.

boolean debug = debugLifecycle;

if (debug) log.log(Level.FINE, "Created " + getName());

try {

alive.incrementAndGet();

super.run();

} finally {

alive.decrementAndGet();

if (debug) log.log(Level.FINE, "Exiting " + getName());

}

}

public static int getThreadsCreated() {

return created.get();

}

public static int getThreadsAlive() {

return alive.get();

}

public static boolean getDebug() {

return debugLifecycle;

}

public static void setDebug(boolean b) {

debugLifecycle = b;

}

}

5.扩展ThreadPoolExecutor

可以被自定义子类覆盖的方法:

1.afterExecute:结束后,如果抛出RuntimeException则方法不会执行

2.beforeExecute:开始前,如果抛出RuntimeException则任务不会执行

3.terminated:在线程池关闭时,可以用来释放资源等

二、递归算法的并行化

1.循环

在循环中,每次循环操作都是独立的

//串行化

void processSequentially(List elements) {

for (Element e : elements)

process(e);

}

//并行化

void processInParallel(Executor exec, List elements) {

for (final Element e : elements)

exec.execute(new Runnable() {

public void run() {

process(e);

}

});

}

2.迭代

如果每个迭代操作是彼此独立的,则可以串行执行

如:深度mViYPHhHfR优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的

//串行 计算compute 和串行迭代

public void sequentialRecursive(List> nodes, Collection results) {

for (Node n : nodes) {

results.add(n.compute());

sequentialRecursive(n.getChildren(), results);

}

}

//并行 计算compute 和串行迭代

public void parallelRecursive(final Executor exec, List> nodes, final Collection results) {

for (final Node n : nodes) {

exec.execute(() -> results.add(n.compute()));

parallelRecursive(exec, n.getChildren(), results);

}

}

//调用并行方法的操作

public Collection getParallelResults(List> nodes)

throws InterruptedException {

ExecutorService exec = Executors.newCachedThreadPool();

Queue resultQueue = new ConcurrentLinkedQueue();

parallelRecursive(exec, nodes, resultQueue);

exec.shutdown();

exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

return resultQueue;

}

实例:

public class ConcurrentPuzzleSolver

{

private final Puzzle

puzzle;

private final ExecutorService exec;

private final ConcurrentMap

seen;

protected final ValueLatch> solution = new ValueLatch>();

public ConcurrentPuzzleSolver(Puzzle

puzzle) {

this.puzzle = puzzle;

this.exec = initThreadPool();

this.seen = new ConcurrentHashMap

();

if (exec instanceof ThreadPoolExecutor) {

ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;

tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

}

}

private ExecutorService initThreadPool() {

return Executors.newCachedThreadPool();

}

public List solve() throws InterruptedException {

try {

P p = puzzle.initialPosition();

exec.execute(newTask(p, null, null));

// 等待ValueLatch中闭锁解开,则表示已经找到答案

PuzzleNode

solnPuzzleNode = solution.getValue();

return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();

} finally {

exec.shutdown();//最终主线程关闭线程池

}

}

protected Runnable newTask(P p, M m, PuzzleNode

n) {

return new SolverTask(p, m, n);

}

protected class SolverTask extends PuzzleNode

implements Runnable {

SolverTask(P pos, M move, PuzzleNode

prev) {

super(pos, move, prev);

}

public void run() {

//如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;

//为类避免死锁,将已经扫描http://的节点放入set集合中,避免继续扫描产生死循环

if (solution.isSet() || seen.putIfAbsent(pos, true) != null){

return; // already solved or seen this position

}

if (puzzle.isGoal(pos)) {

solution.setValue(this);

} else {

for (M m : puzzle.legalMoves(pos))

exec.execute(newTask(puzzle.move(pos, m), m, this));

}

}

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java 异步编程实践_动力节点Java学院整理
下一篇:Kotlin基础教程之操作符与操作符重载
相关文章

 发表评论

暂时没有评论,来抢沙发吧~