AQS是什么 java concurrent包中有很多阻塞类如:ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore、Synchronous、FutureTask等,他们的底层都是根据aqs构建的,它可以说是java多线程编程最底层核心的抽象类。既然这么重要,我们就来看看它底层原理到底是什么。
aqs全称AbstractQueuedSynchronizer,它作为抽象类无法单独使用,需要有具体实现,不同的实现中自己定义什么状态意味着获取或者被释放
AQS的原理是什么 AQS内部维护一个先进先出(FIFO)的等待队列叫做CLH队列,当一个线程来请求资源时,AQS通过状态判断是否能获取资源,如果不能获取,则挂起这个线程,和状态一起封装成一个Node节点放在队尾,等待前面的线程释放资源好唤醒自己,所以谁先请求的谁最先获得机会唤醒,当然新线程可能加塞提前获取资源,在源码解析可以看到原因
AQS分独占和共享两种方式,独占模式,只有一个线程可以获得锁,比如ReentrantLock,共享模式下可以允许多个线程同时获取锁,比如CountDownLatch使用的就是共享方式,
源码解析 AQS的子类需要实现的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); } protected boolean tryRelease (int arg) { throw new UnsupportedOperationException(); } protected int tryAcquireShared (int arg) { throw new UnsupportedOperationException(); } protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException(); } protected boolean isHeldExclusively () { throw new UnsupportedOperationException(); }
可以看到,子类调用这些方法如果没有实现的话会抛异常,当然也不是所有方法都要实现,找自己需要的实现就可以了。
为了更好的理解先实现一个最简单的锁,只需要实现tryAcquire和tryRelease方法即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class TestLock { private Sync sync = new Sync(); public void lock () { sync.acquire(1 ); } public void unLock () { sync.release(1 ); } public static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { assert arg == 1 ; if (compareAndSetState(0 ,1 )){ return true ; } return false ; } @Override protected boolean tryRelease (int arg) { assert arg == 1 ; if (getState() == 0 ){ throw new IllegalMonitorStateException(); } setState(0 ); return true ; } } }
再来写一个并发场景,简单的加法,先获取前值,用sleep模拟方法执行时间比较长,然后累加
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public static void main (String[] args) { final AddCount count = new AddCount(); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0 ;i<3 ;i++){ executorService.submit(new Runnable() { @Override public void run () { try { count.add(100 ); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } public static class AddCount { private int countTotle = 0 ; public void add (int count) throws InterruptedException { int tmp = this .countTotle; Thread.sleep(100L ); this .countTotle = tmp+count; System.out.println(this .countTotle); } } 100 100 100
在add方法加上自定义的的锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static void main (String[] args) { final AddCount count = new AddCount(); final TestLock testLock = new TestLock(); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0 ;i<3 ;i++){ executorService.submit(new Runnable() { @Override public void run () { try { testLock.lock(); count.add(100 ); testLock.unLock(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } 100 200 300
根据这个简单的例子,我们来看一下源码中是怎么实现的
acquire lock方法首先调用的是AQS的acquire方法
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
它会调用tryAcquire尝试去取锁,如果没有取到的话调用addWaiter将Node放入队尾,同样也使用CAS的方式,AQS中有大量CAS的使用,不了解CAS的可以看浅析乐观锁、悲观锁与CAS
这里有新的线程在执行第一个判断!tryAcquire(arg)时,如果刚好有线程释放锁,那新的线程很有可能插队直接获取到锁,也就是有队列也无法公平的原因。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
在尾部添加node,将node双向关联,如果成功则直接返回,这里有一个问题,在设置队尾的时候,没有并发控制,有另一个线程也来设置,就只会有一个线程成功,没成功的线程或者队尾为空则执行enq方法。
enq方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
这里看到如果tail是null,则cas设置head为一个新节点,也就是说第一个入队的节点head和tail是相同的。 如果队尾不为空,则用cas加自旋的方式放入队尾。
Node对象 node对象封装了状态和请求的线程以及前后节点的地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
没有使用condition,node常用的状态有 0 新建状态和 -1 挂起状态
acquireQueued 再看一下acquireQueued方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
这里的主要逻辑就是将新加入的节点设置为待唤醒状态,进入队列的节点都进入中断状态,head节点持有锁,锁被释放后后面的节点会代替之前的head成为新的head节点
release 释放锁的过程,掉用release方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
调用LockSupport.unpark后,唤醒后一个中断的线程,队列剔除之前的head,这样往复,释放锁后继续唤醒后面的线程。