I want to fly higher
programming Explorer
posts - 114,comments - 263,trackbacks - 0
1.ThreadPoolExecutor#execute(Runnable command)

     public void execute(Runnable command) {
               
// 如果任务为空,则直接抛出空指针异常
                if (command == null)
                    
throw new NullPointerException();
                
// 1.如果线程池线程数目UnderCorePoolSize且RUNNING则直接添加worker线程并启动
                
// 2.如果超过了corePoolSize或者addIfUnderCorePoolSize失败则
                if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
                
// 如果线程池是RUNNING状态且可将任务command加入workQueue(即不违反容量限制)
                    if (runState == RUNNING && workQueue.offer(command)) {
                   
// 因为是并发执行.如果此时发现线程池状态不再是RUNNING(可能执行了类似shutdown的操作)或者线程池中已无Worker线程
                        if (runState != RUNNING || poolSize == 0)
                  
//1.如果线程池状态不再是RUNNING且此时command依然在队列中,即还未执行则直接拒绝.
                 
// 2.否则如果线程池状态 < STOP,即可能是SHUTDOWN状态且任务队列中依然有任务且工作线程的数目不足corePoolSize,则额外添加一个Worker线程并启动
                            ensureQueuedTaskHandled(command);
                    }

                    
else if (!addIfUnderMaximumPoolSize(command))
                   
// 如果在UnderMaximumPoolSize下增加worker线程失败则执行拒绝策略,直接调用RejectedExecutionHandler#rejectedExecution
                        reject(command); // is shutdown or saturated
                }

            }


2. addIfUnderCorePoolSize(Runnable firstTask)

// poolSize < corePoolSize && RUNNING的情况下添加worker线程并启动worker线程
        private boolean addIfUnderCorePoolSize(Runnable firstTask) {
                Thread t = null;
               
final ReentrantLock mainLock = this.mainLock;
                    
//
                mainLock.lock();
               
try {
                   
// 初始poolSize为0,runState为0,即RUNNING
                   
// RUNNING = 0 / SHUTDOWN = 1 / STOP = 2
                   
// TERMINATED = 3
                    if (poolSize < corePoolSize && runState == RUNNING)
                        t = addThread(firstTask);
                }
finally {
                    mainLock.unlock();
                }

               
if (t == null)
                   
return false;
                t.start();
               
return true;
            }


3.addThread(Runnable firstTask)

private Thread addThread(Runnable firstTask) {
               
// 初始化Worker,传入firstTask
                Worker w = new Worker(firstTask);
               
// 利用线程工厂新建线程,注意这里传入的参数是w
                Thread t = threadFactory.newThread(w);
               
if (t != null) {
                    w.thread = t;
               
// 添加至workers
                    workers.add(w);
               
// ++poolSize
                    int nt = ++poolSize;
                   
if (nt > largestPoolSize)
                        largestPoolSize = nt;
                }

               
return t;
            }


4.Worker

private final class Worker implements Runnable {
       
/**
         * The runLock is acquired and released surrounding each task
         * execution. It mainly protects against interrupts that are
         * intended to cancel the worker thread from instead
         * interrupting the task being run.
        
*/

       
private final ReentrantLock runLock = new ReentrantLock();

       
/**
         * Initial task to run before entering run loop. Possibly null.
        
*/

       
private Runnable firstTask;

       
/**
         * Per thread completed task counter; accumulated
         * into completedTaskCount upon termination.
        
*/

       
volatile long completedTasks;

       
/**
         * Thread this worker is running in.  Acts as a final field,
         * but cannot be set until thread is created.
        
*/

        Thread thread;

        Worker(Runnable firstTask) {
           
this.firstTask = firstTask;
        }


       
boolean isActive() {
           
return runLock.isLocked();
        }


       
/**
         * 中断线程如果没有正在运行任务(可能在等待任务)
          * {@link ThreadPoolExecutor#interruptIdleWorkers}
          * {@link ThreadPoolExecutor#getTask}
          * {@link ThreadPoolExecutor#shutdown}
        
*/

       
void interruptIfIdle() {
           
final ReentrantLock runLock = this.runLock;
           
if (runLock.tryLock()) {
               
try {
                   
// 注意只有该方法是被其他线程调用才会执行interrupt.
                   
// 1.个人认为如果是当前自身线程执行到这里的时候,说明getTask返回了null.线程就会结束了.
         
// 2.Worker线程在自身任务的执行中调用此方法时没有作用的.即恰恰说明了运行时不被中断.(因为不太可能存在这样的类似业务,内部线程自己在运行任务的时候中断自己.没有任何作用.你懂的.这压根就是错误的做法)
          
// 3.还有一个很重要的原因是:这里加了运行锁.即如果此时有任务正在运行则独占runLock,则其他线程必须等待任务完毕释放锁才可以进行interrupt.
                    if (thread != Thread.currentThread())
                        thread.interrupt();
                }
finally {
                    runLock.unlock();
                }

            }

        }


       
/**
         * Interrupts thread even if running a task.
        
*/

       
void interruptNow() {
   
// 直接进行中断,无论是内部线程还是其他线程
   
// 无论是否正在运行任务
   
// 没有获得锁
   
// 此时如果线程正在等待任务或者任务执行过程中阻塞都可以被中断
   
// 个人认为该方法也肯定是由外部线程进行调用的,而非内部的线程,你懂的.用了也没有作用.
            thread.interrupt();
        }


       
/**
         * 运行任务在beforeExecute/afterExecute之间
        
*/

       
private void runTask(Runnable task) {
           
final ReentrantLock runLock = this.runLock;
            runLock.lock();
           
try {
               
/*
                 * Ensure that unless pool is stopping, this thread
                 * does not have its interrupt set. This requires a
                 * double-check of state in case the interrupt was
                 * cleared concurrently with a shutdownNow -- if so,
                 * the interrupt is re-enabled.
                
*/

           
// 这段代码乍看起来可能有些奇怪.个人认为是因为多线程的原因,如线程池调用了shutdownNow方法.
           
// 1.如果线程池是RUNNING/SHUTDOWN且之前被中断过,则清除中断状态(interrupted)  2.再次检查如果执行了shutdownNow的话,则会直接interrupt thread.而此时的中断状态可能被清除了.->需要需要再次调用interrupt重置中断状态.(还需要仔细考证)
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();

               
boolean ran = false;
          
// 任务执行前的一些业务,空实现,子类可覆盖
          
// 任务完成或者任务执行出现异常则可通过afterExecute(空实现)追踪
                beforeExecute(thread, task);
               
try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
              
// 任务计数
                    ++completedTasks;
                }
catch (RuntimeException ex) {
                   
if (!ran)
                        afterExecute(task, ex);
                   
throw ex;
                }

            }
finally {
                runLock.unlock();
            }

        }


       
/**
         * Work线程主任务循环
        
*/

       
public void run() {
           
try {
                Runnable task = firstTask;
                firstTask = null;
          
// 1.如果第一个任务不为null则一定会执行第一个任务
          
// 2.如果getTask为null则线程结束.
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }

            }
finally {
         
//  跳出while,线程即结束
         
// 1.completedTaskCount 计数
         
// 2.workers.remove(w) 从workers移除
        
// 3.--poolSize,如果poolSize为0则tryTerminate
                workerDone(this);
            }

        }

    }


5.Runnable getTask()

Runnable getTask() {
       
for (;;) {
           
try {
               
int state = runState;
          
// 线程池运行状态为STOP或者TERMINATED,直接返回null,则Worker线程跳出while,终止
                if (state > SHUTDOWN)
                   
return null;
                Runnable r;
          
// 如果线程池运行状态恰好是SHUTDOWN,则继续从队列获取任务(队列为空则返回null),也在该状态下如果线程池不为空则一直获取任务
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
               
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
              
// RUNNING状态下,poolSize超出了corePoolSize 或者allowCoreThreadTimeOut(允许核心线程超时) {@link ThreadPoolExecutor#allowCoreThreadTimeOut(boolean value)}
              
// 在keepAliveTime时间内等待可用的元素,等待时可被中断.如果超时则返回null.
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
               
else
             
// Running状态下,poolSize未超出corPoolSize且不允许核心线程超时,则在元素变得可用之前一直等待,可被中断
                    r = workQueue.take();
               
if (r != null)
                   
return r;
          
// 如果此时返回的任务为null且worker线程可退出(该方法其实是重复校验,因为是并发执行.所以可能任务队列已经有了任务等条件出现)
                if (workerCanExit()) {
              
// 如果此时线程池状态不是RUNNING
                    if (runState >= SHUTDOWN) // Wake up others
               
// 唤醒可能阻塞的任务,{@link Worker#interruptIfIdle}
                        interruptIdleWorkers();
             
// 返回null,结束任务
                    return null;
                }

               
// Else retry
          
// 继续for-循环
            }
catch (InterruptedException ie) {
               
// On interruption, re-check runState
            }

        }

    }


6.workerCanExit()

// 判断worker线程是否可退出
private boolean workerCanExit() {
       
final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
       
boolean canExit;
       
try {
       
// 运行状态为STOP或者TERMINATED
       
// 或者任务队列为空
       
// 或者池中至少有一个线程且允许核心线程超时
            canExit = runState >= STOP ||
                workQueue.isEmpty() ||
                (allowCoreThreadTimeOut &&
                 poolSize > Math.max(1, corePoolSize));
        }
finally {
            mainLock.unlock();
        }

       
return canExit;
    }


7.tryTerminate()

// 尝试终止
    private void tryTerminate() {
      
// 如果当前池中没有线程
        if (poolSize == 0) {
           
int state = runState;
       
// 如果当前运行状态时是Running/SHUTDOWN且任务队列不为空
            if (state < STOP && !workQueue.isEmpty()) {
          
// 重新设置为运行状态
                state = RUNNING; // disable termination check below
         
// 添加一个firstTask为null的worker并启动.因为队列不为空则可以getTask
                Thread t = addThread(null);
               
if (t != null)
                    t.start();
            }

  
            
// 如果运行状态为STOP或者SHUTDOWN则置状态为TERMINATED并唤醒等待终止的线程 {@link #awaitTermination(long timeout, TimeUnit unit)}
            if (state == STOP || state == SHUTDOWN) {
                runState = TERMINATED;
                termination.signalAll();
                terminated();// 此方法暂时未实现
            }

        }

    }


8.awaitTermination(long timeout, TimeUnit unit)

// 等待线程池终止 {@link #tryTerminate()}
        public boolean awaitTermination(long timeout, TimeUnit unit)
               
throws InterruptedException {
               
long nanos = unit.toNanos(timeout);
               
final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
               
try {
               
// 注这是一个无限循环,直到线程池终止或者超时
                    for (;;) {
                       
if (runState == TERMINATED)
                           
return true;
                       
if (nanos <= 0)
                           
return false;
               
//  {@link Condition#long awaitNanos(long nanosTimeout)}
               
//  此方法返回的是一个估算(nanosTimeout - awaitTime),如果小于等于0则表示没有剩余时间,即超时.不过如果返回值是一个正值的话且线程池未终止的话->所以由将返回值继续传入了参数->确保肯定会发生超时而导致nanos<=0而跳出循环
                        nanos = termination.awaitNanos(nanos);
                    }

                }
finally {
                    mainLock.unlock();
                }

            }


9.shutdown()

public void shutdown() {
       
// 检查是否有shutdown的权限
        SecurityManager security = System.getSecurityManager();
       
if (security != null)
            security.checkPermission(shutdownPerm);

       
final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
       
try {
           
if (security != null) { // Check if caller can modify our threads
       
// 检查所有的worker线程是否有修改线程的权限
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }


           
int state = runState;
       
// 设置线程池当前状态是RUNNING,则设置为SHUTDOWN状态
            if (state < SHUTDOWN)
                runState = SHUTDOWN;

           
try {
       
// 尝试打断空闲的worker线程
                for (Worker w : workers) {
                    w.interruptIfIdle();
                }

            }
catch (SecurityException se) { // Try to back out
       
// 如果出现异常,则还原状态
                runState = state;
               
// tryTerminate() here would be a no-op 这个注释的意思是出现了这个异常,tryTerminate是不起作用的.因为tryTerminate的条件是poolSize == 0.但是异常说明interruptIfIdle失败则不可能poolSize == 0.
       
// 继续向上抛出异常,这个异常是SecurityException
                throw se;
            }

       
// 尝试终止(队列为空的时候直接终止)
            tryTerminate(); // Terminate now if pool and queue empty
        }
finally {
            mainLock.unlock();
        }

    }


10.shutdownNow()

public List<Runnable> shutdownNow() {
        
// 检查shutdown权限以及修改工作线程的权限
        SecurityManager security = System.getSecurityManager();
        
if (security != null)
            security.checkPermission(shutdownPerm);

        
final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        
try {
            
if (security != null// Check if caller can modify our threads
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }


            
int state = runState;
    
// 置状态为STOP(可能未RUNNING或者SHUTDOWN)
            if (state < STOP)
                runState = STOP;

            
try {
                
for (Worker w : workers) {
         
// 直接中断
                    w.interruptNow();
                }

            }
 catch (SecurityException se) // Try to back out
                runState = state;
                
// tryTerminate() here would be a no-op
                throw se;
            }


    
// 将队列中的所有可用元素添加list中并返回
            List<Runnable> tasks = drainQueue();
    
// 尝试终止
            tryTerminate(); // Terminate now if pool and queue empty
            return tasks;
        }
 finally {
            mainLock.unlock();
        }

    }


11.总结:
      1.corePoolSize/maximumPoolSize/keepAliveTime/workQueue/threadFactory/rejectedExecutionHandler 为线程池6大参数.
     2.corePoolSize:当线程池poolSize少于corePoolSize时,则会新增worker线程.
     3.线程池数目超过corePoolSize则向workQueue offer 任务.如果offer失败则在maximumPoolSize下新增worker线程;如果超过了maximumPoolSize,则执行拒绝策略.
     4.keepAliveTime:poolSize超过了corePoolSize时(或者允许core thread timeout),此参数指明workQueue pool的超时时间,超时则返回null,即表示当前线程空闲.(workerCanExit中有判断workQueue为空的条件)然后worker线程结束(被回收).
     5.Worker有两个方法interruptIfIdle,这个方法会先获得运行锁,即如果当前有任务运行(占有锁),则其他线程无法中断.只有执行完workQueue的任务才会结束并释放锁.(shutdown);而另一个方法interruptNow则是不管任何条件,直接interrupt.
posted on 2013-12-26 11:43 landon 阅读(1650) 评论(2)  编辑  收藏 所属分类: Sources

FeedBack:
# re: JDK源码笔记1-ThreadPoolExecutor
2013-12-26 12:28 | 零柒锁业
对国有银行的办事效率深表怀疑  回复  更多评论
  
# re: JDK源码笔记1-ThreadPoolExecutor
2013-12-27 12:49 | 零柒锁业
支持博主分享  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: