java之AQS

AQS概述

AQS(AbstractQueuedSynchronizer),类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch...。
在AQS中维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)

AQS在重入锁中的使用

  • 以ReentrantLock为例,state初始化为0,表示未锁定状态
  • 在ReentrantLock中lock()方法,实际上调用的是AQS的acquire(1)方法;而在acquire(1)中实际上会调用tryAcquire()方法,tryAcquire()方法在AQS中是无方法体的,需要自己实现;
  • unlock()方法实际上调用的是AQS的release(1)方法,而在该方法中会调用tryRelease(int arg),tryRelease(int arg)方法是没有方法体的,需要自己实现
  • 上述这种模式采用了模板方法设计模板,不同的功能需要开发者自己实现
  • 在ReentrantLock的tryAcquire()方法中,会判断当前的status值,如果为0,则通过cas修改为1;此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁
  • 如果不为0,则表示当前已经线程已经获得锁,然后在当前值的基础上+1;此处有两点需要注意
    • ReentrantLock名为重入锁表示,可以重复加锁,不会有其它影响;
    • 但是当前线程调用了几次lock()就需要调用几次unlock(),而不是只需要调用一次unlock解锁
    • 而Mutex是不可重入锁,当加锁后,同一线程也无法再获取到锁;

AQS中重要方法及属性

state

共享资源,通过竞争修改该值,尝试获取锁资源

compareAndSetState

通过CAS操作,修改state值

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

acquire

如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  • addWaiter()将该线程封装为Node对象并加入等待队列的尾部,同时标记为独占模式;此处说是队列实际是Node链表,通过cas保证线程安全
  • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

acquireQueued

使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

该方法中会调用方法parkAndCheckInterrupt,我们会发现在该方法中会通过LockSupport.park挂起线程,直到前面的线程执行完毕,则从队列中获取到对应的thread实例,并通过调用LockSupport.unpark(thread)唤醒线程

tryAcquire

如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
  • AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现。至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。
  • 这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口

release

它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到头结点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒等待队列里的下一个线程
        return true;
    }
    return false;
}

tryRelease

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false,比如ReentrantLock中该资源被锁了多次。

signal

  • ReentrantLock在lock过程中支持条件挂起及休眠
  • 在ReentrantLock中存在方法newCondition(),该方法实际会新建AQS中的ConditionObject对象
  • 可以通过condition.await()condition.awaitNanos(delay)挂起线程
    • await会将当前线程添加至封装为Node
    • 然后调用release释放Lock锁,由于ReentrantLock为可重入锁,故在ReentrantLock中status减为0则释放锁
    • 释放锁成功后将当前线程对应的Node对象通过cas添加到队列中
    • 添加队列成功则通过LockSupport.park挂起线程
  • 可以通过condition.signal()唤醒线程
    • 获取首个wait的node 对象first
    • 将AQS node链表对应的firstWaiter修改为first
    • 通过调用LockSupport.unpark(node.thread)唤醒线程

通过AQS实现简易锁功能

package com.kevin.lock;
import java.util.Date;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
 * Created by liukai on 2019/10/20.
 */
public class AQSApp extends AbstractQueuedSynchronizer implements Runnable {
    private static AQSApp lock = new AQSApp();
    @Override
    protected boolean tryAcquire(int arg) {
        return this.compareAndSetState(0, arg);
    }
    @Override
    protected boolean tryRelease(int arg) {
        return this.compareAndSetState(1, arg);
    }
    public void lock() {
        this.acquire(1);
    }
    public void unlock() {
        this.release(0);
    }
    @Override
    public void run() {
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "  inter:" + new Date());
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + "  out:" + new Date());
            lock.unlock();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws Exception {
        Thread ts[] = new Thread[10];
        for (int i = 0; i < ts.length; i++) {
            ts[i] = new Thread(new AQSApp());
        }
        for (Thread t : ts) {
            t.start();
        }
        for (Thread t : ts) {
            t.join();
        }
    }
}
输出:
Thread-0  inter:Sun Oct 20 16:47:02 CST 2019
Thread-0  out:Sun Oct 20 16:47:03 CST 2019
Thread-1  inter:Sun Oct 20 16:47:03 CST 2019
Thread-1  out:Sun Oct 20 16:47:04 CST 2019
Thread-2  inter:Sun Oct 20 16:47:04 CST 2019
Thread-2  out:Sun Oct 20 16:47:05 CST 2019
Thread-3  inter:Sun Oct 20 16:47:05 CST 2019
Thread-3  out:Sun Oct 20 16:47:06 CST 2019
Thread-4  inter:Sun Oct 20 16:47:06 CST 2019
Thread-4  out:Sun Oct 20 16:47:07 CST 2019
Thread-5  inter:Sun Oct 20 16:47:07 CST 2019
Thread-5  out:Sun Oct 20 16:47:08 CST 2019
Thread-6  inter:Sun Oct 20 16:47:08 CST 2019
Thread-6  out:Sun Oct 20 16:47:09 CST 2019
Thread-7  inter:Sun Oct 20 16:47:09 CST 2019
Thread-7  out:Sun Oct 20 16:47:10 CST 2019
Thread-8  inter:Sun Oct 20 16:47:10 CST 2019
Thread-8  out:Sun Oct 20 16:47:11 CST 2019
Thread-9  inter:Sun Oct 20 16:47:11 CST 2019
Thread-9  out:Sun Oct 20 16:47:12 CST 2019

总结

  • 使用ReentrantLock时,线程挂起存在两个场景
    • 调用lock方法未获取到锁
    • 在lock方法中执行condition.await()方法
      • 释放当前的锁
      • 将当前线程添加至队列中
      • 挂起当前线程
  • AQS中的线程node存储至Node链表中,通过cas修改链表,保障线程安全

附1