Posted on 2011-03-23 20:41 
snake 阅读(172) 
评论(0)  编辑  收藏  
			 
			
		 
		
              
                第二步,是实现一个线程池
因为之前看书的时候留了个心眼,看线程池相关的内容的时候特意没去研究JDK的实现
因为学跟做不是一码事,写一个线程池,算是给自己看完并发实践这书的一个练习吧
废话不多说,练习开始
首先,整理一下要实现的功能,或者说要注意的元素
1.实现Executor接口
2.实现一个等待队列(可否配置,优先级等)
3.是否需要预启动线程(可否配置)
4.执行开始前,结束后,需要留接口
5.Runable在任务中的存放形式
6.线程的启动,唤醒
7.线程池的关闭(优雅地结束),需要线程提供中断,线程池提供给使用者的提示,线程池返回取消线程等
8.线程队列(空闲队列?)
9.取空闲线程算法(任务入队时?线程执行一个任务结束时?)
10.将所有需要同步的地方尽量使用非阻塞算法(通过侦察,更新一个原子变量实现)
11.减少线程切换开销(轮询是否有任务,n微秒后再进入等待)
暂时就考虑到这些,剩下的以后再补
总的来说,计划写n个版本(毕竟是第二次动手,写一个庞大的需要细致考虑的东西功力还差远呢,只能从最简单的,最方便的实现开始,然后慢慢加强)
测试先行:
	public static void main(String[] args) throws InterruptedException
	{
		testEasyRunnableThreadPool(new ThreadPoolTest1(10), 10000, 10);
		testEasyRunnableThreadPool(Executors.newFixedThreadPool(10), 10000, 10);
	}
	/**
	 * 一个产生随机数的方法,防止jvm优化
	 * @param seed
	 * @return
	 */
	static int getRandomNum(int seed)
	{
		seed ^= (seed << 6);
		seed ^= (seed >>> 21);
		seed ^= (seed << 7);
		return seed;
	}
	
	/**
	 * 执行一个简单的计算,只占用cpu,没有io和其他阻塞的方法
	 * @param pool
	 * @param tryTime
	 * @param threadNum
	 * @throws InterruptedException
	 */
	static void testEasyRunnableThreadPool(Executor pool,int tryTime,int threadNum) throws InterruptedException
	{
		//construct runnable
		Runnable command = new Runnable() {			
			public void run() {
				final int addTime = 1000000;
				long sum = 0;
				int temp = this.hashCode() ^ (int)System.currentTimeMillis();
				for(int i = 0;i<addTime;i++)
				{
					sum += (temp = getRandomNum(temp));
				}				
			}
		};		
		testThreadPool(tryTime, pool, command);
	}
	
	/**
	 * 
	 * @param tryNum 
	 * @param pool
	 * @param commandList
	 * @throws InterruptedException
	 */
	static void testThreadPool(int tryNum,Executor pool,final Runnable command) throws InterruptedException
	{
		final CountDownLatch latch = new CountDownLatch(tryNum);
		Runnable wrapper = new Runnable() {			
			public void run() {
				command.run();
				//想测试并发,在并发中加入适当的同步操作是无法避免的,只能减少
				//,在这,只是做了一个简单的countdown,影响不大
				latch.countDown();
			}
		};
		long startTime = System.nanoTime();
		for(int i = 0;i<tryNum;i++)
		{
			pool.execute(wrapper);
		}		
		latch.await();
		long endTime = System.nanoTime();
		System.out.println(endTime-startTime);
	}
线程池代码:
第一版本的目标很简单,只要能跑,没死锁,就是完胜
可惜结果很让人绝望~
写完了,调了近3个小时,仍然没发现问题,最后加了一堆输出,又加了多个锁,终于勉勉强强跑起来了……
并发的调试真难,debug完全没用,看输出又看不出什么来,只能是一遍一遍地检查代码,写下一个版本前先找点资料,研究下调试方法吧
后来发现错误是一个简单的i++……
public class ThreadPoolTest1 implements Executor {
	//等待队列
	Queue<Runnable> waitingQueue = null;
	
	ConcurrentLinkedQueue<ThreadNode> freeThread;
	//相当于一个freeThread的状态,根据状态决定行为,原则上将freeThread.size()+busyThreadsNum=MAXTHREADNUM
	private AtomicInteger busyThreadsNum = new AtomicInteger(0);
	//最大线程数
	final int MAXTHREADNUM;
	
	public ThreadPoolTest1 (int threadNum)
	{
		this.MAXTHREADNUM = threadNum;
		init(MAXTHREADNUM,new ConcurrentLinkedQueue<Runnable>());
	}	
	private void init(int threadNum,ConcurrentLinkedQueue<Runnable> queue)
	{
		freeThread = new ConcurrentLinkedQueue<ThreadNode>();
		waitingQueue = queue;
		//初始化空线程,一开始不是这样实现的,后来发现一堆问题,暂时只能先加锁,无论怎么样,跑起来再说把
		synchronized(this)
		{
			for(int i = 0;i<threadNum;i++)
			{
				ThreadNode node = new ThreadNode();
				busyThreadsNum.incrementAndGet();				
				node.start();
			}
		}
	}
	
	private synchronized void threadExecute(Runnable command)
	{
		//用了个挺弱智的非阻塞算法
		for(;;)
		{
			//得到开始的值
			int expect = busyThreadsNum.get();
			if(expect == MAXTHREADNUM)
			{
				waitingQueue.add(command);
				return;
			}
			else
			{
				//比较并设置,如果失败,重来
				if(busyThreadsNum.compareAndSet(expect, ++expect))//之前写的是expect++,检查了n久,硬是看不出啥问题,只能怪自己天资愚鲁吧
				{
					ThreadNode t = freeThread.remove();
					t.setCommand(command);
					synchronized(t)
					{t.notify();}
					return;
				}
				else
					continue;
			}
		}
	}
	
	private class ThreadNode extends Thread
	{
		Runnable command = null;
		Exception e = null;
		
		Exception getException()
		{
			return e;
		}
		
		void setCommand(Runnable c)
		{
			command = c;
		}
		
		
		@Override
		public void run() {
			try {
				for(;;)
				{
					if(command == null)
					{					
						ThreadPoolTest1.this.waitThread(this);					
					}
					command.run();				
					command = ThreadPoolTest1.this.getCommand();			
				}
			}catch (InterruptedException e) {
			}
		}
	}
	
	Runnable getCommand() throws InterruptedException
	{
		return waitingQueue.poll();
	}
	
	void waitThread(Thread t) throws InterruptedException
	{
		synchronized(this)
		{		
			freeThread.add((ThreadNode) t);
			busyThreadsNum.decrementAndGet();
		}
		synchronized(t)
		{
			t.wait();					
		}
	}
	
	protected void beforeExecute()
	{
		
	}
	public void execute(Runnable command) {
		beforeExecute();		
		threadExecute(command);
		afterExecute();
	}
	
	protected void afterExecute()
	{
		
	}
}
第一版本就写成这样了,之后再慢慢加强了,毕竟学java第一个玩意儿也是helloworld,原谅自己了
              
              
              
              
                已有 0 人发表留言,猛击->>这里<<-参与讨论
              
              
ItEye推荐