pursue wind pursue wind
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
  • mybatis

  • mybatis-plus

  • Spring

  • SpringBoot

  • SpringSecurity

  • SpringCloud

    • 文档

    • Gateway

    • Spring Cloud Stream

      • Spring Cloud Stream实现消息过滤消费
      • Spring Cloud Stream知识点盘点
        • 概念
          • group
          • destination binder
          • destination binding
          • partition
        • 注解
          • Input(Stream)
          • Output(Stream)
          • StreamListener(Stream)
          • SendTo(messaging)
          • InboundChannelAdapter(Integration)
          • ServiceActivator(Integration)
          • Transformer(Integration)
        • PollableMessageSource(Stream)
        • 本文首发
      • Spring Cloud Stream错误处理详解
    • Alibaba Sentinel 规则参数总结
    • Alibaba Sentinel规则持久化-拉模式-手把手教程【基于文件】
    • Feign常见问题总结
    • SentinelResource注解 属性总结
    • Spring Cloud Alibaba Sentienl相关配置项
    • SpringCloudAlibaba
    • SpringCloud入门
    • 使用Spring Cloud Feign上传文件
    • 如何使用Feign构造多参数的请求
    • 实用技巧:Hystrix传播ThreadLocal对象(两种方案)
    • 扩展Ribbon支持Nacos权重的三种方式
    • 扩展Ribbon支持基于元数据的版本管理
    • 搭建生产可用的Nacos集群
  • 单元测试框架Mockito
  • 框架
  • SpringCloud
  • Spring Cloud Stream
pursuewind
2020-11-23
目录

Spring Cloud Stream知识点盘点

# Spring Cloud Stream知识点盘点

前面,已经探讨了:

  • Spring Cloud Stream实现消息过滤消费 (opens new window)
  • Spring Cloud Stream错误处理详解 (opens new window)

本文来Spring Cloud Stream,做一个知识点盘点和总结,包括:

  • 概念
  • Stream注解
  • Spring Integration(Spring Cloud Stream的底层)注解
  • Spring Messaging(Spring消息编程模型)注解
  • Spring Cloud Stream API

# 概念

# group

组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会消费。

组内单次只有1个实例消费,并且会轮询负载均衡。通常,在将应用程序绑定到给定目标时,最好始终指定consumer group。

# destination binder

与外部消息系统通信的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序可以灵活地连接到中间件,目前spring为kafka、rabbitmq提供binder。

# destination binding

Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建。

# partition

TIPS

严格来说这个不是概念,而是一种Stream提高伸缩性、吞吐量的一种方式。不过不想另起标题了,写在这里吧。

一个或多个生产者将数据发送到多个消费者,并确保有共同特征标识的数据由同一个消费者处理。默认是对消息进行hashCode,然后根据分区个数取余,所以对于相同的消息,总会落到同一个消费者上。

# 注解

# Input(Stream)

示例:

public interface Barista {
    @Input("inboundOrders")
    SubscribableChannel orders();
}
1
2
3
4

作用:

  • 用于接收消息
  • 为每个binding生成channel实例
  • 指定channel名称
  • 在spring容器中生成一个名为inboundOrders,类型为SubscribableChannel的bean
  • 在spring容器中生成一个类,实现Barista接口。

# Output(Stream)

示例:

public interface Source {
    @Output
    MessageChannel output();
}
1
2
3
4

作用:

类似Input,只是用来生产消息。

# StreamListener(Stream)

示例:

@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")
public void handle(String body) {
    System.out.println("Received: " + body);
}

@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
        poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "2"))
public MessageSource<String> test() {
    return () -> {
        Map<String, Object> map = new HashMap<>(1);
        map.put("type", "dog");
        return new GenericMessage<>("abcdef", map);
    };
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

作用:

用于消费消息

condition的作用:符合条件,才进入处理方法。

condition起作用的两个条件:

  • 注解的方法没有返回值
  • 方法是一个独立方法,不支持Reactive API

# SendTo(messaging)

示例:

// 接收INPUT这个channel的消息,并将返回值发送到OUTPUT这个channel
@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public String receive(String receiveMsg) {
   return "handle...";
}
1
2
3
4
5
6

作用:

用于发送消息

# InboundChannelAdapter(Integration)

示例:

@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
        poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
    return () -> new GenericMessage<>("Hello Spring Cloud Stream");
}
1
2
3
4
5
6

作用:

表示让定义的方法生产消息。

注:用 InboundChannelAdapter 注解的方法上即使有参数也没用。即下面test方法不要有参数。

  • fixedDelay:多少毫秒发送1次
  • maxMessagesPerPoll:一次发送几条消息。

# ServiceActivator(Integration)

示例:

@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String transform(String payload) {
    return payload.toUpperCase();
}
1
2
3
4

作用:

表示方法能够处理消息或消息有效内容,监听input消息,用方法体的代码处理,然后输出到output中。

# Transformer(Integration)

示例:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String message) {
  return message.toUpperCase();
}
1
2
3
4

作用:

和 ServiceActivator 类似,表示方法能够转换消息,消息头,或消息有效内容

# PollableMessageSource(Stream)

示例代码:

@SpringBootApplication
@EnableBinding({ConsumerApplication.PolledProcessor.class})
@EnableScheduling
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Autowired
    private PolledProcessor polledProcessor;

    @Scheduled(fixedDelay = 5_000)
    public void poll() {
        polledProcessor.input().poll(message -> {
            byte[] bytes = (byte[]) message.getPayload();
            String payload = new String(bytes);
            System.out.println(payload);
        });
    }

    public interface PolledProcessor {
        @Input
        PollableMessageSource input();

        @Output
        MessageChannel output();
    }

    @Bean
    @InboundChannelAdapter(value = "output",
            poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
    public MessageSource<String> test() {
        return () -> {
            Map<String, Object> map = new HashMap<>(1);
            map.put("type", "dog");
            return new GenericMessage<>("adfdfdsafdsfa", map);
        };
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

如果不想自己做byte数组转换,可以添加配置:

spring:
  cloud:
    stream:
      bindings:
        output:
          # 指定content-type
          content-type: text/plain
1
2
3
4
5
6
7

作用:

允许消费者控制消费速率。

相关文章:

https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers

# 本文首发

http://www.itmuch.com/spring-cloud/spring-cloud-stream-pan-ta/

Last Updated: 2023/01/30, 11:01:00
Spring Cloud Stream实现消息过滤消费
Spring Cloud Stream错误处理详解

← Spring Cloud Stream实现消息过滤消费 Spring Cloud Stream错误处理详解→

Theme by Vdoing | Copyright © 2019-2023 pursue-wind | 粤ICP备2022093130号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
  • 飙升榜
  • 新歌榜
  • 云音乐民谣榜
  • 美国Billboard榜
  • UK排行榜周榜
  • 网络DJ