posted @ 2019-07-11 11:45 paulwong 阅读(27) | 评论 (0)编辑 收藏


SPRING BATCH 读文件时,是按一行一行来读取数据,再按CHUNKSIZE提交到REMOTE操作,有时要整合当前行和下几行,再决定CHUNKSIZE,以便相关的数据能在远程同一个PROCESSOR中按顺序进行处理,因为相关的数据被拆成几个CHUNK来处理的话,就有可能不按顺序来处理。这样就需要动态调整CHUNKSIZE。



posted @ 2019-07-02 11:13 paulwong 阅读(59) | 评论 (0)编辑 收藏


消息队列之 RabbitMQ

Spring Boot 中使用 RabbitMQ

posted @ 2019-06-28 10:24 paulwong 阅读(32) | 评论 (0)编辑 收藏

How to implement JMS ReplyTo using SpringBoot

Request-Response is a message-exchange-pattern. In some cases, a message producer may want the consumers to reply to a message. The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).

In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.

For simplicity, this pattern is typically implemented in a purely synchronous fashion, as in web service calls over HTTP, which holds a connection open and waits until the response is delivered or the timeout period expires. However, request–response may also be implemented asynchronously, with a response being returned at some unknown later time.

For more information, check here.

Now, let’s jump into the code. In Spring, there are 2 ways to implement this (at least I know of).

  1. Using JMSTemplate
  2. Using Spring Integration

For demo purpose, I used ActiveMQ. However, you can implement this in other messaging systems like IBM MQ, Rabbit MQ, Tibco EMS, etc. In this demo, I send an ObjectMessage of type Order and reply with a Shipment object.

Using JMSTemplate

  1. First, we include the required dependencies. Replace the activemq dependency with your messaging system’s jars if not using ActiveMQ

  2. Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.

         broker-url: tcp://localhost:
         non-blocking-redelivery: true
           trust-all: true    
  3. Note in the above configuration can be changed to spring.activemq.packages.trusted with the appropriate packages.
  4. Now spring will do it’s magic and inject all the required Beans as usual :) However, in our code, we need to EnableJms

    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.annotation.EnableJms;

    public class ActiveMQConfig {

    public static final String ORDER_QUEUE = "order-queue";
    public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";

  5. First, we will configure the Producer

    public class Producer {

         JmsMessagingTemplate jmsMessagingTemplate;

         JmsTemplate jmsTemplate;
          private Session session;

           public void init(){

             session = jmsMessagingTemplate.getConnectionFactory().createConnection()
                     .createSession(false, Session.AUTO_ACKNOWLEDGE);

    public Shipment sendWithReply(Order order) throws JMSException {

             ObjectMessage objectMessage 
    = session.createObjectMessage(order);

    new ActiveMQQueue(ORDER_REPLY_2_QUEUE));

    return jmsMessagingTemplate.convertSendAndReceive(new ActiveMQQueue(ORDER_QUEUE),
                     objectMessage, Shipment.
    class); //this operation seems to be blocking + sync
  6. Note in the above code that, JmsMessagingTemplate is used instead of JmsTemplatebecause, we are interested in the method convertSendAndReceive. As seen in the method signature, it waits to receive the Shipment object from the consumer.
  7. Next, we can see the Receiver

    public class Receiver implements SessionAwareMessageListener<Message> {

    public void onMessage(Message message, Session session) throws JMSException {
             Order order 
    = (Order) ((ActiveMQObjectMessage) message).getObject();
             Shipment shipment 
    = new Shipment(order.getId(), UUID.randomUUID().toString());

    // done handling the request, now create a response message
             final ObjectMessage responseMessage = new ActiveMQObjectMessage();

    // Message sent back to the replyTo address of the income message.
             final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
  8. Using the javax.jms.Session the javax.jms.MessageProducer is created and used to send the reply message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.

Using Spring Integration

  1. First, we include the required dependencies in addition to the above dependencies

  2. Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.

    -url: tcp://localhost:61616
    -blocking-redelivery: true
    -all: true   
  3. Note in the above configuration can be changed to spring.activemq.packages.trusted with the appropriate packages.
  4. Next we create the required Beans for the Spring Integration.

    public class ActiveMQConfig {

    public static final String ORDER_QUEUE = "order-queue";
    public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";

    public MessageConverter messageConverter() {
             MappingJackson2MessageConverter converter 
    = new MappingJackson2MessageConverter();
    return converter;

    public MessageChannel requests() {
    return new DirectChannel();

    = "requests")
    public JmsOutboundGateway jmsGateway(ActiveMQConnectionFactory activeMQConnectionFactory) {
             JmsOutboundGateway gateway 
    = new JmsOutboundGateway();
    return gateway;

         Receiver receiver;

    public DefaultMessageListenerContainer responder(ActiveMQConnectionFactory activeMQConnectionFactory) {
             DefaultMessageListenerContainer container 
    = new DefaultMessageListenerContainer();
             MessageListenerAdapter adapter 
    = new MessageListenerAdapter(new Object() {

    public Shipment handleMessage(Order order) {
    return receiver.receiveMessage(order);

    return container;
  5. Next, we will configure the MessagingGateway

     @MessagingGateway(defaultRequestChannel = "requests")
    public interface ClientGateway {
         Shipment sendAndReceive(Order order);
  6. We then Autowire this gateway in our Component class when we want to send and receive the message. A sample is shown below.

    public class Receiver {
    public Shipment receiveMessage(@Payload Order order) {
             Shipment shipment 
    = new Shipment(order.getId(), UUID.randomUUID().toString());
    return shipment;
  7. Next we configure the Componen to process the Order message. After successful execution, this component will send the Shipment message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.

For those, who just want to clone the code, head out to aniruthmp/jms

Written on June 5, 2018

posted @ 2019-06-27 09:20 paulwong 阅读(39) | 评论 (0)编辑 收藏


posted @ 2019-06-26 14:13 paulwong 阅读(38) | 评论 (0)编辑 收藏





posted @ 2019-06-24 17:42 paulwong 阅读(39) | 评论 (0)编辑 收藏

Multiple MongoDB connectors with Spring Boot

posted @ 2019-06-20 15:12 paulwong 阅读(36) | 评论 (0)编辑 收藏

NFS Server 架設

  1. 安裝 nfs-utils 套件。
    [root@kvm5 ~]# yum install -y nfs-utils
  2. 建立 NFS 分享目錄。
    [root@kvm5 ~]# mkdir /public /protected
  3. 修改 NFS 分享目錄的 SELinux 檔案 context。
    [root@kvm5 ~]# semanage fcontext -a -t public_content_t "/public(/.*)?"
    [root@kvm5 ~]# semanage fcontext -a -t public_content_t "/protected(/.*)?"
    [root@kvm5 ~]# restorecon -Rv /public /protected
  4. 考試時不用自行產生 kerberos keytab,只要依照指定的位置下載,存放在目錄 /etc/ 下,且檔名必須為 krb5.keytab。
    [root@kvm5 ~]# wget -O /etc/krb5.keytab 
  5. kerberos keytab 的驗證跟時間有關,server 與 client 都必須校時。
    [root@kvm5 ~]# date
    Sun Jan  7 14:50:04 CST 2018
    [root@kvm5 ~]# chronyc -a makestep
    200 OK
    200 OK
    [root@kvm5 ~]# date
    Mon Nov 20 15:53:22 CST 2017
  6. 在 /protected 下建立次目錄 restricted,並將其擁有者設定為 deyu3,讓 deyu3 可以寫入資料。
    [root@kvm5 ~]# mkdir -p  /protected/restricted 
    [root@kvm5 ~]# chown deyu3 /protected/restricted 
  7. 編輯設定檔 /etc/exports,分享 /protected 及 /public 兩個目錄給網域。
    [root@kvm5 ~]# echo '/protected,sync,sec=krb5p)' > /etc/exports
    [root@kvm5 ~]# echo '/public,sync)' >> /etc/exports
    [root@kvm5 ~]# vim /etc/exports
    [root@kvm5 ~]# cat /etc/exports

  8. NFS 掛載參數說明如下,詳細說明請參考 man 5 nfs 手冊。
    1. rw:read-write,可讀寫的權限;
    2. ro:read-only,唯讀的權限;
    3. sec=mode:安全認證模式;
      1. sec=sys 預設,使用本地 UNIX UIDs 及 GIDs 進行身份認證。
      2. sec=krb5 使用 Kerberos V5 取代本地 UNIX UIDs 及 GIDs 進行身份認證。
      3. sec=krb5i 使用 Kerberos V5 進行身份認證,資料完整性檢查,以防止數據被篡改。
      4. sec=krb5p 使用 Kerberos V5 進行身份認證,資料完整性檢查及 NFS 傳輸加密,以防止數據被篡改,這是最安全的方式。
    4. sync:資料同步寫入到記憶體與硬碟當中;
    [root@kvm5 ~]# man 5 nfs 
  9. 設定使用 4.2 版本,以匯出分享 SELinux context。無適合的版本 client 端掛載時會出現 mount.nfs: Protocol not supported 的訊息。
    [root@kvm5 ~]# vim /etc/sysconfig/nfs  sed -i 's/^\(RPCNFSDARGS=\).*$/\1\"-V 4.2\"/' /etc/sysconfig/nfs 
    [root@kvm5 ~]# grep ^RPCNFSDARGS /etc/sysconfig/nfs  RPCNFSDARGS="-V 4.2" 
  10. 設定開機啟動 nfs 服務,NFS server 端的服務為 nfs-server 及 nfs-secure-server,本版本只要啟動 nfs-server 就同時啟動 nfs-secure-server,而且使用 tab 鍵也不會出現 nfs-secure-server 服務,但有些版本則是兩者分開,必須確認是不是兩種服務都啟動。
    [root@kvm5 ~]# systemctl enable nfs-server.service nfs-secure-server.service 
  11. 啟動 nfs 服務
    [root@kvm5 ~]# systemctl start nfs-server.service nfs-secure-server.service 
  12. 查看目前啟動的 nfs 版本,因 server 指定使用 4.2,若出現 -4.2 表示 nfs server 沒有成功啟動。
    [root@kvm5 ~]# cat /proc/fs/nfsd/versions -2 +3 +4 +4.1 +4.2 
  13. 要確定 nfs-secure-server nfs-server 服務都正常運作。
    [root@kvm5 ~]# systemctl status nfs-secure-server.service nfs-server.service 
    nfs-secure-server.service - Secure NFS Server
    : loaded (/usr/lib/systemd/system/nfs-secure-server.service; enabled)
    : active (running) since Mon 2015-09-21 20:04:10 CST; 8s ago
    : 3075 ExecStart=/usr/sbin/rpc.svcgssd $RPCSVCGSSDARGS (code=exited, status=0/SUCCESS)
     Main PID
    : 3077 (rpc.svcgssd)
    : /system.slice/nfs-secure-server.service
    3077 /usr/sbin/rpc.svcgssd

    21 20:04:10 systemd[1]: Starting Secure NFS Server
    21 20:04:10 systemd[1]: Started Secure NFS Server.

    -server.service - NFS Server
    : loaded (/usr/lib/systemd/system/nfs-server.service; enabled)
    : active (exited) since Mon 2015-09-21 20:04:10 CST; 8s ago
    : 3078 ExecStopPost=/usr/sbin/exportfs -f (code=exited, status=0/SUCCESS)
    : 3076 ExecStop=/usr/sbin/rpc.nfsd 0 (code=exited, status=0/SUCCESS)
    : 3087 ExecStart=/usr/sbin/rpc.nfsd $RPCNFSDARGS $RPCNFSDCOUNT (code=exited, status=0/SUCCESS)
    : 3084 ExecStartPre=/usr/sbin/exportfs -r (code=exited, status=0/SUCCESS)
    : 3083 ExecStartPre=/usr/libexec/nfs-utils/scripts/nfs-server.preconfig (code=exited, status=0/SUCCESS)
     Main PID
    : 3087 (code=exited, status=0/SUCCESS)
    : /system.slice/nfs-server.service

    21 20:04:10 systemd[1]: Starting NFS Server
    21 20:04:10 systemd[1]: Started NFS Server.
  14. 建議不論是否 TAB 有沒有出現提示,都同時啟動這兩個服務。CentOS 安裝版本 nfs-utils-1.3.0-8.el7.x86_64 啟動 nfs-secure-server 出現錯誤訊息,請執行 yum downgrade nfs-utils 換成 nfs-utils-1.3.0-0.el7.x86_64 套件。
    [root@kvm5 ~]# rpm -qa | grep nfs-utils
    @kvm5 ~]# yum downgrade nfs-utils -y
    [root@kvm5 ~]# rpm -qa | grep nfs-utils
  15. 再重新啟動 nfs 服務,並查看是否正常運作。
    [root@kvm5 ~]# systemctl restart nfs-server.service nfs-secure-server.service 
  16. 輸出所有設定的 nfs 分享目錄。
    [root@kvm5 ~]# exportfs -arv

posted @ 2019-06-18 09:08 paulwong 阅读(50) | 评论 (0)编辑 收藏

centos7 mysql数据库安装和配置

posted @ 2019-06-14 10:24 paulwong 阅读(38) | 评论 (0)编辑 收藏


posted @ 2019-06-13 15:22 paulwong 阅读(42) | 评论 (0)编辑 收藏

共89页: 上一页 1 2 3 4 5 6 7 8 9 下一页 Last