JUC-AQS原理


大量使用CAS进行操作

关键字:

volatile int state: 同步状态

等待队列(链表):head、tail

等待节点Node:链表结构中的节点

acquire:尝试一次加锁,失败则入队列

cas循环尝试,添加到队列尾

acquireShared:加锁的操作

tryAcquire、tryAcquireShared:需要被子类实现的方法

release、releaseShared:解锁的操作

tryRelease、tryReleaseShared:需要被子类实现的方法

AQS基本原理

AQS(AbstractQueuedSynchronizer)是Java并发编程中的一个重要组件,它是一个抽象类,提供了线程同步的底层实现机制。AQS的作用是实现线程的同步和互斥操作,它提供了两种主要的锁机制,分别是排他锁和共享锁。
排他锁也称为独占锁,在多个线程竞争同一共享资源时,同一时刻只允许一个线程访问该共享资源,即多个线程中只有一个线程获得锁资源。

在AQS中,排他锁是通过内置的同步状态来实现的。

  • 当同步状态为0时,表示锁是未被获取的;
  • 当同步状态大于0时,表示锁已经被获取且被占用;
  • 当同步状态小于0时,表示锁已经被获取但是处于等待状态。

** 共享锁允许多个线程同时获得锁资源,但是在同一时刻只有一个线程可以获取到锁的拥有权,其他线程需要等待该线程释放锁**。在AQS中,共享锁的实现与排他锁类似,也是通过内置的同步状态来实现的。
AQS通过一个内置的FIFO(先进先出)等待队列来实现线程的排队和调度。当线程需要获取锁资源时,如果锁已经被其他线程获取,则该线程会被加入到等待队列中等待。当锁被释放时,等待队列中的第一个线程会获得锁资源并继续执行。
在实现AQS时,需要继承自AQS类并实现其抽象方法。其中比较重要的方法包括:

  • tryAcquire()和tryRelease()方法,用于实现锁的获取和释放;
  • acquire()和release()方法,用于实现阻塞和唤醒操作;isHeldExclusively()方法,用于判断是否是排他锁。

总之,AQS是Java并发编程中的重要组件之一,它提供了线程同步的底层实现机制。在使用AQS时,需要根据具体的应用场景选择合适的锁机制来实现线程的同步和互斥操作。

基本结构

画板

int state:代表锁的状态(独占、非独占,取决于不同的实现子类)

队列:head节点代表当前加锁成功的线程;其余节点代表等待的线程

基础API

getState()

setState()

compareAndSetState()

底层API

enq():死循环,入等待队列,CAS实现

属性架构

Node

节点的状态

Node结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常

加锁

判断是否有头结点

若有,则封装当前线程为Node,加入队列,并使用park()阻塞当前线程

解锁

由获取到锁的_线程,设置head节点到后一个节点,并断开head节点的链接,供回收_

然后唤醒下一个节点的线程,unpark()

独占锁

AQS和子类合作实现独占锁,则子类Sync必须实现【tryAcquire() && tryRelease()

acquire

  • acquire 是【独占模式】获取资源的方式

流程分析

1、调用自定义同步器 tryAcquire()尝试获取资源,成功则直接返回
2、没成功,则addWaiter()将线程节点加入等待队列
3、acquireQueue 在等待队列中自旋尝试获取资源。unpark()时,则自旋获取。park(),则进入阻塞
4、只有资源获取后进行自我中断selfInterpt(),如果获取资源过程中线程中断,则selfInterpt()不相应

tryAcquire

  • getState() == 0 检测资源是否上锁
  • 如果locked(state=1) ,则unlock : 流程如下

1setExclusiveOwnerThread(null)解绑线程
2state=0逻辑解锁

acquireQueued()

  • 在等待队列中获取资源,【自旋直到获取资源返回为止
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;// 标记是否取得资源,默认未false
        try {
            boolean interrupted = false;// 标记等待过程中是否被中断
            for (;;) {//自旋 尝试获取资源
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { 
                // 当前线程节点node为首节点 && 尝试获取资源成功
                    setHead(node);// 将node设置为head
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 根据prev.waitStatus 决定 park() & 调用parkAndInterpt()实现park() 并检测是否interpt()
                    interrupted = true;
            }
        } finally {
        // 中断则跳转至finally 将节点从等待队列中取出
            if (failed)
                cancelAcquire(node);
        }
    }
流程分析
  1. 节点进入队尾后,检测prev.waitStatus,寻找安全休息点
  2. 通过park()进入waiting状态,等待unpark() || interpt()唤醒
  3. if(自身是等待队列首节点 && 尝试获取资源)
    • 资源获取成功则head指向当前节点,并return interpt ;即 [获取资源流程中是否被中断过]
    • 否则,根据prev.waitStatus判断是否park() 跳转至流程1循环

shouldParkAfterFailedAcquire(p, node)

  • 判断prev.waitStatus,对当前节点做出不同操作
  • 整个流程中,prev.waitStatus != Node.SIGNAL;则不能park()安心休息。需要不断寻找安心的休息点,同时尝试自己是否能否获取资源。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // prev节点处于 通知状态(release资源后会通知当前节点) ,则可进入waiting()
            return true;
        if (ws > 0) {
             // prev节点被CANCELLED,则不断跳过prev,直到寻找到最近【正常等待】的前驱节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
             // prev节点正常,则prev.waitStatus = Node.SIGNAL 设置为通知状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 设置prev.waitStatus = Node.SIGNAL
           
        }
        return false;
    }

parkAndCheckInterrupt()

  • 阻塞当前当前线程,并检测是否被中断
  • park() 使当前线程进入waiting状态。
    • 解除waiting状态方法 : unpark()唤醒 || interupt() 中断

addWaiter(Node model)

  • 将该线程加入【等待队列尾部】,并【标记独占模式
  • 参数 Node model 含义:  Node.EXCLUSIVE【独占模式】 || Node.SHARED【共享模式】
Q:addWaiter 功能和 enq十分相似,为什么特地调用addwaiter呢?
  • 并发编程中,将最可能成功执行代码写在最常用处,优先执行】而不是先执行不太可能的代码,白白判断
  • 调用enq需要方法调用、进入循环,执行和null比较等一系列指令,然后才到【最可能执行的代码处】
  private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
       
        Node pred = tail;
        if (pred != null) { 
        // 尝试enq的最快路径:Try the fast path of enq; backup to full enq on failure
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);//如果不是【队列不空,在尾部成功添加节点】的常规情况,则调用enq插入当前线程节点
        return node;
    }
enq(node)
  • enq(node)意义:【自旋添加节点,直到成功添加为止】,如果队列为空,则初始化队列
  • for(;;)实现自旋
    private Node enq(final Node node) {
        for (;;) {
        // 如果队列为空,则至少两次for循环。第一次初始化队列,第二次插入当前线程节点
        // for(;;)实现自旋
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

selfInterrupt()

等待过程中断则不起作用,获取资源后selfInterrupt 中断

release(int )

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到头结点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒等待队列里的下一个线程
        return true;
    }
    return false;
}

unparkSuccessor(Node)

  • 用unpark()唤醒等待队列中最前边的那个未放弃线程
 private void unparkSuccessor(Node node) {
     //这里,node一般为当前线程所在的结点。
     int ws = node.waitStatus;
     if (ws < 0)//置零当前线程结点状态,允许失败。
         compareAndSetWaitStatus(node, ws, 0);
 
     Node s = node.next;//找到下一个需要唤醒的结点s
     if (s == null || s.waitStatus > 0) {//如果为空或已取消
         s = null;
         for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
             if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                 s = t;
     }
     if (s != null)
         LockSupport.unpark(s.thread);//唤醒
 }

使用场景

公平锁和非公平锁

公平非公平,区别在于,加锁是否先来先到(队列中的线程非队列中的其他线程 竞争锁的时候)

非公平锁公平锁的区别:非公平锁性能高于公平锁性能。非公平锁可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量

非公平锁性能虽然优于公平锁,但是会存在导致线程饥饿的情况。在最坏的情况下,可能存在某个线程一直获取不到锁。不过相比性能而言,饥饿问题可以暂时忽略,这可能就是ReentrantLock默认创建非公平锁的原因之一了。

Condition

Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition中的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition。且不会产生死锁(原因???)

其中AbstractQueueSynchronizer中实现了Condition中的方法,主要对外提供awaite(Object.wait())signal(Object.notify())调用。

wait()/waitAll(): 需要和synchronized一起使用

nitify()/notifyAll():需要和synchronized一起使用

await():挂起当前线程,并释放锁

signal():唤醒线程

Condition不会产生死锁:

- <font style="color:black;">Condition 需要使用 Lock 进行控制,使用的时候要注意 lock() 后及时的 unlock(),Condition 有类似于 await 的机制,因此不会产生加锁方式而产生的死锁出现,同时底层实现的是 park/unpark 的机制,因此也不会产生先唤醒再挂起的死锁,一句话就是不会产生死锁,但是 wait/notify 会产生先唤醒再挂起的死锁。</font>

子类使用需要重写的方法

  • protected boolean tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • protected boolean tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • protected boolean tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • protected boolean tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
  • protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。

设计模式:

1、模板方法

AQS顶层的各种设计,模板已经固定,将具体的实现交由子类去实现

AQS依赖的底层API:LockSupport

参考文档:

1、https://mp.weixin.qq.com/s/trsjgUFRrz40Simq2VKxTA

2、https://blog.csdn.net/xushiyu1996818/article/details/103528041 (更详细)

3、https://www.cnblogs.com/waterystone/p/4920797.html


文章作者: 王利康
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 王利康 !
  目录