技术标签: MQ消息队列 linq rabbitmq 分布式
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架
https://spring.io/projects/spring-cloud-stream#overview
Spring Cloud Stream 中,提供了一个微服务和消息中间件之间的一个粘合剂,这个粘合剂叫做Binder,Binder 负责与消息中间件进行交互。而我们开发者则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。
目前仅支持RabbitMQ、Kafka
MQ消息中间件广泛应用在应用解耦合、异步消息处理、流量削峰等场景中。
本质:屏蔽掉了底层不同MQ消息中间件之间的差异,统一了MQ的编程模型,降低了学习、开发、维护MQ的成本
Spring Cloud Stream 是⼀个构建消息驱动微服务的框架。应用程序通过inputs(相当于消息消费者consumer)或者outputs(相当于消息生产者producer)来与Spring Cloud Stream中的binder对象交互,而Binder对象是用来屏蔽底层MQ细节的,它负责与具体的消息中间件交互。
Spring Cloud Stream 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动,为流行的消息中间件产品(Spring Cloud Stream 原生默认支持RabbitMQ,Kafka。阿里在官方基础上提供了RocketMQ的支持)
提供了个性化的自动化配置实现,引用了发布-订阅模式,消费组,分区的三大核心概念。
Stream中的消息通信方式遵循了发布-订阅模式
通常的mq的实现思想
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性
在没有绑定器这个概念的情况下,Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,可以完美地实现应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件,实现微服务和具体消息中间件的解耦,使得微服务可以关注更多自己的业务流程。一个集成Spring Cloud Stream 程序的框架示意图,如下图所示:
Binder中的INPUT和OUTPUT针对Binder本身而言,INPUT对应于消费者,OUTPUT对应于生产者。 INPUT接收消息生产者发送的消息,OUTPUT发送消息给到消息消费者消费。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependencies>
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--监控-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--热部署-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
server:
port: 8801
spring:
application:
name: cloud-stream-provider
rabbitmq:
host: 112.124.16.82 # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
port: 5673
username: guest
password: guest
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息
defaultRabbit: # 表示定义的名称,用于binding整合
type: rabbit # 消息组件类型
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称
content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)
lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点 默认是90s
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
public interface IMessageProvider {
public String send();
}
package service.impl;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import service.IMessageProvider;
import javax.annotation.Resource;
import java.util.UUID;
@EnableBinding(Source.class) // 定义消息的推送管道(Source是spring的)
public class IMessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output; // 消息发送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
// MessageBuilder是spring的integration.support.MessageBuilder
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*******serial: " + serial);
return null;
}
}
消息发送管道 MessageChannel
MessageBuilder是spring的integration.support.MessageBuilder
package com.IT.springcloud.controller;
import com.IT.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class SendMessageController {
@Resource
private IMessageProvider iMessageProvider;
@GetMapping("/sendMessage")
public String sendMessage(){
return iMessageProvider.send();
}
}
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
rabbitmq:
host: 112.124.16.82 # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
port: 5673
username: guest
password: guest
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息
defaultRabbit: # 表示定义的名称,用于binding整合
type: rabbit # 消息组件类型
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称
content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)
lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点 默认是90s
instance-id: send-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
@StreamListener注解
@StreamListener注解的作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。
package com.IT.springcloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Controller;
@EnableBinding(Sink.class)
@Controller
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT) // 监听
public void input(Message<String> message){
System.out.println("消费者1号------>收到的消息:" + message.getPayload() + "\t port:" + serverPort);
}
}
浏览器地址栏输入:localhost:8801/sendMessage
按照8802克隆一个新模块8803
将8802/8803实现轮询分组,每次只有一个消费者收到消息,也就是说,8801发出一条消息,只能被8802和8803中的其中一个接收到,不能同时被接收,这样就可以避免重复消费,只需要在8802和8803的yml文件中:bindings/input下设置为同一个分组即可!
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称
content-type: application/json # 设置消息类型,本次为json,本文要设置为“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)
group: ITA # 设置分组
创建 StreamClient 接口,通过 @Input和 @Output注解定义输入通道和输出通道,
public interface StreamClient {
String INPUT = "myInput";
String OUTPUT = "myOutput";
@Input(StreamClient.INPUT)
SubscribableChannel input();
@Output(StreamClient.OUTPUT)
MessageChannel output();
}
SubscribableChannel
定义输入通道时,需要返回 SubscribableChannel 接口对象,该接口集成自 MessageChannel 接口,它定义了维护消息通道订阅者的方法。
MessageChannel
当定义输出通道的时候,需要返回 MessageChannel 接口对象,该接口定义了向消息通道发送消息的方
创建一个接口用于测试发送消息
@RestController
public class IMessageProvider{
@Autowired
private StreamClient streamClient;
@GetMapping("/send")
public void send() {
streamClient.output().send(MessageBuilder.withPayload("Hello World...").build());
}
}
在完成了消息通道绑定的定义后,这些用于定义绑定消息通道的接口则可以被 @EnableBinding 注解的 value 参数指定,从而在应用启动的时候实现对定义消息通道的绑定,Spring Cloud Stream 会为其创建具体的实例,而开发者只需要通过注入的方式来获取这些实例并直接使用即可。下面就来创建用于接收来自 RabbitMQ 消息的消费者 StreamReceiver## 3.验证
@Component
@EnableBinding(value = {
StreamClient.class})
public class StreamReceiver {
private Logger logger = LoggerFactory.getLogger(StreamReceiver.class);
@StreamListener(StreamClient.INPUT)
public void receive(String message) {
logger.info("StreamReceiver: {}", message);
}
}
启动 StreamApplication,访问 http://localhost:9898/send 接口发送消息,通过控制台,可以看到,消息已成功被接收
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
/**
* 事件主题流引擎处理
*
*/
public interface EventTopicStreams {
/**
* 事件合并监听方法
*
* @return 监听通道
*/
@Input(TopicConst.EVENT_TOPIC_INPUT)
SubscribableChannel eventInput();
/**
* 事件合并发送方法
*
* @return 发送渠道
*/
@Output(TopicConst.EVENT_TOPIC_OUTPUT)
MessageChannel eventOutput();
}
@Component
@Slf4j
@EnableBinding(EventTopicStreams.class)
public class EventTopicProducer {
private final EventTopicStreams eventTopicStreams;
public EventTopicProducer(EventTopicStreams eventTopicStreams) {
this.eventTopicStreams = eventTopicStreams;
}
/**
* 发送事件合并消息
*
* @param audienceMergeReqDTO 源档案audid和目标档案audid
*/
public void sendEventMergeMsg(AudienceMergeReqDTO audienceMergeReqDTO) {
eventTopicStreams.eventOutput()
.send(MessageBuilder.withPayload(audienceMergeReqDTO)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
}
@Component
@Slf4j
@EnableBinding(EventTopicStreams.class)
public class EventTopicListener {
private final EventEsService eventEsService;
public EventTopicListener(EventEsService eventEsService) {
this.eventEsService = eventEsService;
}
@StreamListener(TopicConst.EVENT_TOPIC_INPUT)
public void receive(@Payload String payload) {
eventEsService.disposeEventMergeMqMsg(payload);
}
}
文章浏览阅读4.3k次。本人所搭建的平台包括电源,STM32F103开发板,copley驱动器和maxon伺服直流电机,开发板通过驱动器读取电机编码器的信号,所用定时器为定时器8,同时读取编码器AB相,自动重装载值为3999,故电机轴旋转一圈,定时器从0计数到3999,但由于电机前端安装有减速箱,即输出轴为减速器的轴,减速比为128:1,故输出轴旋转一圈,定时器8的计数值应为128X4000=512000。代码如下:..._stm32采集编码器信号
文章浏览阅读1k次。文章目录1、自我介绍(成绩、参与过的活动、爱好等)2、课程没有前端,怎么会选3、之前前端的哪些东西、知识4、MVVM5、浏览器渲染6、浏览器内核7、不同内核渲染有什么差别8、vue生命周期9、双向绑定原理,原生js实现10、总结1、自我介绍(成绩、参与过的活动、爱好等)2、课程没有前端,怎么会选3、之前前端的哪些东西、知识4、MVVM5、浏览器渲染6、浏览器内核7、不同内核渲染有什么..._前端群面是怎么面试的
文章浏览阅读526次。1.进入公众号后台,进入首页 – 新建群发2.选择群发消息,点击群发即可_公众号 群发文字消息 什么样
文章浏览阅读129次。题目链接:HDU3290 The magic apple tree题意:给你一个树,起始叶子结点的值为他的编号本身,开始更新后,如果一个非叶子结点有K个子节点有值,那么他的值更新为这些值中第(k+1)/2小,求根节点的最后值;分析:从根节点向下DFS,到叶子结点之后向上回溯,裸题;#include<bits/stdc++.h>#define pb push_backu..._the magic apple tree
文章浏览阅读143次。我了解到的通知,可以有三种展示类型,分别为普通通知、悬挂通知和折叠通知。而我们可以常用自定义视图、触发事件和权限等对它进行设置,个性化。下面介绍一个普通的通知的构建。示例代码:普通通知//触发事件 val mIntent:Intent = Intent(Intent.ACTION_VIEW, Uri.parse("http://www.baidu.com")) //通知管理器 val manager:Notification_notificationchannel setfullscreenintent
文章浏览阅读888次。原文地址:http://www.cnblogs.com/ShanFish/p/6531365.html参考地址:http://blog.csdn.net/1099564863/article/details/51622709http://blog.csdn.net/wendi_0506/article/details/39478369http://www.cnblogs.c_源码安装mysql5.5.36
文章浏览阅读223次。YOLOv5的onnxruntime推理_onnxruntime推理yolov5
文章浏览阅读3.5k次。根据 https://smartkeyerror.com/Linux-Use-EasyConnect 文章所述,尝试安装了一下,确实可行。虽然每次使用有点麻烦,但依然是一个选项:虚机启动后必须登录 EasyConnect,这客户端登录时还有个验证图片,也没个证书,所以没有测试自动登录。不过第二条就已经打消了自动建立连接的想法了。被共享的连接必须重建,否则192.168.137.1网卡无法转发数据包。我猜使用管理员控制台用命令可以解决,有空再折腾吧。Linux主机使用了 remmina,没找到哪里可_easyconnect 虚拟机
文章浏览阅读512次。sqlmap参数详解:Usage: python sqlmap.py [options] Options(选项): -h, --help Show basic help message and exit 展示帮助文档 参数-hh Show advanced help message..._sqlmap详细大全
文章浏览阅读615次,点赞2次,收藏2次。Kafka-flinkSQL-hudi测试案例:采用flink-generateConnector模拟生成流式数据到kafka中,然后通过flink-kafka-connect实时读取kafka中数据进行处理,实时将数据存入hudi中,最后又通过flink读取hudi的数据进行统计_kafka connect.s3.kcq
文章浏览阅读8.8k次,点赞4次,收藏8次。Sublime text 软件中出现中文乱码,大多是因为编码格式不支持,只需要安装一个插件就可以解决中文乱码问题。_sublime中文乱码
文章浏览阅读744次,点赞6次,收藏13次。STM32系列是为要求高性能、低成本、低功耗的嵌入式应用专门设计的ARM Cortex-M3内核。按性能分成两个不同的系列:“增强型”STM32F103系列和“基本型”STM32F101系列。增强型系列的时钟频率能达到72MHz,是同类产品中频率最高的;基本型的时钟频率为36MHz,用16位产品一样的价格得到比16位产品更大的性能,是16位产品的最好选择。两个系列都有内置的32K到128K的闪存,不同的是SRAM的最大容量和外设接口的组合。_stm32 dac dma proteus 仿真