[本文是我对Java Concurrency In Practice 7.2的归纳和总结. 转载请注明作者和出处, 如有谬误, 欢迎在评论中指正. ]
以ExecutorService为例, 该类内部封装有多个线程, 类外部无法直接停止这些线程. 相反, 外部调用Service的shutDown和shutDownNow方法关闭Service, 而Service负责停止其拥有的线程.
大多数server应用会使用到log, 下例中的LogWriter是一个使用生产者消费者模式构建的log service, 需要打印log的线程将待打印的内容加入到阻塞队列中, 而logger线程则不断的从阻塞队列中取出数据输出:
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
public LogWriter(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start() {
logger.start();
}
/**
* 需要打印数据的线程调用该方法, 将待打印数据加入阻塞队列
*/
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
/**
* 负责从阻塞队列中取出数据输出的线程
*/
private class LoggerThread extends Thread {
private final PrintWriter writer;
// ...
public void run() {
try {
while (true)
writer.println(queue.take());
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
}
LogWriter内部封装有LoggerThread线程, 所以LogWriter是一个基于线程构建的Service. 根据ExecutorService的经验, 我们需要在LogWriter中提供停止LoggerThread线程的方法. 看起来这并不是很难, 我们只需在LogWriter中添加shutDown方法:
/**
* 该方法用于停止LoggerThread线程
*/
public void shutDown() {
logger.interrupt();
}
当LogWriter.shutDown方法被调用时, LoggerThread线程的中断标记被设置为true, 之后LoggerThread线程执行queue.take()方法时会抛出InterruptedException异常, 从而使得LoggerThread线程结束.
然而这样的shutDown方法并不是很恰当:
1. 丢弃了队列中尚未来得及输出的数据.
2. 更严重的是, 假如线程A对LogWriter.log方法的调用因为队列已满而阻塞, 此时停止LoggerThread线程将导致线程A永远阻塞在queue.put方法上.
对上例的改进:
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
/**
* 表示是否关闭Service
*/
private boolean isShutdown;
/**
* 队列中待处理数据的数量
*/
private int reservations;
public void start() {
loggerThread.start();
}
public void shutDown() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
// service已关闭后调用log方法直接抛出异常
if (isShutdown)
throw new IllegalStateException("Service has been shut down");
++reservations;
}
// BlockingQueue本身就是线程安全的, put方法的调用不在同步代码块中
// 我们只需要保证isShutdown和reservations是线程安全的即可
queue.put(msg);
}
private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
synchronized (this) {
// 当service已关闭且处理完队列中的所有数据时才跳出while循环
if (isShutdown && reservations == 0)
break;
}
String msg = queue.take();
synchronized (this) {
--reservations;
}
writer.println(msg);
} catch (InterruptedException e) {
// 发生InterruptedException异常时不应该立刻跳出while循环
// 而应该继续输出log, 直到处理完队列中的所有数据
}
}
} finally {
writer.close();
}
}
}
}
上面的处理显得过于复杂, 利用ExecutorService可以编写出相对更简洁的程序:
public class LogService {
/**
* 创建只包含单个线程的线程池, 提交给该线程池的任务将以串行的方式逐个运行
* 本例中, 此线程用于执行打印log的任务
*/
private final ExecutorService exec = Executors.newSingleThreadExecutor();
private final PrintWriter writer;
public void start() {
}
public void shutdown() throws InterruptedException {
try {
// 关闭ExecutorService后再调用其awaitTermination将导致当前线程阻塞, 直到所有已提交的任务执行完毕, 或者发生超时
exec.shutdown();
exec.awaitTermination(TIMEOUT, UNIT);
} finally {
writer.close();
}
}
public void log(String msg) {
try {
// 线程池关闭后再调用其execute方法将抛出RejectedExecutionException异常
exec.execute(new WriteTask(msg));
} catch (RejectedExecutionException ignored) {
}
}
private final class WriteTask implements Runnable {
private String msg;
public WriteTask(String msg) {
this.msg = msg;
}
@Override
public void run() {
writer.println(msg);
}
}
}