Spring Cloud Stream使用详解及部分重点源码分析
moboyou 2025-07-13 21:09 3 浏览
环境:Springboot2.3.12.RELEASE + Spring Cloud Hoxton.SR12 + RabbitMQ3.8.12
简介
Spring Cloud Stream是一个框架,用于构建与MQ连接的高度可伸缩的事件驱动微服务。其目的是为了简化消息在 Spring Cloud 应用程序中的开发。屏蔽了各种MQ之间的差异,使得在更换MQ的时候不需要修改代码。
Spring Cloud Stream支持多种绑定器实现,如下:
- RabbitMQ
- Apache Kafka
- Kafka Streams
- Amazon Kinesis
- Google PubSub (partner maintained)
- Solace PubSub+ (partner maintained)
- Azure Event Hubs (partner maintained)
- AWS SQS (partner maintained)
- AWS SNS (partner maintained)
- Apache RocketMQ (partner maintained)
详细查看官方文档,对应每一个MQ都有一个Github地址。
Spring Cloud Stream的核心构建块是:
- 目标绑定器(Destination Binders):负责与MQ集成的组件。
- 目标绑定(Destination Bindings):MQ中间件与最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。
- 消息(Message):生产者和消费者用来与目标绑定器(以及通过MQ与其他应用程序)通信的规范数据结构。
快速入门
依赖:
<properties>
<spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
应用配置:
spring:
rabbitmq:
host: localhost
virtual-host: bus
port: 5672
username: xxx
password: xxx
---
spring:
cloud:
stream:
bindings:
#自定义输入输出
myInput:
#指定输入通道对应的主题名
destination: demo
myOutput:
destination: demo
创建消息通道绑定的接口:
public interface StreamBinding {
String INPUT = "myInput";
String OUTPUT = "myOutput";
@Input(StreamBinding.INPUT)
SubscribableChannel input();
@Output(StreamBinding.OUTPUT)
MessageChannel output();
}
通过 @Input和 @Output注解定义输入通道和输出通道名称,这里的名称与上面配置文件中的是对应的。
当定义输出通道的时候,需要返回 MessageChannel 接口对象,该接口定义了向消息通道发送消息的方法;定义输入通道时,需要返回 SubscribableChannel 接口对象,该接口集成自 MessageChannel 接口,它定义了维护消息通道订阅者的方法。
这里的Input,Output两个方法容器会分别创建一个Bean对象
创建消费者:
@Component
@EnableBinding(value = {StreamBinding.class})
public class StreamReceiver {
private Logger logger = LoggerFactory.getLogger(StreamReceiver.class);
@StreamListener(StreamBinding.INPUT)
public void receive(String message) {
logger.info("接收到消息: {}", message);
}
}
@EnableBinding 注解用来指定一个或多个定义了 @Input 或 @Output 注解的接口,以此实现对消息通道(Channel)的绑定。上面我们通过 @EnableBinding(value = {StreamClient.class}) 绑定了 StreamClient 接口,该接口是我们自己实现的对输入输出消息通道绑定的定义
@StreamListener,主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。上面我们将 receive 方法注册为 myInput 消息通道的监听处理器,当我们往这个消息通道发送信息的时候,receiver 方法会执行。
消息发送接口:
@Resource
private StreamBinding streamBinding;
@GetMapping("/send")
public void send() {
streamBinding.output().send(MessageBuilder.withPayload("First Message...").build());
}
启动服务:
查看RabbitMQ
自动为我们创建了一个队列,队列的名称是以我们在配置文件中配置的开头,后面是随机生成的。这个队列会自动删除AD,服务关闭后就自动删除队列;Excl:排他的,存在该队列就不会在创建了。
修改端口后,再启动一个服务:
创建了2个队列,使用其中一个发送消息:
两个服务都收到了消息。
消费者组
上面启动了2个服务都能收到消息,在集群的环境下这样肯定会带来问题,如果是业务方面的就会出现重复数据,这时候我们可以通过设置分组的解决此问题。修改配置:
spring:
cloud:
stream:
bindings:
myInput:
#指定输入通道对应的主题名
destination: demo
#指定一个组;指定分组以后,不管你启动多少个实例,所有的实例都监听这一个队列
#多个实例会轮询的接收消息
group: g_test
myOutput:
destination: demo
再次启动服务后,两个服务会轮询的接收到消息。
启动服务后,两个服务都同时监听同一个队列。队列也不是随机生成的了,并且队列是持久化的,服务断开后队列也不会自动删除。
消息分区
通过消费组的设置,虽然能保证同一消息只被一个消费者进行接收和处理,但是对于特殊业务情况,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能被同一个实例消费,这个就可以使用 Spring Cloud Stream 提供的消息分区功能。修改配置
spring:
cloud:
stream:
bindings:
myInput:
#指定输入通道对应的主题名
destination: demo
#指定一个组;指定分组以后,不管你启动多少个实例,所有的实例都监听这一个队列
#多个实例会轮询的接收消息
group: g_test
consumer:
#通过该参数开启消费者分区功能
partitioned: true
myOutput:
destination: demo
producer:
#这里的配置也可以是SpEL表达式,比如:headers['partition']通过消息header获取属性
#这里会通过表达式及消息对象进行计算得到一个Key,然后获取key的hashCode
# 得到hashCode以后会与partitionCount进行取模运算得到具体的分区
partitionKeyExpression: '1' #我这里给的值就是对应的instanceIndex的值,你希望谁接收就设置谁配置的值即可
partitionCount: 2
#实例总数
instanceCount: 2
#该参数设置了当前实例的索引号,从 0 开始
instanceIndex: 0
计算分区源码:
最后得到分区信息后会在消息头中放入一个scst_partition为key,partition为值的头信息。
启动多个实例后,测试发现所有的消息都只是同一个实例收到消息
交换机分别与每一个服务进行绑定使用不同的Routing Key这样在发送消息的时候就可以根据计算处理的分区进行定向发送消息了。
通过源码查看:
这里通过我们的配置交换机为demo。接着是获取路由key了
这里会从消息header中获取key = scst_partition的头信息。
这样针对使用RabbitMQ的中间件发送消息所需要的交换机及路由key就确定下来了。
完毕!!!
在Spring Cloud 中你还在使用Ribbon快来试试Load-Balancer
SpringCloud Alibaba 之 Nacos 服务
相关推荐
- 软件下载超级合集(软件大集合)
-
注:AutoCAD软件解压密码均为:www.cadzxw.com(网址就是解压密码)AutoCAD2004:链接:http://pan.baidu.com/s/1i5yL4UT密码:wpxcAutoC...
- Discuz! Database Error(discuzdatabaseerror怎么解决)
-
(1017)Can'tfindfile:'./xyw/common_syscache.frm'(errno:13)SELECT*FROMcommon_syscacheWHERE`...
- 想在天上赏月?最全攻略来了(形容在天上赏月)
-
“但愿人长久,千里共婵娟。”赏月,是中秋夜的传统习俗之一。在地上赏月,或许人们已经习以为常,但在天上赏月又是怎样一番景象?记者梳理发现,为了满足广大旅客“上九天摘星揽月”的需求,春秋航空、南方航空等多...
- APP检测:安卓系统四大组件介绍(安卓的四大组件是什么?分别有什么作用?)
-
1、Activity组件漏洞Activity是Android组件中*基本也是*为常见用的四大组件之一,是一个负责与用户交互的组件。Activity组件中存在以下常见的漏洞。(1)activity绑定b...
- Markdown + 文档管理 + 静态网页生成,集大成的 Markdown 应用:MWeb
-
上周给大家推荐了Typora,作为一款纯粹的Markdown应用来说,它的各种功能和细节可以说已经相当极致,然而,Ulysses用户表示:我们想要的不仅仅是Markdown。是的,Markdo...
- Istio多集群实践(多集群架构)
-
为了实现应用高并发和高可用,企业通常会选择将应用部署在多个地域的多个集群,甚至多云、混合云环境中。在这种情况下,如何在多个集群中部署和管理应用,成为了一个挑战,当然多集群方案也逐步成为了企业应用部署的...
- 源码建站的流程是什么(有源码怎么建站)
-
1.选择适合自己需求的源码:在进行源码建站前,需要根据自己的需求选定一款适合自己的源码,一般建议选择流行度较高、稳定性较好的开源程序,如WordPress、Discuz等。2.下载源码:根据选择的...
- 论坛站长福利!积分墙Discuz插件火爆上线!
-
一款新型的Discuz插件正在火爆袭来,克服种种插件的弊端,全新打造,让你成为最成功最轻松的赚钱能手,这就是积分墙Discuz插件。积分墙Discuz插件(http://www.jifenqiang....
- 2020年了,公司还有必要做企业网站吗?网站开发是否过时呢
-
作为一个以网站开发起步的程序员,回想起来,曾经为不少客户做了网站。而我自己的网站已经六七年没有更新了,本想重新设计升级,但一直忙于做客户的系统开发,自己的网站就一直不管了,反正也没什么用,做得好还经常...
- 放大招,这才是低代码真正的形态PHP工作流引擎
-
放大招,这才是低代码真正的形态。来点干货,今天上点重头戏。表单设计中其实相对还是比较复杂的,比如常见的脚本,比如要控制一个默认的数值,大家可以看平台能够做到页面可以想输,输出什么?添加的时候进行操作。...
- OA源码解析:深入研究企业办公自动化系统的核心代码
-
随着信息技术的迅速发展,企业办公自动化(OfficeAutomation,简称OA)系统已成为现代企业管理中不可或缺的一部分。这些系统通过集成各种办公功能,如文档管理、流程管理、协作与通信等,极大地...
- 用PHP写了个数据分析框架示例代码
-
下面是一个简单的PHP数据分析框架的示例:```php<?php//1.数据收集functioncollectData(){//从数据库或API获取数据//...}//2.数据清...
- 「2022/02/02」thinkphp源码详细阅读(一)
-
thinkphp源码详细阅读(一)请求流程1.从入口index.php开始2.实例化App,我们看一下实例化所做的工作3.设置thinkPath、rootPath、appPath、...
- 【源码】效果最好的网格Shader(迄今为止)
-
我一直都在写Shader,其中有一个特定的Shader我一直想写好,但我总是因为一些我无法完全理解的原因而失败。然后过了几年,我用新学到的知识再次尝试,越来越接近,然后又失败。是什么Shader?模拟...
- 干货来了,一夜加粉百万的柏拉图源码仍给你
-
相信大家这几天都看到过一些关于“柏拉图app”公众号被封号的文章,主要内容是由于“柏拉图APP”推送的一条图文,叫做《生成你的性格标签,为自己带盐》,然后再短短的数日,柏拉图APP公众号便涨粉百万,阅...
- 一周热门
- 最近发表
- 标签列表
-
- 外键约束 oracle (36)
- oracle的row number (32)
- 唯一索引 oracle (34)
- oracle in 表变量 (28)
- oracle导出dmp导出 (28)
- oracle两个表 (20)
- oracle 数据库 字符集 (20)
- oracle安装补丁 (19)
- matlab化简多项式 (20)
- 多线程的创建方式 (29)
- 多线程 python (30)
- java多线程并发处理 (32)
- 宏程序代码一览表 (35)
- c++需要学多久 (25)
- css class选择器用法 (25)
- css样式引入 (30)
- html5和css3新特性 (19)
- css教程文字移动 (33)
- php简单源码 (36)
- php个人中心源码 (25)
- 网站管理平台php源码 (19)
- php小说爬取源码 (23)
- github好玩的php项目 (18)
- 云电脑app源码 (22)
- js创建txt文件 (18)