paulwong

EVEN DRIVEN - SPRING CLOUD STREAM - Routing Function

SPRING CLOUD STREAM内置了一个RoutingFunction,能将MESSAGE路由到应用的其他FUNCTION中。
对接RoutingFunction可发送消息到其外部DESTINATION中或用“|”连接符连接。

application.yaml
# This setting can increase or decrease the rate of message production (1000 = 1s)
#
 spring.cloud.stream.poller.fixed-delay=1000
#
 DefaultPollerProperties

# This setting can control which function method in our code will be triggered if there are multiple
#
 spring.cloud.function.definition=supplyLoan

# Give the autogenerated binding a friendlier name
spring:
   application:
      name: loan-check-rabbit
   banner:
      location: classpath:/banner-rabbit.txt
   cloud:
      #BindingServiceProperties
      stream:
         #StreamFunctionProperties
         function:
            definition: loadCheckerFunction;loanCheckerDecieder;loanCheckerConsumer;\
                        loanDeclinedConsumer;loanApprovedConsumer;loanCheckerProcessor|functionRouter
            routing:
               enabled: true
         #BindingProperties
         bindings:
            loanCheckerProcessor|functionRouter-in-0:
               destination: queue.pretty.log.messages
               binder: local_rabbit
               
            loanApprovedConsumer-in-0:
               destination: load.approved
               binder: local_rabbit
            loanDeclinedConsumer-in-0:
               destination: load.declined
               binder: local_rabbit
               
            loanCheckerDecieder-in-0:
               destination: queue.pretty.log.messages.222
               binder: local_rabbit
            loanCheckerDecieder-out-0:
               destination: queue.pretty.approved.messages
               binder: local_rabbit
            loanCheckerConsumer-in-0:
               destination: queue.pretty.approved.messages
               binder: local_rabbit
         #BinderProperties
         binders:
            local_rabbit:
               type: rabbit
               environment:
                  spring:
                     rabbitmq:
                        host: 10.80.27.69
                        port: 5672
                        username: guest
                        password: guest
                        virtual-host: my-virtual-host
                        
                        
logging:
   level:
      root: info
      org.springframework:
         cloud.function: debug
         #retry: debug


LoanCheckConfiguration.java
package com.paul.testspringcloudstream.loancheck.config;

import java.util.function.Consumer;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

import com.paul.testspringcloudstream.common.model.Loan;
import com.paul.testspringcloudstream.common.model.Status;
import com.paul.testspringcloudstream.loancheck.router.LoanCheckerRouter;
import com.paul.testspringcloudstream.loancheck.service.LoanProcessor;
import com.paul.testspringcloudstream.loancheck.service.LoanService;

@Configuration
public class LoanCheckConfiguration {
    
    private static final Logger log = LoggerFactory.getLogger(LoanCheckConfiguration.class);
    private static final Long MAX_AMOUNT = 10000L;
    private static final String LOG_PATTERN = "{} - {} {} for ${} for {}";
    
    @Autowired
    public void test(Consumer<Loan> loanCheckerConsumer) {
        log.info("{}", loanCheckerConsumer.getClass());
    }
    
    
    @Bean
    public Consumer<Loan> loanCheckerConsumer(){
        return loan -> 
            log.info(LOG_PATTERN, "loanCheckerConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
    }
    
    @Bean
    public Consumer<Loan> loanDeclinedConsumer(){
        return loan -> 
            log.info(LOG_PATTERN, "loanDeclinedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
    }
    
    @Bean
    public Consumer<Loan> loanApprovedConsumer(){
        return loan -> 
            log.info(LOG_PATTERN, "loanApprovedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
    }
    
    @Bean
    public MessageRoutingCallback loanCheckerRouter() {
        return new LoanCheckerRouter();
    }
    
    @Bean
    public Function<Loan, Loan> loanCheckerProcessor(
        LoanService loanService
    ){
        return loan -> loanService.check(loan);
    }
    
    @Bean
    public Function<Loan, Message<Loan>> loanCheckerProcessorBak(
        LoanService loanService
    ){
        return loan -> {
            Loan result = loanService.check(loan);
            String sendTo = Status.DECLINED.name().equals(result.getStatus()) ? 
                        LoanProcessor.DECLINED_OUT : LoanProcessor.APPROVED_OUT;
            
            return MessageBuilder.withPayload(result)
                        .setHeader("spring.cloud.stream.sendto.destination", sendTo)
                        .build();
        };
    }
    
    @Bean
    public Consumer<Loan> loanCheckerDecieder(StreamBridge streamBridge){
        return loan -> {
            log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());

            if (loan.getAmount() > MAX_AMOUNT) {
                loan.setStatus(Status.DECLINED.name());
                streamBridge.send(LoanProcessor.DECLINED_OUT, "local_rabbit", loan);
            } else {
                loan.setStatus(Status.APPROVED.name());
                streamBridge.send(LoanProcessor.APPROVED_OUT, "local_rabbit", loan);
            }

            log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
        };
    }

}


LoanCheckerRouter.java,将路由条件统一在此处
package com.paul.testspringcloudstream.loancheck.router;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.messaging.Message;

import com.paul.testspringcloudstream.common.model.Loan;
import com.paul.testspringcloudstream.common.model.Status;

public class LoanCheckerRouter implements MessageRoutingCallback{
    
    private static final Logger log = LoggerFactory.getLogger(LoanCheckerRouter.class);

    @Override
    public String functionDefinition(Message<?> message) {
        
//        byte[] resultByte = (byte[])message.getPayload();
//        String resultString = new String(resultByte);
//        
//        return "loanDeclinedConsumer";
        
        Loan result = (Loan)message.getPayload();
        
        log.info("Loan status: {}", result.getStatus());
        
        return Status.DECLINED.name().equals(result.getStatus()) ? 
                    "loanDeclinedConsumer" : "loanApprovedConsumer";
    }

}

posted on 2021-11-15 14:46 paulwong 阅读(293) 评论(0)  编辑  收藏 所属分类: SPRING CLOUD


只有注册用户登录后才能发表评论。


网站导航: