keep moving!

We must not cease from exploration. And the end of all our exploring will be to arrive where we began and to know the place for the first time.
随笔 - 37, 文章 - 2, 评论 - 3, 引用 - 0
数据加载中……

多线程 — Concurrent包简介

多线程 — Concurrent包简介

Util.concurrent工具包概述
Doug Lea
State University of New York at Oswego
dl@cs.oswego.edu

http://gee.cs.oswego.edu

翻译:

Cocia Lin(cocia@163.com)

Huihoo.org

原文

http://gee.cs.oswego.edu/dl/cpjslides/util.pdf

要点
–目标和结构

–主要的接口和实现

Sync:获得/释放(acquire/release) 协议

Channel:放置/取走(put/take) 协议

Executor:执行Runnable任务

–每一个部分都有一些关联的接口和支持类

–简单的涉及其他的类和特性

目标
–一些简单的接口

-但是覆盖大部分程序员需要小心处理代码的问题

– 高质量实现

-正确的,保守的,有效率的,可移植的

–可能作为将来标准的基础

-获取经验和收集反馈信息

Sync
– acquire/release协议的主要接口

-用来定制锁,资源管理,其他的同步用途

- 高层抽象接口

- 没有区分不同的加锁用法

–实现

-Mutex, ReentrantLock, Latch, CountDown,Semaphore, WaiterPreferenceSemaphore,FIFOSemaphore, PrioritySemaphore

n 还有,有几个简单的实现,例如ObservableSync, LayeredSync

独占锁
try {

lock.acquire();

try {

action();

}

finally {

lock.release();

}

}

catch (InterruptedException ie) { ... }

– Java同步块不适用的时候使用它

- 超时,回退(back-off)

- 确保可中断

- 大量迅速锁定

- 创建Posix风格应用(condvar)

独占例子
class ParticleUsingMutex {

int x; int y;

final Random rng = new Random();

final Mutex mutex = new Mutex();

public void move() {

try {

mutex.acquire();

try { x += rng.nextInt(2)-1; y += rng.nextInt(2)-1; }

finally { mutex.release(); }

}

catch (InterruptedException ie) {

Thread.currentThread().interrupt(); }

}

public void draw(Graphics g) {

int lx, ly;

try {

mutex.acquire();

try { lx = x; ly = y; }

finally { mutex.release(); }

}

catch (InterruptedException ie) {

Thread.currentThread().interrupt(); return; }

g.drawRect(lx, ly, 10, 10);

}

}

回退(Backoff)例子
class CellUsingBackoff {

private long val;

private final Mutex mutex = new Mutex();

void swapVal(CellUsingBackoff other)

throws InterruptedException {

if (this == other) return; // alias check

for (;;) {

mutex.acquire();

try {

I f (other.mutex.attempt(0)) {

try {

long t = val;

val = other.val;

other.val = t;

return;

}

finally { other.mutex.release(); }

}

}

finally { mutex.release(); };

Thread.sleep(100); // heuristic retry interval

}

}

}

读写锁
interface ReadWriteLock {

Sync readLock();

Sync writeLock();

}

– 管理一对锁

- 和普通的锁一样的使用习惯

– 对集合类很有用

-半自动的方式实现SyncSet, SyncMap, …

– 实现者使用不同的锁策略

- WriterPreference, ReentrantWriterPreference,

ReaderPreference, FIFO

ReadWriteLock例子
– 示范在读写锁中执行任何Runnable的包装类

class WithRWLock {

final ReadWriteLock rw;

public WithRWLock(ReadWriteLock l) { rw = l; }

public void performRead(Runnable readCommand)

throws InterruptedException {

rw.readLock().acquire();

try { readCommand.run(); }

finally { rw.readlock().release(); }

}

public void performWrite(…) // similar

}

闭锁(Latch)
– 闭锁是开始时设置为false,但一旦被设置为true,他将永远保持true状态

- 初始化标志

- 流结束定位

- 线程中断

- 事件出发指示器

– CountDown和他有点类似,不同的是,CountDown需要一定数量的触发设置,而不是一次

– 非常简单,但是广泛使用的类

- 替换容易犯错的开发代码

Latch Example 闭锁例子
class Worker implements Runnable {

Latch startSignal;

Worker(Latch l) { startSignal = l; }

public void run() {

startSignal.acquire();

// … doWork();

}

}

class Driver { // …

void main() {

Latch ss = new Latch();

for (int i = 0; i < N; ++i) // make threads

new Thread(new Worker(ss)).start();

doSomethingElse(); // don’t let run yet

ss.release(); // now let all threads proceed

}

}

信号(Semaphores)
-- 服务于数量有限的占有者

- 使用许可数量构造对象(通常是0)

- 如果需要一个许可才能获取,等待,然后取走一个许可

- 释放的时候将许可添加回来

-- 但是真正的许可并没有转移(But no actual permits change hands.)

- 信号量仅仅保留当前的计数值

-- 应用程序

- 锁:一个信号量可以被用作互斥体(mutex)

- 一个独立的等待缓存或者资源控制的操作

- 设计系统是想忽略底层的系统信号

-- (phores ‘remember’ past signals)记住已经消失的信号量

信号量例子
class Pool {

ArrayList items = new ArrayList();

HashSet busy = new HashSet();

final Semaphore available;

public Pool(int n) {

available = new Semaphore(n);

// … somehow initialize n items …;

}

public Object getItem() throws InterruptedException {

available.acquire();

return doGet();

}

public void returnItem(Object x) {

if (doReturn(x)) available.release();

}

synchronized Object doGet() {

Object x = items.remove(items.size()-1);

busy.add(x); // put in set to check returns

return x;

}

synchronized boolean doReturn(Object x) {

return busy.remove(x); // true if was present

}

}

屏障(Barrier)
– 多部分同步接口

- 每一部分都必须等待其他的分不撞倒屏障

– CyclicBarrier类

- CountDown的一个可以重新设置的版本

- 对于反复划分算法很有用(iterative partitioning algorithms)

– Rendezvous类

- 一个每部分都能够和其他部分交换信息的屏障

- 行为类似同时的在一个同步通道上put和take

- 对于资源交换协议很有用(resource-exchange protocols)

通道(Channel)
–为缓冲,队列等服务的主接口

– 具体实现

- LinkedQueue, BoundedLinkedQueue,BoundedBuffer, BoundedPriorityQueue,SynchronousChannel, Slot

通道属性
– 被定义为Puttable和Takable的子接口

- 允许安装生产者/消费者模式执行

– 支持可超时的操作offer和poll

- 当超时值是0时,可能会被阻塞

- 所有的方法能够抛出InterruptedException异常

– 没有接口需要size方法

- 但是一些实现定义了这个方法

- BoundedChannel有capacity方法

通道例子
class Service { // …

final Channel msgQ = new LinkedQueue();

public void serve() throws InterruptedException {

String status = doService();

msgQ.put(status);

}

public Service() { // start background thread

Runnable logger = new Runnable() {

public void run() {

try {

for(;;)

System.out.println(msqQ.take());

}

catch(InterruptedException ie) {} }

};

new Thread(logger).start();

}

}

运行器(Executor)
– 类似线程的类的主接口

- 线程池

- 轻量级运行框架

- 可以定制调度算法

– 只需要支持execute(Runnable r)

- 同Thread.start类似

– 实现

- PooledExecutor, ThreadedExecutor,QueuedExecutor, FJTaskRunnerGroup

- 相关的ThreadFactory类允许大多数的运行器通过定制属性使用线程

PooledExecutor
– 一个可调的工作者线程池,可修改得属性如下:

- 任务队列的类型

- 最大线程数

- 最小线程数

- 预热(预分配)和立即(分配)线程

- 保持活跃直到工作线程结束

– 以后如果需要可能被一个新的代替

- 饱和(Saturation)协议

– 阻塞,丢弃,生产者运行,等等

PooledExecutor例子
class WebService {

public static void main(String[] args) {

PooledExecutor pool =

new PooledExecutor(new BoundedBuffer(10), 20);

pool.createThreads(4);

try {

ServerSocket socket = new ServerSocket(9999);

for (;;) {

final Socket connection = socket.accept();

pool.execute(new Runnable() {

public void run() {

new Handler().process(connection);

}});

}

}

catch(Exception e) { } // die

}

}

class Handler { void process(Socket s); }

前景(Future)和可调用(Callable)
– Callabe是类似于Runnable的接口,用来作为参数和传递结果

interface Callable {

Object call(Object arg) throws Exception;

}

– FutureResult管理Callable的异步执行

class FutureResult { // …

// block caller until result is ready

public Object get()

throws InterruptedException, InvocationTargetException;

public void set(Object result); // unblocks get

// create Runnable that can be used with an Executor

public Runnable setter(Callable function);

}

FutureResult例子
class ImageRenderer { Image render(byte[] raw); }

class App { // …

Executor executor = …; // any executor

ImageRenderer renderer = new ImageRenderer();

public void display(byte[] rawimage) {

try {

FutureResult futureImage = new FutureResult();

Runnable cmd = futureImage.setter(new Callable(){

public Object call() {

return renderer.render(rawImage);

}});

executor.execute(cmd);

drawBorders(); // do other things while executing

drawCaption();

drawImage((Image)(futureImage.get())); // use future

}

catch (Exception ex) {

cleanup();

return;

}

}

}

其他的类
– CopyOnWriteArrayList

- 支持整个集合复制时每一个修改的无锁访问

- 适合大多数的多路广播应用程序

– 工具包还包括了一个java.beans多路广播类的COW版本

– SynchronizedDouble, SynchronizedInt,SynchronizedRef, etc

- 类似于java.lang.Double,提供可变操作的同步版本.例如,addTo,inc

- 添加了一些象swap,commit这样的实用操作

未来计划
– 并发数据构架

- 一组繁重线程连接环境下有用的工具集合

–支持侧重I/O的程序

- 事件机制的IO系统

– 小版本的实现

- 例如SingleSourceQueue

–小幅度的改善

- 使运行器更容易使用

– 替换

- JDK1.3 java.util.Timer 被ClockDaemon取代



张金鹏 2007-01-31 13:08 发表评论

posted on 2008-09-07 11:10 大石头 阅读(456) 评论(0)  编辑  收藏


只有注册用户登录后才能发表评论。


网站导航: