pursue wind pursue wind
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
  • mybatis

  • mybatis-plus

  • Spring

  • SpringBoot

  • SpringSecurity

  • SpringCloud

    • 文档

    • Gateway

    • Spring Cloud Stream

      • Spring Cloud Stream实现消息过滤消费
      • Spring Cloud Stream知识点盘点
      • Spring Cloud Stream错误处理详解
        • 本文首发
    • Alibaba Sentinel 规则参数总结
    • Alibaba Sentinel规则持久化-拉模式-手把手教程【基于文件】
    • Feign常见问题总结
    • SentinelResource注解 属性总结
    • Spring Cloud Alibaba Sentienl相关配置项
    • SpringCloudAlibaba
    • SpringCloud入门
    • 使用Spring Cloud Feign上传文件
    • 如何使用Feign构造多参数的请求
    • 实用技巧:Hystrix传播ThreadLocal对象(两种方案)
    • 扩展Ribbon支持Nacos权重的三种方式
    • 扩展Ribbon支持基于元数据的版本管理
    • 搭建生产可用的Nacos集群
  • 单元测试框架Mockito
  • 框架
  • SpringCloud
  • Spring Cloud Stream
pursuewind
2020-11-23
目录

Spring Cloud Stream错误处理详解

# Spring Cloud Stream错误处理详解

TIPS

本文基于Spring Cloud Greenwich SR1,理论支持Finchley及更高版本。

本节详细探讨Spring Cloud Stream的错误处理。

# 应用处理

# 局部处理【通用】

配置:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-destination
          group: my-group
        output:
          destination: my-destination
1
2
3
4
5
6
7
8
9

代码:

@Slf4j
@SpringBootApplication
@EnableBinding({Processor.class})
@EnableScheduling
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @StreamListener(value = Processor.INPUT)
    public void handle(String body) {
        throw new RuntimeException("x");
    }

    @ServiceActivator(inputChannel = "my-destination.my-group.errors")
    public void handleError(ErrorMessage message) {
        Throwable throwable = message.getPayload();
        log.error("截获异常", throwable);

        Message<?> originalMessage = message.getOriginalMessage();
        assert originalMessage != null;

        log.info("原始消息体 = {}", new String((byte[]) originalMessage.getPayload()));
    }

    @Bean
    @InboundChannelAdapter(value = Processor.OUTPUT,
            poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
    public MessageSource<String> test() {
        return () -> new GenericMessage<>("adfdfdsafdsfa");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 全局处理【通用】

@StreamListener(value = Processor.INPUT)
public void handle(String body) {
    throw new RuntimeException("x");
}

@StreamListener("errorChannel")
public void error(Message<?> message) {
    ErrorMessage errorMessage = (ErrorMessage) message;
    System.out.println("Handling ERROR: " + errorMessage);
}
1
2
3
4
5
6
7
8
9
10

# 系统处理

系统处理方式,因消息中间件不同而异。如果应用没有配置错误处理,那么error将会被传播给binder,binder将error回传给消息中间件。消息中间件可以丢弃消息、requeue(重新排队,从而重新处理)或将失败的消息发送给DLQ(死信队列)。

# 丢弃

默认情况下,错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产。

# DLQ【RabbitMQ】

TIPS

  • 虽然RocketMQ也支持DLQ,但目前RocketMQ控制台并不支持在界面上操作,将死信放回消息队列,让客户端重新处理。所以使用很不方便,而且用法也和本节有一些差异。
  • 如使用RocketMQ,建议参考上面【应用处理】一节的用法,也可额外订阅这个Topic %DLQ%+consumerGroup
  • 个人给RocketMQ控制台提的Issue:https://github.com/apache/rocketmq/issues/1334

配置:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-destination
          group: my-group
        output:
          destination: my-destination
      rabbit:
        bindings:
          input:
            consumer:
              auto-bind-dlq: true
1
2
3
4
5
6
7
8
9
10
11
12
13
14

代码:

@StreamListener(value = Processor.INPUT)
public void handle(String body) {
    throw new RuntimeException("x");
}

@Bean
@InboundChannelAdapter(value = Processor.OUTPUT,
        poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
    return () -> new GenericMessage<>("adfdfdsafdsfa");
}
1
2
3
4
5
6
7
8
9
10
11

这样,消息消费失败后,就会放入死信队列。在控制台操作一下,即可将死信放回消息队列,这样,客户端就可以重新处理。

如果想获取原始错误的异常堆栈,可添加如下配置:

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          input:
            consumer:
              republish-to-dlq: true
1
2
3
4
5
6
7
8

# requeue【RabbitMQ】

Rabbit/Kafka的binder依赖RetryTemplate实现重试,从而提升消息处理的成功率。然而,如果设置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那么RetryTemplate则不再重试。此时可通过requeue方式处理异常。

添加如下配置:

# 默认是3,设为1则禁用重试
spring.cloud.stream.bindings.<input channel名称>.consumer.max-attempts=1
# 表示是否要requeue被拒绝的消息(即:requeue处理失败的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true
1
2
3
4

这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException 异常为止。

# RetryTemplate【通用】

# 配置方式

RetryTemplate重试也是错误处理的一种手段。

spring:
  cloud:
    stream:
      bindings:
        <input channel名称>:
          consumer:
            # 最多尝试处理几次,默认3
            maxAttempts: 3
            # 重试时初始避退间隔,单位毫秒,默认1000
            backOffInitialInterval: 1000
            # 重试时最大避退间隔,单位毫秒,默认10000
            backOffMaxInterval: 10000
            # 避退乘数,默认2.0
            backOffMultiplier: 2.0
            # 当listen抛出retryableExceptions未列出的异常时,是否要重试
            defaultRetryable: true
            # 异常是否允许重试的map映射
            retryableExceptions:
              java.lang.RuntimeException: true
              java.lang.IllegalStateException: false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

测试代码:

@StreamListener(value = Processor.INPUT)
public void handle(String body) {
    throw new RuntimeException(body);
}

private AtomicInteger count = new AtomicInteger(0);

@Bean
@InboundChannelAdapter(value = Processor.OUTPUT,
        poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
    return () -> new GenericMessage<>(count.getAndAdd(1) + "");
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 编码方式

多数场景下,使用配置方式定制重试行为都是可以满足需求的,但配置方式可能无法满足一些复杂需求。此时可使用编码方式配置RetryTemplate:

@Configuration
class RetryConfiguration {
    @StreamRetryTemplate
    public RetryTemplate sinkConsumerRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());

        return retryTemplate;
    }

    private ExceptionClassifierRetryPolicy retryPolicy() {
        BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(
                Collections.singletonList(IllegalAccessException.class
                ));
        keepRetryingClassifier.setTraverseCauses(true);

        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
        AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();

        ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
        retryPolicy.setExceptionClassifier(
                classifiable -> keepRetryingClassifier.classify(classifiable) ?
                        alwaysRetryPolicy : simpleRetryPolicy);

        return retryPolicy;
    }

    private FixedBackOffPolicy backOffPolicy() {
        final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(2);

        return backOffPolicy;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

然后添加配置:

spring.cloud.stream.bindings.<input channel名称>.consumer.retry-template-name=myRetryTemplate
1

注意:

Spring Cloud Stream 2.2才支持设置retry-template-name

# 本文首发

http://www.itmuch.com/spring-cloud/spring-cloud-stream-error-handling/

Last Updated: 2023/01/30, 11:01:00
Spring Cloud Stream知识点盘点
Alibaba Sentinel 规则参数总结

← Spring Cloud Stream知识点盘点 Alibaba Sentinel 规则参数总结→

Theme by Vdoing | Copyright © 2019-2023 pursue-wind | 粤ICP备2022093130号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
  • 飙升榜
  • 新歌榜
  • 云音乐民谣榜
  • 美国Billboard榜
  • UK排行榜周榜
  • 网络DJ