RabbitMQ--05--Spring Cloud Stream(消息驱动)-程序员宅基地

技术标签: MQ消息队列  linq  rabbitmq  分布式  

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


1.Spring Cloud Stream

1. 基本介绍

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架

https://spring.io/projects/spring-cloud-stream#overview

在这里插入图片描述

Spring Cloud Stream 中,提供了一个微服务和消息中间件之间的一个粘合剂,这个粘合剂叫做Binder,Binder 负责与消息中间件进行交互。而我们开发者则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。

目前仅支持RabbitMQ、Kafka

  • 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。
  • 通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。
  • 所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

2.Spring Cloud Stream 解决的痛点问题

MQ消息中间件广泛应用在应用解耦合、异步消息处理、流量削峰等场景中。

  • 不同的MQ消息中间件内部机制包括使用方式都会有所不同,比如RabbitMQ中有Exchange(交换机/交换器)这一概念,kafka有Topic、Partition分区这些概念,MQ消息中间件的差异性不利于我们上层的开发应用,当我们的系统希望从原有的RabbitMQ切换到Kafka时,我们会发现比较困难,很多要操作可能重来(因为应用程序和具体的某⼀款MQ消息中间件耦合在⼀起了)。
  • Spring Cloud Stream进行了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle一样)。如此⼀来,我们学习、开发、维护MQ都会变得轻松。目前Spring Cloud Stream支持RabbitMQ和Kafka。

本质:屏蔽掉了底层不同MQ消息中间件之间的差异,统一了MQ的编程模型,降低了学习、开发、维护MQ的成本

3.设计思想

Spring Cloud Stream 是⼀个构建消息驱动微服务的框架。应用程序通过inputs(相当于消息消费者consumer)或者outputs(相当于消息生产者producer)来与Spring Cloud Stream中的binder对象交互,而Binder对象是用来屏蔽底层MQ细节的,它负责与具体的消息中间件交互。

  • 说白了:对于我们来说,只需要知道如何使⽤Spring Cloud Stream与Binder对象交互即可

在这里插入图片描述

Spring Cloud Stream 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动,为流行的消息中间件产品(Spring Cloud Stream 原生默认支持RabbitMQ,Kafka。阿里在官方基础上提供了RocketMQ的支持

提供了个性化的自动化配置实现,引用了发布-订阅模式,消费组,分区的三大核心概念。

Stream中的消息通信方式遵循了发布-订阅模式

通常的mq的实现思想

  • 生产者/消费者之间靠消息媒介传递信息内容—Message
  • 消息必须走特定的通道—消息通道MessageChannel
  • 消息通道里的消息如何被消费呢,谁负责收发处理----消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

Stream为什么可以统一底层差异

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性

  • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
  • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

绑定器Binder的说明:

在没有绑定器这个概念的情况下,Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,可以完美地实现应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件,实现微服务和具体消息中间件的解耦,使得微服务可以关注更多自己的业务流程。一个集成Spring Cloud Stream 程序的框架示意图,如下图所示:
在这里插入图片描述
在这里插入图片描述
Binder中的INPUT和OUTPUT针对Binder本身而言,INPUT对应于消费者,OUTPUT对应于生产者。 INPUT接收消息生产者发送的消息,OUTPUT发送消息给到消息消费者消费。

Stream的工作流程

在这里插入图片描述

  • binder: 目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka
    就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
  • Source和Sink:可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接收消息就是输入。Source用于获取数据(要发送到MQ的数据),Sink用于提供数据(要接收MQ发送的数据,提供数据给消息消费者)
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介。用于存放source接收到的数据,或者是存放binder拉取的数据。
  • Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。

4.核心概念

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5. 常用注解

在这里插入图片描述

基础案例1

1.pom依赖

  • 核心组件
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

  • 项目依赖
<dependencies>
    <!--stream rabbit -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <!--eureka client-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--监控-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--热部署-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2.yml配置文件

server:
  port: 8801
 
spring:
  application:
    name: cloud-stream-provider
  rabbitmq:
    host: 112.124.16.82  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
    port: 5673
    username: guest
    password: guest
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合
          type: rabbit # 消息组件类型
 
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)
 
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)
    lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点 默认是90s
    instance-id: send-8801.com # 在信息列表时显示主机名称
    prefer-ip-address: true # 访问的路径变为IP地址

3.生产着

  • 新建service.IMessageProvider接口

 public interface IMessageProvider {
    
    public String send();
}
  • 在service下新建impl.IMessageProviderImpl实现类
package service.impl;
 
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import service.IMessageProvider;
 
import javax.annotation.Resource;
import java.util.UUID;
 
@EnableBinding(Source.class)    // 定义消息的推送管道(Source是spring的)
public class IMessageProviderImpl implements IMessageProvider {
    
 
    @Resource
    private MessageChannel output;  // 消息发送管道
 
    @Override
    public String send() {
    
        String serial = UUID.randomUUID().toString();
        // MessageBuilder是spring的integration.support.MessageBuilder
        output.send(MessageBuilder.withPayload(serial).build());    
        System.out.println("*******serial: " + serial);
        return null;
    }
}

消息发送管道 MessageChannel

在这里插入图片描述

MessageBuilder是spring的integration.support.MessageBuilder

在这里插入图片描述

  • 新建controller.SendMessageController
package com.IT.springcloud.controller;
 
import com.IT.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
import javax.annotation.Resource;
 
@RestController
public class SendMessageController {
    
 
    @Resource
    private IMessageProvider iMessageProvider;
 
    @GetMapping("/sendMessage")
    public String sendMessage(){
    
        return iMessageProvider.send();
    }
 
}
 

4.消费者

  • pom文件与8801相同
  • yml文件
server:
  port: 8802
 
spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: 112.124.16.82  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
    port: 5673
    username: guest
    password: guest
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合
          type: rabbit # 消息组件类型
 
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)
 
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)
    lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点 默认是90s
    instance-id: send-8802.com # 在信息列表时显示主机名称
    prefer-ip-address: true # 访问的路径变为IP地址
  • 新建controller.ReceiveMessageListenerController
@StreamListener注解

@StreamListener注解的作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。

package com.IT.springcloud.controller;
 
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Controller;
 
 
@EnableBinding(Sink.class)
@Controller
public class ReceiveMessageListenerController {
    
 
    @Value("${server.port}")
    private String serverPort;
 
    @StreamListener(Sink.INPUT) // 监听
    public void input(Message<String> message){
    
        System.out.println("消费者1号------>收到的消息:" + message.getPayload() + "\t port:" + serverPort);
    }
 
}

5.测试,启动7001,8801,8802

浏览器地址栏输入:localhost:8801/sendMessage

  • 发送方8801

在这里插入图片描述

  • 接收方8802

在这里插入图片描述

6.分组消费与持久化

  1. 按照8802克隆一个新模块8803

  2. 将8802/8803实现轮询分组,每次只有一个消费者收到消息,也就是说,8801发出一条消息,只能被8802和8803中的其中一个接收到,不能同时被接收,这样就可以避免重复消费,只需要在8802和8803的yml文件中:bindings/input下设置为同一个分组即可

bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)
          group: ITA # 设置分组
  1. 测试,现在我们发送两条消息
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

案例2----自定义消息通道

1.生产者

创建 StreamClient 接口,通过 @Input和 @Output注解定义输入通道和输出通道

  • @Input 和 @Output 注解都还有一个 value 属性,该属性可以用来设置消息通道的名称,这里指定的消息通道名称分别是 myInput 和 myOutput。如果直接使用两个注解而没有指定具体的 value 值,则会默认使用方法名作为消息通道的名称。
public interface StreamClient {
    
 
    String INPUT = "myInput";
    String OUTPUT = "myOutput";
 
    @Input(StreamClient.INPUT)
    SubscribableChannel input();
 
    @Output(StreamClient.OUTPUT)
    MessageChannel output();
}
  • SubscribableChannel
    定义输入通道时,需要返回 SubscribableChannel 接口对象,该接口集成自 MessageChannel 接口,它定义了维护消息通道订阅者的方法。
    在这里插入图片描述

  • MessageChannel
    当定义输出通道的时候,需要返回 MessageChannel 接口对象,该接口定义了向消息通道发送消息的方
    在这里插入图片描述

  • 创建一个接口用于测试发送消息


@RestController
public class IMessageProvider{
    
 
    @Autowired
    private StreamClient streamClient;
 
    @GetMapping("/send")
    public void send() {
    
        streamClient.output().send(MessageBuilder.withPayload("Hello World...").build());
    }
  
}

2.消费者

在完成了消息通道绑定的定义后,这些用于定义绑定消息通道的接口则可以被 @EnableBinding 注解的 value 参数指定,从而在应用启动的时候实现对定义消息通道的绑定,Spring Cloud Stream 会为其创建具体的实例,而开发者只需要通过注入的方式来获取这些实例并直接使用即可。下面就来创建用于接收来自 RabbitMQ 消息的消费者 StreamReceiver## 3.验证

  • 创建用于接收来自 RabbitMQ 消息的消费者 StreamReceiver 类
@Component
@EnableBinding(value = {
    StreamClient.class})
public class StreamReceiver {
    
 
    private Logger logger = LoggerFactory.getLogger(StreamReceiver.class);
 
    @StreamListener(StreamClient.INPUT)
    public void receive(String message) {
    
        logger.info("StreamReceiver: {}", message);
    }
}

3.测试

启动 StreamApplication,访问 http://localhost:9898/send 接口发送消息,通过控制台,可以看到,消息已成功被接收
在这里插入图片描述

案例3----自定义消息通道

1.生产着

import  org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * 事件主题流引擎处理
 *
 */
public interface EventTopicStreams {
    

    /**
     * 事件合并监听方法
     *
     * @return 监听通道
     */
    @Input(TopicConst.EVENT_TOPIC_INPUT)
    SubscribableChannel eventInput();

    /**
     * 事件合并发送方法
     *
     * @return 发送渠道
     */
    @Output(TopicConst.EVENT_TOPIC_OUTPUT)
    MessageChannel eventOutput();

}
@Component
@Slf4j
@EnableBinding(EventTopicStreams.class)
public class EventTopicProducer {
    

    private final EventTopicStreams eventTopicStreams;

    public EventTopicProducer(EventTopicStreams eventTopicStreams) {
    
        this.eventTopicStreams = eventTopicStreams;
    }

    /**
     * 发送事件合并消息
     *
     * @param audienceMergeReqDTO 源档案audid和目标档案audid
     */
    public void sendEventMergeMsg(AudienceMergeReqDTO audienceMergeReqDTO) {
    
        eventTopicStreams.eventOutput()
                .send(MessageBuilder.withPayload(audienceMergeReqDTO)
                        .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                        .build());
    }

}

2.消费者

@Component
@Slf4j
@EnableBinding(EventTopicStreams.class)
public class EventTopicListener {
    

    private final EventEsService eventEsService;

    public EventTopicListener(EventEsService eventEsService) {
    
        this.eventEsService = eventEsService;
    }

    @StreamListener(TopicConst.EVENT_TOPIC_INPUT)
    public void receive(@Payload String payload) {
    
        eventEsService.disposeEventMergeMqMsg(payload);
    }

}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_48052161/article/details/136911465

智能推荐

STM32读取伺服电机编码器信号_stm32采集编码器信号-程序员宅基地

文章浏览阅读4.3k次。本人所搭建的平台包括电源,STM32F103开发板,copley驱动器和maxon伺服直流电机,开发板通过驱动器读取电机编码器的信号,所用定时器为定时器8,同时读取编码器AB相,自动重装载值为3999,故电机轴旋转一圈,定时器从0计数到3999,但由于电机前端安装有减速箱,即输出轴为减速器的轴,减速比为128:1,故输出轴旋转一圈,定时器8的计数值应为128X4000=512000。代码如下:..._stm32采集编码器信号

恒生 信托事业部-Web前端-现场群面_前端群面是怎么面试的-程序员宅基地

文章浏览阅读1k次。文章目录1、自我介绍(成绩、参与过的活动、爱好等)2、课程没有前端,怎么会选3、之前前端的哪些东西、知识4、MVVM5、浏览器渲染6、浏览器内核7、不同内核渲染有什么差别8、vue生命周期9、双向绑定原理,原生js实现10、总结1、自我介绍(成绩、参与过的活动、爱好等)2、课程没有前端,怎么会选3、之前前端的哪些东西、知识4、MVVM5、浏览器渲染6、浏览器内核7、不同内核渲染有什么..._前端群面是怎么面试的

微信公众号群发文字_公众号 群发文字消息 什么样-程序员宅基地

文章浏览阅读526次。1.进入公众号后台,进入首页 – 新建群发2.选择群发消息,点击群发即可_公众号 群发文字消息 什么样

