I want to fly higher
programming Explorer
posts - 114,comments - 263,trackbacks - 0
1.SynchronousQueue示例代码,详解见代码注释

package com.landon.mavs.example.concurrent;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * {@link java.util.concurrent.SynchronousQueue}示例
 * 
 * <pre>
 *     一种无缓冲的等待队列,类似于无中介的直接交易
 * </pre>
 * 
 * <pre>
 * 1.public class SynchronousQueue<E> extends AbstractQueue<E>
 *     implements BlockingQueue<E>, java.io.Serializable
 * 2.其是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作,反之亦然.
 * 3.同步队列没有任何内部容量,甚至连一个队列的容量都没有.(isEmpty永远返回true)
 * 4.不能在同步队列上peek(获取但并不移除此队列的头),因为仅在试图移除元素时该元素才存在.
 * 5.除非另一个线程试图移除某个元素,否则也不能使用任何方法插入元素.
 * 6.无法迭代队列,因为没有元素可用于迭代.
 * 7.该队列的头是尝试添加到队列的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且pool方法将返回null.
 * 6.同步队列类似于传递性的设计,即一个线程中运行的对象要将某些信息/事件/任务传递给另一个线程中运行的对象,->同步
 * 7.SynchronousQueue构造方法中可指定fairness的策略.如果fair,则等待线程以FIFO的顺序竞争访问;否则顺序是未指定的.
 * </pre>
 * 
 * <pre>
 *     总结:1.进行相应插入和移除的操作时必须要有相应的移除和插入操作(__>同步).否则失败.
 *        2.公平策略:内部实现为TransferQueue
 *             FIFO:先进先出_用队列(Queue)来阻塞多余的生产者和消费者
 *        3.非公平策略:内部实现为TransferStack
 *             LIFO:后进先出_用栈(Stack)来阻塞多余的生产者和消费者
 * </pre>
 * 
 * @author landon
 * 
 
*/

public class SynchronousQueueExample {
    
private static final Logger LOGGER = LoggerFactory
            .getLogger(SynchronousQueueExample.class);

    
public static void main(String[] args) {
        SynchronousQueue<SQElement> sq = new SynchronousQueue<SQElement>();

        
// 启动take线程
        SQTakeThread sqtt = new SQTakeThread(sq, "sqtt");
        sqtt.start();

        
// 启动put线程,此时有sqtt线程正在执行take.所以put成功,take成功.
        SQPutThread sqpt = new SQPutThread(sq, 1"sqpt");
        sqpt.start();

        
// 主线程暂停1秒
        try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }


        
// 启动put线程
        SQPutThread sqpt2 = new SQPutThread(sq, 2"sqpt2");
        sqpt2.start();

        
// 启动take线程,此时有sqpt2线程正在进行put.所以take成功.put成功.
        SQTakeThread sqtt2 = new SQTakeThread(sq, "sqpt2");
        sqtt2.start();

        
// 主线程暂停1秒
        try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }


        
// 启动take线程
        SQTakeThread sqtt3 = new SQTakeThread(sq, "sqtt3");
        sqtt3.start();

        
// 主线程暂停1秒
        try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }


        
// 主线程启动offer.则发现sqtt3线程正在take.所以offer成功,take成功.
        boolean offerResult = sq.offer(new SQElement(3));
        LOGGER.debug("offerResult:{}", offerResult);

        
// 主线程暂停1秒
        try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }


        
// 启动offer_timeout线程
        SQOfferTimeoutThread sqott = new SQOfferTimeoutThread(sq, 4);
        sqott.start();

        
// 主线程暂停1秒
        try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }


        
// 启动poll线程,启动发现sqott线程正在offer.所以poll成功,offer_timeout成功
        SQPollThread sqPollThread = new SQPollThread(sq);
        sqPollThread.start();

        
// 主线程暂停1秒
        try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }


        
// 启动poll_timeout线程
        SQPollTimeoutThread sqptot = new SQPollTimeoutThread(sq);
        sqptot.start();

        
// 主线程暂停1秒
        try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }


        
// 启动offer线程,此时发现有poll_timeout正在poll.所以offer成功,poll成功.
        SQOfferThread sqot = new SQOfferThread(sq, 5);
        sqot.start();

        
// 主线程暂停1秒
        try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }


        
// 这个必然返回false.因为此时执行插入的时候并没有一个移除的操作
        boolean mainOfferResult = sq.offer(new SQElement(6));
        LOGGER.debug("mainOfferResult:{}", mainOfferResult);

        
try {
            
// 主线程同样会超时等待2秒.结果也返回false.原因同上.
            boolean mainOfferResult2 = sq.offer(new SQElement(7), 2,
                    TimeUnit.SECONDS);
            LOGGER.debug("mainOfferResult2:{}", mainOfferResult2);
        }
 catch (InterruptedException e) {
        }


        
// 输出必然返回null.因为只有在试图移除元素时,该元素才存在.该方法始终返回null.
        SQElement peekElement = sq.peek();
        LOGGER.debug("peekElement:{}", peekElement);

        
// 主线程暂停1秒
        try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }


        
// 测试公平访问策略.
        SynchronousQueue<SQElement> sq2 = new SynchronousQueue<SQElement>(true);

        
// 从输出看:consumer是按照FIFO的顺序,即consumer1->consumer2->consumer3分别拿到数据->公平策略

        
// consumer1
        SQTakeThread sq2TakeThread1 = new SQTakeThread(sq2, "sq2TakeThread1");
        sq2TakeThread1.start();
        
try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }

        
// consumer2
        SQTakeThread sq2TakeThread2 = new SQTakeThread(sq2, "sq2TakeThread2");
        sq2TakeThread2.start();
        
try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }

        
// producer1
        SQPutThread sq2PutThread1 = new SQPutThread(sq2, 11"sq2PutThread1");
        sq2PutThread1.start();
        
try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }

        
// consumer3
        SQTakeThread sq2TakeThread3 = new SQTakeThread(sq2, "sq2TakeThread3");
        sq2TakeThread3.start();
        
try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }

        
// producer2
        SQPutThread sq2PutThread2 = new SQPutThread(sq2, 12"sq2PutThread2");
        sq2PutThread2.start();
        
try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }

        
// producer3
        SQPutThread sq2PutThread3 = new SQPutThread(sq2, 13"sq2PutThread3");
        sq2PutThread3.start();

        
// 主线程暂停1秒
        try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }


        
// 测试非公平访问策略.这也是同步队列的默认策略
        SynchronousQueue<SQElement> sq3 = new SynchronousQueue<SQElement>(false);

        
// 从输出看:
        
// 1.当启动producer1的时候,因为consumer2是LI的->所以其率先take
        
// 2.当启动producer2的时候,因为consumer3是LI的->所以其率先take
        
// 3.最后take的是最早启动的consumer1
        
// 所以整体是不公平的,即LIFO

        
// consumer1
        SQTakeThread sq3TakeThread1 = new SQTakeThread(sq3, "sq3TakeThread1");
        sq3TakeThread1.start();
        
try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }

        
// consumer2
        SQTakeThread sq3TakeThread2 = new SQTakeThread(sq3, "sq3TakeThread2");
        sq3TakeThread2.start();
        
try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }

        
// producer1
        SQPutThread sq3PutThread1 = new SQPutThread(sq3, 21"sq3PutThread1");
        sq3PutThread1.start();
        
try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }

        
// consumer3
        SQTakeThread sq3TakeThread3 = new SQTakeThread(sq3, "sq3TakeThread3");
        sq3TakeThread3.start();
        
try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }

        
// producer2
        SQPutThread sq3PutThread2 = new SQPutThread(sq3, 22"sq3PutThread2");
        sq3PutThread2.start();
        
try {
            Thread.sleep(1 * 1000L);
        }
 catch (InterruptedException e) {
        }

        
// producer3
        SQPutThread sq3PutThread3 = new SQPutThread(sq3, 23"sq3PutThread3");
        sq3PutThread3.start();
    }


    
/**
     * 
     * SynchronousQueue element
     * 
     * @author landon
     * 
     
*/

    
private static class SQElement {
        
private int id;

        
public SQElement(int id) {
            
this.id = id;
        }


        @Override
        
public String toString() {
            
return "SQElement [id=" + id + "]";
        }

    }


    
