博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Java并发编程实战】——Semaphore源码分析
阅读量:4181 次
发布时间:2019-05-26

本文共 12180 字,大约阅读时间需要 40 分钟。

Semaphore 一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。

举个例子:初始化一个许可证为2,启动10个线程,每个线程模拟执行3秒。

/** * Created by Tangwz on 2019/6/28. */public class TestSemaphore {
public static void main(String[] args) {
Thread.currentThread().interrupt(); Semaphore semaphore = new Semaphore(2); for (int i = 0; i < 10; i++) {
new Thread(new SemaphoreWorker(semaphore), "Thread" + i).start(); } } private static class SemaphoreWorker implements Runnable {
private Semaphore semaphore; SemaphoreWorker(Semaphore semaphore) {
this.semaphore = semaphore; } @Override public void run() {
try {
log("waiting for a permit"); semaphore.acquire(); log("get a permit"); Thread.sleep(3000L); // log("executed"); } catch (InterruptedException e) {
e.printStackTrace(); } log("release a permit"); semaphore.release(); } private void log(String message) {
String name = Thread.currentThread().getName(); System.out.println(name + " " + message); } }}

输出结果:每次只有两个线程可以执行,其他的线程都在等待。两个线程运行三秒,执行完释放许可证后,只有另外的两个线程能获取到许可证然后执行,如此依次进行。

看下面的打印结果,本次执行顺序为:2,6 -> 3,7 -> 1,5 -> 0,9 -> 4,8

Thread2 waiting for a permit

Thread6 waiting for a permit
Thread2 get a permit
Thread6 get a permit
Thread3 waiting for a permit
Thread7 waiting for a permit
Thread1 waiting for a permit
Thread5 waiting for a permit
Thread0 waiting for a permit
Thread9 waiting for a permit
Thread8 waiting for a permit
Thread4 waiting for a permit
Thread2 release a permit
Thread3 get a permit
Thread6 release a permit
Thread7 get a permit
Thread3 release a permit
Thread1 get a permit
Thread7 release a permit
Thread5 get a permit
Thread1 release a permit
Thread0 get a permit
Thread5 release a permit
Thread9 get a permit
Thread0 release a permit
Thread9 release a permit
Thread4 get a permit
Thread8 get a permit
Thread4 release a permit
Thread8 release a permit

初始化一个许可证数量,fair 参数可选,默认非公平

/** * Creates a {@code Semaphore} with the given number of * permits and the given fairness setting. * * @param permits the initial number of permits available. *        This value may be negative, in which case releases *        must occur before any acquires will be granted. * @param fair {@code true} if this semaphore will guarantee *        first-in first-out granting of permits under contention, *        else {@code false} */public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);}

获取一个许可证,许可证集中如果还有,立即返回并减少一个许可证;获取不到则阻塞,等待其他线程释放一个许可证,或者被其他线程中断。

/** * Acquires a permit from this semaphore, blocking until one is * available, or the thread is {@linkplain Thread#interrupt interrupted}. */public void acquire() throws InterruptedException {
//注意此方法响应中断 sync.acquireSharedInterruptibly(1);}/** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//进入前检查中断 if (Thread.interrupted()) throw new InterruptedException(); //尝试获取一个许可证 //两种获取方式,公平还是非公平 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);}//非公平,返回值剩余可用许可证 remaining >=0 代表获取许可证成功,remaining <0 代表失败final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) //两种情况会退出循环 //1.可用许可证 available 为 0 ,返回 remaining = -1,代表失败 //2.可用许可证 available > 0 , 返回 remaining >=0 ,并且CAS成功,代表成功 return remaining; }}/** * Fair version *///公平protected int tryAcquireShared(int acquires) {
for (;;) {
//比非公平获取多加了一个判断,判断同步队列中是否已经有节点在等待获取许可证 if (hasQueuedPredecessors()) return -1; //没有节点在排队,下面和非公平获取一样 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; }}/** * Acquires in shared interruptible mode. * @param arg the acquire argument *///获取许可证失败,进入这里private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
//创建一个共享节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try {
for (;;) {
final Node p = node.predecessor(); //前驱为头节点,才可以尝试获取许可证 if (p == head) {
//尝试获取许可证 int r = tryAcquireShared(arg); if (r >= 0) {
//获取许可证成功,如果可用许可证 >0,或者头结点被设置为传递状态,需要唤醒后续节点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //前驱不为头节点,或者获取许可证失败,被阻塞,等待其他线程释放一个许可证 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //等待期间被中断,抛异常 throw new InterruptedException(); } } finally {
if (failed) //被中断会进入到这里,将此节点从同步队列中移除 cancelAcquire(node); }}/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ //propagate > 0 需要唤醒等待节点可以理解,为啥还有后面这么多判断? //这些判断是否可以去掉呢?见后文分析 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next; //后驱节点类型是共享的才进行唤醒 if (s == null || s.isShared()) //需要唤醒后驱节点 doReleaseShared(); }}

分析 setHeadAndPropagate() 前,先看看怎么释放许可证。

任何一个线程调用 release() 就会释放一个许可证到 Semaphore 中,该线程不一定非要先获取一个许可证。

/** * Releases a permit, returning it to the semaphore. * * 

Releases a permit, increasing the number of available permits by * one. If any threads are trying to acquire a permit, then one is * selected and given the permit that was just released. That thread * is (re)enabled for thread scheduling purposes. * *

There is no requirement that a thread that releases a permit must * have acquired that permit by calling {@link #acquire}. * Correct usage of a semaphore is established by programming convention * in the application. */public void release() {

sync.releaseShared(1);}/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */public final boolean releaseShared(int arg) {
//不管是公平还是非公平,在 Semaphore 中调用的都是同一个方法 if (tryReleaseShared(arg)) {
//释放 arg 个许可证成功后,唤醒在同步队列中可能存在等待许可证的节点 doReleaseShared(); return true; } return false;}//releases 大于 0 的情况下,一定是放入成功了才会返回protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState(); int next = current + releases; //保证释放的许可证数量大于 0 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //放许可证到 Semaphore 中可能会产生竞争 if (compareAndSetState(current, next)) return true; }}/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) *///为共享节点做的唤醒,确保传递信号private void doReleaseShared() {
/* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) {
Node h = head; //头节点是一定不会被阻塞的,头尾相等即没有节点被阻塞 //头结点为空的情况,只发生在同步队列为空,一直没有线程获取许可证失败而阻塞 if (h != null && h != tail) {
int ws = h.waitStatus; //如果同时有多个线程进行释放,这里会产生竞争 if (ws == Node.SIGNAL) {
//步骤1 竞争先会发生在这里,成功的线程会将头设置成0,然后唤醒后驱 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //1.1 失败的线程会继续进行循环 continue; // loop to recheck cases unparkSuccessor(h); } //1.1失败的线程会读到在步骤1中已经修改为0的状态 else if (ws == 0 && //1.2 失败的线程会将头节点状态标记为传播 //1.3 在步骤1中失败的线程不止一个,这里也可能CAS失败 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //1.4 CAS失败的线程继续运行 continue; // loop on failed CAS } //若线程发现头结点变动,继续循环 if (h == head) // loop if head changed break; }}

有没有发现上面释放许可证的时候,设置头结点状态为0失败后,为啥还非要执行 compareAndSetWaitStatus(h, 0, Node.PROPAGATE) ,将头结点状态设置为 -3 共享式传递状态,为啥 setHeadAndPropagate() 中有那么多关于头结点的判断?

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); }}

判断和设置-3都不可少,考虑这样一种情况:初始化一个信号量为2,线程A和线程B先获取许可证再释放一个许可证,线程C再获取许可证然后进同步队列,线程D再获取许可证然后在线程C之后进同步队列。接着按时间顺序发生如下步骤:

步骤1:线程A设置头结点状态为 0 ,然后唤醒C后发现头节点引用没有变化然后结束;
步骤2:线程C恢复执行,可用许可证数量 propagate 为 0;
步骤3:线程B发现头状态不为-1不做修改,线程B也发现头节点引用没有变化然后结束;
步骤4:线程C这时才执行 setHead(node) ,发现 propagate = 0,h.waitStatus = 0,然后退出;
步骤5:线程C可能只获取不执行 release() ,那么线程D不会被唤醒,但是此时还有一个许可证。
流程图
设置了 Node.PROPAGATE 就不一样了,这时不管线程C执行 setHead(node) 在线程B修改头结点为-3之前还是之后都没有问题:在之前线程C发现头节点状态等于0后退出,线程B发现头节点引用变化,然后继续循环重新获取头结点C,将头结点C的状态改为-1,然后唤醒节点C之后的节点D;在之后线程C发现原头节点状态小于0,唤醒原头节点后驱。

总结下:

setHeadAndPropagate() 作用:设置自己为头节点,传递头节点的共享状态唤醒后驱。当可用许可证大于0唤醒后驱;当原头节点变动说明有其他线程释放了许可证,也唤醒后驱;当自己成为头节点后,之后不管头节点再怎么变化,只要变化说明有其他释放许可证或者有新节点加入,唤醒后驱。并发竞争程度高的话,可能产生无用唤醒。
doReleaseShared() 作用:唤醒后驱,并确保释放的共享状态能传递下去。头节点为-1,唤醒后驱;头节点为0,设置头节点为-3传递释放状态。设置-3失败的情况可能是设置头节点为-1时失败的线程比较多,再次循环发现头节点已经是-3然后退出;或者有新节点加入,再次循环发现头节点为-1唤醒后驱。

同步队列的分析请看

参考

转载地址:http://rmrai.baihongyu.com/

你可能感兴趣的文章
linux 两个装逼的命令
查看>>
C语言中的static
查看>>
open系统调用
查看>>
man 命令
查看>>
linux 特权级 用户态 内核态
查看>>
【C++】入门基础概念
查看>>
【C++】面向对象程序设计
查看>>
【C++】类与对象
查看>>
【C++】类和对象二
查看>>
【C++】构造函数&&析构函数
查看>>
【C++】继承语法&&继承方式
查看>>
【C++】继承时的名字遮蔽&&派生类的构造函数
查看>>
【C++】智能指针的设计与实现
查看>>
栈的简介与C++模板实现
查看>>
队列的简介与C++模板实现
查看>>
哈夫曼树与哈夫曼编码详解及C++模板实现
查看>>
二叉查找树及C++模板实现
查看>>
大顶堆的C++模板实现及二叉堆的简介
查看>>
linux网络通信工具
查看>>
【C++】顺序容器
查看>>