HDU3290 The magic apple tree【DFS】-程序员宅基地

文章浏览阅读129次。题目链接:HDU3290 The magic apple tree题意:给你一个树,起始叶子结点的值为他的编号本身,开始更新后,如果一个非叶子结点有K个子节点有值,那么他的值更新为这些值中第(k+1)/2小,求根节点的最后值;分析:从根节点向下DFS,到叶子结点之后向上回溯,裸题;#include<bits/stdc++.h>#define pb push_backu..._the magic apple tree

Android初步进阶之通知Notification_notificationchannel setfullscreenintent-程序员宅基地

文章浏览阅读143次。我了解到的通知,可以有三种展示类型,分别为普通通知、悬挂通知和折叠通知。而我们可以常用自定义视图、触发事件和权限等对它进行设置,个性化。下面介绍一个普通的通知的构建。示例代码:普通通知//触发事件 val mIntent:Intent = Intent(Intent.ACTION_VIEW, Uri.parse("http://www.baidu.com")) //通知管理器 val manager:Notification_notificationchannel setfullscreenintent

CentOS6.5源码安装MySQL5.6.35_源码安装mysql5.5.36-程序员宅基地

文章浏览阅读888次。原文地址:http://www.cnblogs.com/ShanFish/p/6531365.html参考地址:http://blog.csdn.net/1099564863/article/details/51622709http://blog.csdn.net/wendi_0506/article/details/39478369http://www.cnblogs.c_源码安装mysql5.5.36

随便推点

onnxruntime推理YOLOv5-程序员宅基地

文章浏览阅读223次。YOLOv5的onnxruntime推理_onnxruntime推理yolov5

Linux主机通过虚机中的EasyConnect连接内网_easyconnect 虚拟机-程序员宅基地

文章浏览阅读3.5k次。根据 https://smartkeyerror.com/Linux-Use-EasyConnect 文章所述,尝试安装了一下,确实可行。虽然每次使用有点麻烦,但依然是一个选项:虚机启动后必须登录 EasyConnect,这客户端登录时还有个验证图片,也没个证书,所以没有测试自动登录。不过第二条就已经打消了自动建立连接的想法了。被共享的连接必须重建,否则192.168.137.1网卡无法转发数据包。我猜使用管理员控制台用命令可以解决,有空再折腾吧。Linux主机使用了 remmina,没找到哪里可_easyconnect 虚拟机

sqlmap用法大全-程序员宅基地

文章浏览阅读512次。sqlmap参数详解:Usage: python sqlmap.py [options] Options(选项): -h, --help Show basic help message and exit 展示帮助文档 参数-hh Show advanced help message..._sqlmap详细大全

Kafka-flinkSQL-hudi测试案例:_kafka connect.s3.kcq-程序员宅基地

文章浏览阅读615次,点赞2次,收藏2次。Kafka-flinkSQL-hudi测试案例:采用flink-generateConnector模拟生成流式数据到kafka中,然后通过flink-kafka-connect实时读取kafka中数据进行处理,实时将数据存入hudi中,最后又通过flink读取hudi的数据进行统计_kafka connect.s3.kcq

解决Sublime出现中文乱码的情况_sublime中文乱码-程序员宅基地

文章浏览阅读8.8k次,点赞4次,收藏8次。Sublime text 软件中出现中文乱码,大多是因为编码格式不支持,只需要安装一个插件就可以解决中文乱码问题。_sublime中文乱码

350、仿真-基于STM32单片机温湿度光照CO2浓度检测报警proteus仿真设计(程序+原理图+Proteus仿真+配套资料等)_stm32 dac dma proteus 仿真-程序员宅基地

文章浏览阅读744次,点赞6次,收藏13次。STM32系列是为要求高性能、低成本、低功耗的嵌入式应用专门设计的ARM Cortex-M3内核。按性能分成两个不同的系列:“增强型”STM32F103系列和“基本型”STM32F101系列。增强型系列的时钟频率能达到72MHz,是同类产品中频率最高的;基本型的时钟频率为36MHz,用16位产品一样的价格得到比16位产品更大的性能,是16位产品的最好选择。两个系列都有内置的32K到128K的闪存,不同的是SRAM的最大容量和外设接口的组合。_stm32 dac dma proteus 仿真