ReentrantLock:表示重入锁,它是唯一一个实现了 Lock 接口的类。重入锁指的是 线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入 次数
ReentrantReadWriteLock:重入读写锁,它实现了 ReadWriteLock 接口,在这个 类中维护了两个锁,一个是 ReadLock,一个是 WriteLock,他们都分别实现了 Lock 接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则 是: 读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的 操作都会存在互斥。
StampedLock: stampedLock 是 JDK8 引入的新的锁机制,可以简单认为是读写 锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全 并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。 stampedLock 是一种乐观的读策略,使得乐观锁完全不会阻塞写线程
重入锁,表示支持重新进入的锁,也就是说,如果当前线程 t1 通过调用 lock 方 法获取了锁之后,再次调用 lock,是不会再阻塞去获取锁的,直接增加重试次数 就行了。synchronized 和 ReentrantLock 都是可重入锁。
public class App {
public synchronized void demo(){ //main获得对象锁
System.out.println("demo");
demo2();
}
public void demo2(){
synchronized (this) { //增加重入次数就行
System.out.println("demo2");
}//减少重入次数
}
public static void main(String[] args) {
App app=new App();
app.demo();
}
}
public class ReentrantLockDemo {
static Lock lock=new ReentrantLock();
//synchronized的原子操作改造成Lock
public void demo() throws InterruptedException { //N线程来访问
lock.lock(); //获得一个锁
lock.unlock();// 释放锁
}
public static void main(String[] args) {
}
}
读写锁维护了一对锁,一个读锁、一个写锁; 一般情况下,**读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。**在读 多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量.
public class RWLock {
static ReentrantReadWriteLock wrl=new ReentrantReadWriteLock();
static Map<String,Object> cacheMap=new HashMap<>();
static Lock read=wrl.readLock();
static Lock write=wrl.writeLock();
//线程B/C/D
public static final Object get(String key){
System.out.println("begin read data:"+key);
read.lock(); //获得读锁-> 阻塞
try {
return cacheMap.get(key);
}finally {
read.unlock();
}
}
//线程A
public static final Object put(String key,Object val){
write.lock();//获得了写锁
try{
return cacheMap.put(key,val);
}finally {
write.unlock();
}
}
public static void main(String[] args) {
wrl.readLock();//B线程 ->阻塞
wrl.writeLock(); //A线程
//读->读是可以共享
//读->写 互斥
//写->写 互斥
//读多写少的场景
}
}
读锁与读锁可以共享
读锁与写锁不可以共享(排他)
写锁与写锁不可以共享(排他)
多个线程竞争锁的时候,其他线程怎么办?->阻塞
同步工具
AQS分为两种功能:
- 1.独占 => 互斥
- 2.共享 => 读写锁
AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构 都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任 意一个节点开始很方便的访问前驱和后继。每个 Node 其实是由线程封装,当线 程争抢锁失败后会封装成 Node 加入到 ASQ 队列中去;当获取锁的线程释放锁以 后,会从队列中唤醒一个阻塞的节点(线程)
public void lock() {
sync.lock();
}
sync 实际上是一个抽象的静态内部类,它继承了 AQS 来实现重入锁的逻辑,我们 前面说过 AQS 是一个同步队列,它能够实现线程的阻塞以及唤醒,但它并不具备 业务功能,所以在不同的同步场景中,会继承 AQS 来实现对应场景的功能 Sync 有两个具体的实现类,分别是
NofairSync:表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁
FailSync: 表示所有线程严格按照 FIFO 来获取锁
锁的基本要素
1.一个共享的苏剧来记录锁的状态
State:锁标记
0:无锁
大于等于1
是有锁状态cas -> 实现原子性
2.非公平锁
3.公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
1.非公平锁和公平锁最大的区别在于,在非公平锁中我抢占锁的逻辑是,不管有没有线程排队,我先上来 cas 去抢占一下
2.CAS 成功,就表示成功获得了锁
3.CAS 失败,调用 acquire(1)走锁竞争逻辑
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
如果当前内存中的 state 的值和预期值 expect 相等,则替换为 update。更新成功返回 true,否则返 回 false.
state 是 AQS 中的一个属性,它在不同的实现中所表达的含义不一样,对于重入 锁的实现来说,表示一个同步状态。它有两个含义的表示
1.当 state=0 时,表示无锁状态
2.当 state>0 时,表示已经有线程获得了锁,也就是 state=1,但是因为 ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增, 比如重入 5 次,那么 state=5。而在释放锁的时候,同样需要释放 5 次直到 state=0 其他线程才有资格获得锁
#### Unsafe类
1.通过 tryAcquire 尝试获取独占锁,如果成功返回 true,失败返回 false
2.如果 tryAcquire 失败,则会通过 addWaiter 方法将当前线程封装成 Node 添加 到 AQS 队列尾部
3.acquireQueued,将 Node 作为参数,通过自旋去尝试获取锁。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
1.获取当前线程,判断当前的锁的状态
2.如果 state=0 表示当前是无锁状态,通过 cas 更新 state 状态的值
3.当前线程是属于重入,则增加重入次数
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { //cas 替换 state 的值,cas 成功表示获取锁成功
setExclusiveOwnerThread(current); //保存当前获得锁的线程,下次再来的时候不要再尝试竞争锁
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //如果同一个线程来获得锁,直接增加重入次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
当 tryAcquire 方法获取锁失败以后,则会先调用 addWaiter 将当前线程封装成 Node.
入参 mode 表示当前节点的状态,传递的参数是 Node.EXCLUSIVE,表示独占状 态。意味着重入锁用到了 AQS 的独占锁功能
1.将当前线程封装成 Node
2.当前链表中的 tail 节点是否为空,如果不为空,则通过 cas 操作把当前线程的 node 添加到 AQS 队列
3.如果为空或者 cas 失败,调用 enq 将节点添加到 AQS 队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//把当前线程封装为 Node
Node pred = tail; //tail 是 AQS 中表示同比队列队尾的属性,默认是 null
if (pred != null) {//tail 不为空的情况下,说明队列中存在节点
node.prev = pred;//把当前线程的 Node 的 prev 指向 tail
if (compareAndSetTail(pred, node)) {//通过 cas 把 node加入到 AQS 队列,也就是设置为 tail
pred.next = node;//设置成功以后,把原 tail 节点的 next指向当前 node
return node;
}
}
enq(node);//tail=null,把 node 添加到同步队列
return node;
}
enq 就是通过自旋操作把当前节点加入到队列中
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
通过 addWaiter 方法把线程添加到链表后,会接着把 Node 作为参数传递给 acquireQueued 方法,去竞争锁
1.获取当前节点的 prev 节点
2.如果 prev 节点为 head 节点,那么它就有资格去争抢锁,调用 tryAcquire 抢占 锁
3.抢占锁成功以后,把获得锁的节点设置为 head,并且移除原来的初始化 head 节点
4.如果获得锁失败,则根据 waitStatus 决定是否需要挂起线程
5.最后,通过 cancelAcquire 取消获得锁的操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // //获取当前节点的 prev 节点
if (p == head && tryAcquire(arg)) { //如果是 head 节点,说明有资格去争抢锁
setHead(node); ;//获取锁成功,也就是ThreadA 已经释放了锁,然后设置 head 为 ThreadB 获得执行权限
p.next = null; // help GC //把原 head 节点从链表中移除
failed = false;
return interrupted;
}
//ThreadA 可能还没释放锁,使得 ThreadB 在执行 tryAcquire 时会返回 false
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; //并且返回当前线程在等待过程中有没有中断过。
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果 ThreadA 的锁还没有释放的情况下,ThreadB 和 ThreadC 来争抢锁肯定是会 失败,那么失败以后会调用 shouldParkAfterFailedAcquire 方法
Node 有 5 中状态,分别是:CANCELLED(1),SIGNAL(-1)、CONDITION(- 2)、PROPAGATE(-3)、默认状态(0)
CANCELLED: 在同步队列中等待的线程等待超时或被中断,需要从同步队列中取 消该 Node 的结点, 其结点的 waitStatus 为 CANCELLED,即结束状态,进入该状 态后的结点将不会再变化
SIGNAL: 只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程
CONDITION: 和 Condition 有关系,后续会讲解 PROPAGATE:共享模式下,
PROPAGATE 状态的线程处于可运行状态 0:初始状态 这个方法的主要作用是,通过 Node 的状态来判断,ThreadA 竞争锁失败以后是否应该被挂起。
1.如果 ThreadA 的 pred 节点状态为 SIGNAL,那就表示可以放心挂起当前线程
2.通过循环扫描链表把 CANCELLED 状态的节点移除
3.修改 pred 节点的状态为 SIGNAL,
返回 false. 返回 false 时,也就是不需要挂起,返回 true,则需要调用 parkAndCheckInterrupt 挂起当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //前置节点的waitStatus
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true; //返回 true,意味着可以直接放心的挂起了
if (ws > 0) { //ws 大于 0,意味着 prev 节点取消了排队,直接移除这个节点就行
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//相当于: pred=pred.prev; node.prev=pred;
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//这里采用循环,从双向列表中移除 CANCELLED 的节点
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//利用 cas 设置 prev 节点的状态为 SIGNAL(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
使用 LockSupport.park 挂起当前线程编程 WATING 状态
Thread.interrupted,返回当前线程是否被其他线程触发过中断请求,也就是 thread.interrupt(); 如果有触发过中断请求,那么这个方法会返回当前的中断标识 true,并且对中断标识进行复位标识已经响应过了中断请求。如果返回 true,意味 着在 acquire 方法中会执行 selfInterrupt()。
在unlock中会调用release来释放锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) { //释放锁成功
Node h = head; //得到 aqs 中 head 节点
if (h != null && h.waitStatus != 0) //如果 head 节点不为空并且状态!=0.调用 unparkSuccessor(h)唤醒后续节点
unparkSuccessor(h);
return true;
}
return false;
}
这个方法可以认为是一个设置锁状态的操作,通过将 state 状态减掉传入的参数值 (参数是 1),如果结果状态为 0,就将排它锁的 Owner 设置为 null,以使得其它 的线程有机会进行执行。
在排它锁中,加锁的时候状态会增加 1(当然可以自己修改这个值),在解锁的时 候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock() 的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下 才会返回 true。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; ;//获得 head 节点的状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 设置 head 节点状态为 0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; //得到 head 节点的下一个节点
if (s == null || s.waitStatus > 0) {
//如果下一个节点为 null 或者 status>0 表示 cancelled 状态.
//通过从尾部节点开始扫描,找到距离 head 最近的一个waitStatus<=0 的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //next 节点不为空,直接唤醒这个线程即可
LockSupport.unpark(s.thread);
}
1.将新的节点的 prev 指向 tail
2.通过 cas 将 tail 设置为新的节点,因为 cas 是原子操作所以能够保证线程安全性
3.t.next=node;设置原 tail 的 next 节点指向新的节点
在 cas 操作之后,t.next=node 操作之前。存在其他线程调用 unlock 方法从 head 开始往后遍历,由于 t.next=node 还没执行意味着链表的关系还没有建立完整。 就会导致遍历到 t 节点的时候被中断。所以从后往前遍历,一定不会存在这个问 题。
通过锁的释放,原本的结构就发生了一些变化。head 节点的 waitStatus 变成了 0, ThreadB 被唤醒
1.把 ThreadB 节点当成 head
2.把原 head 节点的 next 节点指向为 null
1.设置新 head 节点的 prev=null
2.设置原 head 节点的 next 节点为 null
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}