funinhand

funinhand
随笔 - 9, 文章 - 0, 评论 - 0, 引用 - 0
数据加载中……

用JAVA实现缓冲多线程无阻塞读取远程文件

    我平时比较喜欢从网上听歌,有些链接下载速度太慢了。如果用HttpURLConnection类的方法打开连接,然后用InputStream类获得输入流,再用BufferedInputStream构造出带缓冲区的输入流,如果网速太慢的话,无论缓冲区设置多大,听起来都是断断续续的,达不到真正缓冲的目的。于是尝试编写代码实现用缓冲方式读取远程文件,以下贴出的代码是我写的MP3解码器的一部分。我是不怎么赞同使用多线程下载的,加之有的链接下载速度本身就比较快,所以在下载速度足够的情况下,就让下载线程退出,直到只剩下一个下载线程。当然,多线程中令人头痛的死锁问题、HttpURLConnection的超时阻塞问题都会使代码看起来异常复杂。

      简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块32K,下载线程负责向缓冲区写数据,每次写一块;读线程(BuffRandAcceURL类)每次读小于32K的任意字节。同步描述:写/写互斥等待空闲块;写/写并发填写buf[];读/写并发使用buf[]。

      经过我很长一段时间使用,我认为比较满意地实现了我的目标,同其它MP3播放器对比,我的这种方法能够比较流畅、稳定地下载并播放。我把实现多线程下载缓冲的方法写出来,不足之处恳请批评指正。

 

一、HttpReader类功能:HTTP协议从指定URL读取数据

/**
* author by 
http://www.bt285.cn http://www.5a520.cn
*/

package instream;   
  
import java.io.IOException;   
import java.io.InputStream;   
import java.net.HttpURLConnection;   
import java.net.URL;   
  
public final class HttpReader {   
    
public static final int MAX_RETRY = 10;   
    
private static long content_length;   
    
private URL url;   
    
private HttpURLConnection httpConnection;   
    
private InputStream in_stream;   
    
private long cur_pos;           //用于决定seek方法中是否执行文件定位   
    private int connect_timeout;   
    
private int read_timeout;   
       
    
public HttpReader(URL u) {   
        
this(u, 50005000);   
    }
   
       
    
public HttpReader(URL u, int connect_timeout, int read_timeout) {   
        
this.connect_timeout = connect_timeout;   
        
this.read_timeout = read_timeout;   
        url 
= u;   
        
if (content_length == 0{   
            
int retry = 0;   
            
while (retry < HttpReader.MAX_RETRY)   
                
try {   
                    
this.seek(0);   
                    content_length 
= httpConnection.getContentLength();   
                    
break;   
                }
 catch (Exception e) {   
                    retry
++;   
                }
   
        }
   
    }
   
       
    
public static long getContentLength() {   
        
return content_length;   
    }
   
       
    
public int read(byte[] b, int off, int len) throws IOException {   
        
int r = in_stream.read(b, off, len);   
        cur_pos 
+= r;   
        
return r;   
    }
   
       
    
public int getData(byte[] b, int off, int len) throws IOException {   
        
int r, rema = len;   
        
while (rema > 0{   
            
if ((r = in_stream.read(b, off, rema)) == -1{   
                
return -1;   
            }
   
            rema 
-= r;   
            off 
+= r;   
            cur_pos 
+= r;   
        }
   
        
return len;   
    }
   
       
    
public void close() {   
        
if (httpConnection != null{   
            httpConnection.disconnect();   
            httpConnection 
= null;   
        }
   
        
if (in_stream != null{   
            
try {   
                in_stream.close();   
            }
 catch (IOException e) {}   
            in_stream 
= null;   
        }
   
        url 
= null;   
    }
   
       
    
/*  
     * 抛出异常通知再试  
     * 响应码503可能是由某种暂时的原因引起的,例如同一IP频繁的连接请求可能遭服务器拒绝  
     
*/
  
    
public void seek(long start_pos) throws IOException {   
        
if (start_pos == cur_pos && in_stream != null)   
            
return;   
        
if (httpConnection != null{   
            httpConnection.disconnect();   
            httpConnection 
= null;   
        }
   
        
if (in_stream != null{   
            in_stream.close();   
            in_stream 
= null;   
        }
   
        httpConnection 
= (HttpURLConnection) url.openConnection();   
        httpConnection.setConnectTimeout(connect_timeout);   
        httpConnection.setReadTimeout(read_timeout);   
        String sProperty 
= "bytes=" + start_pos + "-";   
        httpConnection.setRequestProperty(
"Range", sProperty);   
        
//httpConnection.setRequestProperty("Connection", "Keep-Alive");   
        int responseCode = httpConnection.getResponseCode();   
        
if (responseCode < 200 || responseCode >= 300{   
            
try {   
                Thread.sleep(
500);   
            }
 catch (InterruptedException e) {   
                e.printStackTrace();   
            }
   
            
throw new IOException("HTTP responseCode="+responseCode);   
        }
   
  
        in_stream 
= httpConnection.getInputStream();   
        cur_pos 
= start_pos;   
    }
   
  
}
  


二、IWriterCallBack接口功能:实现读/写通信。

package instream;   
  
public interface IWriterCallBack {   
    
public boolean tryWriting(Writer w) throws InterruptedException;   
    
public void updateBuffer(int i, int len);   
    
public void updateWriterCount();   
    
public void terminateWriters();   
}
  

三、Writer类:下载线程,负责向buf[]写数据。

/**
http://www.bt285.cn http://www.5a520.cn 
*/

package instream;   
import java.io.IOException;   
import java.net.URL;   
  
public final class Writer implements Runnable {   
    
private static boolean isalive = true;   
    
private byte[] buf;   
    
private IWriterCallBack icb;   
    
protected int index;            //buf[]内"块"索引号   
    protected long start_pos;       //index对应的文件位置(相对于文件首的偏移量)   
    protected int await_count;      //用于判断:下载速度足够就退出一个"写"线程   
    private HttpReader hr;   
       
    
public Writer(IWriterCallBack call_back, URL u, byte[] b, int i) {   
        hr 
= new HttpReader(u);   
        
if(HttpReader.getContentLength() == 0)  //实例化HttpReader对象都不成功   
            return;   
        icb 
= call_back;   
        buf 
= b;   
        Thread t 
= new Thread(this,"dt_"+i);   
        t.setPriority(Thread.NORM_PRIORITY 
+ 1);   
        t.start();   
    }
   
       
    
public void run() {   
        
int write_bytes=0, write_pos=0, rema = 0, retry = 0;   
        
boolean cont = true;   
        
while (cont) {   
            
try {   
                
// 1.等待空闲块   
                if(retry == 0{   
                    
if (icb.tryWriting(this== false)   
                        
break;   
                    write_bytes 
= 0;   
                    rema 
= BuffRandAcceURL.UNIT_LENGTH;   
                    write_pos 
= index << BuffRandAcceURL.UNIT_LENGTH_BITS;   
                }
   
                   
                
// 2.定位   
                hr.seek(start_pos);   
  
                
// 3.下载"一块"   
                int w;   
                
while (rema > 0 && isalive) {   
                    w 
= (rema < 2048? rema : 2048//每次读几K合适?   
                    if ((w = hr.read(buf, write_pos, w)) == -1{   
                        cont 
= false;   
                        
break;   
                    }
   
                    rema 
-= w;   
                    write_pos 
+= w;   
                    start_pos 
+= w;   
                    write_bytes 
+= w;   
                }
   
                   
                
//4.通知"读"线程   
                retry = 0;   
                icb.updateBuffer(index, write_bytes);   
            }
 catch (InterruptedException e) {   
                isalive 
= false;   
                icb.terminateWriters();   
                
break;   
            }
 catch (IOException e) {   
                
if(++retry == HttpReader.MAX_RETRY) {   
                    isalive 
= false;   
                    icb.terminateWriters();   
                    
break;   
                }
   
            }
   
        }
   
        icb.updateWriterCount();   
        
try {   
            hr.close();   
        }
 catch (Exception e) {}   
        hr 
= null;   
        buf 
= null;   
        icb 
= null;   
    }
   
  
}
  

 

 四、IRandomAccess接口:随机读取文件接口,BuffRandAcceURL类和BuffRandAcceFile类实现接口方法。BuffRandAcceFile类实现读取本地磁盘文件,这儿就不给出其源码了。

 

package instream;   
  
public interface IRandomAccess {   
    
public int read() throws Exception;   
    
public int read(byte b[]) throws Exception;   
    
public int read(byte b[], int off, int len) throws Exception;   
    
public int dump(int src_off, byte b[], int dst_off, int len) throws Exception;   
    
public void seek(long pos) throws Exception;   
    
public long length();   
    
public long getFilePointer();   
    
public void close();   
}
  

五、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。

关键是如何简单有效防止死锁?以下只是我的一次尝试,请指正。

/**
http://www.5a520.cn  http://www.bt285.cn
*/
 
package instream;   
  
import java.net.URL;   
import java.net.URLDecoder;   
import decode.Header;   
import tag.MP3Tag;   
import tag.TagThread;   
  
public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack {   
    
public static final int UNIT_LENGTH_BITS = 15;                  //32K   
    public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS;   
    
public static final int BUF_LENGTH = UNIT_LENGTH << 4;            //16块   
    public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;   
    
public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);   
    
private static final int MAX_WRITER = 8;   
    
private static long file_pointer;   
    
private static int read_pos;   
    
private static int fill_bytes;   
    
private static byte[] buf;      //同时也作读写同步锁:buf.wait()/buf.notify()   
    private static int[] buf_bytes;   
    
private static int buf_index;   
    
private static int alloc_pos;   
    
private static URL url = null;   
    
private static boolean isalive = true;   
    
private static int writer_count;   
    
private static int await_count;   
    
private long file_length;   
    
private long frame_bytes;   
       
    
public BuffRandAcceURL(String sURL) throws Exception {   
        
this(sURL,MAX_WRITER);   
    }
   
       
    
public BuffRandAcceURL(String sURL, int download_threads) throws Exception {   
        buf 
= new byte[BUF_LENGTH];   
        buf_bytes 
= new int[UNIT_COUNT];   
        url 
= new URL(sURL);   
           
        
//创建线程以异步方式解析ID3   
        new TagThread(url);   
           
        
//打印当前文件名   
        try {   
            String s 
= URLDecoder.decode(sURL, "GBK");   
            System.out.println(
"start>> " + s.substring(s.lastIndexOf("/"+ 1));   
            s 
= null;   
        }
 catch (Exception e) {   
            System.out.println(
"start>> " + sURL);   
        }
   
           
        
//创建"写"线程   
        for(int i = 0; i < download_threads; i++)   
            
new Writer(this, url, buf, i+1);   
        frame_bytes 
= file_length = HttpReader.getContentLength();   
        
if(file_length == 0{   
            Header.strLastErr 
= "连接URL出错,重试 " + HttpReader.MAX_RETRY + " 次后放弃。";   
            
throw new Exception("retry " + HttpReader.MAX_RETRY);   
        }
   
        writer_count 
= download_threads;   
           
        
//缓冲   
        try_cache();   
           
        
//跳过ID3 v2   
        MP3Tag mP3Tag = new MP3Tag();   
        
int v2_size = mP3Tag.checkID3V2(buf,0);   
        
if (v2_size > 0{   
            frame_bytes 
-= v2_size;   
            
//seek(v2_size):   
            fill_bytes -= v2_size;   
            file_pointer 
= v2_size;   
            read_pos 
= v2_size;   
            read_pos 
&= BUF_LENGTH_MASK;   
            
int units = v2_size >> UNIT_LENGTH_BITS;   
            
for(int i = 0; i < units; i++{   
                buf_bytes[i] 
= 0;   
                
this.notifyWriter();   
            }
   
            buf_bytes[units] 
-= v2_size;   
            
this.notifyWriter();   
        }
   
        mP3Tag 
= null;   
    }
   
       
    
private void try_cache() throws InterruptedException {   
        
int cache_size = BUF_LENGTH;   
        
if(cache_size > (int)file_length - alloc_pos)   
            cache_size 
= (int)file_length - alloc_pos;   
        cache_size 
-= UNIT_LENGTH;   
           
        
//等待填写当前正在读的那"一块"缓冲区   
        /*if(fill_bytes >= cache_size && writer_count > 0) {  
            synchronized (buf) {  
                buf.wait();  
            }  
            return;  
        }
*/
  
           
        
//等待填满缓冲区   
        while (fill_bytes < cache_size) {   
            
if (writer_count == 0 || isalive == false)   
                
return;   
            
if(BUF_LENGTH > (int)file_length - alloc_pos)   
                cache_size 
= (int)file_length - alloc_pos - UNIT_LENGTH;   
            System.out.printf(
"\r[缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100);   
            
synchronized (buf) {   
                buf.wait();   
            }
   
        }
   
        System.out.printf(
"\r");   
    }
   
       
    
private int try_reading(int i, int len) throws Exception {   
        
int n = (i == UNIT_COUNT - 1? 0 : (i + 1);   
        
int r = (buf_bytes[i] == 0? 0 : (buf_bytes[i] + buf_bytes[n]);   
        
while (r < len) {   
            
if (writer_count == 0 || isalive == false)   
                
return r;   
            try_cache();   
            r 
= (buf_bytes[i] == 0? 0 : (buf_bytes[i] + buf_bytes[n]);   
        }
   
           
        
return len;   
    }
   
       
    
/*  
     * 各个"写"线程互斥等待空闲块  
     
*/
  
    
public synchronized boolean tryWriting(Writer w) throws InterruptedException {   
        await_count
++;   
        
while (buf_bytes[buf_index] != 0 && isalive) {   
            
this.wait();   
        }
   
           
        
//下载速度足够就结束一个"写"线程   
        if(writer_count > 1 && w.await_count >= await_count &&   
                w.await_count 
>= writer_count)   
            
return false;   
           
        
if(alloc_pos >= file_length)   
            
return false;   
        w.await_count 
= await_count;   
        await_count
--;   
        w.start_pos 
= alloc_pos;   
        w.index 
= buf_index;   
        alloc_pos 
+= UNIT_LENGTH;   
        buf_index 
= (buf_index == UNIT_COUNT - 1? 0 : buf_index + 1;   
        
return isalive;   
    }
   
       
    
public void updateBuffer(int i, int len) {   
        
synchronized (buf) {   
            buf_bytes[i] 
= len;   
            fill_bytes 
+= len;   
            buf.notify();   
        }
   
    }
   
       
    
public void updateWriterCount() {   
        
synchronized (buf) {   
            writer_count
--;   
            buf.notify();   
        }
   
    }
   
       
    
public synchronized void notifyWriter() {   
        
this.notifyAll();   
    }
   
       
    
public void terminateWriters() {   
        
synchronized (buf) {   
            
if (isalive) {   
                isalive 
= false;   
                Header.strLastErr 
= "读取文件超时。重试 " + HttpReader.MAX_RETRY   
                        
+ " 次后放弃,请您稍后再试。";   
            }
   
            buf.notify();   
        }
   
           
        notifyWriter();        
    }
   
       
    
public int read() throws Exception {   
        
int iret = -1;   
        
int i = read_pos >> UNIT_LENGTH_BITS;   
        
// 1."等待"有1字节可读   
        while (buf_bytes[i] < 1{   
            try_cache();   
            
if (writer_count == 0)   
                
return -1;   
        }
   
        
if(isalive == false)   
            
return -1;   
  
        
// 2.读取   
        iret = buf[read_pos] & 0xff;   
        fill_bytes
--;   
        file_pointer
++;   
        read_pos
++;   
        read_pos 
&= BUF_LENGTH_MASK;   
        
if (--buf_bytes[i] == 0)   
            notifyWriter();     
// 3.通知   
  
        
return iret;   
    }
   
       
    
public int read(byte b[]) throws Exception {   
        
return read(b, 0, b.length);   
    }
   
  
    
public int read(byte[] b, int off, int len) throws Exception {   
        
if(len > UNIT_LENGTH)   
            len 
= UNIT_LENGTH;   
        
int i = read_pos >> UNIT_LENGTH_BITS;   
           
        
// 1."等待"有足够内容可读   
        if(try_reading(i, len) < len || isalive == false)   
            
return -1;   
  
        
// 2.读取   
        int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH   
        if (tail_len < len) {   
            System.arraycopy(buf, read_pos, b, off, tail_len);   
            System.arraycopy(buf, 
0, b, off + tail_len, len - tail_len);   
        }
 else  
            System.arraycopy(buf, read_pos, b, off, len);   
  
        fill_bytes 
-= len;   
        file_pointer 
+= len;   
        read_pos 
+= len;   
        read_pos 
&= BUF_LENGTH_MASK;   
        buf_bytes[i] 
-= len;   
        
if (buf_bytes[i] < 0{   
            
int ni = read_pos >> UNIT_LENGTH_BITS;   
            buf_bytes[ni] 
+= buf_bytes[i];   
            buf_bytes[i] 
= 0;   
            notifyWriter();   
        }
 else if (buf_bytes[i] == 0)   
            notifyWriter();   
           
        
return len;   
    }
   
       
    
/*  
     * 从src_off位置复制,不移动文件"指针"  
     
*/
  
    
public int dump(int src_off, byte b[], int dst_off, int len) throws Exception {   
        
int rpos = read_pos + src_off;   
        
if(try_reading(rpos >> UNIT_LENGTH_BITS, len) < len || isalive == false)   
            
return -1;   
        
int tail_len = BUF_LENGTH - rpos;   
        
if (tail_len < len) {   
            System.arraycopy(buf, rpos, b, dst_off, tail_len);   
            System.arraycopy(buf, 
0, b, dst_off + tail_len, len - tail_len);   
        }
 else  
            System.arraycopy(buf, rpos, b, dst_off, len);   
        
// 不发信号   
  
        
return len;   
    }
   
       
    
public long length() {   
        
return file_length;   
    }
   
       
    
public long getFilePointer() {   
        
return file_pointer;   
    }
   
  
    
public void close() {   
        
//   
    }
   
       
    
//   
    public void seek(long pos) throws Exception {   
        
//   
    }
   
       
}
  

posted on 2009-06-22 22:25 funinhand 阅读(1868) 评论(0)  编辑  收藏