本文共 12180 字,大约阅读时间需要 40 分钟。
Semaphore 一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。
举个例子:初始化一个许可证为2,启动10个线程,每个线程模拟执行3秒。
/** * Created by Tangwz on 2019/6/28. */public class TestSemaphore { public static void main(String[] args) { Thread.currentThread().interrupt(); Semaphore semaphore = new Semaphore(2); for (int i = 0; i < 10; i++) { new Thread(new SemaphoreWorker(semaphore), "Thread" + i).start(); } } private static class SemaphoreWorker implements Runnable { private Semaphore semaphore; SemaphoreWorker(Semaphore semaphore) { this.semaphore = semaphore; } @Override public void run() { try { log("waiting for a permit"); semaphore.acquire(); log("get a permit"); Thread.sleep(3000L); // log("executed"); } catch (InterruptedException e) { e.printStackTrace(); } log("release a permit"); semaphore.release(); } private void log(String message) { String name = Thread.currentThread().getName(); System.out.println(name + " " + message); } }}
输出结果:每次只有两个线程可以执行,其他的线程都在等待。两个线程运行三秒,执行完释放许可证后,只有另外的两个线程能获取到许可证然后执行,如此依次进行。
看下面的打印结果,本次执行顺序为:2,6 -> 3,7 -> 1,5 -> 0,9 -> 4,8Thread2 waiting for a permit
Thread6 waiting for a permit Thread2 get a permit Thread6 get a permit Thread3 waiting for a permit Thread7 waiting for a permit Thread1 waiting for a permit Thread5 waiting for a permit Thread0 waiting for a permit Thread9 waiting for a permit Thread8 waiting for a permit Thread4 waiting for a permit Thread2 release a permit Thread3 get a permit Thread6 release a permit Thread7 get a permit Thread3 release a permit Thread1 get a permit Thread7 release a permit Thread5 get a permit Thread1 release a permit Thread0 get a permit Thread5 release a permit Thread9 get a permit Thread0 release a permit Thread9 release a permit Thread4 get a permit Thread8 get a permit Thread4 release a permit Thread8 release a permit
初始化一个许可证数量,fair 参数可选,默认非公平
/** * Creates a {@code Semaphore} with the given number of * permits and the given fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. * @param fair {@code true} if this semaphore will guarantee * first-in first-out granting of permits under contention, * else {@code false} */public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
获取一个许可证,许可证集中如果还有,立即返回并减少一个许可证;获取不到则阻塞,等待其他线程释放一个许可证,或者被其他线程中断。
/** * Acquires a permit from this semaphore, blocking until one is * available, or the thread is {@linkplain Thread#interrupt interrupted}. */public void acquire() throws InterruptedException { //注意此方法响应中断 sync.acquireSharedInterruptibly(1);}/** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //进入前检查中断 if (Thread.interrupted()) throw new InterruptedException(); //尝试获取一个许可证 //两种获取方式,公平还是非公平 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);}//非公平,返回值剩余可用许可证 remaining >=0 代表获取许可证成功,remaining <0 代表失败final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) //两种情况会退出循环 //1.可用许可证 available 为 0 ,返回 remaining = -1,代表失败 //2.可用许可证 available > 0 , 返回 remaining >=0 ,并且CAS成功,代表成功 return remaining; }}/** * Fair version *///公平protected int tryAcquireShared(int acquires) { for (;;) { //比非公平获取多加了一个判断,判断同步队列中是否已经有节点在等待获取许可证 if (hasQueuedPredecessors()) return -1; //没有节点在排队,下面和非公平获取一样 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; }}/** * Acquires in shared interruptible mode. * @param arg the acquire argument *///获取许可证失败,进入这里private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //创建一个共享节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //前驱为头节点,才可以尝试获取许可证 if (p == head) { //尝试获取许可证 int r = tryAcquireShared(arg); if (r >= 0) { //获取许可证成功,如果可用许可证 >0,或者头结点被设置为传递状态,需要唤醒后续节点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //前驱不为头节点,或者获取许可证失败,被阻塞,等待其他线程释放一个许可证 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //等待期间被中断,抛异常 throw new InterruptedException(); } } finally { if (failed) //被中断会进入到这里,将此节点从同步队列中移除 cancelAcquire(node); }}/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ //propagate > 0 需要唤醒等待节点可以理解,为啥还有后面这么多判断? //这些判断是否可以去掉呢?见后文分析 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //后驱节点类型是共享的才进行唤醒 if (s == null || s.isShared()) //需要唤醒后驱节点 doReleaseShared(); }}
分析 setHeadAndPropagate() 前,先看看怎么释放许可证。
任何一个线程调用 release() 就会释放一个许可证到 Semaphore 中,该线程不一定非要先获取一个许可证。/** * Releases a permit, returning it to the semaphore. * *Releases a permit, increasing the number of available permits by * one. If any threads are trying to acquire a permit, then one is * selected and given the permit that was just released. That thread * is (re)enabled for thread scheduling purposes. * *
There is no requirement that a thread that releases a permit must * have acquired that permit by calling {@link #acquire}. * Correct usage of a semaphore is established by programming convention * in the application. */public void release() {
sync.releaseShared(1);}/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */public final boolean releaseShared(int arg) { //不管是公平还是非公平,在 Semaphore 中调用的都是同一个方法 if (tryReleaseShared(arg)) { //释放 arg 个许可证成功后,唤醒在同步队列中可能存在等待许可证的节点 doReleaseShared(); return true; } return false;}//releases 大于 0 的情况下,一定是放入成功了才会返回protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; //保证释放的许可证数量大于 0 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //放许可证到 Semaphore 中可能会产生竞争 if (compareAndSetState(current, next)) return true; }}/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) *///为共享节点做的唤醒,确保传递信号private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; //头节点是一定不会被阻塞的,头尾相等即没有节点被阻塞 //头结点为空的情况,只发生在同步队列为空,一直没有线程获取许可证失败而阻塞 if (h != null && h != tail) { int ws = h.waitStatus; //如果同时有多个线程进行释放,这里会产生竞争 if (ws == Node.SIGNAL) { //步骤1 竞争先会发生在这里,成功的线程会将头设置成0,然后唤醒后驱 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //1.1 失败的线程会继续进行循环 continue; // loop to recheck cases unparkSuccessor(h); } //1.1失败的线程会读到在步骤1中已经修改为0的状态 else if (ws == 0 && //1.2 失败的线程会将头节点状态标记为传播 //1.3 在步骤1中失败的线程不止一个,这里也可能CAS失败 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //1.4 CAS失败的线程继续运行 continue; // loop on failed CAS } //若线程发现头结点变动,继续循环 if (h == head) // loop if head changed break; }}
有没有发现上面释放许可证的时候,设置头结点状态为0失败后,为啥还非要执行 compareAndSetWaitStatus(h, 0, Node.PROPAGATE) ,将头结点状态设置为 -3 共享式传递状态,为啥 setHeadAndPropagate() 中有那么多关于头结点的判断?
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); }}
判断和设置-3都不可少,考虑这样一种情况:初始化一个信号量为2,线程A和线程B先获取许可证再释放一个许可证,线程C再获取许可证然后进同步队列,线程D再获取许可证然后在线程C之后进同步队列。接着按时间顺序发生如下步骤:
步骤1:线程A设置头结点状态为 0 ,然后唤醒C后发现头节点引用没有变化然后结束; 步骤2:线程C恢复执行,可用许可证数量 propagate 为 0; 步骤3:线程B发现头状态不为-1不做修改,线程B也发现头节点引用没有变化然后结束; 步骤4:线程C这时才执行 setHead(node) ,发现 propagate = 0,h.waitStatus = 0,然后退出; 步骤5:线程C可能只获取不执行 release() ,那么线程D不会被唤醒,但是此时还有一个许可证。 设置了 Node.PROPAGATE 就不一样了,这时不管线程C执行 setHead(node) 在线程B修改头结点为-3之前还是之后都没有问题:在之前线程C发现头节点状态等于0后退出,线程B发现头节点引用变化,然后继续循环重新获取头结点C,将头结点C的状态改为-1,然后唤醒节点C之后的节点D;在之后线程C发现原头节点状态小于0,唤醒原头节点后驱。总结下:
setHeadAndPropagate() 作用:设置自己为头节点,传递头节点的共享状态唤醒后驱。当可用许可证大于0唤醒后驱;当原头节点变动说明有其他线程释放了许可证,也唤醒后驱;当自己成为头节点后,之后不管头节点再怎么变化,只要变化说明有其他释放许可证或者有新节点加入,唤醒后驱。并发竞争程度高的话,可能产生无用唤醒。 doReleaseShared() 作用:唤醒后驱,并确保释放的共享状态能传递下去。头节点为-1,唤醒后驱;头节点为0,设置头节点为-3传递释放状态。设置-3失败的情况可能是设置头节点为-1时失败的线程比较多,再次循环发现头节点已经是-3然后退出;或者有新节点加入,再次循环发现头节点为-1唤醒后驱。同步队列的分析请看
参考
转载地址:http://rmrai.baihongyu.com/