AQS概述
AQS(AbstractQueuedSynchronizer),类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch...。
在AQS中维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)
AQS在重入锁中的使用
- 以ReentrantLock为例,state初始化为0,表示未锁定状态
- 在ReentrantLock中
lock()
方法,实际上调用的是AQS的acquire(1)
方法;而在acquire(1)
中实际上会调用tryAcquire()
方法,tryAcquire()
方法在AQS中是无方法体的,需要自己实现; unlock()
方法实际上调用的是AQS的release(1)
方法,而在该方法中会调用tryRelease(int arg)
,tryRelease(int arg)
方法是没有方法体的,需要自己实现- 上述这种模式采用了模板方法设计模板,不同的功能需要开发者自己实现
- 在ReentrantLock的
tryAcquire()
方法中,会判断当前的status值,如果为0,则通过cas修改为1;此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁 - 如果不为0,则表示当前已经线程已经获得锁,然后在当前值的基础上+1;此处有两点需要注意
- ReentrantLock名为重入锁表示,可以重复加锁,不会有其它影响;
- 但是当前线程调用了几次lock()就需要调用几次unlock(),而不是只需要调用一次unlock解锁
- 而Mutex是不可重入锁,当加锁后,同一线程也无法再获取到锁;
AQS中重要方法及属性
state
共享资源,通过竞争修改该值,尝试获取锁资源
compareAndSetState
通过CAS操作,修改state值
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
acquire
如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- addWaiter()将该线程封装为Node对象并加入等待队列的尾部,同时标记为独占模式;此处说是队列实际是Node链表,通过cas保证线程安全
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
acquireQueued
使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
该方法中会调用方法parkAndCheckInterrupt,我们会发现在该方法中会通过LockSupport.park挂起线程,直到前面的线程执行完毕,则从队列中获取到对应的thread实例,并通过调用LockSupport.unpark(thread)唤醒线程
tryAcquire
如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
- AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现。至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。
- 这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口
release
它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}
tryRelease
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false,比如ReentrantLock中该资源被锁了多次。
signal
- ReentrantLock在lock过程中支持条件挂起及休眠
- 在ReentrantLock中存在方法newCondition(),该方法实际会新建AQS中的ConditionObject对象
- 可以通过
condition.await()
或condition.awaitNanos(delay)
挂起线程- await会将当前线程添加至封装为Node
- 然后调用release释放Lock锁,由于ReentrantLock为可重入锁,故在ReentrantLock中status减为0则释放锁
- 释放锁成功后将当前线程对应的Node对象通过cas添加到队列中
- 添加队列成功则通过LockSupport.park挂起线程
- 可以通过
condition.signal()
唤醒线程- 获取首个wait的node 对象first
- 将AQS node链表对应的firstWaiter修改为first
- 通过调用LockSupport.unpark(node.thread)唤醒线程
通过AQS实现简易锁功能
package com.kevin.lock;
import java.util.Date;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* Created by liukai on 2019/10/20.
*/
public class AQSApp extends AbstractQueuedSynchronizer implements Runnable {
private static AQSApp lock = new AQSApp();
@Override
protected boolean tryAcquire(int arg) {
return this.compareAndSetState(0, arg);
}
@Override
protected boolean tryRelease(int arg) {
return this.compareAndSetState(1, arg);
}
public void lock() {
this.acquire(1);
}
public void unlock() {
this.release(0);
}
@Override
public void run() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " inter:" + new Date());
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " out:" + new Date());
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
Thread ts[] = new Thread[10];
for (int i = 0; i < ts.length; i++) {
ts[i] = new Thread(new AQSApp());
}
for (Thread t : ts) {
t.start();
}
for (Thread t : ts) {
t.join();
}
}
}
输出:
Thread-0 inter:Sun Oct 20 16:47:02 CST 2019
Thread-0 out:Sun Oct 20 16:47:03 CST 2019
Thread-1 inter:Sun Oct 20 16:47:03 CST 2019
Thread-1 out:Sun Oct 20 16:47:04 CST 2019
Thread-2 inter:Sun Oct 20 16:47:04 CST 2019
Thread-2 out:Sun Oct 20 16:47:05 CST 2019
Thread-3 inter:Sun Oct 20 16:47:05 CST 2019
Thread-3 out:Sun Oct 20 16:47:06 CST 2019
Thread-4 inter:Sun Oct 20 16:47:06 CST 2019
Thread-4 out:Sun Oct 20 16:47:07 CST 2019
Thread-5 inter:Sun Oct 20 16:47:07 CST 2019
Thread-5 out:Sun Oct 20 16:47:08 CST 2019
Thread-6 inter:Sun Oct 20 16:47:08 CST 2019
Thread-6 out:Sun Oct 20 16:47:09 CST 2019
Thread-7 inter:Sun Oct 20 16:47:09 CST 2019
Thread-7 out:Sun Oct 20 16:47:10 CST 2019
Thread-8 inter:Sun Oct 20 16:47:10 CST 2019
Thread-8 out:Sun Oct 20 16:47:11 CST 2019
Thread-9 inter:Sun Oct 20 16:47:11 CST 2019
Thread-9 out:Sun Oct 20 16:47:12 CST 2019
总结
- 使用ReentrantLock时,线程挂起存在两个场景
- 调用lock方法未获取到锁
- 在lock方法中执行condition.await()方法
- 释放当前的锁
- 将当前线程添加至队列中
- 挂起当前线程
- AQS中的线程node存储至Node链表中,通过cas修改链表,保障线程安全