最近需要一个能根据请求数变化的线程池,JAVA有这样的东西,可是C++下好像一般只是固定大小的线程池。所以就基于ACE写了个,只做了初步测试。
主要思想是:
1. 重载ACE_Task,这相当于是个固定线程池,用一个信号量(ACE_Thread_Semaphore)来记数空闲线程数。
2. 初始化时根据用户的输入,确定最少线程数minnum和最大线程数maxnum,当多个请求到来,并且无空闲线程(信号量用光),判断总线程数小于maxnum,就开始强迫增加线程数。
3. 当线程响应完一个请求(任务)后,如果当前任务队列为空,且线程数大于minnum,就退出本线程。这里做了一个优化,就算满足条件,线程也会在队列上再等待10秒,防止线程池抖动带来不必要的开销。
使用:
重载这个类,重载service_func函数实现自己的任务处理。
start_pool初始化线程池,之后,就可以用add_task向线程池添加任务。
它会根据请求的数量自动控制池大小进行处理。
已经在LINUX下测试通过。由于ACE是跨平台的,所以这个实现也应该可以在WINDOWS下工作。
编译:
带THREAD_POOL_UNIT_TEST选项,则编译出自测程序test
gcc -g -Wall -O2 -g -Wall -I. -I../ -I../mon/comm/ACE_wrappers -g -DTHREAD_POOL_UNIT_TEST -o test thread_pool.cpp -lpthread -lm -lz -lstdc++ ../mon/comm/ACE_wrappers/ace/libACE.a -ldl
thread_pool.h头文件:
 #ifndef THREAD_POOL
#ifndef THREAD_POOL
 #define THREAD_POOL
#define THREAD_POOL

 #include "ace/Task.h"
#include "ace/Task.h"
 #include "ace/Thread_Mutex.h"
#include "ace/Thread_Mutex.h"
 #include "ace/Thread_Semaphore.h"
#include "ace/Thread_Semaphore.h"

 class thread_pool : public ACE_Task<ACE_MT_SYNCH>
class thread_pool : public ACE_Task<ACE_MT_SYNCH>


 {
{
 public:
public:
 thread_pool ();
    thread_pool ();

 ~thread_pool ();
    ~thread_pool ();

 // begin the initial threads and waiting for request
    // begin the initial threads and waiting for request
 int start_pool (
    int start_pool (
 int minnum = 5, // min number of thread
        int minnum = 5, // min number of thread
 int maxnum = 100,  // max number of thread
        int maxnum = 100,  // max number of thread
 int waitsize = 1024, // request queue length
        int waitsize = 1024, // request queue length
 int parsize = 1024); // your parameter size
        int parsize = 1024); // your parameter size


 // pending request in work queue
    // pending request in work queue
 int wait_cnt ();
    int wait_cnt ();

 // add one task to thread pool
    // add one task to thread pool
 int add_task (void *arg, int size);
    int add_task (void *arg, int size);

 // user defined work thread function
    // user defined work thread function
 virtual int service_func (void* arg);
    virtual int service_func (void* arg);

 // overide base class function for thread pool logical
    // overide base class function for thread pool logical
 virtual int svc (void);
    virtual int svc (void);

 // not use
    // not use
 virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg);
    virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg);

 private:
private:
 int minnum_, maxnum_;
    int minnum_, maxnum_;
 int waitsize_, parsize_;
    int waitsize_, parsize_;

 //    ACE_Recursive_Thread_Mutex free_thread_cnt__mutex_;
//    ACE_Recursive_Thread_Mutex free_thread_cnt__mutex_;

 ACE_Thread_Semaphore *pfree_thread_; // for free thread count
    ACE_Thread_Semaphore *pfree_thread_; // for free thread count

 long thread_flags_; // ace thread create flag
    long thread_flags_; // ace thread create flag
 };
};



 #endif /**//* THREAD_POOL */
#endif /**//* THREAD_POOL */


thread_pool.cpp实现文件:
 #include "thread_pool.h"
#include "thread_pool.h"

 #define THREAD_POOL_DONOT_ACQUIRE    0x1001 // do not aquire again in new added thread
#define THREAD_POOL_DONOT_ACQUIRE    0x1001 // do not aquire again in new added thread


 thread_pool::thread_pool ()
thread_pool::thread_pool ()  {
{
 thread_flags_ = THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED;
    thread_flags_ = THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED;
 pfree_thread_ = NULL;
    pfree_thread_ = NULL;
 }
}


 thread_pool::~thread_pool ()
thread_pool::~thread_pool ()  {
{
 if (pfree_thread_)
    if (pfree_thread_)
 delete pfree_thread_;
        delete pfree_thread_;
 }
}


 int thread_pool::wait_cnt ()
int thread_pool::wait_cnt ()  {
{
 return this->msg_queue()->message_count ();
    return this->msg_queue()->message_count ();
 }
}


 int thread_pool::handle_timeout (const ACE_Time_Value &tv, const void *arg)
int thread_pool::handle_timeout (const ACE_Time_Value &tv, const void *arg)  {
{
 return 0;
    return 0;
 }
}

 int thread_pool::start_pool (
int thread_pool::start_pool (
 int minnum,
    int minnum,
 int maxnum,
    int maxnum, 
 int waitsize,
    int waitsize, 

 int parsize)
    int parsize)  {
{
 minnum_ = minnum;
    minnum_ = minnum;
 maxnum_ = maxnum;
    maxnum_ = maxnum;
 waitsize_ = waitsize;
    waitsize_ = waitsize;
 parsize_ = parsize;
    parsize_ = parsize;
 
    
 this->msg_queue()->high_water_mark (waitsize * parsize);
    this->msg_queue()->high_water_mark (waitsize * parsize);

 pfree_thread_ = new ACE_Thread_Semaphore (minnum);
    pfree_thread_ = new ACE_Thread_Semaphore (minnum);

 int ret = this->activate (thread_flags_, minnum);
    int ret = this->activate (thread_flags_, minnum);

 return ret;
    return ret;
 }
}


 int thread_pool::add_task (void *arg, int size)
int thread_pool::add_task (void *arg, int size)  {
{
 ACE_Message_Block *mb = new ACE_Message_Block (parsize_);
    ACE_Message_Block *mb = new ACE_Message_Block (parsize_);
 
    
 // test free threads condition
    // test free threads condition

 if (pfree_thread_->tryacquire () == -1)
    if (pfree_thread_->tryacquire () == -1)  { // acquire one free thread to do work
{ // acquire one free thread to do work
 printf ("free thread used up\n");
        printf ("free thread used up\n");


 if (this->thr_count () < maxnum_)
        if (this->thr_count () < maxnum_)  {
{
 this->activate (thread_flags_, 1, 1);
            this->activate (thread_flags_, 1, 1);
 
            
 printf ("new thread release\n");
            printf ("new thread release\n");
 pfree_thread_->release ();
            pfree_thread_->release ();
 
            
 printf ("new one thread, now %d\n", this->thr_count ());
            printf ("new one thread, now %d\n", this->thr_count ());

 } else
        } else  {
{
 printf ("can't new more threads, queue len %d\n", wait_cnt () + 1);
            printf ("can't new more threads, queue len %d\n", wait_cnt () + 1);
 }
        }

 } else
    } else  {
{
 // pfree_thread_->release (); // restore cnt, let svc function do acquire work
        // pfree_thread_->release (); // restore cnt, let svc function do acquire work
 printf ("new task acquire\n");
        printf ("new task acquire\n");
 mb->set_flags (THREAD_POOL_DONOT_ACQUIRE);
        mb->set_flags (THREAD_POOL_DONOT_ACQUIRE);
 }
    }
 
    
 // create msg
    // create msg
 printf ("add msg\n");
    printf ("add msg\n");

 memcpy (mb->wr_ptr (), (char*) arg, size);
    memcpy (mb->wr_ptr (), (char*) arg, size);
 
            
 this->putq (mb);
    this->putq (mb);

 return 0;
    return 0;
 }
}



 int thread_pool::service_func (void* arg)
int thread_pool::service_func (void* arg)  {
{
 sleep (1);
    sleep (1);
 printf ("finished task %d in thread %02X\n", *(int*) arg, (int)ACE_Thread::self ());
    printf ("finished task %d in thread %02X\n", *(int*) arg, (int)ACE_Thread::self ());
 return 0;
    return 0;
 }
}



 int thread_pool::svc (void)
int thread_pool::svc (void)  {
{
 printf ("thread started\n");
    printf ("thread started\n");

 while (1)
    while (1)

 
     {
{                
 ACE_Message_Block *b = 0;
        ACE_Message_Block *b = 0;
 ACE_Time_Value wait = ACE_OS::gettimeofday ();
        ACE_Time_Value wait = ACE_OS::gettimeofday ();
 wait.sec (wait.sec () + 10); // timeout in 10 secs to test if more tasks need to do or we'll exit
        wait.sec (wait.sec () + 10); // timeout in 10 secs to test if more tasks need to do or we'll exit
 
        

 if (this->getq (b, &wait) < 0)
        if (this->getq (b, &wait) < 0)  {
{

 if (this->thr_count () > minnum_)
            if (this->thr_count () > minnum_)  {
{
 printf ("over task acquire\n");
                printf ("over task acquire\n");
 pfree_thread_->acquire ();
                pfree_thread_->acquire ();
 printf ("delete one thread, now %d\n", this->thr_count ()-1);
                printf ("delete one thread, now %d\n", this->thr_count ()-1);
 
                
 return 0;
                return 0;
 } else
            } else 
 continue; // I'm the one of last min number of threads
                continue; // I'm the one of last min number of threads
 }
        }


 if (b->flags () & THREAD_POOL_DONOT_ACQUIRE == 0)
        if (b->flags () & THREAD_POOL_DONOT_ACQUIRE == 0)  {
{
 printf ("queue task acquire\n");
            printf ("queue task acquire\n");
 pfree_thread_->acquire (); // I'll use one free thread
            pfree_thread_->acquire (); // I'll use one free thread
 }
        }
 else
        else 
 printf ("no need to acquire\n");
            printf ("no need to acquire\n");

 this->service_func ((void*)b->rd_ptr());
        this->service_func ((void*)b->rd_ptr());
 
                            
 printf ("finished release\n");
        printf ("finished release\n");
 b->release();
        b->release();
 
        
 pfree_thread_->release (); // added one free thread
        pfree_thread_->release (); // added one free thread
 }
    }

 return 0;
    return 0;
 }
}


 #ifdef THREAD_POOL_UNIT_TEST
#ifdef THREAD_POOL_UNIT_TEST 

 int main (int argc, int ** argv)
int main (int argc, int ** argv)  {
{
 printf ("begin test:\n");
    printf ("begin test:\n");

 /**//*
/**//*
 ACE_Thread_Semaphore* s = new ACE_Thread_Semaphore (0);
    ACE_Thread_Semaphore* s = new ACE_Thread_Semaphore (0);
 s->release (3);
    s->release (3);
 s->acquire ();
    s->acquire ();
 s->acquire ();
    s->acquire ();
 s->acquire ();
    s->acquire ();
 printf ("ok");
    printf ("ok");
 return 0;
    return 0;
 */
*/    
 thread_pool t;
    thread_pool t;
 t.start_pool (10, 100);
    t.start_pool (10, 100);


 for (int i=0; i<200; i++)
    for (int i=0; i<200; i++)  {
{
 t.add_task (&i, sizeof(i));
        t.add_task (&i, sizeof(i));
 if (i % 20 == 0)
        if (i % 20 == 0)
 sleep (1);
            sleep (1);
 }
    }

 sleep (1000);
    sleep (1000);
 
    
 printf ("end test:\n");
    printf ("end test:\n");
 return 0;
    return 0;
 }
}

 #endif
#endif


posted on 2007-08-14 17:56 
我爱佳娃 阅读(6111) 
评论(4)  编辑  收藏  所属分类: 
自写类库