天道酬勤

点点滴滴的足迹,幻化成漫天的云彩
posts - 22, comments - 0, trackbacks - 0, articles - 2
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

ActiveMQ之三 -- 使用ActiveMQ来传送文件

Posted on 2011-08-28 23:40 匆匆过客 阅读(5230) 评论(0)  编辑  收藏 所属分类: Java
这个方法还有待研究,目前还有如下几个疑点:
1. ActiveMQ 报出这样的信息:
INFO | Usage Manager memory limit (1048576) reached for topic://EXCHANGE.FILE. Producers will be throttled to the rate at which messages are removed from this
destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info
2. 这种以异步方式传送资料,能保证客户端能以正确的顺序接收到文件段麽?

使用ActiveMQ传送文件,发送端必须将文件拆成一段一段,每段封装在独立的Message中,逐次发送到客户端。例如下面的例子,Producer通过发送命令,告诉文件传送的开始,发送中,结束。客户端接收到这些命令之后,就知道如何接收资料了。

客户端收到内容后,根据命令将内容合并到一个文件中。 
package org.apache.activemq.exchange.file;

import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

    
/**
     * 
@param args
     
*/
    
public static void main(String[] args) throws JMSException, IOException {
        ConnectionFactory factory 
= new ActiveMQConnectionFactory("tcp://localhost:61616");

        Connection connection 
= factory.createConnection();
        connection.start();

        Session session 
= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Destination destination 
= session.createTopic("EXCHANGE.FILE");

        MessageConsumer consumer 
= session.createConsumer(destination);

        
boolean appended = false;
        
try {
            
while (true) {
                Message message 
= consumer.receive(5000);
                
if (message == null) {
                    
continue;
                }

                
if (message instanceof StreamMessage) {
                    StreamMessage streamMessage 
= (StreamMessage) message;
                    String command 
= streamMessage.getStringProperty("COMMAND");
                    
                    
if ("start".equals(command)) {
                        appended 
= false;
                        
continue;
                    }

                    
if ("sending".equals(command)) {
                        
byte[] content = new byte[4096];
                        String file_name 
= message.getStringProperty("FILE_NAME");
                        BufferedOutputStream bos 
= null;
                        bos 
= new BufferedOutputStream(new FileOutputStream("c:/" + file_name, appended));
                        
if (!appended) {
                            appended 
= true;
                        }
                        
while (streamMessage.readBytes(content) > 0) {
                            bos.write(content);
                        }
                        bos.close();
                        
continue;
                    }

                    
if ("end".equals(command)) {
                        appended 
= false;
                        
continue;
                    }
                }
            }
        } 
catch (JMSException e) {
            
throw e;
        } 
finally {
            
if (connection != null) {
                connection.close();
            }
        }

    }

}

发送端将文件分包,逐次发送到客户端 
package org.apache.activemq.exchange.file;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Publisher {

    
public static String FILE_NAME = "01.mp3";
    
    
public static void main(String[] args) throws JMSException, IOException {
        ConnectionFactory factory 
= new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection 
= factory.createConnection();
        connection.start();
        Session session 
= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination 
= session.createTopic("EXCHANGE.FILE");        
        MessageProducer producer 
= session.createProducer(destination);
        
long time = System.currentTimeMillis();
        
        
//通知客户端开始接受文件
        StreamMessage message = session.createStreamMessage();
        message.setStringProperty(
"COMMAND""start");
        producer.send(message);
        
        
//开始发送文件
        byte[] content = new byte[4096];
        InputStream ins 
= Publisher.class.getResourceAsStream(FILE_NAME);
        BufferedInputStream bins 
= new BufferedInputStream(ins);
        
while (bins.read(content) > 0) {
            
//
            message = session.createStreamMessage();
            message.setStringProperty(
"FILE_NAME", FILE_NAME);
            message.setStringProperty(
"COMMAND""sending");
            message.clearBody();
            message.writeBytes(content);
            producer.send(message);
        }
        bins.close();
        ins.close();
        
        
//通知客户端发送完毕
        message = session.createStreamMessage();
        message.setStringProperty(
"COMMAND""end");
        producer.send(message);
        
        connection.close();
        
        System.out.println(
"Total Time costed : " + (System.currentTimeMillis() - time) + " mili seconds");
    }
}

只有注册用户登录后才能发表评论。


网站导航: