我的家园

我的家园

处理不可中断的阻塞-JCIP7.1读书笔记

Posted on 2012-04-15 16:26 zljpp 阅读(194) 评论(0)  编辑  收藏

[本文是我对Java Concurrency In Practice 7.1的归纳和总结.  转载请注明作者和出处,  如有谬误, 欢迎在评论中指正. ]

并不是所有的阻塞都是可中断的, 比如InputStream.read方法. 在检测到输入数据可用, 到达流末尾或者抛出异常前, 该方法一直阻塞. 而且阻塞的时候不会检查中断标记, 所以中断线程无法使read从阻塞状态返回. 但是关闭流可以使得read方法抛出异常, 从而从阻塞状态返回. 

public class ReaderThread extends Thread {
	private static final int BUFSZ = 1024;
	private final Socket socket;
	private final InputStream in;

	public ReaderThread(Socket socket) throws IOException {
		this.socket = socket;
		this.in = socket.getInputStream();
	}

	// 覆盖Thread类的interrupt方法, 加入关闭socket的代码
	// 如果发生中断时, 线程阻塞在read方法上, socket的关闭会导致read方法抛出SocketException, 然后run方法运行完毕
	public void interrupt() {
		try {
			socket.close();
		} catch (IOException ignored) {
		} finally {
			super.interrupt();
		}
	}

	public void run() {
		try {
			byte[] buf = new byte[BUFSZ];
			while (true) {
				int count = in.read(buf);
				if (count < 0)
					break;
				else if (count > 0)
					processBuffer(buf, count);
			}
		} catch (IOException e) { /*  Allow thread to exit  */
		}
	}

	private void processBuffer(byte[] buf, int count) {
		//...
	}
}

如果task并非在自己创建的线程里运行, 而是提交给线程池运行的话, 就无法使用上例的方式处理不可中断阻塞了. 之前有过分析, 对于提交给线程池执行的task, 应该通过Future.cancel方法提前终止task的运行, 所以可以考虑重写Future.cancel方法, 在其中加入关闭socket的操作. Future对象是由submit方法返回的, 其源代码如下:

public <T> Future<T> submit(Callable<T> task) {
	if (task == null) 
		throw new NullPointerException();
	RunnableFuture<T> ftask = newTaskFor(task);
	execute(ftask);
	return ftask;
} 

可知submit方法返回的Future对象是调用newTaskFor方法获得的:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
	return new FutureTask<T>(callable);
} 

newTaskFor方法被声明为protected, 所以我们可以通过继承覆盖该方法, 返回自定义的Future对象.

首先将需要覆盖的2个方法定义在接口中:

public interface CancellableTask<T> extends Callable<T> { 
	void cancel(); 
	RunnableFuture<T> newTask(); 
}  

然后让task类实现CancellableTask接口:

public abstract class SocketUsingTask<T> implements CancellableTask<T> {
	private Socket socket;

	protected synchronized void setSocket(Socket s) {
		socket = s;
	}

	public synchronized void cancel() {
		try {
			if (socket != null)
				socket.close();
		} catch (IOException ignored) {
		}
	}

	public RunnableFuture<T> newTask() {
		return new FutureTask<T>(this) {
			// 定义FutureTask的匿名内部类, 并覆盖cancel方法, 向其中加入关闭socket的操作
			public boolean cancel(boolean mayInterruptIfRunning) {
				try {
					SocketUsingTask.this.cancel();
				} finally {
					return super.cancel(mayInterruptIfRunning);
				}
			}
		};
	}
}

接着继承ThreadPoolExecutor类并覆盖newTaskFor方法, 让该方法返回自定义的FutureTask对象:

public class CancellingExecutor extends ThreadPoolExecutor {
	// ...
	protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
		// 如果callable是CancellableTask对象, 那么就返回自定义的FutureTask(通过调用其newTaskFor方法实现)
		if (callable instanceof CancellableTask)
			return ((CancellableTask<T>) callable).newTask();
		else
			return super.newTaskFor(callable);
	}
}

测试代码:

public static void main(String[] args) {
	CancellingExecutor executor = new CancellingExecutor();
	SocketUsingTask task = new SocketUsingTask();
	task.setSocket(new Socket("www.baidu.com", 80));
	Future<V> future = executor.submit(task);
	future.cancel(true);
}





只有注册用户登录后才能发表评论。


网站导航: