Spring Cloud Stream实现消息过滤消费
# Spring Cloud Stream实现消息过滤消费
TIPS
本文基于
Spring Cloud Greenwich SR1+spring-cloud-starter-stream-rocketmq 0.9.0理论兼容:
Spring Cloud Finchley++spring-cloud-starter-stream-rocketmq 0.2.2+MQ使用的是RocketMQ,也可使用Kafka或者RabbitMQ。
本文探讨Spring Cloud Stream & RocketMQ过滤消息的各种姿势。
在实际项目中,我们可能需要实现消息消费的过滤。
举个例子:实现消息的分流处理:
生产者生产的消息,虽然消息体可能一样,但是header不一样。可编写两个或者更多的消费者,对不同header的消息做针对性的处理!
# condition
# 生产者
生产者设置一下header,比如my-header,值根据你的需要填写:
@Autowired
private Source source;
public String testStream() {
this.source.output()
.send(
MessageBuilder
.withPayload("消息体")
.setHeader("my-header","你的header")
.build()
);
return "success";
}
2
3
4
5
6
7
8
9
10
11
12
13
# 消费者
@Service
@Slf4j
public class TestStreamConsumer {
@StreamListener(value = Sink.INPUT,condition = "headers['my-header']=='你的header'")
public void receive(String messageBody) {
log.info("通过stream收到了消息:messageBody ={}", messageBody);
}
}
2
3
4
5
6
7
8
如代码所示,使用 StreamListener 注解的 condition 属性。当 headers['my-header']=='你的header' 条件满足,才会进入到方法体。
# Tags
TIPS
该方式只支持RoketMQ,不支持Kafka/RabbitMQ
# 生产者
@Autowired
private Source source;
public String testStream() {
this.source.output()
.send(
MessageBuilder
.withPayload("消息体")
// 注意:只能设置1个tag
.setHeader(RocketMQHeaders.TAGS, "tag1")
.build()
);
return "success";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 消费者
接口
public interface MySink { String INPUT1 = "input1"; String INPUT2 = "input2"; @Input(INPUT1) SubscribableChannel input(); @Input(INPUT2) SubscribableChannel input2(); }1
2
3
4
5
6
7
8
9
10注解
@EnableBinding({MySink.class})1配置
spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input1: consumer: # 表示input2消费带有tag1的消息 tags: tag1 input2: consumer: # 表示input2消费带有tag2或者tag3的消息 tags: tag2||tag3 bindings: input1: destination: test-topic group: test-group1 input2: destination: test-topic group: test-group21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22消费代码
@Service @Slf4j public class MyTestStreamConsumer { /** * 我消费带有tag1的消息 * * @param messageBody 消息体 */ @StreamListener(MySink.INPUT1) public void receive1(String messageBody) { log.info("带有tag1的消息被消费了:messageBody ={}", messageBody); } /** * 我消费带有tag1或者tag2的消息 * * @param messageBody 消息体 */ @StreamListener(MySink.INPUT2) public void receive2(String messageBody) { log.info("带有tag2/tag3的消息被消费了:messageBody ={}", messageBody); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23日志:
2019-08-04 19:10:03.799 INFO 53760 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer : 带有tag1的消息被消费了:messageBody =消息体1
# Sql 92
TIPS
- 该方式只支持RoketMQ,不支持Kafka/RabbitMQ
- 用了sql,就不要用Tag
RocketMQ支持使用SQL语法过滤消息。官方文档:http://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/
Spring Clous Stream RocketMQ也为此特性提供了支持。
# 开启SQL 92支持
默认情况下,RocketMQ的SQL过滤支持是关闭的,要想使用SQL 92过滤消息,需要:
在
conf/broker.conf添加enablePropertyFilter = true1启动RocketMQ
nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &1
# 生产者
@Autowired
private Source source;
public String testStream() {
this.source.output()
.send(
MessageBuilder
.withPayload("消息体")
.setHeader("index", 1000)
.build()
);
return "success";
}
2
3
4
5
6
7
8
9
10
11
12
13
# 消费者
接口
public interface MySink { String INPUT1 = "input1"; String INPUT2 = "input2"; @Input(INPUT1) SubscribableChannel input(); @Input(INPUT2) SubscribableChannel input2(); }1
2
3
4
5
6
7
8
9
10注解
@EnableBinding({MySink.class})1配置
spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input1: consumer: sql: 'index < 1000' input2: consumer: sql: 'index >= 1000' bindings: input1: destination: test-topic group: test-group1 input2: destination: test-topic group: test-group21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20消费代码
@Service @Slf4j public class MyTestStreamConsumer { /** * 我消费带有tag1的消息 * * @param messageBody 消息体 */ @StreamListener(MySink.INPUT1) public void receive1(String messageBody) { log.info("index > 1000的消息被消费了:messageBody ={}", messageBody); } /** * 我消费带有tag1或者tag2的消息 * * @param messageBody 消息体 */ @StreamListener(MySink.INPUT2) public void receive2(String messageBody) { log.info("index <=1000 的消息被消费了:messageBody ={}", messageBody); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23日志
2019-08-04 19:58:59.787 INFO 56375 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer : index <=1000 的消息被消费了:messageBody =消息体1
# 相关代码
org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties
# 参考文档
- Filter Messages By SQL92 In RocketMQ (opens new window)
- RocketMQ 错误:The broker does not support consumer to filter message by SQL92 (opens new window)
# 本文首发
http://www.itmuch.com/spring-cloud-alibaba/spring-cloud-stream-rocketmq-filter-consume/
······················