I want to fly higher
programming Explorer
posts - 114,comments - 263,trackbacks - 0
1.CountDownLatch demo
package com.landon.mavs.example.concurrent;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 
 * CountdownLatch用法
 * 
 * <pre>
 *     1.同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
 *     2.给定的计数 初始化 CountDownLatch.计数器到达零之前,所以调用await的线程会一直阻塞.之后,会释放所有等待的线程,执行await的后续调用.
 *  3.计数无法被重置.如需重置计数,可考虑{
@link java.util.concurrent.CyclicBarrier}
 *  4.计数 1初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:
 *      在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待;
 *      用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待
 * </pre>
 * 
 * <pre>
 *     1.CountDownLatch(int count) 构造一个用给定计数初始化的 CountDownLatch
 *  2.await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断
 *  3.await(long timeout, TimeUnit unit) 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间
 *  4.countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程
 *  5.getCount() 返回当前计数
 *  6.toString() 返回标识此锁存器及其状态的字符串
 * </pre>
 * 
 * 
@author landon
 * 
 
*/

public class CountdownLatchExample {
    
public static void main(String[] args) throws Exception {
        
// 示例1:
        
// Master启动多个worker线程处理任务.所有的worker线程在执行任务前需等待Master初始化,Master线程初始化完毕,则startSignal.countdown,表示开始
        
// 工作线程被唤醒;然后Master阻塞,等待所有的worker线程执行完毕任务;每个worker线程执行完毕任务,则countdown一下,直至执行所有的任务完成;Master被唤醒,执行收尾工作.

        
// 示例2:
        
// 将一个问题分成 N 个部分,用执行每个部分并让锁存器倒计数的 Runnable 来描述每个部分,然后将所有 Runnable 加入到
        
// Executor 队列。
        
// 当所有的子部分完成后,协调线程就能够通过 await

        Master master 
= new Master(3);
        master.start();

        
// 任务分为5部分,交个线程池去执行任务.
        CountDownLatch doneSignal = new CountDownLatch(5);
        
// 启动一个线程池去执行任务.这里是一个单线程(这里不关心有多少个线程去执行任务,这里只关心任务完成后计数递减,使得主线程可以继续执行)
        ExecutorService executor = Executors.newSingleThreadExecutor();

        
// 向线程池提交5个任务
        for (int i = 0; i < 5; i++{
            executor.execute(
new WorkerTask(doneSignal, i));
        }


        
// 主线程等待任务完成
        doneSignal.await();

        
// 此时toString:[Count = 0]
        System.out.println("主线程:问题全部解决.继续:" + doneSignal.toString());
    }


    
private static class Master {
        
private CountDownLatch startSignal;
        
private CountDownLatch endSignal;

        
public Master(int workerNum) {
            startSignal 
= new CountDownLatch(1);
            endSignal 
= new CountDownLatch(workerNum);

            
// 启动所有worker线程
            for (int i = 0; i < workerNum; i++{
                
new Thread(new Worker(startSignal, endSignal)).start();
            }

        }


        
private void init() {
            System.out.println(
"Master 初始化环境");
        }


        
public void start() {
            
try {
                init();

                
// 初始化完毕,则唤醒工作线程执行任务.
                startSignal.countDown();

                
// 等待所有worker线程完成任务
                endSignal.await();

                dispose();

            }
 catch (Exception e) {
            }

        }

        
private void dispose() {
            System.out.println(
"Master 执行收尾操作");
        }

    }


    
private static class Worker implements Runnable {
        
private CountDownLatch startSignal;
        
private CountDownLatch endSignal;

        
public Worker(CountDownLatch startSignal, CountDownLatch endSignal) {
            
this.startSignal = startSignal;
            
this.endSignal = endSignal;
        }


        @Override
        
public void run() {
            
try {
                
// 等待Master线程初始化完毕
                startSignal.await();

                System.out.println(
"worker 执行任务");

                
// 表示任务完成,计数递减,计数为0时,表示所有的任务完成
                endSignal.countDown();

                System.out.println(
"endSignal.counter:" + endSignal.getCount());
            }
 catch (Exception e) {
            }

        }

    }


    
private static class WorkerTask implements Runnable {
        
// 所有任务完成信号
        private CountDownLatch doneSignal;
        
// 表示任务序号
        private int i;

        
public WorkerTask(CountDownLatch doneSignal, int i) {
            
this.doneSignal = doneSignal;
            
this.i = i;
        }


        @Override
        
public void run() {
            
try {
                System.out.println(
"Worker[" + i + "]" + " 任务完成");
                doneSignal.countDown();
            }
 catch (Exception e) {
            }

        }

    }

}



2.CylicBarrier demo


package com.landon.mavs.example.concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 
 * CyclicBarrier用法
 * 
 * <pre>
 * 1.同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)
 * 2.粗浅的理解即有一道屏障,目的是等待一组线程完成操作.某一线程完成操作后,则等待在屏障下(await).直至所有线程均到了屏障下,
 *   则可执行指定的屏障操作.待执行完执行的屏障操作后,所有的线程则结束await,即越过屏障,继续执行后续操作.
 * 3.该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier
 * </pre>
 * 
 * 
@author landon
 * 
 
*/

public class CyclicBarrierExample {
    
public static void main(String[] args) throws Exception {
        Master master 
= new Master();
        master.start();
    }

}


// 计算1²到10²和.分发到每个worker线程,最后合并
class Master {
    
// 用来保存计算结果
    private static List<Integer> result = new ArrayList<>();

    
// public CyclicBarrier(int parties, Runnable barrierAction)
    
// 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作
    
// 该操作由最后一个进入 barrier 的线程执行
    private CyclicBarrier barrier = new CyclicBarrier(10new Runnable() {

        @Override
        
public void run() {
            
int sum = 0;
            
for (int tmp : result) {
                sum 
+= tmp;
            }


            
// final result:285_Worker-9
            
// 从输出看Worker-9执行了屏障操作.而Worker-9在线程的索引为0.即await的返回值.
            
// 执行完该操作后,所有的线程越过屏障,执行后续操作.
            System.out.println("final result:" + sum + "_"
                    
+ Thread.currentThread().getName());
        }

    }
);

    
public void start() throws Exception {
        
for (int i = 0; i < 10; i++{
            Thread thread 
= new Thread(new Worker(i, barrier), "Worker-" + i);
            thread.start();
        }

    }

    
public static synchronized void addASum(int sum) {
        result.add(sum);
    }

}


class Worker implements Runnable {
    
private int i;
    
private CyclicBarrier barrier;

    
public Worker(int i, CyclicBarrier barrier) {
        
this.i = i;
        
this.barrier = barrier;
    }


    @Override
    
public void run() {
        
int sum = i * i;
        Master.addASum(sum);

        
try {

            
// 模拟一下耗时
            Thread.sleep(i * 100);

            
// public int getNumberWaiting()
            
// 返回当前在屏障处等待的参与者数目
            System.out.println(Thread.currentThread().getName()
                    
+ "_curNumberWaitting:" + barrier.getNumberWaiting());

            
// public int await() throws InterruptedException,
            
// BrokenBarrierException
            
// 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待
            int curIndex = barrier.await();

            System.out.println(Thread.currentThread().getName() 
+ " end wait:"
                    
+ curIndex);
        }
 catch (InterruptedException e) {
        }
 catch (BrokenBarrierException e) {
        }

    }

}


posted on 2014-03-01 11:53 landon 阅读(1715) 评论(4)  编辑  收藏 所属分类: Program

FeedBack:
# re: Java多线程笔记7-CountdownLatch/CyclicBarrier
2014-03-02 09:44 | 鹏达锁业
支持博主分享啊  回复  更多评论
  
# re: Java多线程笔记7-CountdownLatch/CyclicBarrier
2014-03-03 13:07 | 鹏达锁业
给力支持,还要 回访  回复  更多评论
  
# re: Java多线程笔记7-CountdownLatch/CyclicBarrier
2014-03-04 10:02 | 鹏达锁业
支持博主,欢迎回访、、、、、、、、、、、

  回复  更多评论
  
# re: Java多线程笔记7-CountdownLatch/CyclicBarrier
2014-03-21 16:24 | 老米
老米来访,欢迎  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: