Java并发编程Semaphore计数信号量详解

网友投稿 363 2023-03-23


Java并发编程Semaphore计数信号量详解

Semaphore 是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量)。

简单示例:

package me.socketthread;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

public class SemaphoreLearn {

//信号量总数

private static final int SEM_MAX = 12;

public static void main(String[] args) {

Semaphore sem = new Semaphore(SEM_MAX);

//创建线程池

ExecutorService threadPool = Executors.newFixedThreadPool(3);

//在线程池中执行任务

threadPool.execute(new MyThread(sem, 7));

threadPool.execute(new MyThread(sem, 4));

threadPool.execute(new MyThread(sem, 2));

//关闭池

threadPool.shutdown();

}

}

class MyThread extends Thread {

private volatile Semaphore sem; // 信号量

private int count; // 申请信号量的大小

MyThread(Semaphore sem, int count) {

this.sem = sem;

this.count = count;

}

public void run() {

try {

// 从信号量中获取count个许可

sem.acquire(count);

Thread.sleep(2000);

System.out.println(Thread.currentThread().getName() + " acquire count="+count);

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

// 释放给定数目的许可,将其返回到信号量。

sem.release(count);

System.out.println(Thread.currentThread().getName() + " release " + count + "");

}

}

}

执行结果:

pool-1-thread-2 acquire count=4

pool-1-thread-1 acquire count=7

pool-1-thread-1 release 7

pool-1-thread-2 release 4

pool-1-thread-3 acquire count=2

pool-1-thread-3 release 2

线程1和线程2会并发执行,因为两者的信号量和没有超过总信号量,当前两个线程释放掉信号量之后线程3才能继续执行。

源码分析:

1、构造函数

在构造函数中会初始化信号量值,这值最终是作为锁标志位state的值

Semaphore sem = new Semaphore(12);//简单来说就是给锁标识位state赋值为12

2、Semaphore.acquire(n);简单理解为获取锁资源,如果获取不到线程阻塞

Semaphore.acquire(n);//从锁标识位state中获取n个信号量,简单来说是state = state-n 此时state大于0表示可以获取信号量,如果小于0则将线程阻塞

public void acquire(int permits) throws InterruptedException {

if (permits < 0) throw new IllegalArgumentException();

//获取锁

sync.acquireSharedInterruptibly(permits);

}

acquireSharedInterruptibly中的操作是获取锁资源,如果可以获取则将state= state-permits,否则将线程阻塞

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

if (tryAcquireShared(arg) < 0)//tryAcquireShared中尝试获取锁资源

doAcquireSharedInterruptibly(arg); //将线程阻塞

}

tryAcquireShared中的操作是尝试获取信号量值,简单来说就是state=state-acquires ,如果此时小于0则返回负值,否则返回大于新值,再判断是否将当线程线程阻塞

protected int tryAcquireSjhjEThared(int acquires) {

for (;;) {

if (hasQueuedPredecessors())

return -1;

//获取state值

int available = getState();

//从state中获取信号量

int remaining = available - acquires;

if (remaining < 0 ||

compareAndSetState(available, remaining))

//如果信号量小于0则直接返回,表示无法获取信号量,否则将state值修改为新值

return remaining;

}

}

doAcquireSharedInterruptibly中的操作简单来说是将当前线程添加到FIFO队列中并将当前线程阻塞。

/会将线程添加到FIFO队列中,并阻塞

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

//将线程添加到FIFO队列中

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

//parkAndCheckInterrupt完成线程的阻塞操作

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

3、Semaphore.release(int permits),这个函数的实现操作是将state = state+permits并唤起处于FIFO队列中的阻塞线程。

public void release(int permits) {

if (permits < 0) throw new IllegalArgumentException();

//state = state+permits,并将FIFO队列中的阻塞线程唤起

sync.releaseShared(permits);

}

releaseShared中的操作是将state = state+permits,并将FIFO队列中的阻塞线程唤起。

public final boolean releaseShared(int arg) {

//tryReleaseShared将state设置为state = state+arg

if (tryReleaseShared(arg)) {

//唤起FIFO队列中的阻塞线程

doReleaseShared();

return true;

}

return false;

}

tryReleaseShared将state设置为state = state+arg

projhjETtected final boolean tryReleaseShared(int releases) {

for (;;) {

int current = getState();

int next = current + releases;

if (next < current) // overflow

throw new Error("Maximum permit count exceeded");

//将state值设置为state=state+releases

if (compareAndSetState(current, next))

return true;

}

}

doReleaseShared()唤起FIFO队列中的阻塞线程

private void doReleaseShared() {

for (;;) {

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

//完成阻塞线程的唤起操作

unparkSuccessor(h);

}

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

if (h == head) // loop if head changed

break;

}

}

总结:Semaphore简单来说设置了一个信号量池state,当线程执行时会从state中获取值,如果可以获取则线程执行,并且在执行后将获取的资源返回到信号量池中,并唤起其他阻塞线程;如果信号量池中的资源无法满足某个线程的需求则将此线程阻塞。

Semaphore源码:

public class Semaphore implements java.io.Serializable {

private static final long serialVersionUID = -3222578661600680210L;

private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 1192457210091910933L;

//设置锁标识位state的初始值

Sync(int permits) {

setState(permits);

}

//获取锁标识位state的值,如果state值大于其需要的值则表示锁可以获取

final int getPermits() {

return getState();

}

//获取state值减去acquires后的值,如果大于等于0则表示锁可以获取

final int nonfairTryAcquireShared(int acquires) {

for (;;) {

int available = getState();

int remaining = available - acquires;

if (remaining < 0 ||

compareAndSetState(available, remaining))

return remaining;

}

}

//释放锁

protected final boolean tryReleaseShared(int releases) {

for (;;) {

int current = getState();

//将state值加上release值

int next = current + releases;

if (next < current) // overflow

throw new Error("Maximum permit count exceeded");

if (compareAndSetState(current, next))

return true;

}

}

//将state的值减去reductions

final void reducePermits(int reductions) {

for (;;) {

int current = getState();

int next = current - reductions;

if (next > current) // underflow

throw new Error("Permit count underflow");

if (compareAndSetState(current, next))

return;

}

}

final int drainPermits() {

for (;;) {

int current = getState();

if (current == 0 || compareAndSetState(current, 0))

return current;

}

}

}

//非公平锁

static final class NonfairSync extends Sync {

private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {

super(permits);

}

protected int tryAcquireShared(int acquires) {

return nonfairTryAcquireShared(acquires);

}

}

//公平锁

static final class FairSync extends Sync {

private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {

super(permits);

}

protected int tryAcquireShared(int acquires) {

for (;;) {

if (hasQueuedPredecessors())

return -1;

int available = getState();

int remaining = available - acquires;

if (remaining < 0 ||

compareAndSetState(available, remaining))

return remaining;

}

}

}

//设置信号量

public Semaphore(int permits) {

sync = new NonfairSync(permits);

}

public Semaphore(int permits, boolean fair) {

sync = fair ? new FairSync(permits) : new NonfairSync(permits);

}

//获取锁

public void acquire() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

public void acquireUninterruptibly() {

sync.acquireShared(1);

}

public boolean tryAcquire() {

return sync.nonfairTryAcquireShared(1) >= 0;

}

public boolean tryAcquire(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

}

public void release() {

sync.releaseShared(1);

}

//获取permits值锁

public void acquire(int permits) throws InterruptedException {

if (permits < 0) throw new IllegalArgumentException();

sync.acquireSharedInterruptibly(permits);

}

public void acquireUninterruptibly(int permits) {

if (permits < 0) throw new IllegalArgumentException();

sync.acquireShared(permits);

}

public boolean tryAcquire(int permits) {

if (permits < 0) throw new IllegalArgumentException();

return sync.nonfairTryAcquireShared(permits) >= 0;

}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)

throws InterruptedException {

if (permits < 0) throw new IllegalArgumentException();

return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));

}

//释放

public void release(int permits) {

if (permits < 0) throw new IllegalArgumentException();

sync.releaseShared(permits);

}

public int availablePermits() {

return sync.getPermits();

}

public int drainPermits() {

return sync.drainPermits();

}

protected void reducePermits(int reduction) {

if (reduction < 0) throw new IllegalArgumentException();

sync.reducePermits(reduction);

}

public boolean isFair() {

return sync instanceof FairSync;

}

public final boolean hasQueuedThreads() {

return sync.hasQueuedThreads();

}

public final int getQueueLength() {

return sync.getQueueLength();

}

protected Collection getQueuedThreads() {

return sync.getQueuedThreads();

}

public String toString() {

return super.toString() + "[Permits = " + sync.getPermits() + "]";

}

}

总结

以上就是本文关于Java并发编程Semaphore计数信号量详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Java并发编程之重入锁与读写锁、Java系统的高并发解决方法详解、java高并发锁的3种实现示例代码等,有什么问题,可以留言交流讨论。感谢朋友们对本站的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:使用Bootrap和Vue实现仿百度搜索功能
下一篇:vue+swiper实现组件化开发的实例代码
相关文章

 发表评论

暂时没有评论,来抢沙发吧~