@import url(http://www.blogjava.net/CuteSoft_Client/CuteEditor/Load.ashx?type=style&file=SyntaxHighlighter.css);@import url(/css/cuteeditor.css); 面向服务的架构和事件驱动的架构天生就有着对分布式系统的适应性,这些架构都有着模块性、松散耦合,和适应性等特性。
“面向服务”这个术语已经演变成一个架构,在那里服务作为一个软件组件嵌入在企业业务逻辑和特新的核心中,特性如下:
·        松散耦合:服务部与其它组件有着根深蒂固的关系
·        协议独立:多种协议透明访问
·        位置不可知:一个服务执行一组业务逻辑,针对这次调用返回一个结果
·        粗粒度:不论在什么位置均可访问该服务。
·        维护无用户状态


一个事件驱动框架(EDA)定义了一个设计和实现一个应用系统得方法学,在这个系统里事件可传输于松散耦合的软件组件和服务之间。一个事件驱动系统典型地由事件消费者和事件产生者组成。事件消费者向事件管理器订阅事件,事件产生者向事件管理器发布事件。当事件管理器从事件产生者那接收到一个事件时,事件管理把这个事件转送给相应的事件消费者。如果这个事件消费者是不可用的,事件管理这将保留这个事件,一段间隔之后再次转送该事件消费者。这种事件传送方法在基于消息的系统里就是:储存(store)和转送(forward)。

构建一个包含事件驱动构架的应用程序和系统,这样就使得这些应用程序和系统响应更灵敏,因为事件驱动的系统更适合应用在不可预知的和异步的环境里。

事件驱动设计和开发的优势:
事件驱动设计和开发所提供的优势如下:
·        可以更容易开发和维护大规模分布式应用程序和不可预知的服务或异步服务
·        可以很容易,低成本地集成、再集成、再配置新的和已存在的英勇程序和服务
·        促进远程组件和服务的再使用,拥有一个更灵敏、没有Bug的开发环境
·        短期利益:更容易定制。因为设计对动态处理又更好的响应。
·        长期利益:系统和组织的状态变得更精准,对实时变化的响应接近于同步

Mule是一个开源消息ESB框架,一个消息代理,一个分级事件驱动的框架(SEDA)。SEDA定义了一个依照分级队列、高度并行的企业级平台。Mule使用SED的概念增加事件处理的性能。

Mule事件对象
Mule事件对象包含事件数据和被组件所感知和操控的属性。属性是任意的,在事件创建之后任何时间可被设置。
org.mule.umo.UMOEvent类代表了一个在Mule环境中出现的时间。所有在组件之间发送或接收的数据都是org.mule.umo.UMOEvent的一个实体。可以访问一个原始的或被转换的Mule事件对象中的数据能。一个Mule事件对象使用一个与提供者管理的提供者转换数据,提供者收到数据后把事件中的有效载荷转换成当前组件所识别的格式。

一个Mule事件对象的有效载荷能通过org.mule.umo.UMOMessage接口访问,一个org.mule.umo.UMOMessage实例由有效载荷和它的属性组成。这个接口是不同技术实现的消息对象的一个抽象。

org.mule.extras.client.MuleClient类定义了一个简单的借口,允许Mule客户端从Mule服务器接收和发送事件数据。在大多数Mule应用程序里,事件是被一些外部的并发行为所触发,例如一个主题上接收到消息或在目录里一个文件被删除。

Mule支持同步、异步和请求响应事件,事件处理和传输实用不同的技术例如JMS,HTTP,电子邮件和基于XML的RPC。Mule能很容易地嵌入到任何应用框架中,明确支持Spring框架。Mule也支持动态的,预定义的,基于内容的和基于规则的消息路由。Mule使得预定义的和计划性的事务更容易,包括XA事务支持。Mule提供一个有代表性的状态调用(REST)API提供给与Web的事件访问。

Mule ESB模式驱动系统中所有服务,这个系统有着一个分离的消息通讯中枢。服务注册在总线上,但不知道其他任何被注册的消息;因此,每个服务只关心处理它收到的事件。Mule也把容器,传输,转换细节从服务中分离出来,允许任何对象作为服务注册到总线的。

下面演示了如何去发送一个同步事件到另外的Mule组件:
String componentName = "MyReceiver";     // The name of the receiving component. 
String transformers = null;                      // A comma-separated list of transformers
                                                         
// to apply to the result message. 
String payload = "A test event";              // The payload of the event. 
java.util.Map messageProperties = null;    // Any properties to be associated
                                                        
// with the payload.
MuleClient client = new MuleClient();
UMOMessage message 
= client.sendDirect(componentName,
                                       transformers,
                                       payload,
                                       messageProperties);
System.out.println(
"Event result: " + message.getPayloadAsString())

MuleClient类需要一个服务器URL区定义它所连接的远程Mule服务器的终端。URL定义了传输协议、接收消息的地址,提供者在派遣一个事件时可以随时使用这些信息。终端例示如下:
·        vm://com.jeffhanson.receivers.Default: 使用虚拟机的提供者派遣到一个com.jeffhanson.receivers.Default
·        jms://jmsProvider/accounts.topic:使用全局注册的jmsProvider派遣一个JMS消息到ccounts.topic.
·        jms://accounts.topic: 使用第一个(默认)的JMS提供者派遣JMS消息

Mule事件处理
Mule可以在三种不同的方式发送和接收事件:
1.异步方式:一个组件可通过不同的线程同时处理多个事件的发送和接收
2.同步方式:在一个组件重新工作之前,一个单一的事件必须被处理完。换言之,一个创建了事件的组件发送事件时将被阻断,直到发送任务完成,因此,一次只允许处理一个事件
3.请求-应答方式:一个组件专门请求一个事件,然后等待一个特定的时间去接收回应。
org.mule.impl.MuleComponent实现类提供了一个具体的组件类,它包括又有创建,发送和接收事件的功能。
执行同步动作的对象应该实现org.mule.umo.lifecycle.Callable接口,这个定义了一个简单的方法Object onCall(UMOEventContext eventContext)。Callable接口提供支持事件调用的UMO对象。虽然不是强制的,但这个接口提供了一个生命周期控制的方法,当实现这个接口的组建接收到一个消息时执行这个方法。下面展示了这个接口的简单实现。
import org.mule.umo.lifecycle.Callable;

public class EchoComponent
   
implements Callable
{
    
public Object onCall(UMOEventContext context) throws Exception
    {
        String msg 
= context.getMessageAsString();
        
// Print message to System.out
        System.out.println("Received synchronous message: " + msg);
        
// Echo transformed message back to sender
        return context.getTransformedMessage();
    }
}

    从onCall()方法可返回任何对象。当组件的UMOLifecycleAdapter接收这个对象时,它首先看看这个对象是否是一个UMOMessage;如果这个对象既不是UMOMessage也不是Null,那么以这个对象作为有效载荷去创建一个新的消息。这个新事件经由所配制的出站路有器发布,如果UMO对象已经配制了一个出站路由器,那么在UMOEventContext实例中不能调用setStopFurtherProcessing(true)方法

Mule使用的一个简单的事件框架
让我们把这几段Mule的代码放到一起去构建一个简单的事件框架。这个框架包含一个负责注册和注销事件的管理器,可以接收事件,和负责路由同步和异步消息到他们相应的服务。

Mule的虚拟机协议要求有一个放置事件管理器工作目录META-INF/services/org/mule/providers/vm路径下的可配制文件,配制文件为协议定义了大量的组件,例如连接器和调度工厂。配制文件的内容如下:
connector=org.mule.providers.vm.VMConnector
dispatcher.factory
=org.mule.providers.vm.VMMessageDispatcherFactory
message.receiver
=org.mule.providers.vm.VMMessageReceiver
message.adapter
=org.mule.providers.vm.VMMessageAdapter
endpoint.builder
=org.mule.impl.endpoint.ResourceNameEndpointBuilder

一个简单的接口定义了事件管理器的公有结构:
package com.jeffhanson.mule;

import org.mule.umo.FutureMessageResult;

public interface EventManager{
   
/**
    * Sends an event message synchronously to a given service.
    *
    * 
@param serviceName    The name of the service to which the event
    *                       message is to be sent.
    * 
@param payload        The content of the event message.
    * 
@return Object          The result, if any.
    * 
@throws EventException on error
    
*/
   
public Object sendSynchronousEvent(String serviceName,  Object payload) throws EventException;

   
/**
    * Sends an event message asynchronously to a given service.
    *
    * 
@param serviceName    The name of the service to which the event message is to be sent.
    * 
@param payload           The content of the event message.
    * 
@return FutureMessageResult The result, if any.
    * 
@throws EventException on error
    
*/
   
public FutureMessageResult sendAsynchronousEvent(String serviceName,  Object payload) throws EventException;

   
/**
    * Starts this event manager.
    
*/
   
public void start();

   
/**
    * Stops this event manager.
    
*/
   
public void stop();

   
/**
    * Retrieves the protocol this event manager uses.
    * 
@return
    
*/
   
public String getProtocol();

   
/**
    * Registers a service to receive event messages.
    *
    * 
@param serviceName      The name to associate with the service.
    * 
@param implementation   Either a container reference to the service   or a fully-qualified class name.
    
*/
   
public void registerService(String serviceName, String implementation) throws EventException;

   
/**
    * Unregisters a service from receiving event messages.
    *
    * 
@param serviceName  The name associated with the service to unregister.
    
*/
   
public void unregisterService(String serviceName)  throws EventException;
}

事件管理器类是被封装在一个工厂类里,因此,可以依据需要去改变它的实现而不会影响到它的客户端。事件管理器实现如下:
package com.jeffhanson.mule;

import org.mule.umo.*;
import org.mule.extras.client.MuleClient;
import org.mule.impl.endpoint.MuleEndpoint;
import org.mule.config.QuickConfigurationBuilder;

import java.util.HashMap;
import java.util.Map;

public class EventManagerFactory{
   
private static HashMap instances = new HashMap();
   
/**
    * Retrieves the event manager instance for a given protocol.
    *
    * 
@param protocol      The protocol to use.
    * 
@return EventManager The event manager instance.
    
*/
   
public static EventManager getInstance(String protocol){
      EventManager instance 
= (EventManager)instances.get(protocol);
      
if (instance == null){
         instance 
= new EventManagerImpl(protocol);
         instances.put(protocol, instance);
      }

      
return instance;
   }

   
/**
    * A concrete implementation for a simple event manager.
    
*/
   
private static class EventManagerImpl  implements EventManager {
      
private UMOManager manager = null;
      
private QuickConfigurationBuilder builder = null;
      
private MuleClient eventClient = null;
      
private String protocol = null;
      
private MuleEndpoint receiveEndpoint = null;
      
private MuleEndpoint sendEndpoint = null;

      
private EventManagerImpl(String protocol){
         
this.protocol = protocol;
      }

      
/**
       * Starts this event manager.
       
*/
      
public void start() {
         
try {
            builder 
= new QuickConfigurationBuilder();
            manager 
= builder.createStartedManager(true,  protocol + "tmp/events");
            eventClient 
= new MuleClient();
            receiveEndpoint 
= new MuleEndpoint(protocol + "tmp/events/receive");
            sendEndpoint 
= new MuleEndpoint(protocol + "tmp/events/send");
         }
         
catch (UMOException e) {
            System.err.println(e);
         }
      }

      
/**
       * Stops this event manager.
       
*/
      
public void stop() {
         
try {
            manager.stop();
         }
         
catch (UMOException e)  {
            System.err.println(e);
         }
      }

      
/**
       * Retrieves the protocol this event manager uses.
       * 
@return
       
*/
      
public String getProtocol()   {
         
return protocol;
      }

      
/**
       * Registers a service to receive event messages.
       *
       * 
@param serviceName      The name to associate with the service.
       * 
@param implementation   Either a container reference to the service
       *                         or a fully-qualified class name
       *                         to use as the component implementation.
       
*/
      
public void registerService(String serviceName,   String implementation) throws EventException  {
         
if (!manager.getModel().isComponentRegistered(serviceName))    {
            
try   {
               builder.registerComponent(implementation,  serviceName,   receiveEndpoint, sendEndpoint);
            }  
catch (UMOException e) {
               
throw new EventException(e.toString());
            }
         }
      }

      
/**
       * Unregisters a service from receiving event messages.
       *
       * 
@param serviceName  The name associated with the service to unregister.
       
*/
      
public void unregisterService(String serviceName)   throws EventException {
         
try  {
            builder.unregisterComponent(serviceName);
         }
         
catch (UMOException e)   {
            
throw new EventException(e.toString());
         }
      }

      
/**
       * Sends an event message synchronously to a given service.
       *
       * 
@param serviceName    The name of the service to which the event
       *                       message is to be sent.
       * 
@param payload        The content of the event message
       * 
@return Object        The result, if any.
       * 
@throws EventException on error
       
*/
      
public Object sendSynchronousEvent(String serviceName,  Object payload)  throws EventException  {
         
try    {
            
if (!manager.getModel().isComponentRegistered(serviceName)){
               
throw new EventException("Service: " + serviceName  + " is not registered.");
            }

            String transformers 
= null;
            Map messageProperties 
= null;
            UMOMessage result 
= eventClient.sendDirect(serviceName,  transformers,  payload, messageProperties);
            
if (result == null)     {
                 
return null;
            }
            
return result.getPayload();
         }  
catch (UMOException e)   {
            
throw new EventException(e.toString());
         }   
catch (Exception e)  {
            
throw new EventException(e.toString());
         }
      }

      
/**
       * Sends an event message asynchronously.
       *
       * 
@param serviceName    The name of the service to which the event
       *                       message is to be sent.
       * 
@param payload        The content of the event message.
       * 
@return FutureMessageResult The result, if any
       * 
@throws EventException on error
       
*/
      
public FutureMessageResult sendAsynchronousEvent(String serviceName,   Object payload)  throws EventException  {
         FutureMessageResult result 
= null;
         
try   {
            
if (!manager.getModel().isComponentRegistered(serviceName))  {
               
throw new EventException("Service: " + serviceName + " is not registered.");
            }

            String transformers 
= null;
            Map messageProperties 
= null;
            result 
= eventClient.sendDirectAsync(serviceName,   transformers,   payload,  messageProperties);
         }   
catch (UMOException e) {
            
throw new EventException(e.toString());
         }

         
return result;
      }
   }
}

Mule框架依据消息有效载荷的类型来派遣消息。事件框架使用基于有效载荷的派遣机制,这种派遣机制把注册到事件管理器中一般定义的事件方法作为事件接收器。下面的类定义了一个包含三个重载的receiveEvent()方法的服务:
package com.jeffhanson.mule;

import java.util.Date;

public class TestService{
   
public void receiveEvent(String eventMessage){
      System.out.println(
"
           TestService.receiveEvent(String) received 
" + "event message:  " + eventMessage + "");
     }

   
public void receiveEvent(Integer eventMessage){
      System.out.println(
"
      TestService.receiveEvent(Integer) received 
"  +"event message:  " + eventMessage + "");
    }

   
public void receiveEvent(Date eventMessage){
      System.out.println(
"
      TestService.receiveEvent(Date) received " + "event message:  " + eventMessage + "");
   }
}

事件管理器客户端应用程序发送三个事件到测试服务中,去测试每一个receiveEvent()方法。客户端应用程序如下:
package com.jeffhanson.mule;

import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.log4j.BasicConfigurator;

import java.util.Date;

public class EventClient{
   
static Logger logger = Logger.getLogger(EventClient.class);
   
public static void main(String[] args)  {
      
// Set up a simple configuration that logs on the console.
      BasicConfigurator.configure();
      logger.setLevel(Level.ALL);
      
try  {
         EventManager eventManager 
= EventManagerFactory.getInstance("vm://");
         eventManager.start();

         String serviceName 
= TestService.class.getName();
         String implementation 
= serviceName;

         eventManager.registerService(serviceName, implementation);

         Object result 
=  eventManager.sendSynchronousEvent(serviceName, "A test message");

         
if (result != null)  {
            System.out.println(
"Event result: " + result.toString());
         }

         result 
=   eventManager.sendSynchronousEvent(serviceName, new Integer(23456));
         
if (result != null)  {
            System.out.println(
"Event result: " + result.toString());
         }

         result 
=  eventManager.sendSynchronousEvent(serviceName, new Date());

         
if (result != null) {
            System.out.println(
"Event result: " + result.toString());
         }

         eventManager.stop();
      }
      
catch (EventException e)
      {
         System.err.println(e.toString());
      }
   }
}

Mule平台简化和抽象了前面所叙述框架的事件方面的处理,使得你发送和接收穿越一个层级结构的同步和异步消息时,不需要知道下层系统的细节。工厂模式和SOA准则的应用,则使得这个框架有了一个松散耦合和可扩展的设计。

总结
当服务和进程需要穿越多层结构,使用多种协议去交互时,设计一个有效地事件驱动的软件系统可能变得复杂了。可是,一个使用标准模式包含适当事件管理层的面向服务架构能减少,甚至消除这些问题。

Mule 平台提供API,组件和抽象对象,这些都可以用于去建立一个强大,健壮,事件驱动的有着良好的伸缩性和可维护性的系统。

@import url(http://www.blogjava.net/CuteSoft_Client/CuteEditor/Load.ashx?type=style&file=SyntaxHighlighter.css);@import url(/css/cuteeditor.css);