public class ConditionWait implements Runnable{
private Lock lock;
private Condition condition;
public ConditionWait(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
try {
lock.lock(); //竞争锁
try {
System.out.println("begin - ConditionWait");
condition.await();//阻塞(1. 释放锁, 2.阻塞当前线程, FIFO(单向、双向))
System.out.println("end - ConditionWait");
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
lock.unlock();//释放锁
}
}
}
public class ConditionNotify implements Runnable{
private Lock lock;
private Condition condition;
public ConditionNotify(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
try{
lock.lock();//获得了锁.
System.out.println("begin - conditionNotify");
condition.signal();//唤醒阻塞状态的线程
//if(uncondition){
// condition.await();
// }
//condition.notify;
condition.await();
System.out.println("end - conditionNotify");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); //释放锁
}
}
}
await:把当前线程阻塞挂起
signal:唤醒阻塞的线程
调用 Condition 的 await()方法(或者以 await 开头的方法), 会使当前线程进入等待队列并释放锁,同时线程状态变为 等待状态。当从 await()方法返回时,当前线程一定获取了 Condition 相关联的锁
public final void await() throws InterruptedException {
if (Thread.interrupted()) //表示 await 允许被中断
throw new InterruptedException();
Node node = addConditionWaiter(); //创建一个新的节点,节点状态为 condition,采用的数据结构仍然是链表
int savedState = fullyRelease(node); //创建一个新的节点,节点状态为 condition,采用的数据结构仍然是链表
int interruptMode = 0; //如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
while (!isOnSyncQueue(node)) { //判断这个节点是否在 AQS 队列上,第一次判断的是 false,因为前面已经释放锁了
LockSupport.park(this); //通过 park 挂起当前线程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
// interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
// 将这个变量设置成 REINTERRUPT.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
// interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
// 将这个变量设置成 REINTERRUPT.
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果线程被中断了,需要抛出异常.或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如 果 lastWaiter不等于空并且waitStatus 不等于 CONDITION 时,把冲好这个节点从链表中移除
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的,所以相比 AQS 来说会简单很多
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
执行完 addConditionWaiter 这个方法之后,就会产生一个 这样的 condition 队列
fullRelease,就是彻底的释放锁,什么叫彻底呢,就是如果 当前锁存在多次重入,那么在这个方法中只需要释放一次 就会把所有的重入次数归零。
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获得重入的次数
int savedState = getState();
//释放锁并且唤醒下一个同步队列中的线程
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
此时,同步队列会触发锁的释放和重新竞争。ThreadB 获 得了锁
判断当前节点是否在同步队列中,返回 false 表示不在,返 回 true 表示在
如果不在 AQS 同步队列,说明当前节点没有唤醒去争抢同 步锁,所以需要把当前线程阻塞起来,直到其他的线程调 用 signal 唤醒
如果在 AQS 同步队列,意味着它需要去竞争同步锁去获得 执行程序执行权限
1.如果 ThreadA 的 waitStatus 的状态为 CONDITION,说 明它存在于 condition 队列中,不在 AQS 队列。因为 AQS 队列的状态一定不可能有 CONDITION
2.如果 node.prev 为空,说明也不存在于 AQS 队列,原因 是 prev=null 在 AQS 队列中只有一种可能性,就是它是 head 节点,head 节点意味着它是获得锁的节点。
3.如果 node.next 不等于空,说明一定存在于 AQS 队列 中,因为只有 AQS 队列才会存在 next 和 prev 的关系
4.findNodeFromTail,表示从 tail 节点往前扫描 AQS 队列, 一旦发现 AQS 队列的节点和当前节点相等,说明节点一 定存在于 AQS 队列中
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
await 方法会阻塞 ThreadA,然后 ThreadB 抢占到了锁获 得了执行权限,这个时候在 ThreadB 中调用了 Condition 的 signal()方法,将会唤醒在等待队列中节点
public final void signal() {
if (!isHeldExclusively()) //先判断当前线程是否获得了锁,这个判断比较简单,直接用获得锁的线程和当前线程相比即可
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
对 condition 队列中从首部开始的第一个 condition 状态的 节点,执行 transferForSignal 操作,将 node 从 condition 队列中转换到 AQS 队列中,同时修改 AQS 队列中原先尾 节点的状态
private void doSignal(Node first) {
do {
//从 Condition 队列中删除 first 节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null; // 将 next 节点设置成 null
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
该方法先是 CAS 修改了节点状态,如果成功,就将这个节 点放到 AQS 队列中,然后唤醒这个节点上的线程。此时, 那个节点就会在 await 方法中苏醒
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //更新节点的状态为 0,如果更新失败,只有一种可能就是节点被 CANCELLED 了
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); //更新节点的状态为 0,如果更新失败,只有一种可能就是节点被 CANCELLED 了
int ws = p.waitStatus;
// 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL 失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒节点上的线程.
return true; //如果 node 的 prev 节点已经是signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来完成
}
执行完 doSignal 以后,会把 condition 队列中的节点转移 到 aqs 队列上,逻辑结构图如下
这个时候会判断 ThreadA 的 prev 节点也就是 head 节点 的 waitStatus,如果大于 0 或者设置 SIGNAL 失败,表示 节点被设置成了 CANCELLED 状态。这个时候会唤醒 ThreadA 这个线程。否则就基于 AQS 队列的机制来唤 醒,也就是等到 ThreadB 释放锁之后来唤醒 ThreadA
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
///if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
1.Thread.interrupt(); // 1 2
2.condition.signal(); // 2 1
先执行2,在执行1 -> 重新发按期中断就可
先执行1,在执行2 -> 异常 interruptedException
这个方法在讲 aqs 的时候说过,是的当前被唤醒的节点 ThreadA 去抢占同步锁。并且要恢复到原本的重入次数状 态。调用完这个方法之后,AQS 队列的状态如下 将 head 节点的 waitStatus 设置为-1,Signal 状态。
根据 checkInterruptWhileWaiting 方法返回的中断标识来 进行中断上报。 如果是 THROW_IE,则抛出中断异常 如果是 REINTERRUPT,则重新响应中断
阻塞:await()方法中,在线程释放锁资源之后,如果节点 不在 AQS 等待队列,则阻塞当前线程,如果在等待队 列,则自旋等待尝试获取锁
释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入正常锁的获取流程
使用场景:计数器
它允许一个或多个线 程一直等待,直到其他线程的操作执行完毕再执行
countdownlatch 提供了两个方法,一个是 countDown, 一个是 await,countdownlatch 初始化的时候需要传入一 个整数,在这个整数倒数到 0 之前,调用了 await 方法的 程序都必须要等待,然后通过 countDown 来倒数。
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch =new CountDownLatch(3);
new Thread(()->{
System.out.println("Thread1");
countDownLatch.countDown(); //3-1=2
}).start();
new Thread(()->{
System.out.println("Thread2");
countDownLatch.countDown();//2-1=1
}).start();
new Thread(()->{
System.out.println("Thread3");
countDownLatch.countDown();//1-1=0
}).start();
countDownLatch.await();
}
static CountDownLatch countDownLatch=new CountDownLatch(4);
public static void main(String[] args) {
for(int i=0;i<3;i++){
new CountDownLatchDemo().start();
}
countDownLatch.countDown();
}
@Override
public void run() {
try {
countDownLatch.await(); //阻塞 3个线程 Thread.currentThread
} catch (InterruptedException e) {
e.printStackTrace();
}
//TODO
System.out.println("ThreadName:"+Thread.currentThread().getName());
}
总的来说,凡事涉及到需要指定某个人物在执行之前,要 等到前置人物执行完毕之后才执行的场景,都可以使用 CountDownLatch
对于 CountDownLatch,我们仅仅需要关心两个方法,一 个是 countDown() 方法,另一个是 await() 方法。
countDown() 方法每次调用都会将 state 减 1,直到 state 的值为 0;
而 await 是一个阻塞方法,当 state 减 为 0 的时候,await 方法才会返回。
await 可以被多个线 程调用,大家在这个时候脑子里要有个图:所有调用了 await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件 满足(state == 0),将线程从队列中一个个唤醒过来。
countdownlatch 也用到了 AQS,在 CountDownLatch 内 部写了一个 Sync 并且继承了 AQS 这个抽象类重写了 AQS 中的共享锁方法。首先看到下面这个代码,这块代码主要 是 判 断 当 前 线 程 是 否 获 取 到 了 共 享 锁 ; ( 在 CountDownLatch 中 , 使 用 的 是 共 享 锁 机 制 ,因为 CountDownLatch 并不需要实现互斥的特性)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //state 如果不等于 0,说明当前线程需要加入到共享锁队列中
doAcquireSharedInterruptibly(arg);
}
1.addWaiter 设置为 shared 模式。
2.tryAcquire 和 tryAcquireShared 的返回值不同,因此会 多出一个判断过程
3.在 判 断 前 驱 节 点 是 头 节 点 后 , 调 用 了 setHeadAndPropagate 方法,而不是简单的更新一下头 节点。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//创建一个共享模式的节点添加到队列中
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取锁
if (r >= 0) { //r>=0 表示获取到了执行权限,这个时候因为 state!=0,所以不会执行这段代码
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
加入这个时候有 3 个线程调用了 await 方法,由于这个时 候 state 的值还不为 0,所以这三个线程都会加入到 AQS 队列中。并且三个线程都处于阻塞状态
1.只有当 state 减为 0 的时候,tryReleaseShared 才返 回 true, 否则只是简单的 state = state - 1
2.如果 state=0, 则调用 doReleaseShared 唤醒处于 await 状态下的线程
public final boolean releaseShared(int arg) {
//用自旋的方法实现 state 减 1
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
共享锁的释放和独占锁的释放有一定的差别前面唤醒锁的逻辑和独占锁是一样,先判断头结点是不是SIGNAL 状态,如果是,则修改为 0,并且唤醒头结点的下一个节点
PROPAGATE: 标识为 PROPAGATE 状态的节 点,是共享锁模式下的节点状态,处于这个状态 下的节点,会对线程的唤醒进行传播
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环
// 通过检查头节点是否改变了,如果改变了就继续循环
if (h == head) // loop if head changed
break;
}
}
h == head:说明头节点还没有被刚刚用 unparkSuccessor 唤醒的线程(这里可以理解为 ThreadB)占有,此时 break 退出循环。
h != head:头节点被刚刚唤醒的线程(这里可以理解为 ThreadB)占有,那么这里重新进入下一轮循环,唤醒下 一个节点(这里是 ThreadB )。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
这个方法的主要作用是把被唤醒的节点,设置成 head 节 点。 然后继续唤醒队列中的其他线程。 由于现在队列中有 3 个线程处于阻塞状态,一旦 ThreadA 被唤醒,并且设置为 head 之后,会继续唤醒后续的 ThreadB
主要场景:限流
semaphore 也就是我们常说的信号灯,semaphore 可以控 制同时访问的线程个数,通过 acquire 获取一个许可,如 果没有就等待,通过 release 释放一个许可。
public class SemaphoreDemo {
//限流(AQS)
//permits; 令牌(5)
//公平和非公平
static class Car extends Thread{
private int num;
private Semaphore semaphore;
public Car(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
public void run(){
try {
semaphore.acquire(); //获得一个令牌, 如果拿不到令牌,就会阻塞
System.out.println("第"+num+" 抢占一个车位");
Thread.sleep(2000);
System.out.println("第"+num+" 开走喽");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Semaphore semaphore=new Semaphore(5);
for(int i=0;i<10;i++){
new Car(i,semaphore).start();
}
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
当存在需要所有的子任务都完成时,才执行主任务,这个 时候就可以选择使用 CyclicBarrier
public class DataImportThread extends Thread{
private CyclicBarrier cyclicBarrier;
private String path;
public DataImportThread(CyclicBarrier cyclicBarrier, String path) {
this.cyclicBarrier = cyclicBarrier;
this.path = path;
}
@Override
public void run() {
System.out.println("开始导入:"+path+" 数据");
//TODO
try {
cyclicBarrier.await(); //阻塞 condition.await()
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public class CycliBarrierDemo extends Thread{
@Override
public void run() {
System.out.println("开始进行数据分析");
}
//循环屏障
//可以使得一组线程达到一个同步点之前阻塞.
public static void main(String[] args) {
CyclicBarrier cyclicBarrier=new CyclicBarrier
(3,new CycliBarrierDemo());
new Thread(new DataImportThread(cyclicBarrier,"file1")).start();
new Thread(new DataImportThread(cyclicBarrier,"file2")).start();
new Thread(new DataImportThread(cyclicBarrier,"file3")).start();
}
}
注意点
1)对于指定计数值 parties,若由于某种原因,没有足够的 线程调用 CyclicBarrier 的 await,则所有调用 await 的线程 都会被阻塞;
2)同样的 CyclicBarrier 也可以调用 await(timeout, unit), 设置超时时间,在设定时间内,如果没有足够线程到达, 则解除阻塞状态,继续工作;
3)通过 reset 重置计数,会使得进入 await 的线程出现 BrokenBarrierException;
4 )如果采用是 CyclicBarrier(int parties, Runnable barrierAction) 构造方法,执行 barrierAction 操作的是最 后一个到达的线程