技术标签: kafka spring cloud SpringCloudAlibaba
参考: https://blog.csdn.net/u011019141/article/details/108743342
https://blog.csdn.net/qq12547345/article/details/119531607
https://blog.csdn.net/JinXYan/article/details/90813592
部署kafka服务, 使用docker-compose部署
docker-compose内容如下 (kafka依赖zookeeper所以会同时部署zookeeper)
version: "3.7"
services:
zookeeper_server:
image: wurstmeister/zookeeper
container_name: zookeeper_server
ports:
- 2181:2181
volumes:
- ./data:/data
logging:
options:
max-size: "50M" # 最大文件上传限制
max-file: "100"
driver: json-file
kafka_server:
image: wurstmeister/kafka
container_name: kafka_server
ports:
- 9092:9092
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper_server:2181
KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: OUTSIDE
volumes:
- ./kafka-logs:/kafka
logging:
options:
max-size: "50M" # 最大文件上传限制
max-file: "100"
driver: json-file
depends_on:
- zookeeper_server
可能会有启动项目后可能会有连接报错的可能 记得在hosts文件中添加(一般都会有的) 127.0.0.1 localhost
本来想弄一个publisher模块和subscriber模块,网上的东西复现下来各种问题,于是就先弄个能跑的demo
项目结构:
依赖文件pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.demo.springcloud_05_kafka</groupId>
<artifactId>kafka_publisher</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka_publisher</name>
<description>kafka</description>
<properties>
<spring.cloud.version>Hoxton.SR9</spring.cloud.version>
<spring.boot.version>2.3.2.RELEASE</spring.boot.version>
<spring.cloud.alibaba.version>2.2.6.RELEASE</spring.cloud.alibaba.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
<optional>true</optional>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!--spring-boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- spring-cloud -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml配置文件
server:
port: 8581
spring:
application:
name: kafka_app
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
autoCreateTopics: true
requiredAcks: 1
autoAddPartitions: true
bindings:
input:
destination: stream-demo
output: #这里用stream给我们提供的默认output,后面会讲到自定义output
destination: stream-demo #消息发往的目的地
content-type: text/plain #消息发送的格式,接收端不用指定格式,但是发送端要
项目文件 KafkaPublisherApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@SpringBootApplication
//@EnableBinding(Source.class)
public class KafkaPublisherApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaPublisherApplication.class, args);
}
}
消息接受端 KafkaMessageReceiveListener.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import java.util.Date;
@Slf4j
@EnableBinding(value = Sink.class)
public class KafkaMessageReceiveListener {
/**
* 从缺省通道接收消息
* @param message
*/
@StreamListener(Sink.INPUT)
public void receive(Message<String> message){
log.info("{}订阅告警消息:通道 = es_default_input,data = {}", new Date(), message);
}
}
消息发送端:KafkaMessageSender.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@EnableBinding(Source.class)
public class KafkaMessageSender {
@Autowired
private Source channel;
public void sendToDefaultChannel(String message) {
channel.output().send(MessageBuilder.withPayload(message).build());
}
}
发送消息端 KafkaSenderController.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class KafkaSenderController {
@Autowired
private KafkaMessageSender sender;
@GetMapping("/send")
public void testKafkaMessageSend(String message) {
log.info("message:{}",message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
}
}
然后启动项目浏览器访问 http://localhost:8581/send?message=%221234521313123
就可以看到接受到的消息了
文章浏览阅读442次。深度学习一部分矩阵求导知识的搬运总结_向量变元是什么
文章浏览阅读8次。近期,裁员的公司越来越多今天想和大家聊聊职场人的新出路。作为席卷全球的新概念ESG已然成为当前各个行业关注的最热风口目前,国内官方发布了一项ESG新证书含金量五颗星、中文ESG证书、完整ESG考试体系、名师主讲...而ESG又是与人力资源直接相关甚至在行业圈内成为大佬们的热门话题...当前行业下行,裁员的公司也越来越多大家还是冲一冲这个新兴领域01 ESG为什么重要?在双碳的大背景下,ESG已然成...
文章浏览阅读356次。云计算快速渗透到众多的行业,使中小企业受益于技术变革。最近微软SMB的一项研究发现,到今年年底,78%的中小企业将以某种方式使用云。企业希望投入少、收益高,来取得更大的发展机会。云计算将中小企业信息化的成本大幅降低,它们不必再建本地互联网基础设施,节省时间和资金,降低了企业经营风险。科技创新已成时代的潮流,中小企业上云是创新前提。云平台稳定、安全、便捷的IT环境,提升企业经营效率的同时,也为企业..._系统上云的前后对比
文章浏览阅读899次。出现选网卡的时候无法选中,这里应该是一个bug。3.保存退出,重启虚拟机即可。1.先随便选择一个网卡。2.勾先取消再重新勾选。_esxi虚拟机无法联网
文章浏览阅读1.7k次。参考链接:https://blog.csdn.net/fljhm/article/details/79281655一开始以为是内核的问题,所以一直在升级内核,thinkpad采用的是rtl的网卡,所以我的网卡一直没有加载出来,跟博客的问题很像,采用了很多方法的都没有解决好,期间遇到/boot的内存不足的问题,想升级到18.04是有点困难,解决好这个问题可以愉快的使用linux了。..._e580断开充电器后wifi无法连接
文章浏览阅读3.8k次。Java面试题系列——JavaSE面试题(集合二)_javase 面试题
文章浏览阅读2.2k次,点赞4次,收藏12次。高手请一笑而过。物理实验课别人已经做过3、4个了,自己一个还没做呢。不是咱不想做,而是咱不想起那么早,并且仅有的一次起得早,但是哈工大的服务器竟然超负荷,不停刷新还是不行,不禁感慨这才是真正的“万马争过独木桥“啊!服务器不给力啊……好了,废话少说。其实,我的想法很简单。写一个三重循环,不停地提交,直到所有的数据都accepted。其中最关键的是提交最后一个页面,因为提交用户名和密码后不需要再访问其..._哈尔滨工业大学抢课脚本
文章浏览阅读4.9k次。一些别人收集的英文站点 http://www.lifeinchina.cn (nice) http://www.huaren.us/ (nice) http://www.hindu.com (okay) http://www.italki.com www.talkdatalk.com (transfer)http://www.en8848.com.cn/yingyu/index._study english html
文章浏览阅读5.5k次,点赞19次,收藏78次。什么是栈?在谈M3堆栈之前我们先回忆一下数据结构中的栈。栈是一种先进后出的数据结构(类似于枪支的弹夹,先放入的子弹最后打出,后放入的子弹先打出)。M3内核的堆栈也不例外,也是先进后出的。栈的作用?局部变量内存的开销,函数的调用都离不开栈。了解了栈的概念和基本作用后我们来看M3的双堆栈栈cortex-M3内核使用了双堆栈,即MSP和PSP,这极大的方便了OS的设计。MSP的含义是Main..._stm32 msp psp
文章浏览阅读1.6k次。VC++的图形工作方式物理坐标系设备场景MFC中的设备场景CDC类的部分成员函数Windows的图形对象1.建立图形对象2.对该对象进行初始化3.将该对象选入当前设备场景,并保留初始对象4.画图5.恢复原始对象实例实验一 二维图形绘制金刚石一笔绘三种方法..._绘图程序计算机图形学
文章浏览阅读751次。★ 先说一个通俗的例子 考虑到证书体系的相关知识比较枯燥、晦涩。俺先拿一个通俗的例子来说事儿。 ◇ 普通的介绍信 想必大伙儿都听说过介绍信的例子吧?假设 A 公司的张三先生要到 B 公司去拜访,但是 B 公司的所有人都不认识他,他咋办捏?常用的办法是带公司开的一张介绍信,在信中说:兹有张三先生前往贵公司办理业务,请给予接洽......云云。然后在信上敲上A公司的公章。_rsa和ca的区别
文章浏览阅读467次。学C语言就是学内存int *p = NULL;这里的/*是在定义时使用的,说明p是指针变量,而不是普通变量printf("%d\n",*p);这里的/*的表示取p保存的地址编号对应空间的内容..._c语言a[4][4]