Vincent

Vicent's blog
随笔 - 74, 文章 - 0, 评论 - 5, 引用 - 0
数据加载中……

一个线程池的实现

设计目标
Ø    提供一个线程池的组件,具有良好的伸缩性,当线程够用时,销毁不用线程,当线程不够用时,自动增加线程数量;
Ø    提供一个工作任务接口和工作队列,实际所需要的任务都必须实现这个工作任务接口,然后放入工作队列中;
Ø    线程池中的线程从工作队列中,自动取得工作任务,执行任务。
主要控制类和功能接口设计
线程池管理器 ThreadPoolManager 的功能:
Ø    管理线程池中的各个属性变量
ü    最大工作线程数
ü    最小工作线程数
ü    激活的工作线程总数
ü    睡眠的工作线程总数
ü    工作线程总数 (即:激活的工作线程总数+睡眠的工作线程总数)
Ø    创建工作线程
Ø    销毁工作线程
Ø    启动处于睡眠的工作线程
Ø    睡眠处于激活的工作线程
Ø    缩任务:当工作线程总数小于或等于最小工作线程数时,销毁多余的睡眠的工作线程,使得现有工作线程总数等于最小工作任务总数
Ø    伸任务:当任务队列任务总数大于工作线程数时,增加工作线程总数至最大工作线程数
Ø    提供线程池启动接口
Ø    提供线程池销毁接口
工作线程 WorkThread  的功能:
Ø    从工作队列取得工作任务
Ø    执行工作任务接口中的指定任务
工作任务接口 ITask   的功能:
Ø    提供指定任务动作
工作队列 IWorkQueue  的功能:
Ø    提供获取任务接口,并删除工作队列中的任务;
Ø    提供加入任务接口;
Ø    提供删除任务接口;
Ø    提供取得任务总数接口;
Ø    提供自动填任务接口;(当任务总数少于或等于默认总数的25%时,自动装填)
Ø    提供删除所有任务接口;


Code


ThreadPoolManager:
=====================================
CODE:
package test.thread.pool1;
import java.util.ArrayList;
import java.util.List;
import test.thread.pool1.impl.MyWorkQueue;

/**
 * <p>Title: 线程池管理器</p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class ThreadPoolManager {
  /*最大线程数*/
  private int threads_max_num;

  /*最小线程数*/
  private int threads_min_num;
  
  /* 线程池线程增长步长 */
  private int threads_increase_step = 5;

  /* 任务工作队列 */
  private IWorkQueue queue;
  
  /* 线程池监视狗 */
  private PoolWatchDog poolWatchDog ;
  
  /* 队列线程 */
  private Thread queueThread ;
  
  /* 线程池 封装所有工作线程的数据结构 */
  private List pool = new ArrayList();
  
  /* 线程池中 封装所有钝化后的数据结构*/
  private List passivePool = new ArrayList();
  
  /* 空闲60秒 */
  private static final long IDLE_TIMEOUT = 60000L;
  
  /* 关闭连接池标志位 */
  private boolean close = false;
  
  /**
   * 线程池管理器
   * @param queue 任务队列
   * @param threads_min_num 工作线程最小数
   * @param threads_max_num 工作线程最大数
   */
  public ThreadPoolManager(int threads_max_num
                          ,int threads_min_num
                          ,IWorkQueue queue){
    this.threads_max_num = threads_max_num;
    this.threads_min_num = threads_min_num;
    this.queue = queue;    
  }

  /**
   * 线程池启动
   */
  public void startPool(){
    System.out.println("=== startPool..........");
    poolWatchDog = new PoolWatchDog("PoolWatchDog");
    poolWatchDog.setDaemon(true);
    poolWatchDog.start();
    System.out.println("=== startPool..........over");
  }

  /**
   * 线程池销毁接口
   */
  public void destoryPool(){
    System.out.println("==========================DestoryPool starting ...");
    this.close = true;
    int pool_size = this.pool.size();
    
    //中断队列线程
    System.out.println("===Interrupt queue thread ... ");
    queueThread.interrupt();
    queueThread = null;
    
    System.out.println("===Interrupt thread pool ... ");
    Thread pool_thread = null;
    for(int i=0; i<pool_size; i++){
      pool_thread = (Thread)pool.get(i);
      if(pool_thread !=null 
      && pool_thread.isAlive() 
      && !pool_thread.isInterrupted()){
        pool_thread.interrupt();
        System.out.println("Stop pool_thread:"
                          +pool_thread.getName()+"[interrupt] "
                          +pool_thread.isInterrupted());
      }
    }//end for
    
    if(pool != null){
      pool.clear();
    }
    if(passivePool != null){
      pool.clear();
    }
    
    try{
      System.out.println("=== poolWatchDog.join() starting ...");
      poolWatchDog.join();
      System.out.println("=== poolWatchDog.join() is over ...");
    }
    catch(Throwable ex){
      System.out.println("###poolWatchDog ... join method throw a exception ... "
                          +ex.toString());
    }
    
    poolWatchDog =null;
    System.out.println("==============================DestoryPool is over ...");    
  }
  
  
  public static void main(String[] args) throws Exception{
    ThreadPoolManager threadPoolManager1 = new ThreadPoolManager(10,5,new MyWorkQueue(50,30000));
    
    threadPoolManager1.startPool();
    Thread.sleep(60000);
    threadPoolManager1.destoryPool();
  }
  
  /**
   * 线程池监视狗
   */
  private class PoolWatchDog extends Thread{
    public PoolWatchDog(String name){
      super(name);
    }
  
    public void run(){
      Thread workThread = null;
      Runnable run = null;
      
      //开启任务队列线程,获取数据--------
      System.out.println("===QueueThread starting ... ... ");
      queueThread = new Thread(new QueueThread(),"QueueThread");
      queueThread.start();
      
      System.out.println("===Initial thread Pool ... ...");
      //初始化线程池的最小线程数,并放入池中
      for(int i=0; i<threads_min_num; i++){
        run = new WorkThread();
        workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
        workThread.start();
        if(i == threads_min_num -1){
          workThread = null;
          run = null;
        }
      }
      System.out.println("===Initial thread Pool..........over ,and get pool's size:"+pool.size());

      //线程池线程动态增加线程算法--------------
      while(!close){
      
        //等待5秒钟,等上述线程都启动----------
        synchronized(this){          
          try{
            System.out.println("===Wait the [last time] threads starting ....");
            this.wait(15000);
          }
          catch(Throwable ex){
            System.out.println("###PoolWatchDog invoking is failure ... "+ex);
          }
        }//end synchronized
          
        //开始增加线程-----------------------spread动作
        int queue_size = queue.getTaskSize();
        int temp_size = (queue_size - threads_min_num);
        
        if((temp_size > 0) && (temp_size/threads_increase_step > 2) ){
          System.out.println("================Spread thread pool starting ....");
          for(int i=0; i<threads_increase_step && (pool.size() < threads_max_num); i++){
            System.out.println("=== Spread thread num : "+i);
            run = new WorkThread();
            workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
            workThread.start();
          }//end for
          
          workThread = null;
          run = null;    
          System.out.println("===Spread thread pool is over .... and pool size:"+pool.size());
        }//end if
          
        //删除已经多余的睡眠线程-------------shrink动作
        int more_sleep_size = pool.size() - threads_min_num;//最多能删除的线程数
        int sleep_threads_size = passivePool.size();
        if(more_sleep_size >0 && sleep_threads_size >0){
          System.out.println("================Shrink thread pool starting ....");        
          for(int i=0; i < more_sleep_size && i < sleep_threads_size ; i++){
            System.out.println("=== Shrink thread num : "+i);
            Thread removeThread = (Thread)passivePool.get(0);
            if(removeThread != null && removeThread.isAlive() && !removeThread.isInterrupted()){
              removeThread.interrupt();
            }
          }
          System.out.println("===Shrink thread pool is over .... and pool size:"+pool.size());          
        }

        System.out.println("===End one return [shrink - spread operator] ....");    
      }//end while
    }//end run 
  }//end private class
  
  /**
   * 工作线程
   */
  class WorkThread implements Runnable{
  
    public WorkThread(){
    }
  
    public void run(){
      String name = Thread.currentThread().getName();
      System.out.println("===Thread.currentThread():"+name);
      pool.add(Thread.currentThread());    
    
      while(true){
      
        //获取任务---------
        ITask task = null;
        try{
          System.out.println("===Get task from queue is starting ... ");
          //看线程是否被中断,如果被中断停止执行任务----
          if(Thread.currentThread().isInterrupted()){
            System.out.println("===Breaking current thread and jump whlie [1] ... ");
            break;
          }
          task = queue.getTask();
        }
        catch(Throwable ex){
          System.out.println("###No task in queue:"+ex);
        }//end tryc
        
        if(task != null){
          //执行任务---------
          try{
            System.out.println("===Execute the task is starting ... ");
            //看线程是否被中断,如果被中断停止执行任务----
            if(Thread.currentThread().isInterrupted()){
              System.out.println("===Breaking current thread and jump whlie [1] ... ");
              break;
            }     
            task.executeTask();
            //任务执行完毕-------
            System.out.println("===Execute the task is over ... ");
          }
          catch(Throwable ex){
            System.out.println("###Execute the task is failure ... "+ex);
          }//end tryc
          
        }else{
          //没有任务,则钝化线程至规定时间--------
          synchronized(this){
            try{
              System.out.println("===Passivate into passivePool ... ");
              
              //看线程是否被中断,如果被中断停止执行任务----
              boolean isInterrupted = Thread.currentThread().isInterrupted();
              if(isInterrupted){
                System.out.println("===Breaking current thread and jump whlie [1] ... ");
                break;
              }
//              passivePool.add(this);
            passivePool.add(Thread.currentThread());

              
              //准备睡眠线程-------
              isInterrupted = Thread.currentThread().isInterrupted();
              if(isInterrupted){
                System.out.println("===Breaking current thread and jump whlie [2] ... ");
                break;
              }              
              this.wait(IDLE_TIMEOUT);
            }
            catch(Throwable ex1){
              System.out.println("###Current Thread passivate is failure ... break while cycle. "+ex1);
              break;
            }
          }          
        }        
      }//end while--------
      
      if(pool.contains(passivePool)){
        pool.remove(this);
      }
      if(passivePool.contains(passivePool)){
        passivePool.remove(this);
      }
      System.out.println("===The thread execute over ... "); 
    }//end run----------
  }
  
  
  class QueueThread implements Runnable{
  
    public QueueThread(){
    }
  
    public void run(){
      while(true){
        //自动装在任务--------
        queue.autoAddTask();
        System.out.println("===The size of queue's task is "+queue.getTaskSize());
      
        synchronized(this){
          if(Thread.currentThread().isInterrupted()){
            break;
          }else{
              try{
                this.wait(queue.getLoadDataPollingTime());
              }
              catch(Throwable ex){
                System.out.println("===QueueThread invoked wait is failure ... break while cycle."+ex);
                break;
              }
          }//end if
        }//end synchr
        
      }//end while
    }//end run
  } 
}






WorkQueue
=====================================
CODE:
package test.thread.pool1;

import java.util.LinkedList;
import test.thread.pool1.impl.MyTask;

/**
 * <p>Title: 工作队列对象 </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public abstract class WorkQueue implements IWorkQueue{
  /* 预计装载量 */
  private int load_size;
  
  /* 数据装载轮循时间 */
  private long load_polling_time;
  
  /* 队列 */
  private LinkedList queue = new LinkedList();
  
  /**
   * 
   * @param load_size 预计装载量
   * @param load_polling_time 数据装载轮循时间
   */
  public WorkQueue(int load_size,long load_polling_time){
    this.load_size = (load_size <= 10) ? 10 : load_size;
    this.load_polling_time = load_polling_time;
  }

  /* 数据装载轮循时间 */
  public long getLoadDataPollingTime(){
    return this.load_polling_time;
  }


  /*获取任务,并删除队列中的任务*/
  public synchronized ITask getTask(){
    ITask task = (ITask)queue.getFirst();
    queue.removeFirst();
    return task;
  }

  /*加入任务*/
  public void  addTask(ITask task){
    queue.addLast(task);
  }

  /*删除任务*/
  public synchronized void removeTask(ITask task){
    queue.remove(task);
  }

  /*任务总数*/
  public synchronized int getTaskSize(){
    return queue.size();
  }

  /*自动装填任务*/
  public synchronized void autoAddTask(){
  
    synchronized(this){
      float load_size_auto = load_size - getTaskSize() / load_size;
      System.out.println("===load_size_auto:"+load_size_auto);
      
      if(load_size_auto > 0.25){        
        autoAddTask0();
      }
      else {
        System.out.println("=== Not must load new work queue ... Now! ");
      }    
    }
  }

  /*删除所有任务*/
  public synchronized void clearAllTask(){
    queue.clear();
  }
  
  /**
   * 程序员自己实现该方法
   */
  protected abstract void autoAddTask0();
}





MyWorkQueue
=====================================
CODE:
package test.thread.pool1.impl;

import java.util.LinkedList;
import test.thread.pool1.WorkQueue;

/**
 * <p>Title: 例子工作队列对象 </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class MyWorkQueue extends WorkQueue{

  /**
   * @param load_size 预计装载量
   * @param load_polling_time 数据装载轮循时间
   */
  public MyWorkQueue(int load_size,long load_polling_time){
    super(load_size,load_polling_time);
  }

  /**
   * 自动加载任务
   */
  protected synchronized void autoAddTask0(){
    //-------------------
    System.out.println("===MyWorkQueue ...  invoked autoAddTask0() method ...");
    for(int i=0; i<10; i++){
      System.out.println("===add task :"+i);
      this.addTask(new MyTask());
    }    
    //-------------------
  }
}





MyTask
=====================================
CODE:
package test.thread.pool1.impl;
import test.thread.pool1.ITask;

/**
 * <p>Title: 工作任务接口 </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class MyTask implements ITask {

  /**
   * 执行的任务
   * @throws java.lang.Throwable
   */
  public void executeTask() throws Throwable{
    System.out.println("["+this.hashCode()+"] MyTask ... invoked executeTask() method ... ");
  }
}

posted on 2006-08-24 16:55 Binary 阅读(3708) 评论(2)  编辑  收藏 所属分类: j2se

评论

# re: 一个线程池的实现  回复  更多评论   

编译时,
ITask出错
IWorkQueue找不到.
是不是弄少了段代码?
2007-03-05 23:29 | Peng

# re: 一个线程池的实现  回复  更多评论   

文章不错,但是缺代码了,作者能补全吗?
2008-05-25 23:30 | lindily

只有注册用户登录后才能发表评论。


网站导航: