百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

Spring Cloud Stream使用详解及部分重点源码分析

moboyou 2025-07-13 21:09 10 浏览

环境:Springboot2.3.12.RELEASE + Spring Cloud Hoxton.SR12 + RabbitMQ3.8.12


简介

Spring Cloud Stream是一个框架,用于构建与MQ连接的高度可伸缩的事件驱动微服务。其目的是为了简化消息在 Spring Cloud 应用程序中的开发。屏蔽了各种MQ之间的差异,使得在更换MQ的时候不需要修改代码。

Spring Cloud Stream支持多种绑定器实现,如下:

  • RabbitMQ
  • Apache Kafka
  • Kafka Streams
  • Amazon Kinesis
  • Google PubSub (partner maintained)
  • Solace PubSub+ (partner maintained)
  • Azure Event Hubs (partner maintained)
  • AWS SQS (partner maintained)
  • AWS SNS (partner maintained)
  • Apache RocketMQ (partner maintained)

详细查看官方文档,对应每一个MQ都有一个Github地址。

Spring Cloud Stream的核心构建块是:

  • 目标绑定器(Destination Binders):负责与MQ集成的组件。
  • 目标绑定(Destination Bindings):MQ中间件与最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。
  • 消息(Message):生产者和消费者用来与目标绑定器(以及通过MQ与其他应用程序)通信的规范数据结构。

快速入门

依赖:

<properties>
  <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  </dependency>
</dependencies>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>${spring-cloud.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

应用配置:

spring:
  rabbitmq:
    host: localhost
    virtual-host: bus
    port: 5672
    username: xxx
    password: xxx
---
spring:
  cloud:
    stream:
      bindings:
        #自定义输入输出
        myInput:
          #指定输入通道对应的主题名
          destination: demo
        myOutput:
          destination: demo

创建消息通道绑定的接口:

public interface StreamBinding {
 
  String INPUT = "myInput";
  String OUTPUT = "myOutput";
 
  @Input(StreamBinding.INPUT)
  SubscribableChannel input();
 
  @Output(StreamBinding.OUTPUT)
  MessageChannel output();
}

通过 @Input和 @Output注解定义输入通道和输出通道名称,这里的名称与上面配置文件中的是对应的。

当定义输出通道的时候,需要返回 MessageChannel 接口对象,该接口定义了向消息通道发送消息的方法;定义输入通道时,需要返回 SubscribableChannel 接口对象,该接口集成自 MessageChannel 接口,它定义了维护消息通道订阅者的方法。

这里的Input,Output两个方法容器会分别创建一个Bean对象

创建消费者:

@Component
@EnableBinding(value = {StreamBinding.class})
public class StreamReceiver {
 
  private Logger logger = LoggerFactory.getLogger(StreamReceiver.class);

  @StreamListener(StreamBinding.INPUT)
  public void receive(String message) {
    logger.info("接收到消息: {}", message);
  }
}

@EnableBinding 注解用来指定一个或多个定义了 @Input 或 @Output 注解的接口,以此实现对消息通道(Channel)的绑定。上面我们通过 @EnableBinding(value = {StreamClient.class}) 绑定了 StreamClient 接口,该接口是我们自己实现的对输入输出消息通道绑定的定义

@StreamListener,主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。上面我们将 receive 方法注册为 myInput 消息通道的监听处理器,当我们往这个消息通道发送信息的时候,receiver 方法会执行。

消息发送接口:

@Resource
private StreamBinding streamBinding;
@GetMapping("/send")
public void send() {
  streamBinding.output().send(MessageBuilder.withPayload("First Message...").build());
}

启动服务:

查看RabbitMQ

自动为我们创建了一个队列,队列的名称是以我们在配置文件中配置的开头,后面是随机生成的。这个队列会自动删除AD,服务关闭后就自动删除队列;Excl:排他的,存在该队列就不会在创建了。

修改端口后,再启动一个服务:

创建了2个队列,使用其中一个发送消息:

两个服务都收到了消息。

消费者组

上面启动了2个服务都能收到消息,在集群的环境下这样肯定会带来问题,如果是业务方面的就会出现重复数据,这时候我们可以通过设置分组的解决此问题。修改配置:

spring:
  cloud:
    stream:
      bindings:
        myInput:
          #指定输入通道对应的主题名
          destination: demo
          #指定一个组;指定分组以后,不管你启动多少个实例,所有的实例都监听这一个队列
          #多个实例会轮询的接收消息
          group: g_test
        myOutput:
          destination: demo

再次启动服务后,两个服务会轮询的接收到消息。

启动服务后,两个服务都同时监听同一个队列。队列也不是随机生成的了,并且队列是持久化的,服务断开后队列也不会自动删除。

消息分区

通过消费组的设置,虽然能保证同一消息只被一个消费者进行接收和处理,但是对于特殊业务情况,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能被同一个实例消费,这个就可以使用 Spring Cloud Stream 提供的消息分区功能。修改配置

spring:
  cloud:
    stream:
      bindings:
        myInput:
          #指定输入通道对应的主题名
          destination: demo
          #指定一个组;指定分组以后,不管你启动多少个实例,所有的实例都监听这一个队列
          #多个实例会轮询的接收消息
          group: g_test
          consumer:
            #通过该参数开启消费者分区功能
            partitioned: true
        myOutput:
          destination: demo
          producer:
            #这里的配置也可以是SpEL表达式,比如:headers['partition']通过消息header获取属性
            #这里会通过表达式及消息对象进行计算得到一个Key,然后获取key的hashCode
            # 得到hashCode以后会与partitionCount进行取模运算得到具体的分区
            partitionKeyExpression: '1' #我这里给的值就是对应的instanceIndex的值,你希望谁接收就设置谁配置的值即可
            partitionCount: 2
      #实例总数
      instanceCount: 2
      #该参数设置了当前实例的索引号,从 0 开始
      instanceIndex: 0

计算分区源码:

最后得到分区信息后会在消息头中放入一个scst_partition为key,partition为值的头信息。

启动多个实例后,测试发现所有的消息都只是同一个实例收到消息

交换机分别与每一个服务进行绑定使用不同的Routing Key这样在发送消息的时候就可以根据计算处理的分区进行定向发送消息了。

通过源码查看:

这里通过我们的配置交换机为demo。接着是获取路由key了

这里会从消息header中获取key = scst_partition的头信息。

这样针对使用RabbitMQ的中间件发送消息所需要的交换机及路由key就确定下来了。

完毕!!!

在Spring Cloud 中你还在使用Ribbon快来试试Load-Balancer

Spring Cloud Gateway应用详解1之谓词

Spring Cloud Bus使用说明详解

SpringCloud Nacos 整合feign

SpringCloud Alibaba 之 Nacos 服务

Spring Cloud Sentinel整合Feign

SpringCloud Hystrix实现资源隔离应用

SpringCloud zuul 动态网关配置

Spring Cloud 微服务日志收集管理Elastic Stack完整详细版

SpringCloud Feign实现原理源分析

Spring Cloud Sentinel 热点参数限流

相关推荐

Excel技巧:SHEETSNA函数一键提取所有工作表名称批量生产目录

首先介绍一下此函数:SHEETSNAME函数用于获取工作表的名称,有三个可选参数。语法:=SHEETSNAME([参照区域],[结果方向],[工作表范围])(参照区域,可选。给出参照,只返回参照单元格...

Excel HOUR函数:“小时”提取器_excel+hour函数提取器怎么用

一、函数概述HOUR函数是Excel中用于提取时间值小时部分的日期时间函数,返回0(12:00AM)到23(11:00PM)之间的整数。该函数在时间数据分析、考勤统计、日程安排等场景中应用广泛。语...

Filter+Search信息管理不再难|多条件|模糊查找|Excel函数应用

原创版权所有介绍一个信息管理系统,要求可以实现:多条件、模糊查找,手动输入的内容能去空格。先看效果,如下图动画演示这样的一个效果要怎样实现呢?本文所用函数有Filter和Search。先用filter...

FILTER函数介绍及经典用法12:FILTER+切片器的应用

EXCEL函数技巧:FILTER经典用法12。FILTER+切片器制作筛选按钮。FILTER的函数的经典用法12是用FILTER的函数和切片器制作一个筛选按钮。像左边的原始数据,右边想要制作一...

office办公应用网站推荐_office办公软件大全

以下是针对Office办公应用(Word/Excel/PPT等)的免费学习网站推荐,涵盖官方教程、综合平台及垂直领域资源,适合不同学习需求:一、官方权威资源1.微软Office官方培训...

WPS/Excel职场办公最常用的60个函数大全(含卡片),效率翻倍!

办公最常用的60个函数大全:从入门到精通,效率翻倍!在职场中,WPS/Excel几乎是每个人都离不开的工具,而函数则是其灵魂。掌握常用的函数,不仅能大幅提升工作效率,还能让你在数据处理、报表分析、自动...

收藏|查找神器Xlookup全集|一篇就够|Excel函数|图解教程

原创版权所有全程图解,方便阅读,内容比较多,请先收藏!Xlookup是Vlookup的升级函数,解决了Vlookup的所有缺点,可以完全取代Vlookup,学完本文后你将可以应对所有的查找难题,内容...

批量查询快递总耗时?用Excel这个公式,自动计算揽收到签收天数

批量查询快递总耗时?用Excel这个公式,自动计算揽收到签收天数在电商运营、物流对账等工作中,经常需要统计快递“揽收到签收”的耗时——比如判断某快递公司是否符合“3天内送达”的服务承...

Excel函数公式教程(490个实例详解)

Excel函数公式教程(490个实例详解)管理层的财务人员为什么那么厉害?就是因为他们精通excel技能!财务人员在日常工作中,经常会用到Excel财务函数公式,比如财务报表分析、工资核算、库存管理等...

Excel(WPS表格)Tocol函数应用技巧案例解读,建议收藏备用!

工作中,经常需要从多个单元格区域中提取唯一值,如体育赛事报名信息中提取唯一的参赛者信息等,此时如果复制粘贴然后去重,效率就会很低。如果能合理利用Tocol函数,将会极大地提高工作效率。一、功能及语法结...

Excel中的SCAN函数公式,把计算过程理清,你就会了

Excel新版本里面,除了出现非常好用的xlookup,Filter公式之外,还更新一批自定义函数,可以像写代码一样写公式其中SCAN函数公式,也非常强大,它是一个循环函数,今天来了解这个函数公式的计...

Excel(WPS表格)中多列去重就用Tocol+Unique组合函数,简单高效

在数据的分析和处理中,“去重”一直是绕不开的话题,如果单列去重,可以使用Unique函数完成,如果多列去重,如下图:从数据信息中可以看到,每位参赛者参加了多项运动,如果想知道去重后的参赛者有多少人,该...

Excel(WPS表格)函数Groupby,聚合统计,快速提高效率!

在前期的内容中,我们讲了很多的统计函数,如Sum系列、Average系列、Count系列、Rank系列等等……但如果用一个函数实现类似数据透视表的功能,就必须用Groupby函数,按指定字段进行聚合汇...

Excel新版本,IFS函数公式,太强大了!

我们举一个工作实例,现在需要计算业务员的奖励数据,右边是公司的奖励标准:在新版本的函数公式出来之前,我们需要使用IF函数公式来解决1、IF函数公式IF函数公式由三个参数组成,IF(判断条件,对的时候返...

Excel不用函数公式数据透视表,1秒完成多列项目汇总统计

如何将这里的多组数据进行汇总统计?每组数据当中一列是不同菜品,另一列就是该菜品的销售数量。如何进行汇总统计得到所有的菜品销售数量的求和、技术、平均、最大、最小值等数据?不用函数公式和数据透视表,一秒就...