当ACTIVEMQ的某个QUEUE有多个消费者,为避免某个消息者取了更多个消息处理,而造成其他消费者无消息可处理的情况,可以设置每个消费者不预取消息,即每个消费者消费完单个消息后,再去取消息,这样其他消费者就能平均的有消息可处理。
https://stackoverflow.com/questions/35928089/activemq-how-to-prevent-message-from-going-to-dispatched-queue
设置方法,在CONNECT STRING中设置:
tcp://localhost:61616?jms.prefetchPolicy.all=0
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
http://activemq.apache.org/what-is-the-prefetch-limit-for.html
Lessons
In our first lesson, you will get introduced to the concepts of Enterprise Application Integration. You will learn about the and Enterprise integration patterns that can be applied to simplify integration between different platforms and the Integration strategies that can be followed for this purpose. Finally, we will discuss how and why to implement a Message driven architecture and how to achieve both Synchronous and asynchronous communication among nodes.
In this lesson, you will get to understand how Spring Integration works under the hood. The core concepts of Spring Integration messaging system (like message channels and endpoints) will be introduced. Additionally, the components that build the framework will be discussed, including the channel adapters, transformers, filters, routers etc. Finally, the two distinct methods of communication (synchronous and asynchronous) are explained and the lesson ends with a discussion on error handling.
In this lesson, we will focus on the integration with external web services. Spring Integration comes with the necessary functionality (adapters, channels etc.) to support web services out of the box. A full example is built from scratch in order to better understand the topic.
In this lesson, we will focus on integrating our application with JMS messaging. For this purpose, we will use Active MQ, which will be our broker. We will show examples of sending and receiving JMS messages by using the Spring Integration JMS channel adapters. Following these examples, we will see some ways of customizing these invocations by configuring message conversion and destination resolution.
In this lesson, we will wrap everything up by providing a complete application that uses several of the components provided by Spring Integration in order to provide a service to its users. We will discuss the system architecture, the actual implementation and the relevant error handling.
In this lesson, we will examine different mechanisms of monitoring or gathering more information about what is going on within the messaging system. Some of these mechanisms consist of managing or monitoring the application through MBeans, which are part of the JMX specification. Another mechanism discussed in this chapter is how we will implement the EIP idempotent receiver pattern using a metadata store. Finally, the last mechanism described is the control bus. This will let us send messages that will invoke operations on components in the application context.
vi /etc/resolv.conf
nameserver 8.8.8.8
如果要对JMS BROKER生产和消费MESSAGE,一种方式是用JmsTemplate发送和消费消息,另一种方式是SPRING INTEGRATION。
SPRING INTEGRATION是实现了EIP模式的一种框架,即使用CHANNEL和JMS-INBOUND-ADAPTER、JMS-OUTBOUND-ADAPTER,完全脱离了JmsTemplate的API。
如果需要实现这种场景:从BROKER取一条消息,处理消息,且处理途中不要再从BROKER再取消息,处理完后再取消息,再处理。
这样要求手动开始和停止JMS LISTENER,即手动开始和停止JMS-INBOUND-ADAPTER、JMS-OUTBOUND-ADAPTER。
@Bean
@InboundChannelAdapter(value = "loaderResponseChannel")
public MessageSource loaderResponseSource() throws Exception {
return Jms
.inboundAdapter(oracleConnectionFactory())
.configureJmsTemplate(
t -> t.deliveryPersistent(true)
.jmsMessageConverter(jacksonJmsMessageConverter())
).destination(jmsInbound).get();
}
当使用@InboundChannelAdapter时,会自动注册一个SourcePollingChannelAdapter ,但这个名字比较长:configrationName.loaderResponseSource.inboundChannelAdapter。
呼叫这个实例的start()和stop()方法即可。
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("controlBus")
.controlBus()
.get();
}
Message operation = MessageBuilder.withPayload("@configrationName.loaderResponseSource.inboundChannelAdapter.start()").build();
operationChannel.send(operation)
https://stackoverflow.com/questions/45632469/shutdown-spring-integration-with-jms-inboundadapterhttps://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/system-management-chapter.html#control-bushttps://github.com/spring-projects/spring-integration-java-dsl/blob/master/src/test/java/org/springframework/integration/dsl/test/jms/JmsTests.javahttps://stackoverflow.com/questions/50428552/how-to-stop-or-suspend-polling-after-batch-job-fail
CountDownLatch、CyclicBarrier和Semaphore这三个并发辅助类,可以在线程中呼叫,使得线程暂停等,但各有不同。
1、初始化,并传入计数器
2、向不同的线程传入CountDownLatch实例
3、如果在某一线程中呼叫await(),则此线程被挂起,直到计数器为0,才往下执行
4、如果在某一线程中呼叫countDown(),计数器减1
5、最终如果计数器值为0时,则CountDownLatch实例不再起作用了,即为一次性的
1、初始化,并传入计数器值,也可传入一个Runnable类,会在计数器为0时,被执行
2、向不同的线程传入CyclicBarrier实例
3、如果在某一线程中呼叫await(),则此线程被挂起,直到计数器为0,才往下执行
4、其他线程呼叫await(),则此线程被挂起,直到计数器为0,才往下执行
5、最终如果计数器值为0时,则CyclicBarrier实例会将计数器值恢复,又可重用
1、初始化,并传入计数器值
2、向不同的线程传入Semaphore实例
3、如果在某一线程中呼叫acquire(),则Semaphore实例会将计数器值减1,如果计数器值为-1,则将计数器值置为0,此线程被挂起,直到计数器值大于1时,才往下执行
4、此线程需呼叫release(),使得计数器值+1,以便其他线程在计数器值为0时不受阻
CountDownLatch 例子:
public class Test {
public static void main(String[] args) {
final CountDownLatch latch =
new CountDownLatch(2);
new Thread(){
public void run() {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown();
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread(){
public void run() {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown();
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
try {
System.out.println("等待2个子线程执行完毕

");
latch.await();
System.out.println("2个子线程已经执行完毕");
System.out.println("继续执行主线程");
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果:
线程Thread-0正在执行
线程Thread-1正在执行
等待2个子线程执行完毕

线程Thread-0执行完毕
线程Thread-1执行完毕
2个子线程已经执行完毕
继续执行主线程
CyclicBarrier例子:
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier =
new CyclicBarrier(N,
new Runnable() {
@Override
public void run() {
System.out.println("当前线程"+Thread.currentThread().getName());
}
});
for(
int i=0;i<N;i++)
new Writer(barrier).start();
}
static class Writer
extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据

");
try {
Thread.sleep(5000);
//以睡眠来模拟写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务

");
}
}
}
执行结果:
线程Thread-0正在写入数据

线程Thread-1正在写入数据

线程Thread-2正在写入数据

线程Thread-3正在写入数据

线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
当前线程Thread-3
所有线程写入完毕,继续处理其他任务

所有线程写入完毕,继续处理其他任务

所有线程写入完毕,继续处理其他任务

所有线程写入完毕,继续处理其他任务

Semaphore例子:
public class Test {
public static void main(String[] args) {
int N = 8;
//工人数
Semaphore semaphore =
new Semaphore(5);
//机器数目
for(
int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker
extends Thread{
private int num;
private Semaphore semaphore;
public Worker(
int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+
this.num+"占用一个机器在生产

");
Thread.sleep(2000);
System.out.println("工人"+
this.num+"释放出机器");
semaphore.release();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
执行结果:
工人0占用一个机器在生产

工人1占用一个机器在生产

工人2占用一个机器在生产

工人4占用一个机器在生产

工人5占用一个机器在生产

工人0释放出机器
工人2释放出机器
工人3占用一个机器在生产

工人7占用一个机器在生产

工人4释放出机器
工人5释放出机器
工人1释放出机器
工人6占用一个机器在生产

工人3释放出机器
工人7释放出机器
工人6释放出机器
https://www.cnblogs.com/dolphin0520/p/3920397.html
https://juejin.im/post/5aeec3ebf265da0ba76fa327