I want to fly higher
programming Explorer
posts - 114,comments - 263,trackbacks - 0
1.Object.wait/Object.notify 代码示例,详解见代码注释

package com.landon.mavs.example.concurrent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * 线程间协作基础例子 {@link Object#wait()} {@link Object#notify()}
 * 
 * @author landon
 * 
 
*/

public class ObjectWatiNotifyExample {
    
private static final Logger LOGGER = LoggerFactory
            .getLogger(ObjectWatiNotifyExample.class);

    
public static void main(String[] args) throws InterruptedException {
        WaitNotifyRunnable wnr1 = new WaitNotifyRunnable();
        Thread oneThread = new Thread(wnr1, "Thread-1");
        oneThread.start();

        
// 主线程暂停3秒->这里要做一个小的延迟.否则结果可能无法预知.因为可能先执行主线程的retrieveData,即先执行了notify.
        
// 而此时可能还没有wait.则可能会造成oneThread一直阻塞在wait方法.
        Thread.sleep(3 * 1000);

        
// 主线程执行retrieveData
        wnr1.retrieveData();

        Thread twoThread = new Thread(new WaitNotifyRunnable(), "Thread-2");
        twoThread.start();

        Thread.sleep(2 * 1000);

        
// 主线程执行中断twoThread,使得twoThread从wait中唤醒
        twoThread.interrupt();

        
// 3,4,5三个线程同时执行wnr2这个任务,则相当于有3个线程都在wait
        WaitNotifyRunnable wnr2 = new WaitNotifyRunnable();
        Thread threeThread = new Thread(wnr2, "Thread-3");
        Thread fourThread = new Thread(wnr2, "Thread-4");
        Thread fiveThread = new Thread(wnr2, "Thread-5");

        threeThread.start();
        fourThread.start();
        fiveThread.start();

        Thread.sleep(2 * 1000);
        
// 唤醒3个线程中的某一个,从输出看,只有其中某一个线程被唤醒了,其余两个线程仍在wait
        wnr2.retrieveData();

        Thread.sleep(1 * 1000);
        
// 唤醒全部,从输出看,另外的两个线程也被唤醒了
        wnr2.retrieveAll();

        WaitTimeoutRunnable wtr = new WaitTimeoutRunnable();
        Thread sixThread = new Thread(wtr, "Thread-6");
        sixThread.start();

        Thread.sleep(5 * 1000);
        wtr.retrieveData();

        WaitNanosTimeoutRunnable wntr = new WaitNanosTimeoutRunnable();
        Thread sevenhread = new Thread(wntr, "Thread-7");
        sevenhread.start();

        Thread.sleep(20);
        wntr.retrieveData();
    }


    
private static class WaitNotifyRunnable implements Runnable {
        
private final Object lock = new Object();

        @Override
        
public void run() {
            Thread curThread = Thread.currentThread();

            LOGGER.debug(this + "[" + curThread.getName() + "]" + "begin");
            LOGGER.debug(this + "[" + curThread.getName() + "]"
                    
+ " is waiting for data.");
            waitData();
            LOGGER.debug(this + "[" + curThread.getName() + "]"
                    
+ " retrived data and end waitting");
        }


        
/**
         * 等待数据
         
*/

        
public void waitData() {
            
synchronized (lock) {
                
try {
                    
// Object#public final void wait() throws
                    
// InterruptedException
                    
// 其他线程调用此对象的notify/notifyAll之前->导致当前线程等待
                    
// 从throws列表看出,当前线程在等待通知时,任何线程中断了当前线程,则抛出此异常.注意->中断状态会被清除
                    
// 当前线程必须拥有此对象监视器(见notify
                    
// api,通过synchronized线程即可成为此对象的监视器的所有者),否则会抛出IllegalMonitorStateException
                    
// 执行了wait方法后会释放锁
                    
// 其源码是直接调用wait(0)->参数0表示永不超时{@link Object#wait(long
                    
// timeout)}
                    lock.wait();
                }
 catch (InterruptedException e) {
                    LOGGER.warn("WaitNotifyRunnable#waitData..was interrupted.");
                    
// Thread#public static native Thread currentThread()
                    
// 返回正在执行的线程对象的引用
                    LOGGER.debug(Thread.currentThread().getName() + " status:"
                            
+ Thread.currentThread().isInterrupted());
                    
// 因为中断状态被清除.->所以再次调用interrupt恢复中断状态
                    Thread.currentThread().interrupt();
                    LOGGER.debug(Thread.currentThread().getName()
                            
+ " restore status:"
                            
+ Thread.currentThread().isInterrupted());
                    
// 因为中断状态被清除.->所以再次调用interrupt恢复中断状态
                }


            }

        }


        
/**
         * 获得了数据
         
*/

        
public void retrieveData() {
            
synchronized (lock) {
                
// Object#public final native void notify()
                
// 唤醒在此对象监视器上等待的某个线程.如果所有线程都在此对象上等待,则会选择唤醒其中一个线程.选择是任意性的.
                
// 直到当期线程放弃此对象的锁定,才能继续执行被唤醒的线程
                
// 同wait,当先线程必须持有对象监视器,否则抛出IllegalMonitorStateException
                lock.notify();

                LOGGER.debug(Thread.currentThread().getName()
                        
+ " executing retrieveData ok");
            }

        }


        
// 唤醒所有等待线程
        public void retrieveAll() {
            
synchronized (lock) {
                
// Object#public final native void notifyAll()
                
// 唤醒此对象监视器上等待的所有线程->直到当前对象放弃此对象上的锁定->才能继续执行被唤醒的线程
                lock.notifyAll();
            }

        }


        @Override
        
public String toString() {
            
return getClass().getSimpleName();
        }

    }


    
private static class WaitTimeoutRunnable implements Runnable {
        
private final Object lock = new Object();

        
// wait条件:是否已经收到了数据,没有收到数据则等待
        private volatile boolean isRetrieveData;

        @Override
        
public void run() {
            Thread curThread = Thread.currentThread();
            LOGGER.debug(this + "[" + curThread.getName() + "]" + " begin");

            
synchronized (lock) {
                
while (!isRetrieveData) {
                    
try {
                        
// Object#public final native void wait(long timeout)
                        
// throws InterruptedException
                        
// 该方法导致当前线程T将其自身放置在对象的等待集中->放弃此对象上的所有同步要求->在下面4中情况之前,线程则一直阻塞
                        
// 1.其他某个线程调用此对象的notify方法且T恰巧被选为唤醒的线程
                        
// 2.其他某个线程调用对象的notifyAll方法
                        
// 3.其他某个线程中断线程T
                        
// 4.大约到了指定的超时时间,timeout为0则不考虑实际时间->则获得通知前线程一直等待
                        
// ->从对象的等待集中删除线程T,重新进行线程调度->该线程已常规方式和其他线程竞争以获得该对象上同步的权利->一旦获得该对象的控制权
                        
// ->该对象上的所有其同步声明将被恢复到以前的状态->线程T从wait的调用中返回
                        
// 注:在没有被通知,中断或者超时的情况下,线程还可以唤醒一个所谓的虚假唤醒(spurious
                        
// wakeup)->所以应用程序需要用循环测试线程被提醒的条件
                        
// ->否则如果用if(condition)(wait)的话,如果出现了虚假唤醒->则可能condition还未达到,则唤醒->后续依靠condition的逻辑可能出错
                        lock.wait(3 * 1000);
                        LOGGER.debug(curThread.getName()
                                
+ " wait(3000) timeout.");
                    }
 catch (InterruptedException e) {
                        LOGGER.debug(curThread.getName()
                                
+ " wait(3000) was interrupted");
                    }

                }

            }


            
if (isRetrieveData) {
                LOGGER.debug(this
                        
+ " already retrieved data.go on execute next step.");
            }

        }


        
public void retrieveData() {
            isRetrieveData = true;
            
synchronized (lock) {
                lock.notifyAll();
            }

        }


        @Override
        
public String toString() {
            
return getClass().getSimpleName();
        }

    }


    
private static class WaitNanosTimeoutRunnable implements Runnable {
        
private final Object lock = new Object();

        
private volatile boolean isRetrieveData;

        @Override
        
public void run() {
            Thread curThread = Thread.currentThread();
            LOGGER.debug(this + "[" + curThread.getName() + "]" + " begin");

            
synchronized (lock) {
                
while (!isRetrieveData) {
                    
try {
                        
// Object#public final void wait(long timeout, int
                        
// nanos) throws InterruptedException
                        
// 此方法类似wait(long)方法,只是允许更好的控制在放弃之前等待通知的时间量.用纳秒度量的实际时间量公式:
                        
// 1000000 * timeout + nanos
                        
// wait(0,0)与wait(0)相同
                        
// 从源码看,nanos参数必须在[0,999999]这个区间,即不超过1毫秒
                        
// 另外从源码看,如果nanos大于500000或者timeout为0,nanos不为0,则执行timeout++.也就是说如果nanos超过了半毫秒,则直接相当于1毫秒对待
                        
// 个人认为此方法用于非常精确的 线程间协作操作
                        lock.wait(10300000);
                        LOGGER.debug(curThread.getName()
                                
+ " wait(10,300000) timeout.");
                    }
 catch (InterruptedException e) {
                        LOGGER.debug(curThread.getName()
                                
+ " wait(10,300000) was interrupted");
                    }

                }

            }


            
if (isRetrieveData) {
                LOGGER.debug(this
                        
+ " already retrieved data.go on execute next step.");
            }

        }


        
public void retrieveData() {
            isRetrieveData = true;
            
synchronized (lock) {
                lock.notifyAll();
            }

        }


        @Override
        
public String toString() {
            
return getClass().getSimpleName();
        }

    }

}



2.Thread.join代码示例,详解见代码注释


package com.landon.mavs.example.concurrent;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* 线程底层API用例 {@link Thread#join()}
*
* @author landon
*
*/

public class ThreadBaseExample {
   
private static final Logger LOGGER = LoggerFactory
            .getLogger(ThreadBaseExample.class);

   
public static void main(String[] args) {
       
// public Thread(Runnable target, String name)
        Thread oneThread = new Thread(new JoinRunnable(), "Thread-1");
        oneThread.start();

       
// public final void join() throws InterruptedException
       
// 当前线程等待执行join的线程终止
       
// 当前线程可被任何线程中断,当抛出InterruptedException,当前线程中断状态被清除
       
// 个人认为join的用处在于线程间顺序的协作.如A线程执行某个业务的时候必须等待B线程执行完毕后才可继续执行
       
// 从源码上看调用join(0)
        try {
           
// 当前主线程等待oneThread终止
            oneThread.join();
        }
catch (InterruptedException e) {
            LOGGER.warn(Thread.currentThread().getName()
                   
+ "execute oneThread.join was interrupted");
        }


       
// 从输出看到,主线程阻塞在了join处,待oneThread业务处理结束->主线程继续运行
        LOGGER.debug(Thread.currentThread().getName() + " join() oneThread ok");

        Thread twoThread = new Thread(new JoinTimeoutRunnable(), "Thread-2");
        twoThread.start();

       
try {
           
// public final synchronized void join(long millis) throws
           
// InterruptedException
           
// 当前线程等待执行join的线程终止的最长时间为millis毫秒,超时为0表示要一直等下去
           
// 从源码上看:
           
// 1.方法声明为synchronized->因为其内部执行了wait方法.其相当于当前线程调用了twoThread.wait->所以当前线程会被阻塞
           
// 2.内部实现是一个while
           
// (isAlive())(wait)->即循环条件为isAlive(twoThread线程是否处于活动状态)
           
// 3.当millis不为0时->timeout后返回->如果线程不在处于活动状态则直接返回;否则额外计算了一下wait的时间,如果大于等于millis(这是最正常的情况),
           
// 则直接break,跳出循环返回->反者则表明可能有线程修改了系统时间->此时jdk的实现是继续wait
           
// 4.当millis为0是,则直接是while(isAlive())(wait(0))->则会一直wait.疑惑?twoThread.notify什么执行的?个人粗略推断在twoThread线程
           
// 终止的时候会唤醒自己等待集中的线程.如本例中的主线程->所以从wait返回再次判断isAlive()条件的时候->直接返回
           
// 5.因为join最终是调用wait->所以join的几个变体join()/join(long millis)/join(long
           
// millis, int nanos)和wait如出一辙
            twoThread.join(3 * 1000);
            LOGGER.debug(Thread.currentThread().getName()
                   
+ " join(3000) twoThread-->timeout");
        }
catch (InterruptedException e) {
            LOGGER.warn(Thread.currentThread().getName()
                   
+ "execute twoThread.join(3000) was interrupted");
        }


        Thread threeThread = new Thread(new JoinTimeoutRunnable(), "Thread-3");
        threeThread.start();

       
try {
            threeThread.join(10 * 1000);
           
// 从输出的时间可以看到,在threeThread终止后,主线程就从join返回.从这点也可以佐证线程终止的时候会唤醒该线程对象上的等待集线程(待考证)
            LOGGER.debug(Thread.currentThread().getName()
                   
+ " join(10000) threeThread-->timeout");
        }
catch (InterruptedException e) {
            LOGGER.warn(Thread.currentThread().getName()
                   
+ "execute threeThread.join(10000) was interrupted");
        }

    }


   
private static class JoinRunnable implements Runnable {
        @Override
       
public void run() {
            LOGGER.debug(this + " begin");

           
// 用sleep来模拟执行业务耗时
            try {
                TimeUnit.SECONDS.sleep(3);
            }
catch (InterruptedException e) {
                LOGGER.warn(this + "was interrutped");
            }


            LOGGER.debug(this + " end");
        }


        @Override
       
public String toString() {
           
return getClass().getSimpleName() + "["
                   
+ Thread.currentThread().getName() + "]";
        }

    }


   
private static class JoinTimeoutRunnable implements Runnable {
        @Override
       
public void run() {
            LOGGER.debug(
this + " begin");

           
// 用sleep来模拟执行业务耗时
            try {
                TimeUnit.SECONDS.sleep(
5);
            }
catch (InterruptedException e) {
                LOGGER.warn(
this + "was interrutped");
            }


            LOGGER.debug(
this + " end");
        }


        @Override
       
public String toString() {
           
return getClass().getSimpleName() + "["
                   
+ Thread.currentThread().getName() + "]";
        }

    }

}


3.本篇以代码示例并结合JDK源码的方式介绍了Java线程底层的多线程协作机制(wait/notify/join).(代码注释很详细)
posted on 2013-12-12 19:24 landon 阅读(1587) 评论(1)  编辑  收藏 所属分类: Program

FeedBack:
# re: Java多线程笔记2-Object.wait/Object.notify/Thread.join
2013-12-14 11:35 | zujixie
受益匪浅,谢谢  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: