带你快速搞定java并发库

网友投稿 191 2022-10-11


带你快速搞定java并发库

目录一、总览二、Executor总览三、继承结构四、怎么保证只有一个线程五、怎么保证时间可以定时执行六、使用总结

一、总览

计算机程序 = 数据 + 算法。

并发编程的一切根本原因是为了保证数据的正确性,线程的效率性。

java并发库共分为四个大的部分,如下图

Executor 和 future 是为了保证线程的效率性

Lock 和数据结构 是为了维持数据的一致性。

Java并发编程的时候,思考顺序为,

对自己的数据要么加锁。要么使用提供的数据结构,保证数据的安全性

调度线程的时候使用Executor更好的调度。

二、Executor总览

Executor 提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。

相当于manager,老板让manager去执行一件任务,具体的是谁执行,什么时候执行,就不管了。

看上图的继承关系,介绍几个

内置的线程池基本上都在这里

newScheduledThreadPool 定时执行的线程池

newCachedThreadPool 缓存使用过的线程

newFixedThreadPool 固定数量的线程池

newWorkStealingPool 将大任务分解为小任务的线程池

三、继承结构

构造函数

包含一个定时的service

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {

return new DelegatedScheduledExecutorService

(new ScheduledThreadPoolExecutor(1));

}

static class DelegatedScheduledExecutorService

extends DelegatedExecutorService

implements ScheduledExecutorService {

private final ScheduledExecutorService e;

DelegatedScheduledExecutorService(ScheduledExecutorService executor) {

super(executor);

e = executor;

}

四、怎么保证只有一个线程

定时执行的时候调用这个方法,调用过程如下,注意看其中的注释,由上往下的调用顺序

public ScheduledFuture> scheduleWithFixedDelay(Runnable command,

long initialDelay,

long delay,

TimeUnit unit) {

if (command == null || unit == null)

throw new NullPointerException();

if (delay <= 0)

throw new IllegalArgumentException();

ScheduledFutureTask sft =

new ScheduledFutureTask(command,

null,

triggerTime(initialDelay, unit),

unit.toNanos(-delay));

RunnableScheduledFuture t = decorateTask(command, sft);

sft.outerTask = t;

// 延迟执行

delayedExecute(t);

return t;

}

private void delayedExecute(RunnableScheduledFuture> task) {

if (isShutdown())

reject(task);

else {

// 加入任务队列

super.getQueue().add(task);

if (isShutdown() &&

!canRunInCurrentRunState(task.isPeriodic()) &&

remove(task))

task.cancel(false);

else

// 确保执行

ensurePrestart();

}

}

// 如果worker数量小于corePoolSize,创建新的线程,其他情况不处理

void ensurePrestart() {

int wc = workerCountOf(ctl.get());

if (wc < corePoolSize)

addWorker(null, true);

else if (wc == 0)

addWorker(null, false);

}

五、怎么保证时间可以定时执行

public ScheduledFuture> schedule(Runnable command,

long delay,

TimeUnit unit) {

if (command == null || unit == null)

throw new NullPointerException();

RunnableScheduledFuture> t = decorateTask(command,

new ScheduledFutureTask(command, null,

triggerTime(delay, unit)));

delayedExecute(t);

return t;

}

在每次执行的时候会把下一次执行的时间放进任务中

private long triggerTime(long delay, TimeUnit unit) {

return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));

}

/**

* Returns the trigger time of a delayed action.

*/

long triggerTime(long delay) {

return now() +

((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));

}

FutureTask 定时是通过LockSupport.parkNanos(this, nanos);LockSupport.park(this);

private int awaitDone(boolean timed, long nanos)

throws InterruptedException {

final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;

boolean queued = false;

for (;;) {

if (Thread.interrupted()) {

removeWaiter(q);

throw new InterruptedException();

}

int s = state;

if (s > COMPLETING) {

if (q != null)

q.thread = null;

return s;

}

else if (s == COMPLETING) // cannot time out yet

Thread.yield();

else if (q == null)

q = new WaitNode();

else if (!queued)

queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

q.next = waiters, q);

else if (timed) {

nanos = deadline - System.nanoTime();

if (nanos <= 0L) {

removeWaiter(q);

return state;

ihTtBJj }

//注意这里

LockSupport.parkNanos(this, nanos);

}

else //注意这里

LockSupport.park(this);

}

}

总结:Executor是通过将任务放在队列中,生成的futureTask。然后将生成的任务在队列中排序,将时间最近的需要出发的任务做检查。如果时间不到,就阻塞线程到下次出发时间。

注意:newSingleThreadScheduledExecutor只会有一个线程,不管你提交多少任务,这些任务会顺序执行,如果发生异常会取消下面的任务,线程池也不会关闭,注意捕捉异常

六、使用

ScheduledExecutorService single = Executors.newSingleThreadScheduledExecutor();

Runnable runnable1 = () -> {

try {

Thread.sleep(4000);

System.out.println("11111111111111");

} catch (InterruptedException e) {

e.printStackTrace();

}

};

Runnable runnable2 = () -> {

try {

Thread.sleep(4000);

System.out.println("222");

} catch (InterruptedException e) {

e.printStackTrace();

}

};

single.scheduleWithFixedDelay(runnable1,0,1, TimeUnit.SECONDS);

single.scheduleWithFixedDelay(runnable2,0,2, TimeUnit.SECONDS);

11111111111111 222 11111111111111 222 11111111111111

在项目中要注意关闭线程池

actionService = Executors.newSingleThreadScheduledExecutor();

actionService.scheduleWithFixedDelay(() -> {

try {

Thread.currentThread().setName("robotActionService");

Integer robotId = robotQueue.poll();

if (robotId == null) {

// 关闭线程池

actionService.shutdown();

} else {

int aiLv = robots.get(robotId);

if (actionQueueMap.containsKey(aiLv)) {

ActionQueue actionQueue = actionQueueMap.get(aiLv);

actionQueue.doAihTtBJjction(robotId);

}

}

} catch (Exception e) {

// 捕捉异常

LOG.error("",e);

}

}, 1, 1, TimeUnit.SECONDS);

总结

本篇文章就到这里了,希望能给你带来帮助,也希望您能够多多关注我们的更多内容!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:基于Netty,从零开发IM(三):编码实践篇(群聊功能)(netty实现im)
下一篇:AC管理(ac管理器)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~