paulwong

#

ACTIVEMQ设置预取消息数目

当ACTIVEMQ的某个QUEUE有多个消费者,为避免某个消息者取了更多个消息处理,而造成其他消费者无消息可处理的情况,可以设置每个消费者不预取消息,即每个消费者消费完单个消息后,再去取消息,这样其他消费者就能平均的有消息可处理。


https://stackoverflow.com/questions/35928089/activemq-how-to-prevent-message-from-going-to-dispatched-queue


设置方法,在CONNECT STRING中设置:
tcp://localhost:61616?jms.prefetchPolicy.all=0 

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0 

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10"); 
consumer = session.createConsumer(queue);

http://activemq.apache.org/what-is-the-prefetch-limit-for.html

posted @ 2019-10-31 11:28 paulwong 阅读(837) | 评论 (0)编辑 收藏

EIP in SPRING INTEGRATION

  • idempotent receiver
幂等型,同一个MESSAGE,如MESSAGE ID都一样,在MESSAGING系统中不管运行多少次,结果都一样,为啥?因为重复的MESSAGE,都被忽略了。
方案:
消息被处理后,从消息中取出ID,放入META-DATA-STORE中,后续处理消息时,要从META-DATA-STORE中检查是否有值。

下面这个方案,ID的存储和判断是否重复消息都在一个INTERCEPTOR中搞定。
https://stackoverflow.com/questions/50401460/spring-integration-dsl-configure-idempotent-receiver-to-identify-duplicates
https://www.javacodegeeks.com/2015/09/monitoring-and-management.html


claim-check
将MESSAGE的PAYLOAD存在STORE中,返回一个ID,这个ID即claim-check,如果需要取MESSAGE的DETAIl时,可从STORE中取出MESSAGE。
https://github.com/spring-projects/spring-integration/blob/master/src/reference/asciidoc/claim-check.adoc


posted @ 2019-10-25 11:03 paulwong 阅读(496) | 评论 (0)编辑 收藏

SPRING INTEGRATION LESSONS

Lessons

Introduction to Enterprise Application Integration

In our first lesson, you will get introduced to the concepts of Enterprise Application Integration. You will learn about the and Enterprise integration patterns that can be applied to simplify integration between different platforms and the Integration strategies that can be followed for this purpose. Finally, we will discuss how and why to implement a Message driven architecture and how to achieve both Synchronous and asynchronous communication among nodes.

Spring Integration Fundamentals

In this lesson, you will get to understand how Spring Integration works under the hood. The core concepts of Spring Integration messaging system (like message channels and endpoints) will be introduced. Additionally, the components that build the framework will be discussed, including the channel adapters, transformers, filters, routers etc. Finally, the two distinct methods of communication (synchronous and asynchronous) are explained and the lesson ends with a discussion on error handling.

Spring Integration and Web Services

In this lesson, we will focus on the integration with external web services. Spring Integration comes with the necessary functionality (adapters, channels etc.) to support web services out of the box. A full example is built from scratch in order to better understand the topic.

Enterprise Messaging

In this lesson, we will focus on integrating our application with JMS messaging. For this purpose, we will use Active MQ, which will be our broker. We will show examples of sending and receiving JMS messages by using the Spring Integration JMS channel adapters. Following these examples, we will see some ways of customizing these invocations by configuring message conversion and destination resolution.

Spring Integration Full Example

In this lesson, we will wrap everything up by providing a complete application that uses several of the components provided by Spring Integration in order to provide a service to its users. We will discuss the system architecture, the actual implementation and the relevant error handling.

Monitoring and Management

In this lesson, we will examine different mechanisms of monitoring or gathering more information about what is going on within the messaging system. Some of these mechanisms consist of managing or monitoring the application through MBeans, which are part of the JMX specification. Another mechanism discussed in this chapter is how we will implement the EIP idempotent receiver pattern using a metadata store. Finally, the last mechanism described is the control bus. This will let us send messages that will invoke operations on components in the application context.

posted @ 2019-10-25 09:45 paulwong 阅读(316) | 评论 (0)编辑 收藏

SPRING INTEGRATION DSL DEMO

https://github.com/spring-projects/spring-integration/tree/master/src/reference/asciidoc

posted @ 2019-10-23 11:55 paulwong 阅读(337) | 评论 (0)编辑 收藏

Spring Integration 中文手册 - GOOD

Spring Integration 中文手册 (1)


Spring Integration 中文手册 (2)

posted @ 2019-10-11 10:28 paulwong 阅读(838) | 评论 (0)编辑 收藏

LINUX配置DNS

vi /etc/resolv.conf

nameserver 8.8.8.8

posted @ 2019-10-10 10:55 paulwong 阅读(332) | 评论 (0)编辑 收藏

在SPRING INTEGRATION中手动开始和停止JMS LISTENER

如果要对JMS BROKER生产和消费MESSAGE,一种方式是用JmsTemplate发送和消费消息,另一种方式是SPRING INTEGRATION。

SPRING INTEGRATION是实现了EIP模式的一种框架,即使用CHANNEL和JMS-INBOUND-ADAPTER、JMS-OUTBOUND-ADAPTER,完全脱离了JmsTemplate的API。

如果需要实现这种场景:从BROKER取一条消息,处理消息,且处理途中不要再从BROKER再取消息,处理完后再取消息,再处理。

这样要求手动开始和停止JMS LISTENER,即手动开始和停止JMS-INBOUND-ADAPTER、JMS-OUTBOUND-ADAPTER。

@Bean
@InboundChannelAdapter(value = "loaderResponseChannel")
public MessageSource loaderResponseSource() throws Exception {
    return Jms
            .inboundAdapter(oracleConnectionFactory())
            .configureJmsTemplate(
                    t -> t.deliveryPersistent(true)
                            .jmsMessageConverter(jacksonJmsMessageConverter())
            ).destination(jmsInbound).get();
}

当使用@InboundChannelAdapter时,会自动注册一个SourcePollingChannelAdapter ,但这个名字比较长:configrationName.loaderResponseSource.inboundChannelAdapter。

呼叫这个实例的start()和stop()方法即可。

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from("controlBus")
              .controlBus()
              .get();
}

Message operation = MessageBuilder.withPayload("@configrationName.loaderResponseSource.inboundChannelAdapter.start()").build();
operationChannel.send(operation)

https://stackoverflow.com/questions/45632469/shutdown-spring-integration-with-jms-inboundadapter

https://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/system-management-chapter.html#control-bus

https://github.com/spring-projects/spring-integration-java-dsl/blob/master/src/test/java/org/springframework/integration/dsl/test/jms/JmsTests.java

https://stackoverflow.com/questions/50428552/how-to-stop-or-suspend-polling-after-batch-job-fail

posted @ 2019-10-09 17:16 paulwong 阅读(623) | 评论 (0)编辑 收藏

CountDownLatch、CyclicBarrier和Semaphore

CountDownLatch、CyclicBarrier和Semaphore这三个并发辅助类,可以在线程中呼叫,使得线程暂停等,但各有不同。

  • CountDownLatch
1、初始化,并传入计数器
2、向不同的线程传入CountDownLatch实例
3、如果在某一线程中呼叫await(),则此线程被挂起,直到计数器为0,才往下执行
4、如果在某一线程中呼叫countDown(),计数器减1
5、最终如果计数器值为0时,则CountDownLatch实例不再起作用了,即为一次性的

  • CyclicBarrier
1、初始化,并传入计数器值,也可传入一个Runnable类,会在计数器为0时,被执行
2、向不同的线程传入CyclicBarrier实例
3、如果在某一线程中呼叫await(),则此线程被挂起,直到计数器为0,才往下执行
4、其他线程呼叫await(),则此线程被挂起,直到计数器为0,才往下执行
5、最终如果计数器值为0时,则CyclicBarrier实例会将计数器值恢复,又可重用

  • Semaphore
1、初始化,并传入计数器值
2、向不同的线程传入Semaphore实例
3、如果在某一线程中呼叫acquire(),则Semaphore实例会将计数器值减1,如果计数器值为-1,则将计数器值置为0,此线程被挂起,直到计数器值大于1时,才往下执行
4、此线程需呼叫release(),使得计数器值+1,以便其他线程在计数器值为0时不受阻


CountDownLatch 例子:
public class Test {
     public static void main(String[] args) {   
         final CountDownLatch latch = new CountDownLatch(2);
          
         new Thread(){
             public void run() {
                 try {
                     System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                    Thread.sleep(3000);
                    System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
          
         new Thread(){
             public void run() {
                 try {
                     System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                     Thread.sleep(3000);
                     System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                     latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
          
         try {
             System.out.println("等待2个子线程执行完毕");
            latch.await();
            System.out.println("2个子线程已经执行完毕");
            System.out.println("继续执行主线程");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
     }
}

结果:
线程Thread-0正在执行
线程Thread-1正在执行
等待2个子线程执行完毕
线程Thread-0执行完毕
线程Thread-1执行完毕
2个子线程已经执行完毕
继续执行主线程


CyclicBarrier例子:
public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程"+Thread.currentThread().getName());   
            }
        });
         
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务");
        }
    }
}

执行结果:
线程Thread-0正在写入数据
线程Thread-1正在写入数据
线程Thread-2正在写入数据
线程Thread-3正在写入数据
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
当前线程Thread-3
所有线程写入完毕,继续处理其他任务
所有线程写入完毕,继续处理其他任务
所有线程写入完毕,继续处理其他任务
所有线程写入完毕,继续处理其他任务


Semaphore例子:
public class Test {
    public static void main(String[] args) {
        int N = 8;            //工人数
        Semaphore semaphore = new Semaphore(5); //机器数目
        for(int i=0;i<N;i++)
            new Worker(i,semaphore).start();
    }
     
    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }
         
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"占用一个机器在生产");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"释放出机器");
                semaphore.release();           
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:
工人0占用一个机器在生产
工人1占用一个机器在生产
工人2占用一个机器在生产
工人4占用一个机器在生产
工人5占用一个机器在生产
工人0释放出机器
工人2释放出机器
工人3占用一个机器在生产
工人7占用一个机器在生产
工人4释放出机器
工人5释放出机器
工人1释放出机器
工人6占用一个机器在生产
工人3释放出机器
工人7释放出机器
工人6释放出机器

https://www.cnblogs.com/dolphin0520/p/3920397.html

https://juejin.im/post/5aeec3ebf265da0ba76fa327

posted @ 2019-09-24 10:18 paulwong 阅读(340) | 评论 (0)编辑 收藏

使用 Jenkins 部署 Spring Boot

https://mp.weixin.qq.com/s?__biz=MzI4NDY5Mjc1Mg==&mid=2247489278&idx=2&sn=a48342d706bfd1651e277e1c24e81e3e&chksm=ebf6ce81dc81479764d1e6ff7b207257a78d52bed5ef8c2f16c76f70660d1da9609167ed7bbb&mpshare=1&scene=1&srcid=&sharer_sharetime=1568861026830&sharer_shareid=24856bf403968a883e437b859be0a9b5&pass_ticket=qB9yWQbj%2FGo7PDZNogjBwishDCx5Suu%2BvBWnS1TpKmY%3D#rd

posted @ 2019-09-19 17:44 paulwong 阅读(347) | 评论 (0)编辑 收藏

CI/CD 资源

Continuous delivery tool landscape
http://www.jamesbowman.me/post/continuous-delivery-tool-landscape/

posted @ 2019-09-18 17:08 paulwong 阅读(398) | 评论 (0)编辑 收藏

仅列出标题
共116页: First 上一页 23 24 25 26 27 28 29 30 31 下一页 Last