随笔 - 41  文章 - 7  trackbacks - 0
<2016年8月>
31123456
78910111213
14151617181920
21222324252627
28293031123
45678910

常用链接

留言簿

随笔分类

随笔档案

搜索

  •  

最新评论

阅读排行榜

评论排行榜

Consumer Tags

从1.4.5版本开始,你可以提供一种策略来生成consumer tags.默认情况下,consumer tag是由broker来生成的.

public interface ConsumerTagStrategy {      String createConsumerTag(String queue);  }
该队列是可用的,所以它可以(可选)在tag中使用。

参考Section 3.1.15, “Message Listener Container Configuration”.

注解驱动的监听器Endpoints

介绍

从1.4版本开始,异步接收消息的最简单方式是使用注解监听器端点基础设施. 简而言之,它允许你暴露管理bean的方法来作为Rabbit 监听器端点.

@Component
public class MyService {      
 @RabbitListener(queues = "myQueue")
 public void processOrder(String data) {         
     ...
  }
}
上面例子的含义是,当消息在org.springframework.amqp.core.Queue "myQueue"上可用时, 会调用processOrder方法(在这种情况下,带有消息的负载).

通过使用RabbitListenerContainerFactory,注解端点基础设施在每个注解方法的幕后都创建了一个消息监听器容器.在上面的例子中,myQueue 必须是事先存在的,
并绑定了某个交换器上.从1.5.0版本开始
,只要在上下文中存在RabbitAdmin,队列可自动声明和绑定.

@Component
public class MyService {

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )public void processOrder(String data) {
    ...
  }

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )public void processInvoice(String data) {
    ...
  }

}

在第一个例子中,队列myQueue 会与交换器一起自动声明(持久化的), 如果需要,可使用路由键来绑定到交换器上.在第二个例子中,匿名(专用的,自动删除的)队列将会声明并绑定.
可提供多个 QueueBinding 条目,允许监听器监听多个队列.

当前只支持DIRECT, FANOUT, TOPIC 和HEADERS的交换器类型.当需要高级配置时,可使用@Bean 定义.

注意第一个例子中交换器上的 ignoreDeclarationExceptions .这允许,例如, 绑定到有不同的设置(如.internal)的交换器上. 默认情况下,现有交换器的属性必须被匹配.

从1.6版本开始,你可为队列,交换器和绑定的@QueueBinding 注解中指定参数.示例:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "auto.headers", autoDelete = "true",
                        arguments = @Argument(name = "x-message-ttl", value = "10000",
                                                type = "java.lang.Integer")),
        exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
        arguments = {
                @Argument(name = "x-match", value = "all"),
                @Argument(name = "foo", value = "bar"),
                @Argument(name = "baz")
        })
)
public String handleWithHeadersExchange(String foo) { ... }

注意队列的x-message-ttl 参数设为了10秒钟,因为参数类型不是String, 因此我们指定了它的类型,在这里是Integer.有了这些声明后,如果队列已经存在了,参数必须匹配现有队列上的参数.对于header交换器,我们设置binding arguments 要匹配头中foo为bar,且baz可为任意值的消息. x-match 参数则意味着必须同时满足两个条件.

参数名称,参数值,及类型可以是属性占位符(${...}) 或SpEL 表达式(#{...}). name 必须要能解析为String; type的表达式必须能解析为Class 或类的全限定名. value 必须能由DefaultConversionService 类型进行转换(如上面例子中x-message-ttl).

如果name 解析为null 或空字符串,那么将忽略 @Argument.

元注解(Meta-Annotations)

有时,你想将同样的配置用于多个监听器上. 为减少重复配置,你可以使用元注解来创建你自己的监听器注解:

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
public@interface MyAnonFanoutListener {
}

public class MetaListener {

    @MyAnonFanoutListener
public void handle1(String foo) {
        ...
    }

    @MyAnonFanoutListener
public void handle2(String foo) {
        ...
    }

}

在这个例子中,每个通过@MyAnonFanoutListener创建的监听器都会绑定一个匿名,自动删除的队列到fanout交换器 metaFanout. 元注解机制是简单的,在那些用户定义注解中的属性是不会经过检查的- 因此你不能从元注解中覆盖设置.当需要高级配置时,使用一般的 @Bean 定义.

Enable Listener Endpoint Annotations

为了启用 @RabbitListener 注解,需要在你的某个@Configuration类中添加@EnableRabbit 注解.

@Configuration
@EnableRabbit
publicclass AppConfig {

    @Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}

默认情况下,基础设施会查找一个名为rabbitListenerContainerFactory 的bean作为工厂来源来创建消息监听器容器. 在这种情况下,会忽略RabbitMQ 基础设施计划, processOrder 方法可使用核心轮询大小为3个线程最大10个线程的池大小来调用.

可通过使用注解或实现RabbitListenerConfigurer

接口来自定义监听器容器工厂. 默认只需要注册至少一个Endpoints,而不需要一个特定的容器工厂.查看javadoc来了解详情和例子.

如果你更喜欢XML配置,可使用 <rabbit:annotation-driven> 元素.

<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers "value="3"/>
<property name="maxConcurrentConsumers"value="10"/>
</bean>
注解方法的消息转换

在调用监听器之前,在管道中有两个转换步骤. 第一个使用 MessageConverter 来将传入的Spring AMQP Message 转换成spring-消息系统的消息. 当目标方法调用时,消息负载将被转换,如果有必要,也会参考消息参数类型来进行.

第一步中的默认 MessageConverter 是一个Spring AMQP SimpleMessageConverter ,它可以处理String 和 java.io.Serializable对象之间的转换; 其它所有的将保留为byte[]. 在下面的讨论中,我们称其为消息转换器.

第二个步骤的默认转换器是GenericMessageConverter ,它将委派给转换服务(DefaultFormattingConversionService的实例). 在下面的讨论中,我们称其为方法参数转换器.

要改变消息转换器,可在连接工厂bean中设置其相关属性:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    ...
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    ...
    return factory;
}

这配置了一个Jackson2 转换器,希望头信息能通过它来指导转换.

你也可以考虑使用ContentTypeDelegatingMessageConverter ,它可以处理不同内容类型的转换.

大多数情况下,没有必要来定制方法参数转换器,除非你想要用自定义的ConversionService.

在1.6版本之前,用于转换JSON的类型信息必须在消息头中提供或者需要一个自定义的ClassMapper. 从1.6版本开始,如果没有类型信息头,类型可根据目标方法参数推断.

类型推断只能用于 @RabbitListener 的方法级.

参考 the section called “Jackson2JsonMessageConverter” 来了解更多信息.

如果您希望自定义方法参数转换器,您可以这样做如下:
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {

    ...

    @Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
    	DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    	factory.setMessageConverter(new GenericMessageConverter(myConversionService()));
    	return factory;
    }

    @Bean
public ConversionService myConversionService() {
    	DefaultConversionService conv = new DefaultConversionService();
    	conv.addConverter(mySpecialConverter());
    	return conv;
    }

    @Override
publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    	registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

    ...

}
重要
对于多方法监听器(参考 the section called “Multi-Method Listeners”), 方法选择是基于消息转换后的消息负载,方法参数转换器只在方法被选择后才会调用.
编程式 Endpoint 注册

RabbitListenerEndpoint 提供了一个Rabbit endpoint 模型并负责为那个模型配置容器.除了通过RabbitListener注解检测外这个基础设施允许你通过编程来配置endpoints.

@Configuration
@EnableRabbit
publicclass AppConfig implements RabbitListenerConfigurer {

    @Override
publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
        endpoint.setQueueNames("anotherQueue");
        endpoint.setMessageListener(message -> {
            // processing
        });
        registrar.registerEndpoint(endpoint);
    }
}

在上面的例子中,我们使用了SimpleRabbitListenerEndpoint (它使用MessageListener 来进行处理),但你也可以构建你自己的endpoint变种来描述自定义的调用机制.

应该指出的是,你也可以跳过@RabbitListener 的使用,通过RabbitListenerConfigurer来编程注册你的endpoints.

Annotated Endpoint Method Signature
到目前为止,我们已经在我们的端点上注入了一个简单的字符串,但它实际上可以有一个非常灵活的方法签名。让我们重写它,以一个自定义的头来控制注入顺序:
@Component
publicclass MyService {

    @RabbitListener(queues = "myQueue")
publicvoid processOrder(Order order, @Header("order_type") String orderType) {
        ...
    }
}

下面是你可以在监听端点上注入的主要元素:

原生org.springframework.amqp.core.Message.

用于接收消息的com.rabbitmq.client.Channel

 org.springframework.messaging.Message 代表的是传入的AMQP消息.注意,这个消息持有自定义和标准的头部信息 (AmqpHeaders定义).


从1.6版本开始, 入站deliveryMode 头可以AmqpHeaders.RECEIVED_DELIVERY_MODE 使用,代替了AmqpHeaders.DELIVERY_MODE.

@Header-注解方法参数可 提取一个特定头部值,包括标准的AMQP头.

@Headers-注解参数为了访问所有头信息,必须能指定为java.util.Map.

非注解元素(非支持类型(如. Message 和Channel))可认为是负荷(payload).你可以使用 @Payload来明确标识. 你也可以添加额外的 @Valid来进行验证.

注入Spring消息抽象的能力是特别有用的,它可受益于存储在特定传输消息中的信息,而不需要依赖于特定传输API.

@RabbitListener(queues = "myQueue")
public void processOrder(Message<Order> order) { ...
}

方法参数的处理是由DefaultMessageHandlerMethodFactory 提供的,它可以更进一步地定制以支持其它的方法参数. 转换和验证支持也可以定制.

例如,如果我们想确保我们的Order在处理之前是有效的,我们可以使用@Valid 来注解负荷,并配置必须验证器,就像下面这样:

@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {

    @Override
publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

    @Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setValidator(myValidator());
        return factory;
    }
}
监听多个队列

当使用queues 属性时,你可以指定相关的容器来监听多个队列. 你可以使用 @Header 注解来指定对于那些队列中收到的消息对POJO方法可用:

@Component
public class MyService {

    @RabbitListener(queues = { "queue1", "queue2" } )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        ...
    }

}

从1.5版本开始,队列名称可以使用属性占位符和SpEL:

@Component
public class MyService {

    @RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        ...
    }

}

1.5版本之前,只有单个队列可以这种方法进行指定,每个队列需要一个单独的属性.

回复管理

MessageListenerAdapter 现有的支持已经允许你的方法有一个非void的返回类型.在这种情况下,调用的结果被封装在一个发送消息中,其消息发送地址要么是原始消息的ReplyToAddress头指定的地址要么是监听器上配置的默认地址.默认地址现在可通过@SendTo 注解进行设置.

假设我们的processOrder 方法现在需要返回一个OrderStatus, 可将其写成下面这样来自动发送一个回复:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
 return status;
}

如果你需要以传输独立的方式来设置其它头,你可以返回Message,就像这样:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
return MessageBuilder
        .withPayload(status)
        .setHeader("code", 1234)
        .build();
}

@SendTo 值按照exchange/routingKey模式(其中的一部分可以省略)来作为对exchange 和 routingKey 的回复.有效值为:

foo/bar - 以交换器和路由键进行回复.

foo/ - 以交换器和默认路由键进行回复.

bar or /bar - 以路由键和默认交换器进行回复.

/ or empty - 以默认交换器和默认路由键进行回复.

 @SendTo 也可以没有value 属性. 这种情况等价于空的sendTo 模式. @SendTo 只能应用于没有replyToAddress 属性的入站消息中.

从1.5版本开始, @SendTo 值可以通过bean SpEL 表达式初始化,例如…​

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
    return"test.sendTo.reply.spel";
}

表达式必须能评估为String,它可以是简单的队列名称(将发送到默认交换器中) 或者是上面谈到的exchange/routingKey 形式.

在初始化时,#{...} 表达式只评估一次.

对于动态路由回复,消息发送者应该包含一个reply_to 消息属性或使用运行时SpEL 表达式.

从1.6版本开始, @SendTo 可以是SpEL 表达式,它可在运行时根据请求和回复来评估:

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
    return processTheFooAndReturnABar(foo);
}

SpEL 表达式的运行时性质是由 !{...} 定界符表示的. 表达式评估上下文的#root 对象有三个属性:

  • request - o.s.amqp.core.Message 请求对象.
  • source - 转换后的 o.s.messaging.Message<?>.
  • result - 方法结果.

上下文有一个map 属性访问器,标准类型转换器以及一个bean解析器,允许引用上下文中的其它beans (如.@someBeanName.determineReplyQ(request, result)).

总结一下, #{...} 只在初始化的时候评估一次, #root 对象代表的是应用程序上下文; beans可通过其名称来引用. !{...} 会在运行时,对于每个消息,都将使用root对象的属性进行评估,bean可以使用其名称进行引用,前辍为@.

多方法监听器

从1.5.0版本开始,@RabbitListener 注解现在可以在类级上进行指定.与新的@RabbitHandler 注解一起,基于传入消息的负荷类型,这可以允许在单个监听器上调用不同的方法.这可以用一个例子来描述:

@RabbitListener(id="multi", queues = "someQueue")
publicclass MultiListenerBean {

    @RabbitHandler
@SendTo("my.reply.queue")
public String bar(Bar bar) {
        ...
    }

    @RabbitHandler
public String baz(Baz baz) {
        ...
    }

    @RabbitHandler
public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
        ...
    }

}

在这种情况下,独立的 @RabbitHandler 方法会被调用,如果转换后负荷是BarBaz 或Qux. 理解基于负荷类型系统来确定唯一方法是很重要的.类型检查是通过单个无注解参数来执行的,否则就要使用@Payload 进行注解. 注意同样的方法签名可应用于方法级 @RabbitListener 之上.

注意,如果有必要,需要在每个方法上指定@SendTo, 在类级上它是不支持的.

@Repeatable @RabbitListener

从1.6版本开始,@RabbitListener 注解可用 @Repeatable进行标记. 这就是说,这个注解可多次出现在相同的注解元素上(方法或类).在这种情况下,对于每个注解,都会创建独立的监听容器,它们每个都会调用相同的监听器@Bean. Repeatable 注解能用于 Java 8+;当在Java 7-使用时,同样的效果可以使用 @RabbitListeners "container" 注解(包含@RabbitListener注解的数组)来达到.

Proxy @RabbitListener and Generics

如果你的服务是用于代理(如,在 @Transactional的情况中) ,当接口有泛型参数时,需要要一些考虑.要有一个泛型接口和特定实现,如:

interface TxService<P> {

   String handle(P payload, String header);

}

static class TxServiceImpl implements TxService<Foo> {

    @Override
 @RabbitListener(...)
 public String handle(Foo foo, String rk) {
         ...
    }

}

你被迫切换到CGLIB目标类代理,因为接口handle方法的实际实现只是一个桥接方法.在事务管理的情况下, CGLIB是通过注解选项来配置的- @EnableTransactionManagement(proxyTargetClass = true). 在这种情况下,所有注解都需要在实现类的目标方法上进行声明:

static class TxServiceImpl implements TxService<Foo> {

 @Override
@Transactional
@RabbitListener(...)
public String handle(@Payload Foo foo, @Header("amqp_receivedRoutingKey") String rk) {
        ...
    }

}
容器管理

由注解创建的容器不会在上下文中进行注册.你可以调用 RabbitListenerEndpointRegistrygetListenerContainers()方法来获取所有容器集合.然后,你可以迭代这个集合,例如,停止/启动所有容器或调用在其注册上调用Lifecycle 方法(调用每个容器中的操作).

你也可以使用id来获取单个容器的引用,即 getListenerContainer(String id); 例如registry.getListenerContainer("multi") .

从1.5.2版本开始,你可以调用getListenerContainerIds()方法来获取所有注册容器的id.

从1.5版本开始,你可在RabbitListener端点上为容器分配一个组(group).这提供了一种机制来获取子集容器的引用; 添加一个group 属性会使Collection<MessageListenerContainer> 类型的bean使用组名称注册在上下文中.

线程和异步消费者

一些不同的线程可与异步消费者关联。

当RabbitMQ Client投递消息时,来自于SimpleMessageListener 配置的TaskExecutor中的线程会调用MessageListener.如果没有配置,将会使用SimpleAsyncTaskExecutor. 如果使用了池化的executor,须确保池大小可以支撑并发处理.

当使用默认SimpleAsyncTaskExecutor时,对于调用监听器的线程,监听器容器的beanName 将用作threadNamePrefix. 这有益于日志分析,在日志appender配置中,一般建议总是包含线程名称.当在SimpleMessageListenerContainertaskExecutor属性中指定TaskExecutor 时,线程名称是不能修改的.建议你使用相似的技术来命名线程, 帮助在日志消息中的线程识别。

当创建连接时,在CachingConnectionFactory 配置的Executor将传递给RabbitMQ Client ,并且它的线程将用于投递新消息到监听器容器.在写作的时候,如果没有配置,client会使用池大小为5的内部线程池executor.

RabbitMQ client 使用ThreadFactory 来为低端I/O(socket)操作创建线程.要改变这个工厂,你需要配置底层RabbitMQ ConnectionFactory, 正如the section called “Configuring the Underlying Client Connection Factory”中所描述.

检测空闲异步消费者

虽然高效,但异步消费者存在一个问题:如何来探测它们什么是空闲的 - 当有一段时间没有收到消息时,用户可能想要采取某些动作.

从1.6版本开始, 当没有消息投递时,可配置监听器容器来发布ListenerContainerIdleEvent 事件. 当容器是空闲的,事件会每隔idleEventInterval 毫秒发布事件.

要配置这个功能,须在容器上设置idleEventInterval:

xml
<rabbit:listener-container connection-factory="connectionFactory"...idle-event-interval="60000"...
        >
<rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" />
</rabbit:listener-container>
Java
@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    ...
    container.setIdleEventInterval(60000L);
    ...
    return container;
}
@RabbitListener
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setIdleEventInterval(60000L);
    ...
    return factory;
}

在上面这些情况中,当容器空闲时,每隔60秒就会发布事件.

事件消费

通过实现ApplicationListener 可捕获这些事件- 要么是一个一般的监听器,要么是一个窄化的只接受特定事件的监听器. 你也可以使用Spring Framework 4.2中引入的@EventListener.

下面的例子在单个类中组合使用了@RabbitListener 和@EventListener .重点要理解,应用程序监听器会收到所有容器的事件,因此如果你只对某个容器采取措施,那么你需要检查监听器id.你也可以使用@EventListener 条件来达到此目的.

事件有4个属性:

  • source - 监听容器实例
  • id - 监听器id(或容器bean名称)
  • idleTime - 当事件发布时,容器已经空闲的时间
  • queueNames - 容器监听的队列名称
public class Listener {

    @RabbitListener(id="foo", queues="#{queue.name}")
    public String listen(String foo) {
        return foo.toUpperCase();
    }

    @EventListener(condition = "event.listenerId == 'foo'")
    public void onApplicationEvent(ListenerContainerIdleEvent event) {
        ...
    }

}
重要
事件监听器会查看所有容器的事件,因此,在上面的例子中,我们根据监听器ID缩小了要接收的事件.
警告
如果你想使用idle事件来停止监听器容器,你不应该在调用监听器的线程上来调用container.stop() 方法- 它会导致延迟和不必要的日志消息. 相反,你应该把事件交给一个不同的线程,然后可以停止容器。

3.1.7 消息转换器

介绍

AmqpTemplate 同时也定义了多个发送和接收消息(委派给MessageConverter)的方法.

MessageConverter 本身是很简单的. 在每个方向上它都提供了一个方法:一个用于转换成Message,另一个用于从Message中转换.注意,当转换成Message时,除了object外,你还需要提供消息属性. "object"参数通常对应的是Message body.

public interface MessageConverter {

    Message toMessage(Object object, MessageProperties messageProperties)
            throws MessageConversionException;

    Object fromMessage(Message message) throws MessageConversionException;

}

AmqpTemplate中相关的消息发送方法列举在下边. 这比我们前面提到的要简单,因为它们不需要Message 实例. 相反地,  MessageConverter 负责创建每个消息(通过将提供的对象转换成Message body的字节数组,以及添加提供的MessageProperties).

void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message)
    throws AmqpException;

void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
    throws AmqpException;

void convertAndSend(String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

在接收端,这里只有两个方法:一个接受队列名称,另一个依赖于模板设置的队列属性.

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;
在 the section called “Asynchronous Consumer” 中提到的MessageListenerAdapter也使用了MessageConverter.

SimpleMessageConverter

MessageConverter 策略的默认实现被称为SimpleMessageConverter. 如果你没有明确配置,RabbitTemplate实例会使用此转换器的实例.它能处理基于文本内容,序列化Java对象,以及简单的字节数组.

从 Message中转换

如果传入消息的内容类型以"text" (如. "text/plain")开头,它同时也会检查内容编码属性,以确定将消息body字节数组转换成字符串所要使用的字符集. 如果在输入消息中没有指定内容编码属性, 它默认会使用"UTF-8"字符集.如果你需要覆盖默认设置,你可以配置一个SimpleMessageConverter实例,设置其"defaultCharset" 属性,再将其注入到RabbitTemplate 实例中.

如果传入消息的内容类型属性值为"application/x-java-serialized-object", SimpleMessageConverter 将尝试将字节数组反序列化为一个Java object. 虽然这对于简单的原型是有用的,但一般不推荐依赖于Java序列化机制,因为它会生产者和消费者之间的紧密耦合。当然,这也排除了在两边使用非Java的可能性.由于AMQP 是线路级协议, 因这样的限制失去了许多优势,这是不幸的. 在后面的两个章节中,我们将探讨通过丰富的域对象的内容来替代java序列化.

对于其它内容类型,SimpleMessageConverter 会以字节数组形式直接返回消息body内容.

参考the section called “Java Deserialization” 来了解更多信息.

转换成消息

当从任意Java对象转换成Message时, SimpleMessageConverter 同样可以处理字节数组,字符串,以及序列化实例.它会将每一种都转换成字节(在字节数组的情况下,不需要任何转换), 并且会相应地设置内容类型属性.如果要转换的对象不匹配这些类型,Message body 将是null.

SerializerMessageConverter

除了它可以使用其它application/x-java-serialized-object转换的Spring框架Serializer 和 Deserializer 实现来配置外,此转换器类似于SimpleMessageConverter

参考the section called “Java Deserialization” 来了解更多信息.

Jackson2JsonMessageConverter

转换成消息

正如前面章节提到的,一般来说依赖于Java序列化机制不是推荐的.另一个常见更灵活且可跨语言平台的选择JSON (JavaScript Object Notation).可通过在RabbitTemplate实例上配置转换器来覆盖默认SimpleMessageConverter.Jackson2JsonMessageConverter 使用的是com.fasterxml.jackson 2.x 包.

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
<!-- if necessary, override the DefaultClassMapper -->
<property name="classMapper"  ref="customClassMapper"/>
</bean>
</property>
</bean>

正如上面展示的, Jackson2JsonMessageConverter 默认使用的是DefaultClassMapper. 类型信息是添加到MessageProperties中的(也会从中获取). 如果入站消息在MessageProperties没有包含类型信息,但你知道预期类型,你可以使用defaultType 属性来配置静态类型

<bean id="jsonConverterWithDefaultType" class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="foo.PurchaseOrder"/>
</bean>
</property>
</bean>
转换Message

入站消息会根据发送系统头部中添加的类型信息来转换成对象.

在1.6之前的版本中,如果不存在类型信息,转换将失败。从1.6版开始,如果类型信息丢失,转换器将使用Jsckson默认值(通常是一个map)来转换JSON.
此外,从1.6版本开始,当在方法上使用@RabbitListener 注解时, 推断类型信息会添加到MessageProperties; 这允许转换器转换成目标方法的参数类型.这只适用于无注解的参数或使用@Payload注解的单个参数. 在分析过程中忽略类型消息的参数。

重要
默认情况下,推断类型信息会覆盖inbound __TypeId__ 和发送系统创建的相关headers. 这允许允许接收系统自动转换成不同的领域对象. 这只适用于具体的参数类型(不是抽象的或不是接口)或者来自java.util 包中的对象.其它情况下,将使用 __TypeId__ 和相关的头.也可能有你想覆盖默认行为以及总是使用__TypeId__信息的情况. 例如, 让我们假设你有一个接受Foo参数的@RabbitListener ,但消息中包含了Bar( 它是的Foo (具体类)的子类). 推断类型是不正确的.要处理这种情况,需要设置Jackson2JsonMessageConverter 的TypePrecedence 属性为TYPE_ID 而替换默认的INFERRED. 这个属性实际上转换器的DefaultJackson2JavaTypeMapper ,但为了方便在转换器上提供了一个setter方法. 如果你想注入一个自定义类型mapper, 你应该设置属性mapper.
@RabbitListener
public void foo(Foo foo) {...}

@RabbitListener
public void foo(@Payload Foo foo, @Header("amqp_consumerQueue") String queue) {...}

@RabbitListener
public void foo(Foo foo, o.s.amqp.core.Message message) {...}

@RabbitListener
public void foo(Foo foo, o.s.messaging.Message<Foo> message) {...}

@RabbitListener
public void foo(Foo foo, String bar) {...}

@RabbitListener
public void foo(Foo foo, o.s.messaging.Message<?> message) {...}

上面前4种情况下,转换器会尝试转换成Foo 类型. 第五个例子是无效的,因为我们不能确定使用哪个参数来接收消息负荷. 在第六个例子中, Jackson 会根据泛型WildcardType来应用.

然而,你也可以创建一个自定义转换器,并使用targetMethod 消息属性来决定将JSON转换成哪种类型.

这种类型接口只能在@RabbitListener 注解声明在方法级上才可实现.在类级@RabbitListener, 转换类型用来选择调用哪个@RabbitHandler 方法.基于这个原因,基础设施提供了targetObject 消息属性,它可用于自定义转换器来确定类型.

MarshallingMessageConverter

还有一个选择是MarshallingMessageConverter.它会委派到Spring OXM 包的 Marshaller 和 Unmarshaller 策略接口实现. 

你可从here了解更多. 在配置方面,最常见的是只提供构造器参数,因为大部分Marshaller 的实现都将实现Unmarshaller.

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
</bean>
</property>
</bean>

ContentTypeDelegatingMessageConverter

这个类是在1.4.2版本中引入的,并可基于MessagePropertiescontentType属性允许委派给一个特定的MessageConverter.默认情况下,如果没有contentType属性或值没有匹配配置转换器时,它会委派给SimpleMessageConverter.

<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map> 
</property>
</bean>

Java 反序列化

重要
当从不可信任的来源反序列化Java对象时,存在一个可能的漏洞.如果从不可信来源,使用内容类型 application/x-java-serialized-object来接收消息,你可以考虑配置允许哪些包/类能反序列化.这既适用于SimpleMessageConverter,也适用于SerializerMessageConverter,当它被配置为使用一个DefaultDeserializer时 -或含蓄地或通过配置方式的。

默认情况下,白名单列表是空的,这意味着所有类都会反序列化.你可以设置模式列表,如 foo.*foo.bar.Baz 或 *.MySafeClass.模式会按照顺序进行检查,直到找到匹配的模式.如果没有找到匹配,将抛出SecurityException.在这些转换器上,可使用whiteListPatterns 属性来设置.

消息属性转换器

 MessagePropertiesConverter 策略接口用于Rabbit Client BasicProperties 与Spring AMQP MessageProperties之间转换. 默认实现(DefaultMessagePropertiesConverter)通常可满虽大部分需求,但如果有需要,你可以自己实现. 当大小不超过1024字节时,默认属性转换器将 BasicProperties 中的LongString 转换成String . 更大的 LongString 将不会进行转换(参考下面的内容.这个限制可通过构造器参数来覆盖.

从1.6版本开始, 现在headers 长超过 long string 限制(默认为1024) 将被DefaultMessagePropertiesConverter保留作为 LongString . 你可以通过 the getBytes[]toString(), 或getStream() 方法来访问内容.

此前, DefaultMessagePropertiesConverter 会将这样的头转换成一个 DataInputStream (实际上它只是引用了LongStringDataInputStream). 在输出时,这个头不会进行转换(除字符串外,如在流上调用toString()方法 java.io.DataInputStream@1d057a39).

更大输入LongString 头现在可正确地转换,在输出时也一样.

它提供了一个新的构造器来配置转换器,这样可像以前一样来工作:

/**
 * Construct an instance where LongStrings will be returned
 * unconverted or as a java.io.DataInputStream when longer than this limit.
 * Use this constructor with 'true' to restore pre-1.6 behavior.
 * @param longStringLimit the limit.
 * @param convertLongLongStrings LongString when false,
 * DataInputStream when true.
 * @since 1.6
 */
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }

另外,从1.6版本开始,在 MessageProperties中添加了一个新属性correlationIdString.此前,当在RabbitMQ 客户端中转换BasicProperties 时,将会执行不必要的byte[] <-> String 转换,这是因为 MessageProperties.correlationId 是一个byte[] 而 BasicProperties 使用的是String

(最终,RabbitMQ客户端使用UTF-8字符串转化为字节并放在协议消息中).


为提供最大向后兼容性,新属性correlationIdPolicy 已经被加入到了DefaultMessagePropertiesConverter.它接受DefaultMessagePropertiesConverter.CorrelationIdPolicy 枚举参数.

默认情况下,它设置为BYTES (复制先前的行为).

对于入站消息:

  • STRING - 只映射correlationIdString 属性
  • BYTES - 只映射correlationId 属性
  • BOTH - 会同时映射两个属性
对于出站消息:
  • STRING - 只映射correlationIdString 属性
  • BYTES - 只映射correlationId 属性
  • BOTH - 两种属性都会考虑,但会优先考虑String 属性

也从1.6版本开始,入站deliveryMode 属性不再需要映射 MessageProperties.deliveryMode,相反使用MessageProperties.receivedDeliveryMode 来代替.另外,入站userId 属性也不需要再映射MessageProperties.userId,相反使用MessageProperties.receivedUserId 来映射. 

这种变化是为了避免这些属性的意外传播,如果同样的MessageProperties 对象用于出站消息时.

3.1.8 修改消息- 压缩以及更多

提供了许多的扩展点,通过它们你可以对消息执行预处理,要么在发送RabbitMQ之前,要么在接收到消息之后.

正如你在Section 3.1.7, “Message Converters”看到的,这样的扩展点存在于AmqpTemplate convertAndReceive 操作中,在那里你可以提供一个MessagePostProcessor

例如,你的POJO转换之后, MessagePostProcessor 允许你在Message上设置自定义的头或属性.

从1.4.2版本开始,额外的扩展点已经添加到RabbitTemplate - setBeforePublishPostProcessors() 和setAfterReceivePostProcessors(). 第一个开启了一个post processor来在发送消息到RabbitMQ之前立即运行.当使用批量时(参考 the section called “Batching”), 这会在批处理装配之后发送之前调用. 

第二个会在收到消息后立即调用.

这些扩展点对于压缩这此功能是有用的,基于这些目的,提供了多个MessagePostProcessor:

  • GZipPostProcessor
  • ZipPostProcessor

针对于发送前的消息压缩,以及

  • GUnzipPostProcessor
  • UnzipPostProcessor

针对于消息解压.

类似地, SimpleMessageListenerContainer 也有一个 setAfterReceivePostProcessors() 方法,

允许在消息收到由容器来执行解压缩.


posted on 2016-08-13 12:48 胡小军 阅读(12977) 评论(0)  编辑  收藏 所属分类: RabbitMQ

只有注册用户登录后才能发表评论。


网站导航: