paulwong

#

Spring Integration Java DSL

This time I decided to play a little bit with Spring Integration Java DSL. Which has been merged directly into Spring Integration Core 5.0, which is smart and obvious move because:

  • Everyone starting the new Spring projects based on Java Config uses that
  • SI Java DSL enables you to use new powerfull Java 8 features like Lambdas
  • You can build your flow using the Builder pattern based on IntegrationFlowBuilder

Let's take a look on the samples howto use that based on ActiveMQ JMS.


https://bitbucket.org/tomask79/spring-integration-java-dsl/src/master/

posted @ 2019-07-18 13:16 paulwong 阅读(405) | 评论 (0)编辑 收藏

SPRING BATCH remote chunking模式下可同时处理多文件

SPRING BATCH remote chunking模式下,如果要同一时间处理多个文件,按DEMO的默认配置,是会报错的,这是由于多个文件的处理的MASTER方,是用同一个QUEUE名,这样SLAVE中处理多个JOB INSTANCE时,会返回不同的JOB-INSTANCE-ID,导致报错。

这时需更改SPRING BATCH使用SPRING INTEGRATION的模式中的GATEWAY组件。

GATEWAY组件是工作在REQUEST/RESPONSE模式下,即发一个MESSAGE到某一QUEUE时,要从REPLY QUEUE等到CONSUMER返回结果时,才往下继续。

OUTBOUND GATEWAY:从某一CHANNEL获取MESSAGE,发往REQUEST QUEUE,从REPLY QUEUE等到CONSUMER返回结果,将此MESSAGE发往下一CHANNEL。

INBOUND GATEWAY:从某一QUEUE获取MESSAGE,发往某一REQUEST CHANNEL,从REPLY CHANNEL等到返回结果,将此MESSAGE发往下一QUEUE。

详情参见此文:https://blog.csdn.net/alexlau8/article/details/78056064

    <!-- Master jms -->
    <int:channel id="MasterRequestChannel">
        <int:dispatcher task-executor="RequestPublishExecutor"/>
    </int:channel>
    <task:executor id="RequestPublishExecutor" pool-size="5-10" queue-capacity="0"/>
<!--    <int-jms:outbound-channel-adapter 
        connection-factory="connectionFactory" 
        destination-name="RequestQueue" 
        channel="MasterRequestChannel"/> 
-->

    <int:channel id="MasterReplyChannel"/>
<!--    <int-jms:message-driven-channel-adapter 
        connection-factory="connectionFactory" 
        destination-name="ReplyQueue"
        channel="MasterReplyChannel"/> 
-->

    <int-jms:outbound-gateway
        
connection-factory="connectionFactory"
        correlation-key
="JMSCorrelationID"
        request-channel
="MasterRequestChannel"
        request-destination-name
="RequestQueue"
        receive-timeout
="30000"
        reply-channel
="MasterReplyChannel"
        reply-destination-name
="ReplyQueue"
        async
="true">
        <int-jms:reply-listener />
    </int-jms:outbound-gateway>

    <!-- Slave jms -->
    <int:channel id="SlaveRequestChannel"/>
<!--    <int-jms:message-driven-channel-adapter
        connection-factory="connectionFactory" 
        destination-name="RequestQueue"
        channel="SlaveRequestChannel"/> 
-->

    <int:channel id="SlaveReplyChannel"/>
<!--    <int-jms:outbound-channel-adapter 
        connection-factory="connectionFactory" 
        destination-name="ReplyQueue"
        channel="SlaveReplyChannel"/> 
-->

    <int-jms:inbound-gateway
        
connection-factory="connectionFactory"
        correlation-key
="JMSCorrelationID"
        request-channel
="SlaveRequestChannel"
        request-destination-name
="RequestQueue"
        reply-channel
="SlaveReplyChannel"
        default-reply-queue-name
="ReplyQueue"/>

MASTER配置
package com.paul.testspringbatch.config.master;

import javax.jms.ConnectionFactory;

import org.springframework.beans.factory.config.CustomScopeConfigurer;
//import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.context.annotation.Scope;
import org.springframework.context.support.SimpleThreadScope;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.JmsOutboundGateway;

import com.paul.testspringbatch.common.constant.IntegrationConstant;

@Configuration
@EnableIntegration
@Profile("batch-master")
public class IntegrationMasterConfiguration {
    
//    @Value("${broker.url}")
//    private String brokerUrl;


//    @Bean
//    public ActiveMQConnectionFactory connectionFactory() {
//        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//        connectionFactory.setBrokerURL(this.brokerUrl);
//        connectionFactory.setTrustAllPackages(true);
//        return connectionFactory;
//    }

    /*
     * Configure outbound flow (requests going to workers)
     
*/
    @Bean
//    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public DirectChannel requests() {
        return new DirectChannel();
    }

//    @Bean
//    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
//        return IntegrationFlows
//                .from(requests())
//                .handle(Jms.outboundAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REQUEST_DESTINATION))
//                .get();
//    }
    
     @Bean
     public CustomScopeConfigurer customScopeConfigurer() {
         CustomScopeConfigurer customScopeConfigurer = new CustomScopeConfigurer();
         customScopeConfigurer.addScope("thread", new SimpleThreadScope());
         return customScopeConfigurer;
     }
     
//     @Bean
//     public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
//         return new BeanFactoryPostProcessor() {
//                
//             @Override
//             public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
//                    beanFactory.registerScope("thread", new SimpleThreadScope());
//                }
//              };
//     }
    
    /*
     * Configure inbound flow (replies coming from workers)
     
*/
    @Bean
    @Scope(value = "thread"/* , proxyMode = ScopedProxyMode.NO */)
//    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public QueueChannel replies() {
        return new QueueChannel();
    }

//    @Bean
//    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
//        return IntegrationFlows
//                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REPLY_DESTINATION))
//                .channel(replies())
//                .get();
//    }

    @Bean
    public JmsOutboundGateway jmsOutboundGateway(ConnectionFactory connectionFactory) {
        JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
        jmsOutboundGateway.setConnectionFactory(connectionFactory);
        jmsOutboundGateway.setRequestDestinationName(IntegrationConstant.MASTER_REQUEST_DESTINATION);//2. send the message to this destination
        jmsOutboundGateway.setRequiresReply(true);
        jmsOutboundGateway.setCorrelationKey(IntegrationConstant.JMS_CORRELATION_KEY);//3. let the broker filter the message
        jmsOutboundGateway.setAsync(true);//must be async, so that JMS_CORRELATION_KEY work
        jmsOutboundGateway.setUseReplyContainer(true);
        jmsOutboundGateway.setReplyDestinationName(IntegrationConstant.MASTER_REPLY_DESTINATION);//4. waiting the response from this destination
        jmsOutboundGateway.setReceiveTimeout(30_000);
        return jmsOutboundGateway;
    }

    @Bean
    public IntegrationFlow jmsOutboundGatewayFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
                        .from(requests())//1. receive message from this channel
                        .handle(jmsOutboundGateway(connectionFactory))
                        .channel(replies())//5. send back the response to this channel
                        .get();
    }

}


SLAVE配置:
package com.paul.testspringbatch.config.slave;

import javax.jms.ConnectionFactory;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;

import com.paul.testspringbatch.common.constant.IntegrationConstant;

@Configuration
@EnableIntegration
@Profile("batch-slave")
public class IntegrationSlaveConfiguration {
    

    /*
     * Configure inbound flow (requests coming from the master)
     
*/
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

//    @Bean
//    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
//        return IntegrationFlows
//                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
//                .channel(requests())
//                .get();
//    }

    /*
     * Configure outbound flow (replies going to the master)
     
*/
    @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    }

//    @Bean
//    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
//        return IntegrationFlows
//                .from(replies())
//                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
//                .get();
//    }

    @Bean
    public IntegrationFlow inboundGatewayFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
                    .from(Jms
                            .inboundGateway(connectionFactory)
                            .destination(IntegrationConstant.SLAVE_HANDLE_MASTER_REQUEST_DESTINATION)//1. receive message from this channel.
                            .correlationKey(IntegrationConstant.JMS_CORRELATION_KEY)//2. let the broker filter the message
                            .requestChannel(requests())//3. send the message to this channel
                            .replyChannel(replies())//4. waitting the result from this channel
                            .defaultReplyQueueName(IntegrationConstant.SLAVE_RETURN_RESULT_DESTINATION)//5.send back the result to this destination to the master.
                            )
                    .get();
    }

}

posted @ 2019-07-16 14:38 paulwong 阅读(826) | 评论 (0)编辑 收藏

Build Messaging Between Ruby/Rails Applications with ActiveMQ

https://dev.to/kirillshevch/build-messaging-between-ruby-rails-applications-with-activemq-4fin

posted @ 2019-07-12 17:12 paulwong 阅读(335) | 评论 (0)编辑 收藏

STEP范围内的ROUTER

在SPRING BATCH中,通常ROUTER是针对STEP的,但是如果在一个STEP中有多个WRITER,每个WRITER是写不同文件的,因此需要一个STEP内的ROUTER,以便能ROUTE到不同的WRITER中。


https://gist.github.com/benas/bfe2be7386b99ce496425fac9ff35fb8

posted @ 2019-07-11 11:45 paulwong 阅读(314) | 评论 (0)编辑 收藏

动态改变SPRING BATCH 的 CHUNKSIZE

 在SPRING BATCH REMOTE CHUNKING的模式下:
SPRING BATCH 读文件时,是按一行一行来读取数据,再按CHUNKSIZE提交到REMOTE操作,有时要整合当前行和下几行,再决定CHUNKSIZE,以便相关的数据能在远程同一个PROCESSOR中按顺序进行处理,因为相关的数据被拆成几个CHUNK来处理的话,就有可能不按顺序来处理。这样就需要动态调整CHUNKSIZE。

参照如下:
https://stackoverflow.com/questions/37390602/spring-batch-custom-completion-policy-for-dynamic-chunk-size

并结合SingleItemPeekableItemReader(装饰者,允许查看下一条数据,真正的操作委托给代理)。

posted @ 2019-07-02 11:13 paulwong 阅读(1017) | 评论 (0)编辑 收藏

RABBITMQ资源

消息队列之 RabbitMQ
https://www.jianshu.com/p/79ca08116d57

Spring Boot 中使用 RabbitMQ
https://juejin.im/post/59f194e06fb9a0451329ec53




posted @ 2019-06-28 10:24 paulwong 阅读(333) | 评论 (0)编辑 收藏

How to implement JMS ReplyTo using SpringBoot

Request-Response is a message-exchange-pattern. In some cases, a message producer may want the consumers to reply to a message. The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).

In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.

For simplicity, this pattern is typically implemented in a purely synchronous fashion, as in web service calls over HTTP, which holds a connection open and waits until the response is delivered or the timeout period expires. However, request–response may also be implemented asynchronously, with a response being returned at some unknown later time.

For more information, check here.

Now, let’s jump into the code. In Spring, there are 2 ways to implement this (at least I know of).

  1. Using JMSTemplate
  2. Using Spring Integration

For demo purpose, I used ActiveMQ. However, you can implement this in other messaging systems like IBM MQ, Rabbit MQ, Tibco EMS, etc. In this demo, I send an ObjectMessage of type Order and reply with a Shipment object.

Using JMSTemplate

  1. First, we include the required dependencies. Replace the activemq dependency with your messaging system’s jars if not using ActiveMQ

     <dependencies>
         
    <dependency>
             
    <groupId>org.springframework.boot</groupId>
             
    <artifactId>spring-boot-starter-activemq</artifactId>
         
    </dependency>
         
    <dependency>
             
    <groupId>org.apache.activemq.tooling</groupId>
             
    <artifactId>activemq-junit</artifactId>
             
    <version>${activemq.version}</version>
             
    <scope>test</scope>
         
    </dependency>
         
    <dependency>
             
    <groupId>org.springframework.boot</groupId>
             
    <artifactId>spring-boot-starter-test</artifactId>
             
    <scope>test</scope>
         
    </dependency>
     
    </dependencies>
  2. Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.

     spring:
       activemq:
         broker-url: tcp://localhost:
    61616
         non-blocking-redelivery: true
         packages:
           trust-all: true    
  3. Note in the above configuration spring.activemq.packages.trust-all can be changed to spring.activemq.packages.trusted with the appropriate packages.
  4. Now spring will do it’s magic and inject all the required Beans as usual :) However, in our code, we need to EnableJms

    import org.springframework.context.annotation.Configuration;
     
    import org.springframework.jms.annotation.EnableJms;

     @EnableJms
     @Configuration
     
    public class ActiveMQConfig {

         
    public static final String ORDER_QUEUE = "order-queue";
         
    public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";

     }
  5. First, we will configure the Producer

     @Slf4j
     @Service
     
    public class Producer {

         @Autowired
         JmsMessagingTemplate jmsMessagingTemplate;

         @Autowired
         JmsTemplate jmsTemplate;
          private Session session;

          @PostConstruct
           public void init(){
             jmsTemplate.setReceiveTimeout(1000L);
             jmsMessagingTemplate.setJmsTemplate(jmsTemplate);

             session = jmsMessagingTemplate.getConnectionFactory().createConnection()
                     .createSession(false, Session.AUTO_ACKNOWLEDGE);
           }

         
    public Shipment sendWithReply(Order order) throws JMSException {


             ObjectMessage objectMessage 
    = session.createObjectMessage(order);

             objectMessage.setJMSCorrelationID(UUID.randomUUID().toString());
             objectMessage.setJMSReplyTo(
    new ActiveMQQueue(ORDER_REPLY_2_QUEUE));
             objectMessage.setJMSCorrelationID(UUID.randomUUID().toString());
             objectMessage.setJMSExpiration(
    1000L);
             objectMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);

             
    return jmsMessagingTemplate.convertSendAndReceive(new ActiveMQQueue(ORDER_QUEUE),
                     objectMessage, Shipment.
    class); //this operation seems to be blocking + sync
         }
     }
  6. Note in the above code that, JmsMessagingTemplate is used instead of JmsTemplatebecause, we are interested in the method convertSendAndReceive. As seen in the method signature, it waits to receive the Shipment object from the consumer.
  7. Next, we can see the Receiver

     @Component
     
    public class Receiver implements SessionAwareMessageListener<Message> {

         @Override
         @JmsListener(destination 
    = ORDER_QUEUE)
         
    public void onMessage(Message message, Session session) throws JMSException {
             Order order 
    = (Order) ((ActiveMQObjectMessage) message).getObject();
             Shipment shipment 
    = new Shipment(order.getId(), UUID.randomUUID().toString());

             
    // done handling the request, now create a response message
             final ObjectMessage responseMessage = new ActiveMQObjectMessage();
             responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
             responseMessage.setObject(shipment);

             
    // Message sent back to the replyTo address of the income message.
             final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
             producer.send(responseMessage);
         }
     }
  8. Using the javax.jms.Session the javax.jms.MessageProducer is created and used to send the reply message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.

Using Spring Integration

  1. First, we include the required dependencies in addition to the above dependencies

     <dependency>
       
    <groupId>org.springframework.integration</groupId>
       
    <artifactId>spring-integration-jms</artifactId>
     
    </dependency>
  2. Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.

     spring:
       activemq
    :
         broker
    -url: tcp://localhost:61616
         non
    -blocking-redelivery: true
         packages
    :
           trust
    -all: true   
  3. Note in the above configuration spring.activemq.packages.trust-all can be changed to spring.activemq.packages.trusted with the appropriate packages.
  4. Next we create the required Beans for the Spring Integration.

     @EnableIntegration
     @IntegrationComponentScan
     @Configuration
     
    public class ActiveMQConfig {

         
    public static final String ORDER_QUEUE = "order-queue";
         
    public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";

         @Bean
         
    public MessageConverter messageConverter() {
             MappingJackson2MessageConverter converter 
    = new MappingJackson2MessageConverter();
             converter.setTargetType(MessageType.TEXT);
             converter.setTypeIdPropertyName(
    "_type");
             
    return converter;
         }

         @Bean
         
    public MessageChannel requests() {
             
    return new DirectChannel();
         }

         @Bean
         @ServiceActivator(inputChannel 
    = "requests")
         
    public JmsOutboundGateway jmsGateway(ActiveMQConnectionFactory activeMQConnectionFactory) {
             JmsOutboundGateway gateway 
    = new JmsOutboundGateway();
             gateway.setConnectionFactory(activeMQConnectionFactory);
             gateway.setRequestDestinationName(ORDER_QUEUE);
             gateway.setReplyDestinationName(ORDER_REPLY_2_QUEUE);
             gateway.setCorrelationKey(
    "JMSCorrelationID");
             gateway.setSendTimeout(
    100L);
             gateway.setReceiveTimeout(
    100L);
             
    return gateway;
         }

         @Autowired
         Receiver receiver;

         @Bean
         
    public DefaultMessageListenerContainer responder(ActiveMQConnectionFactory activeMQConnectionFactory) {
             DefaultMessageListenerContainer container 
    = new DefaultMessageListenerContainer();
             container.setConnectionFactory(activeMQConnectionFactory);
             container.setDestinationName(ORDER_QUEUE);
             MessageListenerAdapter adapter 
    = new MessageListenerAdapter(new Object() {

                 @SuppressWarnings(
    "unused")
                 
    public Shipment handleMessage(Order order) {
                     
    return receiver.receiveMessage(order);
                 }

             });
             container.setMessageListener(adapter);
             
    return container;
         }
     }
  5. Next, we will configure the MessagingGateway

     @MessagingGateway(defaultRequestChannel = "requests")
     
    public interface ClientGateway {
         Shipment sendAndReceive(Order order);
     }
  6. We then Autowire this gateway in our Component class when we want to send and receive the message. A sample is shown below.

     @Slf4j
     @Component
     
    public class Receiver {
         
    public Shipment receiveMessage(@Payload Order order) {
             Shipment shipment 
    = new Shipment(order.getId(), UUID.randomUUID().toString());
             
    return shipment;
         }
     }
  7. Next we configure the Componen to process the Order message. After successful execution, this component will send the Shipment message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.

For those, who just want to clone the code, head out to aniruthmp/jms

Written on June 5, 2018
https://aniruthmp.github.io/Spring-JMS-request-response/

posted @ 2019-06-27 09:20 paulwong 阅读(443) | 评论 (0)编辑 收藏

ACTIVE MQ高级特性

https://blog.51cto.com/1754966750/category17.html

posted @ 2019-06-26 14:13 paulwong 阅读(317) | 评论 (0)编辑 收藏

JMS消息消费的顺序性

现有的系统架构都是分布式的。有多个消息的发送者和多个消息的消费者。例如订单创建消息和订单支付消息,我们需要保证先消费订单创建消息,然后消费订单支付消息。

如何解决MQ消息消费顺序问题
https://segmentfault.com/a/1190000014512075

jms-如何保证消息的顺序
https://leokongwq.github.io/2017/01/23/jms-message-order.html







posted @ 2019-06-24 17:42 paulwong 阅读(421) | 评论 (0)编辑 收藏

Multiple MongoDB connectors with Spring Boot

http://blog.marcosbarbero.com/multiple-mongodb-connectors-in-spring-boot/

https://github.com/yinjihuan/spring-boot-starter-mongodb-pool

https://github.com/lish1le/mongodb-plus

posted @ 2019-06-20 15:12 paulwong 阅读(327) | 评论 (0)编辑 收藏

仅列出标题
共110页: First 上一页 20 21 22 23 24 25 26 27 28 下一页 Last