I want to fly higher
programming Explorer
posts - 112,comments - 263,trackbacks - 0

Java7并发编程实战手册

线程管理

  • Thread/Runnable/Thread.State
    • 线程的信息获取和设置
    • 线程的中断
    • sleep/yield
    • join
    • daemon
    • UncaughtExceptionHandler
      • Thread#setDefaultUncaughtExceptionHandler
    • ThreadLocal/InheritableThreadLocal
    • ThreadGroup
      • uncaughtException
    • ThreadFactory

线程同步基础

  • synchronized 同步方法 this
  • synchronized 属性对象 object
  • 同步代码块中使用条件
    • Object#wait/notify/notifyAll
    • 虚假唤醒(while)
  • Lock
    • ReentrantLock
      • try/finally
    • ReadWriteLock
    • 公平性 fair
    • Condition
      • while
      • lock/unlock之间

线程同步辅助类

  • Semaphore
    • 内部计数器
    • acquire/release
    • acquireUninterruptibly
      • 忽略线程中断且不会抛出任何异常
  • CountDownLatch
    • 内部计数器被初始化之后就不能被再次初始化,唯一能改值的是countDown
  • CylicBarrier
    • BrokenBarrierException
    • reset
  • Phaser
    • 在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步
    • Phaser(3) 指定参与阶段同步的线程数是3个
    • 被Phaser类置于休眠的线程不会响应中断事件
      • awaitAdvanceInterruptibly(int phaser),被中断会抛出InterruptedException
    • onAdavance,阶段改变时被自动执行
  • Exchanger
    • 只能同步两个线程
    • exchange调用后将休眠直到其他的线程到达

线程执行器

  • ThreadPoolExecutor/Executors
    • shutdown,当执行完所有待运行的任务后,它将结束执行,调用完毕立即返回,结合awaitTermination判断是否线程池是否关闭
    • awaitTermination(long timeout, TimeUnit unit)
    • 提供很多方法获取自身状态的信息
    • newCachedThreadPool/newFixedThreadPool
  • Future/Callbale
    • submit
    • call() throws Exception
  • ThreadPoolExecutor#invokeAny
    • 返回第一个完成任务且没有抛出异常的任务的执行结果
  • ThreadPoolExecutor#invokeAll
    • 等待所有任务的完成
    • landon:是否可以用这个方法做场景心跳

       while(!isDone)
       {
        - long startTime = getCurrent()
        - 场景调度器提交场景任务,等待所有场景任务执行完毕(invokeAll),这样亦可以保证潜在的顺序问题,因为每次都是将当前的tick执行完毕
         - 而非之前submit,如果一次tick执行超过50ms则下次循环又会向线程池提交任务,会出现同一个任务多个线程执行的潜在情况
        - long endTime = getCurrent()
        - long sleepTime = startTime + TickInterval - endTime
        - if(sleepTime > 0) sleep(sleepTime)
        - else println("busy,oneTick executeTime:" + sleepTime)
       }
    

  • ScheduledThreadPoolExecutor
    • schedule
    • scheduleAtFixedRate
    • scheduleWithFixDelay
  • Future
    • cancel
    • get
  • FutureTask
    • done,允许在执行器中的任务执行结束之后还可以执行一些代码
    • FutureTask implements RunnableFuture
    • FutureTask(Callable callable)
  • CompletionService
    • submit,提交任务
    • poll/take,获取任务已经完成的Future对象
    • 即任务完成后将Future对象放到一个完成的阻塞队列中
  • RejectedExecutionHandler

Fork/Join框架

  • 分治
    • fork-将一个任务拆分成更小的多个任务
    • join-等待子任务的完成执行
    • 工作窃取算法-work-stealing algorithm
  • ForkJoinPool

    • ForkJoinTask
      • RecursiveAction 任务无返回结果
      • RecursiveTask 任务有返回结果
      • 递归

if(problem size > default size)
{
    tasks = divide(task)
    execute(tasks)
 }
 else {resolve problem using another algorithm}

  • Task extends RecursiveAction // 无返回结果
    • compute

      if(...) // divide
      {
       Task t1 = new Task(...)
       Task t2 = new Task(...)
       invokeAll(t1,t2) // 同步调用,执行创建的多个子任务
      }
      else {...}
    

    • ForkJoinPool#execute(task) --默认创建一个线程数等于计算机cpu数目的线程池
  • 合并任务的结果

    if(problem size > default size)
      tasks = divide(task)
      execute(tasks)
      groupResults
      return result
     else
      resolve problem
      return result
    

    • ForkJoinTask#get 等待返回任务计算结果
  • 异步运行任务
    • 同步方式如invokeAll,任务被挂起,直到任务被发送到fork/join线程池中执行。该方式允许ForkJoinPool采用工作窃取算法
    • 异步方式如fork时(立即返回),无法使用该算法
    • ForkJoinTask#V join()
    • get和join有区别
  • 任务中抛出异常
    • ForkJoinTask#isCompletedAbnormally 检查主任务或者子任务是否抛出了异常
    • getException 获取异常信息
    • 任务抛出运行时异常,会影响其父任务...父任务..
  • 取消任务
    • 在任务开始前可以取消
    • 例:在数字数组中寻找一组数字,拆分为更小的问题,但仅关心数字的一次出现。当我们找到他时,就会取消其他子任务
      • 可存储发送到线程池中的所有任务,当发现当前任务找到数字后,取消非当前任务的所有任务
      • 如果任务正在运行或者已经执行结束,则不能取消,cancel返回false。因此可以尝试去取消所有的任务而不用担心可能带来的间接影响

并发集合

  • ConcurrentLinkedDeque 非阻塞
    • getFirst.../peekFirst.../removeFirst.../pollFirst... - 双端队列
  • LinkedBlockingDeque 阻塞
    • put/take/poll... 双端队列
  • PriorityBlockingQueue
    • 队列的元素必须实现Comparable接口
    • 按照排序结果决定插入元素的位置
  • DelayQueue
    • 元素必须实现Delayed接口
    • public interface Delayed extends Comparable
    • 两个待实现方法
      • compareTo(Delayed o)
      • getDelay(Timeunit unit)
    • 从队列取元素时,到期的元素会返回(未来的元素等待到期_延迟时间)
    • landon:是否可以用于游戏服务器中的计时器如果有多个timer按照到期时间排队_
  • ConcurrentSkipListMap
    • 根据键值排序所有元素
    • 内部机制-Skip List
    • # firstEntry/lastEntry/subMap/...
    • landon:可以和TreeMap做比较,一个线程不安全,一个线程安全
  • ThreadLocalRandom
    • # current,该方法是一个静态方法,如果调用线程还未关联随机数对象,就会初始化一个(localInit)
  • Atomic Variable
    • compareAndSet,这是是最主要的方法
      • 判断内存变量的值是否是expect,如果是说明未被其他线程改过,可以直接用update新值更新
      • 否则说明被其他线程改过,进而可以选择下一步处理方式
    • AtomicReference#public final boolean compareAndSet(V expect, V update)
    • CAS
    • AtomicIntegerArray -原子数组
  • LinkedTransferQueue
  • AtomicReference--实现单例

 public class Singleton {   
    private static final AtomicReference<Singleton> INSTANCE = new AtomicReference<Singleton>();  
    private Singleton (){}   
    public static  Singleton getInstance() {            
    for (;;) {            
        Singleton current = INSTANCE.get();                 
        if (current != null) {                
        return current;            
        }            
        current = new Singleton();            
        if (INSTANCE.compareAndSet(null, current)) {                
        return current;           
        }       
    }    
    }
}

定制并发类

  • 定制ThreadPoolExecutor
    • 继承该类
    • 覆写_记得调用super
    • shutdown
    • shutdownNow
      • 输出如等待执行的任务数目 getQueue().size...
      • getCompletedTaskCount,获得已执行过的任务数
      • getActiveCount,获得正在执行的任务数
    • beforeExecute
    • afterExecute
  • 实现基于优先级的Executor类
    • ThreadPoolExector参数传入PriorityBlockingQueue
    • 如果是fixed两个线程,那么前2个任务是被2个线程执行的;后面的排队任务按照优先级顺序执行
  • 实现ThreadFactory接口生成定制线程
    • 覆写newThread方法
    • 返回的Thread可以是定制的Thread对象(MyThread extends Thread)
    • 可外部直接调用Threadfactory#newThread返回线程
  • 在Executor对象中使用ThreadFactory
    • 线程池参数中指定线程工厂
    • Executors内部有一个DefaultThreadFactory
      • Executors$DefaultThreadFactory
  • 定制运行在定时线程池中的任务
    • 继承ScheduledThreadPoolExecutor
    • 覆写protected RunnableScheduledFuture decorateTask(
      • Runnable runnable, RunnableScheduledFuture task)
    • 可自定义一个调度类extends FutureTask implements RunnableScheduledFuture
      • 参考ScheduledThreadPoolExecutor$ScheduledFutureTask
  • 通过实现ThreadFactory接口为Fork/Join框架生成定制线程
    • ForkJoinPool内部实现
      • 一个任务队列,存放等待被执行的任务
      • 一个执行这些任务的线程池
      • ForkJoinWorkerThread持有一个ForkJoinPool.WorkQueue workQueue
        • work-stealing mechanics
    • MyWorkerThread extends ForkJoinWorkerThread
      • onStart
      • onTermination
      • 调用super
    • MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory
      • newThread
  • 定制运行在Fork/Join框架中的任务
    • MyWorkTask extends ForkJoinTask(Void)
      • getRawResult
      • exec
        • 调用compute抽象方法
  • 实现定制Lock类
    • ReentrantLock内部有一个很重要的类Sync(AQS)
    • abstract static class Sync extends AbstractQueuedSynchronizer
      • static final class FairSync extends Sync
      • static final class NonfairSync extends Sync
    • 自定义实现一个MyAbstractQueuedSynchronizer extends AbstractQueuedSynchronizer
      • tryActuire
      • tryRelease
    • 自定义实现MyLock implements Lock
      • lock
      • unlock
      • tryLock
      • newCondition
  • 实现基于优先级的传输队列
    • MyPriorityTransferQueue extends PriorityBlockingQueue implements TransferQueue
      • tryTransfer
      • transfer
      • hasWaitingConsumer
      • getWaitingConsumerCount
      • take
  • 实现自己的原子对象
    • MyCounter extends AtomicInteger

   for(;;)
     {
      int value = get();
      if(value == 10) return false;
      else
      {
       int newValue = value + 1;
       boolean changed = compareAndSet(value,newValue);
       if(changed) return true;
      }
     }

测试并发应用程序

  • 监控Lock接口
    • ReentrantLock内部方法都是protected的,所以可以继承
    • MyLock extends ReentrantLock
      • 调用Thread getOwner()
      • 调用Collection getQueuedThreads()
      • ...
  • 监控Phaser类
    • getPhase
    • getRegisteredParties
    • getArrivedParties
    • ...
  • 监控执行器框架
    • ThreadPoolExecutor
      • getPoolSize
      • getCorePoolSize
      • getActiveCount
      • getTaskCount
      • ...
  • 监控Fork/Join池
    • getPoolSize
    • getParallelism
    • getActiveThreadCount
    • getStealCount
    • ...
  • 输出高效的日志信息
    • 输出必要的信息
    • 为消息设定恰当的级别
  • 使用FindBugs分析并发代码
  • 配置Eclipse调试并发代码
    • 可选择Default suspend policy for new breakpoints的值为Suspend VM
    • 默认为Suspend Thread
  • 配置NetBeans调试并发代码
  • 使用MultithreadedTC测试并发代码
    • MultithreadedTestCase
    • waitForTick
posted on 2017-03-03 21:00 landon 阅读(1503) 评论(0)  编辑  收藏 所属分类: Book

只有注册用户登录后才能发表评论。


网站导航: