java thread 之AQS

JDK1.5引入了Doug Lea大神的concurrent框架,其中AbstractQueuedSynchronizer是concurrent框架的基本,从大神的paper中可以看到
1.传统的synchronized不能进行中段,这个不合适
2.如果将concurrent重心放在少数竞争下优化锁,而在其他情况下放任缓慢执行的策略是不正确的
3.需要可预测的维护效率,即使在同步竞争激烈的情况下,理想中无论多少线程试图通过一个同步点的开销应该是恒定的
4.设计的目标是总时间的减少,因为有可能在此之间一个线程可以通过同步点,然后他没有立即执行
5.在高吞吐量的基本上,更重要的是线程的公平调度

AQS设计思路:
原子的管理同步状态;阻塞和解除阻塞线程;保持线程的阻塞队列。

实现
1.在CLH queue的基础上进行改造
2.单个int state 计数synchronization状态

队列的每个节点是内部类Node:
  static final class Node {
       
static final int CANCELLED =  1;
    
        
static final int SIGNAL    = -1;
      
        
static final int CONDITION = -2;
      
        
static final Node SHARED = new Node();
       
        
static final Node EXCLUSIVE = null;

        
volatile int waitStatus;

        
volatile Node prev;
  
        
volatile Node next;
    
        
volatile Thread thread;

        Node nextWaiter;
 
        
final boolean isShared() {
            
return nextWaiter == SHARED;
        }
}

对于waitStatus>0的node在等待的遍历当中是会被抛弃掉的,而nextWaiter在共享锁和Lock的Condition中会用到

void acquire(int arg)方法
该方法是其他基于AQS实现的具体类获得锁或者信号等的基础实现
    public final void acquire(int arg) {
        
if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
tryAcquire(int arg) 留给具体子类实现,如果返回false,说明没有得到锁,则调用acquireQueued(final Node node, int arg)方法,在调用此方法前需要将当前线程的node加入队列:

    private Node addWaiter(Node mode) {
       //关键点1.
        Node node 
= new Node(Thread.currentThread(), mode);
        
// Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        
if (pred != null) {
            node.prev 
= pred;
            
if (compareAndSetTail(pred, node)) {
                pred.next 
= node;
                
return node;
            }
        }
        enq(node);
        
return node;
    }
首先尝试快速加入到队尾,如果加入队尾失败,说明在此期间,其余的线程入队了,那么真正的执行入队操作:
 
 
private Node enq(final Node node) {
        
for (;;) {
            Node t 
= tail;
            
if (t == null) { // Must initialize
                Node h = new Node(); // Dummy header
                h.next = node;
                node.prev 
= h;
                
if (compareAndSetHead(h)) {
                    tail 
= node;
                    
return h;
                }
            }
            
else {
                node.prev 
= t;
                
if (compareAndSetTail(t, node)) {
                    t.next 
= node;
                    
return t;
                }
            }
        }
    }
这是个循环,只到node加入到队列中才返回,当是对队列的第一次插入node时,必须初始化队列,创建一个傀儡header,完成入队操作后执行acquireQueued()方法:

    final boolean acquireQueued(final Node node, int arg) {
        
try {
            
boolean interrupted = false;
            
for (;;) {
                
final Node p = node.predecessor();
                //关键点2.
                if (p == head && tryAcquire(arg)) {
               //关键点4.
                    setHead(node);
                    p.next 
= null// help GC
                    return interrupted;
                }
                //关键点3
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted 
= true;
            }
        } 
catch (RuntimeException ex) {
            cancelAcquire(node);
            
throw ex;
        }
    }
前面的方法可以看出当前thread的对应的node就是参数传入的node,获得node的前驱,如果当前前驱是傀儡header而且子类的tryAcquire()方法返回为真,那么当前的node变成header
,而且返回node对应的thread的中断状态,如果前面流程没有执行,执行shouldParkAfterFailedAcquire():
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        
int s = pred.waitStatus;
        
if (s < 0)
            
/*
             * This node has already set status asking a release
             * to signal it, so it can safely park
             
*/
            
return true;
        
if (s > 0) {
            
/*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             
*/
        
do {
        node.prev 
= pred = pred.prev;
        } 
while (pred.waitStatus > 0);
        pred.next 
= node;
    }
        
else
            
/*
             * Indicate that we need a signal, but don't park yet. Caller
             * will need to retry to make sure it cannot acquire before
             * parking.
             
*/
            compareAndSetWaitStatus(pred, 
0, Node.SIGNAL);
        
return false;
    }
得到node的前驱prev的waitStatus,当waitStatus>0,那么prev处于退出状态,从队列中去除prev,如果waitStatus<0,说明线程需要park住,如果没有为0,就是新建的,需要设置成
-1,在下一次循环中,如果还不能获得锁那么就需要park住线程,parkAndCheckInterrupt()就是执行park()线程,然后返回线程中断状态.
acquireQueued()方法是个死循环,只到header之后的一个线程获得锁之后,才会重新设置header,被设置为新header的是header的next node,就这样返回到acquire()中,如果返回的中断状态为true,当前线程中断.
总结下:
关键点1.将线程入队列,不管是否是第一个线程。在和其他线程竞争前,尝试入队一次
关键点2.header只是一个傀儡node,他代表上次获得的线程,在这里只有header的后面一个thread尝试成功了,才会跳出循环,然后其他的线程只能在循环中
关键点3.检查加入的node对应thread是否需要park(),当node的waitStatus>0就是取消的node,不需要park(),其他的需要park().park住的线程就一直阻塞到unpark
关键点4.这里setHead()方法把当前取得锁的node设置为header,使得队列往前走了一步。

boolean release(int arg) 方法
这个方法是unlock()等方法的基础方法

    public final boolean release(int arg) {
        
if (tryRelease(arg)) {
            Node h 
= head;
            
if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            
return true;
        }
        
return false;
    }
tryRelease()留给子类实现,在队列header处的node不为空而且状态不为初始状态的话(初始为0),需要为这个header所持有的thread unpark

    private void unparkSuccessor(Node node) {
        
/*
         * Try to clear status in anticipation of signalling.  It is
         * OK if this fails or if status is changed by waiting thread.
         
*/
        compareAndSetWaitStatus(node, Node.SIGNAL, 
0);

        
/*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor. 这里貌似是cancelAcquire()方法引起从后向前遍历
         
*/
        Node s 
= node.next;
        
if (s == null || s.waitStatus > 0) {
            s 
= null;
            
for (Node t = tail; t != null && t != node; t = t.prev)
                
if (t.waitStatus <= 0)
                    s 
= t;
        }
        
if (s != null)
            LockSupport.unpark(s.thread);
    }
在acquire()方法中知道header是个傀儡header,所以这个方法中得到header的next,如果next不为空或者next为取消的node,则从tail开始向前遍历,找到最前面waitStatus<=0的node,然后unpark node的thread。

void cancelAcquire(Node node)
该方法是一切取消获得的基础实现,代码有些地方需要注意:
    private void cancelAcquire(Node node) {
    
// Ignore if node doesn't exist
        if (node == null)
        
return;

    node.thread 
= null;

    
// Skip cancelled predecessors
    Node pred = node.prev;
    
while (pred.waitStatus > 0)
        node.prev 
= pred = pred.prev;

    
// Getting this before setting waitStatus ensures staleness
    
//前面的操作使得前置改变了 但是prex的后置还是没有变
    Node predNext = pred.next;

    
// Can use unconditional write instead of CAS here
    node.waitStatus = Node.CANCELLED;

    
// If we are the tail, remove ourselves
    if (node == tail && compareAndSetTail(node, pred)) {
        
//tail就set null 了
        compareAndSetNext(pred, predNext, null);
    } 
else {
        
// If "active" predecessor found
        
//

        
if (pred != head
        
&& (pred.waitStatus == Node.SIGNAL
            
|| compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
        
&& pred.thread != null) {

        
// If successor is active, set predecessor's next link
        Node next = node.next;
        
if (next != null && next.waitStatus <= 0)
            compareAndSetNext(pred, predNext, next);
        } 
else {
        
//如果前面没有阻塞的node,到本node下面的话,需要unpark了
        unparkSuccessor(node);
        }

        node.next 
= node; // help GC  这个会影响next节点的秩序  还好每次pred.waitStatus>0的检测使得受影响的时间窗口比较小
    }
    }



共享锁
从acquireShared(int arg)开始:
   public final void acquireShared(int arg) {
        
if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
private void doAcquireShared(int arg) {
        
final Node node = addWaiter(Node.SHARED);
        
try {
            
boolean interrupted = false;
            
for (;;) {
                
final Node p = node.predecessor();
                
if (p == head) {
                    
int r = tryAcquireShared(arg);
                    
if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next 
= null// help GC
                        if (interrupted)
                            selfInterrupt();
                        
return;
                    }
                }
                
if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted 
= true;
            }
        } 
catch (RuntimeException ex) {
            cancelAcquire(node);
            
throw ex;
        }
    }
 
      private void setHeadAndPropagate(Node node, int propagate) {
         //队列向后移动一位
        setHead(node);
        //propagate>0说明,共享数值是大于前面要求的数值的
        if (propagate > 0 && node.waitStatus != 0) {
            /*
             * Don't bother fully figuring out successor.  If it
             * looks null, call unparkSuccessor anyway to be safe.
             */
            Node s = node.next;
            //如果只剩下一个node节点 那么直接unpark是可以的,因为就这个node,propagate数也大于0
            //如果是共享的也可以unpark,unpark后还在doAcquireShared循环的,如果发现acquires数值过大,这个线程还是会park住的
            if (s == null || s.isShared())
                unparkSuccessor(node);
        }
    }

 共享的和独占的在实现上面是类似得,共享实现上,对于获得能成功,只要是子类实现上面能获得成功,如信号量的实现(state的可用量是大于1的),就不用进入队列阻塞。


Condition
condition是服务于单个Lock的,condition.await()等等待方法在Lock上面形成了一个condition的等待队列,condition.singal()方法在Lock上面处理condition的等待队列,
一个一个化解,然后将队列的node加入到AQS的阻塞队列中等待对应的线程被unpark

    public final void await() throws InterruptedException {
            
if (Thread.interrupted())
                
throw new InterruptedException();
            Node node 
= addConditionWaiter();//关键点1
            
int savedState = fullyRelease(node);//关键点2
            
int interruptMode = 0;
            
while (!isOnSyncQueue(node)) {//关键点3
                LockSupport.park(
this);
                
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    
break;
            }
            
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//关键点4
                interruptMode 
= REINTERRUPT;
            
if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            
if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
其中的关键点:
关键点1.加入到condition的对应的lock私有的队列中,和AQS的阻塞队列形式相似
关键点2.释放这个condition对应的lock的锁,因为在使用的过程当中,考虑到如果这个wait()方法阻塞住,而lock如果没有释放锁,那么对于其他的线程的node来说肯定是阻塞住的,
因为condition对应的lock获得了锁,肯定在AQS的header处,其他线程肯定是得不到锁阻塞在那里,这样两边都阻塞的话就死锁了,所以这里需要释放对应lock的锁的
关键点3.判断condition是否已经转化成为AQS阻塞队列的一个节点,如果没有park线程阻塞在这里
关键点4.到这一步的话就需要signal()或者signalAll()的方法的执行,说明这个线程已经被unpark,然后运行直到acquireQueued尝试再次获得锁,因为condition对应的lock的锁在关键
点2是被释放了的
    public final void signal() {
            
if (!isHeldExclusively())
                
throw new IllegalMonitorStateException();
            Node first 
= firstWaiter;
            
if (first != null)
                doSignal(first);
        }

    
private void doSignal(Node first) {
            
do {
                
if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter 
= null;
                first.nextWaiter 
= null;
            } 
while (!transferForSignal(first) &&
                     (first 
= firstWaiter) != null);
        }
        
final boolean transferForSignal(Node node) {
        
/*
         * If cannot change waitStatus, the node has been cancelled.
         
*/
        
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            
return false;

        
/*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         
*/
        Node p 
= enq(node);//进入到AQS的阻塞队列中
        int c = p.waitStatus;
        
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        
return true;
    }

    在整个AQS存在两种链表。 一个链表就是整个Sync Node链表,横向链表。另一种链表就是Condition的wait Node链表,相对于Sync node,它属于node节点的一个纵向链表。当纵向列表被single通知后,会进入对应的Sync Node进行排队处理。

最后面上个图理解下condition:


图片转载自:
http://www.goldendoc.org/2011/06/juc_condition/









posted on 2011-09-15 22:57 nod0620 阅读(646) 评论(0)  编辑  收藏 所属分类: 多线程java


只有注册用户登录后才能发表评论。


网站导航:
 
<2024年5月>
2829301234
567891011
12131415161718
19202122232425
2627282930311
2345678

导航

统计

常用链接

留言簿

随笔分类

随笔档案

文章分类

文章档案

搜索

最新评论

阅读排行榜

评论排行榜