// producer
    private static class SQPutThread extends Thread {
        
private SynchronousQueue<SQElement> sq;
        
private int curElementNum;

        
public SQPutThread(SynchronousQueue<SQElement> sq, int elementNum,
                String name) {
            
super(name);
            
this.sq = sq;
            curElementNum = elementNum;
        }


        @Override
        
public void run() {
            SQElement element = new SQElement(curElementNum);

            LOGGER.debug(getName() + ".sq.put.{} begin.", element);

            
try {
                sq.put(element);
            }
 catch (InterruptedException e) {
                LOGGER.warn(getName() + ".sq.put.err.", e);
            }


            LOGGER.debug(getName() + ".sq.put end:{}.", curElementNum);
        }

    }


    
// consumer
    private static class SQTakeThread extends Thread {
        
private SynchronousQueue<SQElement> sq;

        
public SQTakeThread(SynchronousQueue<SQElement> sq, String name) {
            
super(name);
            
this.sq = sq;
        }


        @Override
        
public void run() {
            LOGGER.debug(getName() + ".sq.take begin");

            SQElement element = null;
            
try {
                element = sq.take();
            }
 catch (InterruptedException e) {
                LOGGER.warn(getName() + ".sq.take.err", e);
            }


            LOGGER.debug(getName() + ".sq.take:{}", element);
        }

    }


    
private static class SQPollThread extends Thread {
        
private SynchronousQueue<SQElement> sq;

        
public SQPollThread(SynchronousQueue<SQElement> sq) {
            
this.sq = sq;
        }


        @Override
        
public void run() {
            LOGGER.debug("SQPollThread.sq.poll begin");

            SQElement element = sq.poll();

            LOGGER.debug("SQPollThread.sq.poll :{}", element);
        }

    }


    
private static class SQOfferTimeoutThread extends Thread {
        
private SynchronousQueue<SQElement> sq;
        
private int elementNum;

        
public SQOfferTimeoutThread(SynchronousQueue<SQElement> sq,
                
int elementNum) {
            
this.sq = sq;
            
this.elementNum = elementNum;
        }


        @Override
        
public void run() {
            LOGGER.debug("SQPollThread.sq.offer.timeout(3) begin");

            SQElement element = new SQElement(elementNum);
            
try {
                
boolean offerTimeoutResult = sq.offer(element, 3,
                        TimeUnit.SECONDS);
                LOGGER.debug("SQPollThread.sq.offer.timeout(3).result:{}",
                        offerTimeoutResult);
            }
 catch (InterruptedException e) {
            }

        }

    }


    
private static class SQOfferThread extends Thread {
        
private SynchronousQueue<SQElement> sq;
        
private int elementNum;

        
public SQOfferThread(SynchronousQueue<SQElement> sq, int elementNum) {
            
this.sq = sq;
            
this.elementNum = elementNum;
        }


        @Override
        
public void run() {
            LOGGER.debug("SQPollThread.sq.offer begin");

            SQElement element = new SQElement(elementNum);
            
boolean offerResult = sq.offer(element);

            LOGGER.debug("SQPollThread.sq.offer.result:{}", offerResult);
        }

    }


    
private static class SQPollTimeoutThread extends Thread {
        
private SynchronousQueue<SQElement> sq;

        
public SQPollTimeoutThread(SynchronousQueue<SQElement> sq) {
            
this.sq = sq;
        }


        @Override
        
public void run() {
            LOGGER.debug("SQPollThread.sq.poll.timeout(3) begin");

            SQElement element = null;
            
try {
                element = sq.poll(3, TimeUnit.SECONDS);
            }
 catch (InterruptedException e) {
            }


            LOGGER.debug("SQPollThread.sq.poll.timeout(3):{}", element);
        }

    }

}



2.LinkedBlockingQueue示例代码,详解见代码注释

package com.landon.mavs.example.concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * {@link java.util.concurrent.LinkedBlockingQueue}示例
 *
 * <pre>
 * 1.public class LinkedBlockingQueue<E> extends AbstractQueue<E>
 *         implements BlockingQueue<E>, java.io.Serializable
 * 2.无界阻塞队列并可以指定capacity防止队列过度扩展
 * 3.不能插入null,否则报空指针异常
 * 4.其内部锁实现:takeLock/putLock,即采用了锁分离技术->因为二者分别操作队列的首和尾,理论上,二者并不冲突
 * 5.内部阻塞实现:Condition notEmpty/Condition notFull,即take的时候如果队列为空则等待/put的时候如果队列已满则等待
 * </pre>
 *
 * @author landon
 *
 */

public class LinkedBlockingQueueExample {
   
private static final Logger LOGGER = LoggerFactory
            .getLogger(LinkedBlockingQueueExample.class);

   
public static void main(String[] args) {
       
// public LinkedBlockingQueue(int capacity) 指定capacity
        LinkedBlockingQueue<LBQElement> lbq = new LinkedBlockingQueue<LBQElement>(
               
1);
       
// AbstractQueue#public boolean add(E e)
       
// 源码实现:
       
// 1. if (offer(e)){return true}
       
// else{throw new IllegalStateException("Queue full")},
       
// 即该方法首先调用offer,如果offer失败即违反容量限制则抛出一个异常.
        try {
           
boolean addResult = lbq.add(new LBQElement(1));
            LOGGER.debug("sq.add(new LBQElement(1)).result:{}", addResult);
        }
catch (IllegalStateException e) {
            LOGGER.warn("sq.add(new SQElement(1)).err.", e);
        }


       
// public boolean offer(E e) 将指定元素插入到此队列的尾部,成功时返回 true,如果此队列已满,则返回 false
       
// 当使用有容量限制的队列时,此方法通常要优于 add 方法,后者可能无法插入元素,而只是抛出一个异常
       
// 从输出结果上看,因为队列容量是1,所以offer失败.
        boolean offerResult = lbq.offer(new LBQElement(2));
        LOGGER.debug("sq.offer(new LBQElement(2)).result:{}", offerResult);

       
try {
           
// public boolean offer(E e, long timeout, TimeUnit unit) throws
           
// InterruptedException
           
// 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用;如果超时则返回false
            boolean offerTimeoutResult = lbq.offer(new LBQElement(3), 3,
                    TimeUnit.SECONDS);
           
// 从输出看,超时3秒后返回false
            LOGGER.warn(
                   
" sq.offer(new LBQElement(3), 3,TimeUnit.SECONDS).result:{}",
                    offerTimeoutResult);
        }
catch (InterruptedException e) {
            LOGGER.warn("sq.offer(new LBQElement(3), 3,TimeUnit.SECONDS).err.",
                    e);
        }


       
// 启动LBQ_Put_Thread,调用阻塞队列的put方法,则LBQ_Put_Thread则阻塞等待可用空间
        LBQPutThread lbqpt = new LBQPutThread(lbq);
        lbqpt.start();

       
// AbstractQueue#public E element()
       
// 源码实现:
       
// 调用peek方法返回E.如果E不为null,则直接返回E.否则抛出NoSuchElementException
       
// 即获取但是不移除此队列的头->队列为空时抛出NoSuchElementException
        LBQElement headElement = lbq.element();
        LOGGER.debug("headElement:{}", headElement);

       
// public E peek()
       
// 获取但不移除此队列的头;如果此队列为空,则返回 null
        LBQElement headPeekElement = lbq.peek();
        LOGGER.debug("headPeekElement:{}", headPeekElement);

       
// public E poll()
       
// 获取并移除此队列的头,如果此队列为空,则返回 null
        LBQElement headPollElement = lbq.poll();
        LOGGER.debug("headPollElement:{}", headPollElement);

       
// 主线程sleep 1秒,尝试等待LBQPutThread操作完成
       
// 从输出看:因为从队列进行了移除.所以lbqpt线程的put的等待操作被唤醒,插入了元素4.
        try {
            Thread.sleep(1 * 1000L);
        }
catch (InterruptedException e) {
        }


       
// 再次poll,获取并移除了元素4
        LBQElement headPollElement2 = lbq.poll();
        LOGGER.debug("headPollElement2:{}", headPollElement2);

       
try {
           
// 再次element,因为此时队列为空.所以element抛出异常
            lbq.element();
        }
catch (NoSuchElementException e) {
            LOGGER.warn("lbq.element().err.", e);
        }


       
// 再次peek,队列为空则返回null
        LBQElement headPeekElement2 = lbq.peek();
        LOGGER.debug("headPeekElement2:{}", headPeekElement2);

       
try {
           
// public E poll(long timeout, TimeUnit unit) throws
           
// InterruptedException
           
// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)
           
// 超时则返回null
            LBQElement headPollTimeoutElement = lbq.poll(3, TimeUnit.SECONDS);
            LOGGER.debug("lbq.poll(3, TimeUnit.SECONDS) result:{}",
                    headPollTimeoutElement);
        }
catch (InterruptedException e) {
            LOGGER.warn("lbq.poll(3, TimeUnit.SECONDS).err", e);
        }


       
try {
           
// AbstractQueue#public E remove()
           
// 源码实现:
           
// 先调用poll方法返回E,如果E不为null则直接返回否则抛出异常:NoSuchElementException
            lbq.remove();
        }
catch (NoSuchElementException e) {
            LOGGER.warn("lbq.remove().err.", e);
        }


        LBQElement removeElement = new LBQElement(5);
        lbq.add(removeElement);

       
// public boolean remove(Object o)
       
// 遍历队列进行移除指定的元素
        boolean removeResult = lbq.remove(removeElement);
       
// 返回true
        LOGGER.debug("removeResult:{}", removeResult);
       
// 再次执行remove,返回false
        boolean removeResult2 = lbq.remove(removeElement);
        LOGGER.debug("removeResult2:{}", removeResult2);

       
// 启动take线程,该线程会一直阻塞,直到有可用元素
        LBQTakeThread lbqtt = new LBQTakeThread(lbq);
        lbqtt.start();

       
// 唤醒take线程
        lbq.offer(new LBQElement(6));

       
// 主线程暂停1秒,等待take线程执行完毕
        try {
            Thread.sleep(1 * 1000L);
        }
catch (InterruptedException e) {
        }


       
// 未指定容量的lbq2-无界阻塞队列
        LinkedBlockingQueue<LBQElement> lbq2 = new LinkedBlockingQueue<LBQElement>();

        List<LBQElement> drainList = new ArrayList<LBQElement>();
       
// public int drainTo(Collection<? super E> c)
       
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中.
       
// 1.正在进行此操作时修改指定的 collection,则此操作行为是不确定的
       
// 2.在试图向 collection c 中添加元素没有成功时,可能导致在抛出相关异常时,元素会同时在两个 collection
       
// 中出现,或者在其中一个 collection 中出现,也可能在两个 collection 中都不出现
       
// 从LinkedBlockingQueue的源码实现中发现:
       
// 1.其是要首先拿到takeLock.
       
// 2.if (c == this)
       
// IllegalArgumentException,即不能指定drainTo的collection为当前队列自身
        lbq2.offer(new LBQElement(7));
        lbq2.offer(new LBQElement(8));
        lbq2.drainTo(drainList);
        LOGGER.debug("lbq2.drainList:{}", drainList);
    }


   
/**
     *
     * LinkedBlocking element
     *
     * @author landon
     *
    
*/

   
private static class LBQElement {
       
private int id;

       
public LBQElement(int id) {
           
this.id = id;
        }


        @Override
       
public String toString() {
           
return "LBQElement [id=" + id + "]";
        }

    }


   
private static class LBQPutThread extends Thread {
       
private LinkedBlockingQueue<LBQElement> lbq;

       
public LBQPutThread(LinkedBlockingQueue<LBQElement> lbq) {
           
this.lbq = lbq;
        }


        @Override
       
public void run() {
           
try {
               
// BlockingQueue#void put(E e) throws InterruptedException
               
// 将指定元素插入队列中,如有必要则等待可用空间
                LOGGER.debug("LBQPutThread.lbq.put(new LBQElement(4)) begin.");
                lbq.put(new LBQElement(4));
                LOGGER.debug("LBQPutThread.lbq.put(new LBQElement(4)) ok(end)");
            }
catch (InterruptedException e) {
                LOGGER.warn("sq.put(new SQElement(2)).err.", e);
            }

        }

    }


   
private static class LBQTakeThread extends Thread {
       
private LinkedBlockingQueue<LBQElement> lbq;

       
public LBQTakeThread(LinkedBlockingQueue<LBQElement> lbq) {
           
this.lbq = lbq;
        }


        @Override
       
public void run() {
           
try {
                LOGGER.debug("LBQTakeThread.lbq.take begin.");
               
// public E take() throws InterruptedException
               
// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)
                LBQElement takeElement = lbq.take();
                LOGGER.debug("LBQTakeThread.lbq.take end.result:{}",
                        takeElement);
            }
catch (InterruptedException e) {
            }

        }

    }

}



posted on 2013-12-31 11:44 landon 阅读(2408) 评论(0)  编辑  收藏 所属分类: Program

只有注册用户登录后才能发表评论。


网站导航: