posts - 155,  comments - 601,  trackbacks - 0
The Cafe Sample(小卖部订餐例子)

    小卖部有一个订饮料服务,客户可以通过订单来订购所需要饮料。小卖部提供两种咖啡饮料
        LATTE(拿铁咖啡)和MOCHA(摩卡咖啡)。每种又都分冷饮和热饮
    整个流程如下:
        1.有一个下订单模块,用户可以按要求下一个或多个订单。
        2.有一个订单处理模块,处理订单中那些是关于订购饮料的。
        3.有一个饮料订购处理模块,处理拆分订购的具体是那些种类的饮料,把具体需要生产的饮料要求发给生产模块
        4.有一个生产模块
   
       
    这个例子利用Spring Integration实现了灵活的,可配置化的模式集成了上述这些服务模块。
   
    先来看一下配置文件
  
<beans:beans xmlns="http://www.springframework.org/schema/integration"
    xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:beans
="http://www.springframework.org/schema/beans"
    xmlns:context
="http://www.springframework.org/schema/context"
    xsi:schemaLocation
="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-2.5.xsd"
>

    
<!-- 启动Message bus 消息服务总线 支持四个属性 
            auto-startup[boolean是否自动启动 default=true]如果设置false,则需要手动调用applicationContext.start()方法
            auto-create-channels[boolean是否自动注册MessageChannel default=false],如果使用的MessagChannle不存在
            error-channel 设置错误时信息发送的MessageChannle,如果不设置,则使用DefaultErrorChannel
            dispatcher-pool-size 使用的启动线程数,默认为10
-->
    
<message-bus/>
    
<!-- 启动支持元数据标记 -->
    
<annotation-driven/>
    
<!-- 设置 @Component标识的元数据扫描包(package) -->
    
<context:component-scan base-package="org.springframework.integration.samples.cafe"/>

        
<!-- 下面启动了四个 MessageChannel服务 处理接收发送端发过来的消息和把消息流转到消息的消费端 -->
        
<!-- 属性说明: capacity 消息最大容量默认为100 publish-subscribe是否是发布订阅模式,默认为否
                                        id bean的id名称 datatype ? 
-->
    
<channel id="orders"/> <!-- 订单Channel -->
    
<channel id="drinks"/> <!-- 饮料订单Channel,处理饮料的类别 -->
    
<channel id="coldDrinks"/> <!-- 饮生产Channel -->
    
<channel id="hotDrinks"/> <!-- 饮生产Channel -->

        
<!-- 消息处理终端 接收 channel coldDrinks的消息后,执行barista.prepareColdDrink方法 生产冷饮 -->
        
<!-- 属性说明: input-channel 接收消息的Channel必须 default-output-channel设置默认回复消息Channel
                                        handler-ref 引用bean的id名称 handler-method Handler处理方法名(参数类型必须与发送消息的payLoad使用的一致)
                                        error-handler设置错误时信息发送的MessageChannle   reply-handler 消息回复的Channel 
-->
    
<endpoint input-channel="coldDrinks" handler-ref="barista"
                                         handler-method
="prepareColdDrink"/>

        
<!-- 消息处理终端 接收 channel hotDrinks的消息后,执行barista.prepareHotDrink方法 生产热饮 -->
    
<endpoint input-channel="hotDrinks" handler-ref="barista"
                                        handler-method
="prepareHotDrink"/>

        
<!-- 定义一个启动下定单操作的bean,它通过 channel orders下定单 -->
    
<beans:bean id="cafe" class="org.springframework.integration.samples.cafe.Cafe">
        
<beans:property name="orderChannel" ref="orders"/>
    
</beans:bean>
</beans:beans>
   
    下面我们来看一下源代码目录:
   
   
    我们来看一下整体服务是怎么启动的
   
    首先我们来看一下CafeDemo这个类,它触发下定单操作、
   
  
 1 public class CafeDemo {
 2 
 3         public static void main(String[] args) {
 4             //加载Spring 配置文件
 5             AbstractApplicationContext context = null;
 6             if(args.length > 0) {
 7                 context = new FileSystemXmlApplicationContext(args);
 8             }
 9             else {
10                 context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class);
11             }
12             //启动 Spring容器(启动所有实现 org.springframework.context.Lifecycle接口的实现类的start方法)
13             context.start();
14             //从Spring容器 取得cafe实例
15             Cafe cafe = (Cafe) context.getBean("cafe");
16             DrinkOrder order = new DrinkOrder();
17             //一杯热饮               参数说明1.饮料类型 2.数量 3.是否是冷饮(true表示冷饮)
18             Drink hotDoubleLatte = new Drink(DrinkType.LATTE, 2false);
19             Drink icedTripleMocha = new Drink(DrinkType.MOCHA, 3true);
20             order.addDrink(hotDoubleLatte);
21             order.addDrink(icedTripleMocha);
22             //下100个订单
23             for (int i = 0; i < 100; i++) {
24                 //调用cafe的placeOrder下订单
25                 cafe.placeOrder(order);
26             }
27         }
28     }

   
    下面是Cafe的源代码
   
 1 public class Cafe {
 2 
 3         private MessageChannel orderChannel;
 4    
 5    
 6         public void setOrderChannel(MessageChannel orderChannel) {
 7             this.orderChannel = orderChannel;
 8         }
 9        
10         //其实下订单操作,调用的是orderChannel(orders channel)的send方法,把消息发出去
11         public void placeOrder(DrinkOrder order) {
12             this.orderChannel.send(new GenericMessage<DrinkOrder>(order));
13                 //GenericMessage有三个构建方法,参考如下
14                 //new GenericMessage<T>(Object id, T payload);
15                 //new GenericMessage<T>(T payload);
16                 //new GenericMessage<T>(T payload, MessageHeader headerToCopy)
17         }
18     }

   
     
    下面我们来看一下哪个类标记有@MessageEndpoint(input="orders") 表示它会消费orders Channel的消息
    我们发现OrderSplitter类标记这个元数据,下面是源代码,我们来分析
   
       
 1 //标记 MessageEndpoint 元数据, input表示 设置后所有 orders Channel消息都会被OrderSplitter收到
 2         @MessageEndpoint(input="orders")
 3         public class OrderSplitter {
 4        
 5             //@Splitter表示,接收消息后,调用这个类的该方法. 其的参数类型必须与message的 payload属性一致。
 6             //即在new GenericMessage<T>的泛型中指定
 7             //元数据设置的 channel属性表示,方法执行完成后,会把方法返回的结果保存到message的payload属性后,发送到指定的channel中去
 8             //这里指定发送到 drinks channel
 9             @Splitter(channel="drinks")
10             public List<Drink> split(DrinkOrder order) {
11                 return order.getDrinks(); //方法中,是把订单中的饮料订单取出来
12             }
13         }

   
    接下来,与找OrderSplitter方法相同,我们要找哪个类标记有@MessageEndpoint(input="drinks") 表示它会消费drinks Channel的消息
    找到DrinkRouter这个类
   
      
 1 @MessageEndpoint(input="drinks")
 2         public class DrinkRouter {
 3        
 4             //@Router表示,接收消息后,调用这个类的该方法. 其的参数类型必须与message的 payload属性一致。
 5             //方法执行完毕后,其返回值为 在容器中定义的channel名称。channel名称必须存在
 6             @Router
 7             public String resolveDrinkChannel(Drink drink) {
 8                 return (drink.isIced()) ? "coldDrinks" : "hotDrinks"//方法中,是根据处理饮料是否是冷饮,送不同的channel处理
 9             }
10         }

       
       
备注:@Router可以把消息路由到多个channel,实现方式如下
            @Router
            
public MessageChannel route(Message message) {}
           
            @Router
            
public List<MessageChannel> route(Message message) {}
           
            @Router
            
public String route(Foo payload) {}
           
            @Router
            
public List<String> route(Foo payload) {}

       
       
        接下来,我们就要找 MessageEndpoint 标记为处理 "coldDrinks" 和 "hotDrinks" 的类,我们发现
        这个两个类并不是通过元数据@MessageEndpoint来实现的,而是通过容器配置
        (下面会演示如何用元数据配置,但元数据配置有局限性。这两种配置方式看大家喜好,系统中都是可以使用)

        下面是容器配置信息:
       
<!-- 消息处理终端 接收 channel coldDrinks的消息后,执行barista.prepareColdDrink方法 生产冷饮 -->
    
<endpoint input-channel="coldDrinks" handler-ref="barista"
                                         handler-method
="prepareColdDrink"/>

        
<!-- 消息处理终端 接收 channel hotDrinks的消息后,执行barista.prepareHotDrink方法 生产热饮 -->
    
<endpoint input-channel="hotDrinks" handler-ref="barista"
                                        handler-method
="prepareHotDrink"/>

                                       
        我们来看一下源代码:
       
 1 @Component //这个必须要有,表示是一个消息处理组件
 2         public class Barista {
 3        
 4             private long hotDrinkDelay = 1000;
 5        
 6             private long coldDrinkDelay = 700;
 7        
 8             private AtomicInteger hotDrinkCounter = new AtomicInteger();
 9            
10             private AtomicInteger coldDrinkCounter = new AtomicInteger();
11        
12        
13             public void setHotDrinkDelay(long hotDrinkDelay) {
14                 this.hotDrinkDelay = hotDrinkDelay;
15             }
16        
17             public void setColdDrinkDelay(long coldDrinkDelay) {
18                 this.coldDrinkDelay = coldDrinkDelay;
19             }
20        
21             public void prepareHotDrink(Drink drink) {
22                 try {
23                     Thread.sleep(this.hotDrinkDelay);
24                 } catch (InterruptedException e) {
25                     Thread.currentThread().interrupt();
26                 }
27                 System.out.println("prepared hot drink #" + hotDrinkCounter.incrementAndGet() + "" + drink);
28             }
29        
30             public void prepareColdDrink(Drink drink) {
31                 try {
32                     Thread.sleep(this.coldDrinkDelay);
33                 } catch (InterruptedException e) {
34                     Thread.currentThread().interrupt();
35                 }
36                 System.out.println("prepared cold drink #" + coldDrinkCounter.incrementAndGet() + "" + drink);
37             }
38        
39         }

       
        如果要用元数据标识实现上述方法:
        要用元数据配置,它不像容器配置,可以在一个类中,支持多个不同的Handler方法。以处理prepareColdDrink方法为例
       
 1 @MessageEndpoint(input="coldDrinks"//加了该元数据,它会自动扫描,并作为@Componet标记处理
 2         public class Barista {
 3        
 4             private long hotDrinkDelay = 1000;
 5        
 6             private long coldDrinkDelay = 700;
 7        
 8             private AtomicInteger hotDrinkCounter = new AtomicInteger();
 9            
10             private AtomicInteger coldDrinkCounter = new AtomicInteger();
11        
12        
13             public void setHotDrinkDelay(long hotDrinkDelay) {
14                 this.hotDrinkDelay = hotDrinkDelay;
15             }
16        
17             public void setColdDrinkDelay(long coldDrinkDelay) {
18                 this.coldDrinkDelay = coldDrinkDelay;
19             }
20        
21             public void prepareHotDrink(Drink drink) {
22                 try {
23                     Thread.sleep(this.hotDrinkDelay);
24                 } catch (InterruptedException e) {
25                     Thread.currentThread().interrupt();
26                 }
27                 System.out.println("prepared hot drink #" + hotDrinkCounter.incrementAndGet() + "" + drink);
28             }
29            
30             @Handler//回调处理的方法
31             public void prepareColdDrink(Drink drink) {
32                 try {
33                     Thread.sleep(this.coldDrinkDelay);
34                 } catch (InterruptedException e) {
35                     Thread.currentThread().interrupt();
36                 }
37                 System.out.println("prepared cold drink #" + coldDrinkCounter.incrementAndGet() + "" + drink);
38             }
39         }   

       
        这样整个流程就执行完了,最终我们的饮料产品就按照订单生产出来了。累了吧,喝咖啡提神着呢!!!


初充:
下面是针对 Spring Integration adapter扩展的学习笔记

JMS Adapters
jms adapters 目前有两种实现
JmsPollingSourceAdapter 和 JmsMessageDrivenSourceAdapter. 前者是使用Srping的JmsTemplate模板类通过轮循的方式接收消息
后者是使用则通过代理Spring的DefaultMessageListenerContainer实例,实现消息驱动的方式。

xml配置如下:
JmsPollingSourceAdapter
<bean class="org.springframework.integration.adapter.jms.JmsPollingSourceAdapter">
    <constructor-arg ref="jmsTemplate"/>
    <property name="channel" ref="exampleChannel"/>
    <property name="period" value="5000"/> <!-- 轮循时间间隔 -->
    <property name="messageMapper" ref=""/> <!-- message转换 -->
</bean>

<!-- 备注:消息的转换方式如下:
收到JMS Message消息后,SourceAdapter会调用Spring的MessageConverter实现类,把javax.jms.Message对象
转换成普通Java对象,再调用Spring Integration的MessageMapper把该对象转成 org.springframework.integration.message.Message对象 -->

JmsMessageDrivenSourceAdapter
<bean class="org.springframework.integration.adapter.jms.JmsMessageDrivenSourceAdapter">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" value="exampleQueue"/>
    <property name="channel" ref="exampleChannel"/>
    <property name="messageConverter" ref=""/> <!-- jms消息对象转换 -->
    <property name="messageMapper" ref="" /> <!-- 普通java对象转换成 Spring Integration Message -->
    <property name="sessionAcknowledgeMode" value="1" />
    <!-- sesssion回复模式 AUTO_ACKNOWLEDGE=1 CLIENT_ACKNOWLEDGE=2  DUPS_OK_ACKNOWLEDGE=3 SESSION_TRASACTED=0-->
</bean>

另外还有一个比较有用的类JmsTargetAdapter 它实现了MessageHandler接口。它提把Spring Integration Message对象转换成
JMS消息并发送到指定的消息队列。与JMS服务连接的实现可以通过设定 jmsTemplate属性引用或是connectionFactory和destination
或destinationName属性。

<bean class="org.springframework.integration.adapter.jms.JmsTargetAdapter">
        <constructor-arg ref="connectionFactory"/>
        <constructor-arg value="example.queue"/>
<!--或是以下配置
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" value="exampleQueue"/>
    或是
    <constructor-arg ref="jmsTemplate"/> -->

</bean>


Good Luck!
Yours Matthew!
posted on 2008-05-28 11:49 x.matthew 阅读(8203) 评论(6)  编辑  收藏 所属分类: Spring|Hibernate|Other framework

只有注册用户登录后才能发表评论。


网站导航: