pursue wind pursue wind
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
  • mybatis

  • mybatis-plus

  • Spring

  • SpringBoot

  • SpringSecurity

  • SpringCloud

    • 文档

    • Gateway

    • Spring Cloud Stream

      • Spring Cloud Stream实现消息过滤消费
        • condition
          • 生产者
          • 消费者
        • Tags
          • 生产者
          • 消费者
        • Sql 92
          • 开启SQL 92支持
          • 生产者
          • 消费者
        • 相关代码
        • 参考文档
        • 本文首发
      • Spring Cloud Stream知识点盘点
      • Spring Cloud Stream错误处理详解
    • Alibaba Sentinel 规则参数总结
    • Alibaba Sentinel规则持久化-拉模式-手把手教程【基于文件】
    • Feign常见问题总结
    • SentinelResource注解 属性总结
    • Spring Cloud Alibaba Sentienl相关配置项
    • SpringCloudAlibaba
    • SpringCloud入门
    • 使用Spring Cloud Feign上传文件
    • 如何使用Feign构造多参数的请求
    • 实用技巧:Hystrix传播ThreadLocal对象(两种方案)
    • 扩展Ribbon支持Nacos权重的三种方式
    • 扩展Ribbon支持基于元数据的版本管理
    • 搭建生产可用的Nacos集群
  • 单元测试框架Mockito
  • 框架
  • SpringCloud
  • Spring Cloud Stream
pursuewind
2020-11-23
目录

Spring Cloud Stream实现消息过滤消费

# Spring Cloud Stream实现消息过滤消费

TIPS

本文基于Spring Cloud Greenwich SR1 + spring-cloud-starter-stream-rocketmq 0.9.0

理论兼容:Spring Cloud Finchley+ + spring-cloud-starter-stream-rocketmq 0.2.2+

MQ使用的是RocketMQ,也可使用Kafka或者RabbitMQ。

本文探讨Spring Cloud Stream & RocketMQ过滤消息的各种姿势。

在实际项目中,我们可能需要实现消息消费的过滤。

举个例子:实现消息的分流处理:

生产者生产的消息,虽然消息体可能一样,但是header不一样。可编写两个或者更多的消费者,对不同header的消息做针对性的处理!

# condition

# 生产者

生产者设置一下header,比如my-header,值根据你的需要填写:

@Autowired
private Source source;

public String testStream() {
  this.source.output()
    .send(
    MessageBuilder
    .withPayload("消息体")
    .setHeader("my-header","你的header")
    .build()
  );
  return "success";
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 消费者

@Service
@Slf4j
public class TestStreamConsumer {
    @StreamListener(value = Sink.INPUT,condition = "headers['my-header']=='你的header'")
    public void receive(String messageBody) {
        log.info("通过stream收到了消息:messageBody ={}", messageBody);
    }
}
1
2
3
4
5
6
7
8

如代码所示,使用 StreamListener 注解的 condition 属性。当 headers['my-header']=='你的header' 条件满足,才会进入到方法体。

# Tags

TIPS

该方式只支持RoketMQ,不支持Kafka/RabbitMQ

# 生产者

@Autowired
private Source source;

public String testStream() {
  this.source.output()
    .send(
    MessageBuilder
    .withPayload("消息体")
    // 注意:只能设置1个tag
    .setHeader(RocketMQHeaders.TAGS, "tag1")
    .build()
  );
  return "success";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 消费者

  • 接口

    public interface MySink {
        String INPUT1 = "input1";
        String INPUT2 = "input2";
    
        @Input(INPUT1)
        SubscribableChannel input();
    
        @Input(INPUT2)
        SubscribableChannel input2();
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
  • 注解

    @EnableBinding({MySink.class})
    
    1
  • 配置

    spring:
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
            bindings:
              input1:
                consumer:
                  # 表示input2消费带有tag1的消息
                  tags: tag1
              input2:
                consumer:
                  # 表示input2消费带有tag2或者tag3的消息
                  tags: tag2||tag3
          bindings:
            input1:
              destination: test-topic
              group: test-group1
            input2:
              destination: test-topic
              group: test-group2
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
  • 消费代码

    @Service
    @Slf4j
    public class MyTestStreamConsumer {
        /**
         * 我消费带有tag1的消息
         *
         * @param messageBody 消息体
         */
        @StreamListener(MySink.INPUT1)
        public void receive1(String messageBody) {
            log.info("带有tag1的消息被消费了:messageBody ={}", messageBody);
        }
    
        /**
         * 我消费带有tag1或者tag2的消息
         *
         * @param messageBody 消息体
         */
        @StreamListener(MySink.INPUT2)
        public void receive2(String messageBody) {
            log.info("带有tag2/tag3的消息被消费了:messageBody ={}", messageBody);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
  • 日志:

    2019-08-04 19:10:03.799  INFO 53760 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer      : 带有tag1的消息被消费了:messageBody =消息体
    
    1

# Sql 92

TIPS

  • 该方式只支持RoketMQ,不支持Kafka/RabbitMQ
  • 用了sql,就不要用Tag

RocketMQ支持使用SQL语法过滤消息。官方文档:http://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/

Spring Clous Stream RocketMQ也为此特性提供了支持。

# 开启SQL 92支持

默认情况下,RocketMQ的SQL过滤支持是关闭的,要想使用SQL 92过滤消息,需要:

  • 在 conf/broker.conf 添加

    enablePropertyFilter = true
    
    1
  • 启动RocketMQ

    nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
    
    1

# 生产者

@Autowired
private Source source;

public String testStream() {
  this.source.output()
    .send(
    MessageBuilder
    .withPayload("消息体")
    .setHeader("index", 1000)
    .build()
  );
  return "success";
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 消费者

  • 接口

    public interface MySink {
        String INPUT1 = "input1";
        String INPUT2 = "input2";
    
        @Input(INPUT1)
        SubscribableChannel input();
    
        @Input(INPUT2)
        SubscribableChannel input2();
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
  • 注解

    @EnableBinding({MySink.class})
    
    1
  • 配置

    spring:
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
            bindings:
              input1:
                consumer:
                  sql: 'index < 1000'
              input2:
                consumer:
                  sql: 'index >= 1000'
          bindings:
            input1:
              destination: test-topic
              group: test-group1
            input2:
              destination: test-topic
              group: test-group2
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
  • 消费代码

    @Service
    @Slf4j
    public class MyTestStreamConsumer {
        /**
         * 我消费带有tag1的消息
         *
         * @param messageBody 消息体
         */
        @StreamListener(MySink.INPUT1)
        public void receive1(String messageBody) {
            log.info("index > 1000的消息被消费了:messageBody ={}", messageBody);
        }
    
        /**
         * 我消费带有tag1或者tag2的消息
         *
         * @param messageBody 消息体
         */
        @StreamListener(MySink.INPUT2)
        public void receive2(String messageBody) {
            log.info("index <=1000 的消息被消费了:messageBody ={}", messageBody);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
  • 日志

    2019-08-04 19:58:59.787  INFO 56375 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer      : index <=1000 的消息被消费了:messageBody =消息体
    
    1

# 相关代码

org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties

# 参考文档

  • Filter Messages By SQL92 In RocketMQ (opens new window)
  • RocketMQ 错误:The broker does not support consumer to filter message by SQL92 (opens new window)

# 本文首发

http://www.itmuch.com/spring-cloud-alibaba/spring-cloud-stream-rocketmq-filter-consume/

······················

Last Updated: 2023/01/30, 11:01:00
Spring Cloud Gateway限流
Spring Cloud Stream知识点盘点

← Spring Cloud Gateway限流 Spring Cloud Stream知识点盘点→

Theme by Vdoing | Copyright © 2019-2023 pursue-wind | 粤ICP备2022093130号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
  • 飙升榜
  • 新歌榜
  • 云音乐民谣榜
  • 美国Billboard榜
  • UK排行榜周榜
  • 网络DJ