廉颇老矣,尚能饭否

java:从技术到管理

常用链接

统计

最新评论

Apache ActiveMQ学习笔记【mq的方式有两种:点到点和发布/订阅】

.简介ActiveMQ

ActiveMQ 是最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现

.下载ActiveMQ

首先去http://activemq.apache.org/download.html 下载稳定版本4.1.0release

Download Here

Description ==Download Link ==PGP Signature file of download

Binary for Windows== apache-activemq-4.1.0-incubator.zip== incubator-activemq-4.1.0.zip.asc

Binary-for-Unix/Linux/Cygwin==apache-activemq-4.1.0-incubator.tar.gz==incubator-activemq-4.1.0.tar.gz.as 

解压后目录如下:

+bin (windows下面的bat和unix/linux下面的sh)

+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)

+data (默认是空的)

+docs (index,replease版本里面没有文档,-.-b不知道为啥不带)

+example (几个例子

+lib (activemMQ使用到的lib)

-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)

-LICENSE.txt

-NOTICE.txt

-README.txt

-user-guide.html

.启动ActiveMQ

可以使用bin\activemq.bat(activemq) 启动,如果一切顺利,你就会看见类似下面的信息(此处解压到D盘根目录下).几个小提示

1. 这个仅仅是最基础的ActiveMQ的配置,很多地方都没有配置因此不要直接使用这个配置用于生产系统

2. 有的时候由于端口被占用,导致ActiveMQ错误,ActiveMQ可能需要以下端口1099(JMX),61616(默认的TransportConnector)

3. 如果没有物理网卡,或者MS的LoopBackAdpater Multicast会报一个错误

.监控ActiveMQ

启动JDK自带的java控制台查看程序查看客户端(如C:\jdk1.6.0_07\bin\jconsole.exe)

远程进程:127.0.0.1:1099

. 测试ActiveMQ

由于ActiveMQ是一个独立的jms provider,所以我们不需要其他任何第三方服务器就可以马上做我们的测试了.编译example目录下面的程序ProducerTool/ConsumerTool 是JMS参考里面提到的典型应用,Producer产生消息,Consumer消费消息,而且这个例子还可以加入参数帮助你测试刚才启动的本地ActiveMQ或者是远程的ActiveMQ

ProducerTool [url] broker的地址,默认的是tcp://localhost:61616

[true|flase] 是否使用topic,默认是false

[subject] subject的名字,默认是TOOL.DEFAULT

[durabl] 是否持久化消息,默认是false

[messagecount] 发送消息数量,默认是10

[messagesize] 消息长度,默认是255

[clientID] durable为true的时候,需要配置clientID

[timeToLive] 消息存活时间

[sleepTime] 发送消息中间的休眠时间

[transacte] 是否采用事务

ConsumerTool [url] broker的地址,默认的是tcp://localhost:61616

[true|flase] 是否使用topic,默认是false

[subject] subject的名字,默认是TOOL.DEFAULT

[durabl] 是否持久化消息,默认是false

[maxiumMessages] 接受最大消息数量,0表示不限制

[clientID] durable为true的时候,需要配置clientID

[transacte] 是否采用事务

[sleepTime] 接受消息中间的休眠时间,默认是0,onMeesage方法不休眠

[receiveTimeOut] 接受超时

.使用应用程序发送点到点消息队列

TestSender类

package test;

import javax.jms.DeliveryMode;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.Session;

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.FileSystemXmlApplicationContext;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.jms.core.MessageCreator;

public class TestSender {

    public static void main(String[] args) {

        ApplicationContext ctx = new FileSystemXmlApplicationContext("conf/applicationContext.xml");

        JmsTemplate template = (JmsTemplate) ctx.getBean("myJmsTemplate");

        template.setDeliveryMode(DeliveryMode.PERSISTENT);

        template.send(new MessageCreator() {

            public Message createMessage(Session session) throws JMSException {

                Message message = session.createTextMessage();

                message.setStringProperty("name", "wangwu");

                message.setStringProperty("password", "ww");

                System.out.println("send success");

                return message;

            }

        });

    }

}

applicationContext.xml配置文件

<beans xmlns="http://www.springframework.org/schema/beans"

    xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"

        destroy-method="stop">

        <property name="connectionFactory">

            <bean class="org.apache.activemq.ActiveMQConnectionFactory">

                <property name="brokerURL" value="tcp://127.0.0.1:61616" />

            </bean>

        </property>

    </bean>

    <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">

        <property name="defaultDestinationName" value="Hello.Queue" />

        <property name="connectionFactory">

            <ref local="jmsFactory" />

        </property>

    </bean>

</beans>

. 使用应用程序接受点到点消息队列

ExampleListener类

package test;

import javax.jms.JMSException;

import javax.jms.MapMessage;

import javax.jms.Message;

import javax.jms.ObjectMessage;

import javax.jms.Session;

import javax.jms.StreamMessage;

import javax.jms.TextMessage;

 

import org.springframework.jms.listener.SessionAwareMessageListener;

public class ExampleListener implements SessionAwareMessageListener {

    public void onMessage(Message message, Session session) throws JMSException {

        if(message instanceof TextMessage) {

            System.out.println("TextMessage begin");

            System.out.println("name = " + message.getStringProperty("name"));

            System.out.println("password = " +message.getStringProperty("password"));

        }

        if (message instanceof ObjectMessage) {

            System.out.println("ObjectMessage");

        } else if (message instanceof TextMessage) {

            System.out.println("TextMessage");

        } else if (message instanceof StreamMessage) {

            System.out.println("StreamMessage");

        } else if (message instanceof MapMessage) {

            System.out.println("MapMessage");

        }

    }

}

TestReceiver类

package test;

import javax.jms.JMSException;

import org.springframework.context.support.FileSystemXmlApplicationContext;

public class TestReceiver {

    public static void main(String[] args) throws JMSException {

        new FileSystemXmlApplicationContext("conf/context.xml");

    }

}

context.xml配置文件.

<beans xmlns="http://www.springframework.org/schema/beans"

    xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

    <bean id="ExampleListener" class="test.ExampleListener" />

    <bean id="destinationa" class="org.apache.activemq.command.ActiveMQQueue">

        <constructor-arg>

            <value>Hello.Queue</value>

        </constructor-arg>

    </bean>

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"

        destroy-method="stop">

        <property name="connectionFactory">

            <bean class="org.apache.activemq.ActiveMQConnectionFactory">

                <property name="brokerURL" value="tcp://127.0.0.1:61616" />

            </bean>

        </property>

    </bean>

    <bean id="listenerContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">

        <property name="concurrentConsumers" value="1" />

        <property name="connectionFactory" ref="jmsFactory" />

        <property name="destination" ref="destinationa" />

        <property name="messageListener" ref="ExampleListener" />

    </bean>

</beans>

. 使用WEB程序发送点到点消息队列

TestWebSender类

package com.jl.material.adapter.impl;

import javax.jms.JMSException;

import com.jl.framework.jms.JmsManager;

import com.jl.framework.jms.JmsManagerFactory;

import com.jl.framework.jms.MessageCreateable;

import com.jl.framework.jms.MessageCreatorDefault;

import com.jl.framework.jms.util.JmsUtil;

import com.jl.material.util.MaterialConstants;

public class TestWebSender{

    public void sender(String name, String password) {

        MessageCreateable mc = new MessageCreatorDefault();       

        mc.setStringProperty("name", name);

        mc.setStringProperty("password", password);       

        JmsManagerFactory jmsManagerFactory = new JmsManagerFactory();

        JmsManager jmsTXManager = jmsManagerFactory.createJmsManager(MaterialConstants.MATERIAL_MODULE_NAME);

        try {

            jmsTXManager.send(JmsUtil.getDestinationFromConfig("quality_synVerifyBatch_queue_M2Q"), mc);

            jmsTXManager.commit();

        } catch (JMSException e) {

            jmsTXManager.rollback();

            throw new RuntimeException(e);

        }

    }

}

. 使用WEB程序接受点到点消息队列

TestWebReceiver类

package com.jl.material.adapter.impl;

import javax.jms.MapMessage;

import javax.jms.ObjectMessage;

import javax.jms.StreamMessage;

import javax.jms.TextMessage;

import com.jl.framework.jms.util.support.JMSCallbackable;

public class TestWebReceiver implements JMSCallbackable {

    public void mdCallback(Object TextMessage_textMessage) {}

    public void logJMSMessageInfo(String arg0) {}

    public void mdCallback(ObjectMessage arg0) throws Exception {}

    public void mdCallback(TextMessage textMessage) throws Exception {

        String name = textMessage.getStringProperty("name");

        String password = textMessage.getStringProperty("password");

        System.out.println("name = " + name);

        System.out.println("password = " + password);

    }

    public void mdCallback(StreamMessage arg0) throws Exception {}

    public void mdCallback(MapMessage arg0) throws Exception {}

}

test_JMS_Spring_Listener.xml配置文件

<?xml version="1.0" encoding="UTF-8"?>

<beans>

    <bean id="test_testWebReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">

       <property name="jmsCallbackable">

           <bean class="com.jl.material.adapter.impl.TestWebReceiver "/>

       </property>

    </bean>

    <bean id="test_testWebReceiver _queue" class="org.apache.activemq.command.ActiveMQQueue">

       <constructor-arg>

           <value>${quality_assayVerifyBatch_queue_Q2M}</value>

       </constructor-arg>

    </bean>

 

    <bean id="test_listenerContainerA"

       class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">

        <property name="concurrentConsumers" value="1" />

       <property name="connectionFactory" ref="jmsRecieveFactory" />

       <property name="destination" ref=" test_testWebReceiver _queue ">

       <property name="messageListener" ref=" test_testWebReceiver " />

       <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>

    </bean>   

</beans>

jms.properties资源文件

jms.sendip=tcp://10.1.1.40:61616,tcp://activemq:61616

jms.listenip=tcp://10.1.1.40:61616,tcp://activemq:61616

quality_assayVerifyBatch_queue_Q2M=

Quality_Assay_VerifyBatch_Q2M.Queue

context.xml配置文件

<?xml version='1.0' encoding='utf-8'?>

<Context path="/identity" reloadable="false">

    <Environment name="jms/jms.sendip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />

    <Environment name="jms/jms.listenip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />

</Context>

十. 使用发布/订阅方式
使用该方式的发布方基本与点到点方式一样,区别只在队列名的后缀从 .Queue 变成了 .Topic
区别主要在接收方配置文件
<bean id="pound_removeBindingInfoReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">
    <property name="jmsCallbackable">
        <bean class="com.jl.pound.adapter.impl.RemoveBindingInfoReceiver">
         <property name="initialInfomationService" ref="initialInfomationService" />
         <property name="productPoundServiceFacade" ref="productPoundServiceFacade" />
        </bean>
    </property>
 </bean>

 <bean id="pound_removeBindingInfomation_topic" class="org.apache.activemq.command.ActiveMQTopic">
   <constructor-arg><value>${CraftPound_RemoveBindingInfomation}</value></constructor-arg>
 </bean>

 <bean id="pound_listenerContainerB"
     class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">
      <property name="concurrentConsumers" value="1"/>
      <property name="connectionFactory" ref="jmsRecieveFactory" />
      <property name="destination" ref="pound_removeBindingInfomation_topic" />
      <property name="messageListener" ref="pound_removeBindingInfoReceiver" />
      <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>
      <property name="subscriptionDurable" value="true"></property>
      <property name="pubSubDomain" value="true"></property>
      <!-- <property name="clientId" value="100554"></property> -->
      <property name="clientId" ref="generateClientIdentityCode1"></property>
 </bean>

 <bean id="generateClientIdentityCode1"
   class="com.jl.pound.security.GenerateClientIdentityCode">
   <property name="prefix" value="1"></property>
 </bean>

import org.apache.log4j.Logger;
import org.springframework.beans.factory.FactoryBean;

import com.**.util.NetworkUtils;

public class GenerateClientIdentityCode implements FactoryBean {

    /**
     * LOGGER for sample classify delegate
     */
    private static final Logger LOGGER = Logger.getLogger(GenerateClientIdentityCode.class);
   
    /**
     * prefix
     */
    private String prefix;

 /**
  * @return the prefix
  */
 public String getPrefix() {
  return prefix;
 }

 /**
  * @param prefix the prefix to set
  */
 public void setPrefix(String prefix) {
  this.prefix = prefix;
 }
 
    public String generateIdentityCode() {
        String mac = NetworkUtils.getMACAddress();
        mac = prefix + mac;
        LOGGER.info("this client identity code is:" + mac);
        return mac;
    }

 public Object getObject() throws Exception {
  return generateIdentityCode();
 }

 public Class getObjectType() {
  return String.class;
 }

 public boolean isSingleton() {
  return false;
 }
}

在JMS中,Topic实现publish和subscribe语义。一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。

JMS Queue执行load balancer语义。一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡

可以使用queue方式发送注册邮件 好友动态数据等

<一>表说明:
当在启动ActiveMQ时,先判断表是否存在,如果不存在,将去创建表,如下:
(1)ACTIVEMQ_ACKS:持久订阅者列表
1.CONTAINER:类型://主题
如:topic://basicInfo.topic
2.SUB_DEST:应该是描述,与1内容相同
3.CLIENT_ID:持久订阅者的标志ID,必须唯一
4.SUB_NAME:持久订阅者的名称.(durableSubscriptionName)
5.SELECTOR:消息选择器,consumer可以选择自己想要的
6.LAST_ACKED_ID:最后一次确认ID,这个字段存的该该订阅者最后一次收到的消息的ID

(2)ACTIVEMQ_LOCK:进行数据访问的排斥锁
1.ID:值为1
2.TIME:时间
3.BROKER_NAME:broker的名称
   这个表似为集群使用,但现在ActiveMQ并不能共享数据库.

(3)ACTIVEMQ_MSGS:存储Queue和Topic消息的表
1.ID:消息的ID
2.CONTAINER: 类型://主题
如:queue://my.queue
Topic://basicInfo.topic
3.MSGID_PROD:发送消息者的标志
MSGID_PROD =ID:[computerName][…..]
注意computerName,不要使用中文,消息对象中会存储这个部分,解析connectID时会出现Bad String错误.
4.MSGID_SEQ:还不知用处
5.EXPIRATION:到期时间.
6.MSG:消息本身,Blob类型.
可以在JmsTemplate发送配置中,加上<property name=”timeToLive” value=”432000000”/>,5天的生命期,如果消息一直没有被处理,消息会被删除,但是表中会存在CONTAINER为queue://ActiveMQ.DLQ的记录.也就是说,相当于将过期的消息发给了一个ActiveMQ自定义的删除队列..

<二>关于ActiveMQ的持久订阅消息删除操作
1.主题消息只有一条,所有订阅了这个消息的持久订阅者都要收到消息,只有所有订阅者收到消息并确认(Acknowledge)之后.才会删除.
说明:ActiveMQ支持批量(optimizeAcknowledge为true)确认,以提高性能
2.ActiveMQ执行删除Topic消息的cleanup()操作的时间间隔为5 minutes..



柳德才
13691193654
18942949207
QQ:422157370
liudecai_zan@126.com
湖北-武汉-江夏-庙山

posted on 2009-04-08 11:18 liudecai_zan@126.com 阅读(18444) 评论(3)  编辑  收藏 所属分类: 程序人生

评论

# re: Apache ActiveMQ学习笔记【mq的方式有两种:点到点和发布/订阅】 2011-07-01 11:16 easy518网址导航

http://www.easy518.com  回复  更多评论   

# re: Apache ActiveMQ学习笔记【mq的方式有两种:点到点和发布/订阅】 2012-08-17 11:40 geek

好文章  回复  更多评论   

# re: Apache ActiveMQ学习笔记【mq的方式有两种:点到点和发布/订阅】 2016-08-03 13:32 zcf

大神 ActiveMQ支不支持大消息拆分呢,求解释  回复  更多评论   


只有注册用户登录后才能发表评论。


网站导航: