Chan Chen Coding...

Pool resources using Apache's Commons Pool Framework

Refer to: http://www.javaworld.com/javaworld/jw-01-2005/jw-0124-pool.html?page=1

Resource usage could prove critical at times for heavy-duty applications. Some famous Websites have shut down because of their inability to handle heavy loads. Most problems related to heavy loads can be handled, at a macro level, using clustering and load-balancing capabilities. Concerns remain at the application level with respect to excessive object creation and the availability of limited server resources like memory, CPU, threads, and database connections, which could represent potential bottlenecks and, when not utilized optimally, bring down the whole server.

In some situations, the database usage policy could enforce a limit on the number of concurrent connections. Also, an external application could dictate or restrict the number of concurrent open connections. A typical example is a domain registry (like Verisign) that limits the number of available active socket connections for registrars (like BulkRegister). Pooling resources has proven to be one of the best options in handling these types of issues and, to a certain extent, also helps in maintaining the required service levels for enterprise applications.

Most J2EE application server vendors provide resource pooling as an integral part of their Web and EJB (Enterprise JavaBean) containers. For database connections, the server vendor usually provides an implementation of the DataSource interface, which works in conjunction with the JDBC (Java Database Connectivity) driver vendor's ConnectionPoolDataSource implementation. TheConnectionPoolDataSource implementation serves as a resource manager connection factory for pooled java.sql.Connection objects. Similarly, EJB instances of stateless session beans, message-driven beans, and entity beans are pooled in EJB containers for higher throughput and performance. XML parser instances are also candidates for pooling, because the creation of parser instances consumes much of a system's resources.

A successful open source resource-pooling implementation is the Commons Pool framework's DBCP, a database connection pooling component from the Apace Software Foundation that is extensively used in production-class enterprise applications. In this article, I briefly discuss the internals of the Commons Pool framework and then use it to implement a thread pool.

Let's first look at what the framework provides.

The Commons Pool framework offers a basic and robust implementation for pooling arbitrary objects. Several implementations are provided, but for this article's purposes, we use the most generic implementation, the GenericObjectPool. It uses aCursorableLinkedList, which is a doubly-linked-list implementation (part of the Jakarta Commons Collections), as the underlying datastructure for holding the objects being pooled.

On top, the framework provides a set of interfaces that supply lifecycle methods and helper methods for managing, monitoring, and extending the pool.

The interface org.apache.commons.PoolableObjectFactory defines the following lifecycle methods, which prove essential for implementing a pooling component:

   // Creates an instance that can be returned by the pool
public Object makeObject() {}
// Destroys an instance no longer needed by the pool
public void destroyObject(Object obj) {}
// Validate the object before using it
public boolean validateObject(Object obj) {}
// Initialize an instance to be returned by the pool
public void activateObject(Object obj) {}
// Uninitialize an instance to be returned to the pool
public void passivateObject(Object obj) {}

As you can make out by the method signatures, this interface primarily deals with the following:

  • makeObject(): Implement the object creation
  • destroyObject(): Implement the object destruction
  • validateObject(): Validate the object before it is used
  • activateObject(): Implement the object initialization code
  • passivateObject(): Implement the object uninitialization code


Another core interface—org.apache.commons.ObjectPool—defines the following methods for managing and monitoring the pool:

  // Obtain an instance from my pool
Object borrowObject() throws Exception;
// Return an instance to my pool
void returnObject(Object obj) throws Exception;
// Invalidates an object from the pool
void invalidateObject(Object obj) throws Exception;
// Used for pre-loading a pool with idle objects void addObject() throws Exception;
// Return the number of idle instances
int getNumIdle() throws UnsupportedOperationException;
// Return the number of active instances int getNumActive() throws UnsupportedOperationException;
// Clears the idle objects void clear() throws Exception, UnsupportedOperationException;
// Close the pool void close() throws Exception;
//Set the ObjectFactory to be used for creating instances
void setFactory(PoolableObjectFactory factory) throws IllegalStateException, UnsupportedOperationException;


The ObjectPool interface's implementation takes a PoolableObjectFactory as an argument in its constructors, thereby delegating object creation to its subclasses. I don't talk much about design patterns here since that is not our focus. For readers interested in looking at the UML class diagrams, please see Resources.

As mentioned above, the class org.apache.commons.GenericObjectPool is only one implementation of theorg.apache.commons.ObjectPool interface. The framework also provides implementations for keyed object pools, using the interfaces org.apache.commons.KeyedObjectPoolFactory and org.apache.commons.KeyedObjectPool, where one can associate a pool with a key (as in HashMap) and thus manage multiple pools.

The key to a successful pooling strategy depends on how we configure the pool. Badly configured pools can be resource hogs, if the configuration parameters are not well tuned. Let's look at some important parameters and their purpose.

Configuration details

The pool can be configured using the GenericObjectPool.Config class, which is a static inner class. Alternatively, we could just use theGenericObjectPool's setter methods to set the values.

The following list details some of the available configuration parameters for the GenericObjectPool implementation:

  • maxIdle: The maximum number of sleeping instances in the pool, without extra objects being released.
  • minIdle: The minimum number of sleeping instances in the pool, without extra objects being created.
  • maxActive: The maximum number of active instances in the pool.
  • timeBetweenEvictionRunsMillis: The number of milliseconds to sleep between runs of the idle-object evictor thread. When negative, no idle-object evictor thread will run. Use this parameter only when you want the evictor thread to run.
  • minEvictableIdleTimeMillis: The minimum amount of time an object, if active, may sit idle in the pool before it is eligible for eviction by the idle-object evictor. If a negative value is supplied, no objects are evicted due to idle time alone.
  • testOnBorrow: When "true," objects are validated. If the object fails validation, it will be dropped from the pool, and the pool will attempt to borrow another.


Optimal values should be provided for the above parameters to achieve maximum performance and throughput. Since the usage pattern varies from application to application, tune the pool with different combinations of parameters to arrive at the optimal solution.

To understand more about the pool and its internals let's implement a thread pool.

Proposed thread pool requirements

Suppose we were told to design and implement a thread pool component for a job scheduler to trigger jobs at specified schedules and report the completion and, possibly, the result of the execution. In such a scenario, the objective of our thread pool is to pool a prerequisite number of threads and execute the scheduled jobs in independent threads. The requirements are summarized as follows:

  • The thread should be able to invoke any arbitrary class method (the scheduled job)
  • The thread should be able to return the result of an execution
  • The thread should be able to report the completion of a task


The first requirement provides scope for a loosely coupled implementation as it doesn't force us to implement an interface likeRunnable. It also makes integration easy. We can implement our first requirement by providing the thread with the following information:

  • The name of the class
  • The name of the method to be invoked
  • The parameters to be passed to the method
  • The parameter types of the parameters passed


The second requirement allows a client using the thread to receive the execution result. A simple implementation would be to store the result of the execution and provide an accessor method likegetResult().

The third requirement is somewhat related to the second requirement. Reporting a task's completion may also mean that the client is waiting to get the result of the execution. To handle this capability, we can provide some form of a callback mechanism. The simplest callback mechanism can be implemented using the java.lang.Object's wait() and notify() semantics. Alternatively, we could use the Observer pattern, but for now let's keep things simple. You might be tempted to use thejava.lang.Thread class's join() method, but that won't work since the pooled thread never completes its run() method and keeps running as long as the pool needs it.

Now that we have our requirements ready and a rough idea as to how to implement the thread pool, it's time to do some real coding.

At this stage, our UML class diagram of the proposed design looks like the figure below.




Implementing the thread pool

The thread object we are going to pool is actually a wrapper around the thread object. Let's call the wrapper the WorkerThreadclass, which extends the java.lang.Thread class. Before we can start coding WorkerThread, we must implement the framework requirements. As we saw earlier, we must implement the PoolableObjectFactory, which acts as a factory, to create our poolableWorkerThreads. Once the factory is ready, we implement the ThreadPool by extending the GenericObjectPool. Then, we finish ourWorkerThread.

Implementing the PoolableObjectFactory interface

We begin with the PoolableObjectFactory interface and try to implement the necessary lifecycle methods for our thread pool. We write the factory class ThreadObjectFactory as follows:

public class ThreadObjectFactory implements PoolableObjectFactory{


   public Object makeObject() {
      return new WorkerThread();
   }
   
   public void destroyObject(Object obj) {
      if (obj instanceof WorkerThread) {
         WorkerThread rt = (WorkerThread) obj;
         rt.setStopped(true);//Make the running thread stop
      }
   }
   
   public boolean validateObject(Object obj) {
      if (obj instanceof WorkerThread) {
         WorkerThread rt = (WorkerThread) obj;
         if (rt.isRunning()) {
            if (rt.getThreadGroup() == null) {
               return false;
            }
            return true;
         }
      }
      return true;
   }
   
   public void activateObject(Object obj) {
      log.debug(" activateObject");
   }


   public void passivateObject(Object obj) {
      log.debug(" passivateObject" + obj);
      if (obj instanceof WorkerThread) {
         WorkerThread wt = (WorkerThread) obj;
         wt.setResult(null); //Clean up the result of the execution
      }
   }
}

Let's walk through each method in detail:

Method makeObject() creates the WorkerThread object. For every request, the pool is checked to see whether a new object is to be created or an existing object is to be reused. For example, if a particular request is the first request and the pool is empty, the ObjectPool implementation calls makeObject() and adds the WorkerThread to the pool.

Method destroyObject() removes the WorkerThread object from the pool by setting a Boolean flag and thereby stopping the running thread. We will look at this piece again later, but notice that we are now taking control over how our objects are being destroyed.


Method validateObject() tries to validate the object it receives from the pool. The validation method returns a Boolean value indicating whether an object returned from the pool is usable. Here, we actually check to see if the thread is running or not. Only when it is running, do we assign the task. We may, alternatively, choose to always return true, without any validation performed. If the thread is not running due to any reason, we cannot execute the task. Suppose the underlying object is a database connection; we would check to see whether the physical connection is alive. Also note that this method is called after the activateObject() method is called, and only when the testOnBorrowtestOnReturn, or testOnIdle is set.

Method activateObject() tries to activate or initialize the state of the underlying object. In our case, the WorkerThread object does all the necessary state management so we don't need any help from the framework here.

Method passivateObject() tries to uninitialize the state of the underlying object. After an object's state is uninitialized, we clean up the execution result by calling setResult(null) so our WorkerThread is ready to handle the next request.

Implement ThreadPool by extending GenericObjectPool

Now, let's implement our ThreadPool class, which usesThreadObjectFactory and extends GenericObjectPool:

public class ThreadPool extends GenericObjectPool {


   //First constructor.
   public ThreadPool(ThreadObjectFactory objFactory) {
      super(objFactory);
      this.setMaxIdle(2); // Maximum idle threads.
      this.setMaxActive(4); // Maximum active threads.
      this.setMinEvictableIdleTimeMillis(30000); //Evictor runs every 30 secs.
      this.setTestOnBorrow(true); // Check if the thread is still valid.
      
//this.setMaxWait(1000); // Wait 1 second till a thread is available
   }


   //Second constructor.
   public ThreadPool(ThreadObjectFactory objFactory,
         GenericObjectPool.Config config) {
      super(objFactory, config);
   }


   public Object borrowObject() throws Exception {
      log.debug(" borrowing object..");


      return super.borrowObject();
   }


   public void returnObject(Object obj) throws Exception {
      log.debug(" returning object.." + obj);
      super.returnObject(obj);
   }
}

The first constructor takes ThreadObjectFactory as an argument. We set the parameters to their required values for our thread pool. When we call setTestOnBorrow(true), we ensure that we are actually validating our WorkerThread by having the framework call thevalidateObject() method.

The setMaxWait(1000), which is commented out, tells the pool to wait for 1,000 milliseconds if a WorkerThread is not available. ThemaxWait value , however, depends on the strategy we specify. The default strategy is to block. The other possible values arefail and grow which are self-explanatory. These values are represented internally as a byte value.

The second constructor can be used as illustrated below:

GenericObjectPool.Config config = new GenericObjectPool.Config();


     config.maxActive = 4;
     config.maxIdle = 2;
     config.maxWait = 1000; // Wait for one second if thread not available.
     config.whenExhaustedAction = 1; //BLOCK
     config.minEvictableIdleTimeMillis = 30000;


 //With first constructor
     ThreadPool pool1 = new ThreadPool(new ThreadObjectFactory());
     //With second constructor
     ThreadPool pool2 = new ThreadPool(new ThreadObjectFactory(), config);

The poolable thread wrapper: WorkerThread

Now it is time to look at WorkerThread's code, which extendsjava.lang.Thread:

public synchronized void execute(String clsName, String methName,
         Object[] params, Class[] paramTypes, Object synObj) {
      this.className = clsName;
      this.methodName = methName;
      this.methodParams = params;
      this.syncObject = synObj;
      this.parmTypes = paramTypes;
      this.done = false;
      
      if (!running) { //If this is the first time, then kick off the thread
         this.setDaemon(true);
         this.start();


      } else { // We already have a thread running, so wake up the waiting thread
         this.notifyAll();
      }
   }

If you recall, one of our requirements states that we should be able to invoke any arbitrary method on a class, which is what we are doing in the execute(Strng, String, Object[], Class[], Object) method above. That method takes the name of the class, the method name to be invoked, the parameters to be passed as an object array, the parameter types associated with the parameters, and, optionally, an object to notify the completion of the task.

Now let's look at WorkerThread's run() method:

 public void run() {
      running = true;
      while (!stopped) { // The thread keeps running.
         if (done) { // We are waiting for a task now.
            synchronized (this) {
               try {
                  this.wait(); //Wait until we get a task to execute.
               } catch (InterruptedException e) {
                stopped = true;
                  log.error("", e);
               }
            }
         } else { //There is a task.let us execute it.
            try {
               execute();
               if (syncObject != null) {
                  synchronized (syncObject) {
                     syncObject.notifyAll(); //Notify the completion.
                  }
               }
            } catch (Exception e) {
               log.error("", e);
            } finally {
               reset();
            }
         }
      }
   }

As you can see, the run() method maintains a while loop as long as the pool uses the thread, irrespective of whether it sits idle in the pool. When the pool decides to remove the thread object, the framework invokes the destroyObject() method, which, in turn, calls the WorkerThread's setStopped(true) method. This method call forces the thread to run to its completion, since it comes out of the while loop.

Also, within the loop, the code checks to see if a task needs to be executed by checking the done variable's value. If done is "true," then the thread waits until it is notified. This notification happens in the execute() method (the method with arguments). If done's value is "false," we have a task to execute, which is executed by calling the method execute().

Once the task's execution is complete, the thread that holds the lock on the syncObject is notified. Now we need to make our thread ready by doing all the housekeeping; thus, we call the reset() method:


public void reset() {
      this.done = true;
      this.className = null;
      this.methodName = null;
      this.methodParams = null;
      this.parmTypes = null;
      this.syncObject = null;
   }


   public void setStopped(boolean stopped) {
      this.stopped = stopped;
   }


   public Object getResult() {
      return result;
   }

Let's now look at the execute() method that takes no arguments:

private void execute() {
      try {
         Class cls = getClass(this.getClassName());
         Object obj = cls.newInstance();
         this.result = MethodUtils.invokeExactMethod(obj, this


               .getMethodName(), this.getMethodParams(), this.getParmTypes());
         log.debug(" #### Execution Result = " + result);
      } catch (Exception e) {


         log.error("Exception - " + e);
      }
   }

The execute() method is where our thread actually executes the required task by instantiating an object of the specified class and invoking the specified method on it by using the passed parameters. Note that we use the Commons BeanUtils class's MethodUtils to invoke the method.

Did you realize that we just finished writing our thread pool code? It wasn't really difficult, was it? Now comes the easy part, putting our thread pool to use.

The thread pool in action

  • Step 1: Create the thread pool:
     // We are using the first constructor here ThreadPool pool = new ThreadPool(new ThreadObjectFactory());   
  • Step 2: Borrow a worker thread from the pool:
     WorkerThread rt1  = (WorkerThread) pool.borrowObject();  
  • Step 3 (optional): Create a dummy object to wait for notifications:
     Object synObj = new Object();  
  • Step 4: Execute the task using the worker thread:
     rt1.execute("com.findonnet.services.pooling.test.SampleWork", "executeTask", null, null, synObj);  
  • Step 5 (optional): Wait until the task is complete:
     synchronized (synObj) {         synObj.wait(); }  
  • Step 6 (optional): Get the result and return the object to the pool after the task completes:
     Object result = rt1.getResult();  
  • Step 7: Return the object to the pool after the task completes:
     pool.returnObject(rt1);    


Note on Steps 3, 5, and 6

Steps 3, 5, and 6 are optional. You use them only when you want to wait for the completion of the task(s) or when you want to wait for the result. You may need to wait for the completion of the task(s) under these scenarios:

  • You need the result of the execution.
  • You want to run two or more concurrent tasks and must wait for their results to do some aggregation or merging. The code for handling such a scenerio could look something like the following:

     WorkerThread wt1 = pool.borrowObject();
    WorkerThread wt2 = pool.borrowObject();

    wt1.execute("com.findonnet.services.pooling.test.SampleWork","executeTask", null, null, null);
    wt2.execute("com.findonnet.services.pooling.test.SampleWork", "executeTask", null, null, null);

    while (!wt1.isDone() || !wt2.isDone()) {
    Thread.sleep(100);
    }
    Object result1 = wt1.getResult();
    Object result2 = wt2.getResult();
    pool.returnObject(wt1); pool.returnObject(wt2);


    If you look carefully, you notice we are not using any wait() methods inside the loop, since we are not passing any object to lock upon. We are running a while loop to see if both the threads are done. If they are not done, we sleep for about 100 milliseconds and the loop continues.



Note on Step 4

Step 4 executes the method executeTask(), which is defined in the class com.findonnet.services.pooling.test.SampleWork, by passing in null as the parameter and null as the parameter types. The method that matches the null arguments is theexecuteTask() method, which takes no arguments.

If we wanted to execute the method with the signature executeTask(String strArg, int intArg), the code in Step 4 would look something like this:

 Object[] params = new Object[] { "Hello", new Integer(1)}; Class[] parmTypes = new Class[] { String.class, int.class }; rt1.execute("com.findonnet.services.pooling.test.SampleWork", "executeTask", params, parmTypes, synObj); Object result = rt1.getResult(); 


Note: Here we are giving the client complete control over the threads. The client should ensure the threads return to the pool. There are some pros and cons in going this route; I leave you the task to figure out what those pros and cons are.

That's all there is to it. We just used our thread pool with great success. As you have seen, implementing a custom thread pool by leveraging the Commons Pool framework is easy. You could easily extend the WorkerThread class to implement a multithreaded socket pool by attaching a socket object to WorkerThread. Similarly, you could completely replace the WorkerThreadobject with a socket object and pool the sockets by implementing the necessary functionality.

A note of caution

Though the implementation seems easy, thread pools sometimes do have some side effects depending on how you use them. Issues such as deadlocks and race conditions can crop up when an application's thread use is either not designed well or poorly written. Configuration issues could also play a major role and could manifest as high severity bugs under heavy loads. To illustrate such a scenario, let's add a new method in the ThreadPool class:

 public synchronized WorkerThread[] borrowObjects(int num) {
      WorkerThread[] rtArr = new WorkerThread[num];
      for (int i = 0; i < num; i++) {
         WorkerThread rt; 
         try {
            rt = (WorkerThread) borrowObject();
            rtArr[i] = rt;
         } catch (Exception e) {
            log.error(" borrowObjects failed.. ", e);
         }
      }
      return rtArr;
   }

At a quick glance, this method appears alright. Now if we invoke it like so:

 WorkerThread[] rtArr = pool.borrowObjects(10); //Number greater than maxActive 

Clearly, the borrowObjects() method is a blocking call since it calls the borrowObject() method. If the maxActive value is set to a lesser value than the passed value 10, the code just sits and waits for the pool to provide the required threads. However, if the strategy for an exhausted pool is configured to grow, the problem will resolve itself. In most cases, the programmer is responsible for handling these issues and documenting their proper usage.

Conclusion

In this article, I touched upon the Commons Pool framework, its design, and how it can be effectively used to create custom resource-pooling solutions for applications. Though the implementation focuses on pooling thread resources, you can easily adapt and apply the concepts to other resources like sockets, database connections, and even plain objects, which are expensive to instantiate and load.

J2SE 5 includes a robust thread pool implementation with queuing facilities and so forth (see Resources). However, you may encounter some situations, like the one illustrated in this article, that require a custom implementation for more control over how the pooled threads are managed and reused



-----------------------------------------------------
Silence, the way to avoid many problems;
Smile, the way to solve many problems;

posted on 2013-04-18 17:03 Chan Chen 阅读(305) 评论(0)  编辑  收藏 所属分类: Scala / Java


只有注册用户登录后才能发表评论。


网站导航: