SpringCloudAlibaba学习-06-SpringCloud整合Kafka入门(一)_spring cloud kafka-程序员宅基地

技术标签: kafka  spring cloud  SpringCloudAlibaba  

参考: https://blog.csdn.net/u011019141/article/details/108743342
https://blog.csdn.net/qq12547345/article/details/119531607
https://blog.csdn.net/JinXYan/article/details/90813592
部署kafka服务, 使用docker-compose部署
docker-compose内容如下 (kafka依赖zookeeper所以会同时部署zookeeper)

version: "3.7" 
services:
    zookeeper_server:
        image: wurstmeister/zookeeper 
        container_name: zookeeper_server
        ports: 
            - 2181:2181
        volumes:
            - ./data:/data
        logging:
            options:
                max-size: "50M" # 最大文件上传限制
                max-file: "100"
            driver: json-file

            
         
    kafka_server:
        image: wurstmeister/kafka
        container_name: kafka_server
        ports: 
            - 9092:9092
        environment:
            KAFKA_ZOOKEEPER_CONNECT: zookeeper_server:2181
            KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092
            KAFKA_ADVERTISED_LISTENERS: INSIDE://:9093,OUTSIDE://localhost:9092
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: OUTSIDE
        volumes:
            - ./kafka-logs:/kafka
        logging:
            options:
                max-size: "50M" # 最大文件上传限制
                max-file: "100"
            driver: json-file     
        depends_on:
          - zookeeper_server   

可能会有启动项目后可能会有连接报错的可能 记得在hosts文件中添加(一般都会有的) 127.0.0.1 localhost

本来想弄一个publisher模块和subscriber模块,网上的东西复现下来各种问题,于是就先弄个能跑的demo

项目结构:
在这里插入图片描述
依赖文件pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.demo.springcloud_05_kafka</groupId>
    <artifactId>kafka_publisher</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka_publisher</name>
    <description>kafka</description>
    <properties>
        <spring.cloud.version>Hoxton.SR9</spring.cloud.version>
        <spring.boot.version>2.3.2.RELEASE</spring.boot.version>
        <spring.cloud.alibaba.version>2.2.6.RELEASE</spring.cloud.alibaba.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.0</version>
            <optional>true</optional>
        </dependency>

        <!--kafka-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

    </dependencies>


    <dependencyManagement>
        <dependencies>
            <!--spring-boot-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!-- spring-cloud -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>


    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.yml配置文件

server:
  port: 8581
spring:
  application:
    name: kafka_app
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zk-nodes: localhost:2181
          autoCreateTopics: true
          requiredAcks: 1
          autoAddPartitions: true
      bindings:
        input:
          destination: stream-demo
        output:      #这里用stream给我们提供的默认output,后面会讲到自定义output
          destination: stream-demo    #消息发往的目的地
          content-type: text/plain    #消息发送的格式,接收端不用指定格式,但是发送端要

项目文件 KafkaPublisherApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@SpringBootApplication
//@EnableBinding(Source.class)
public class KafkaPublisherApplication {
    

    public static void main(String[] args) {
    
        SpringApplication.run(KafkaPublisherApplication.class, args);
    }
}

消息接受端 KafkaMessageReceiveListener.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

import java.util.Date;

@Slf4j
@EnableBinding(value = Sink.class)
public class KafkaMessageReceiveListener {
    

    /**
     * 从缺省通道接收消息
     * @param message
     */
    @StreamListener(Sink.INPUT)
    public void receive(Message<String> message){
    
        log.info("{}订阅告警消息:通道 = es_default_input,data = {}", new Date(), message);
    }
}

消息发送端:KafkaMessageSender.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@EnableBinding(Source.class)
public class KafkaMessageSender {
    
    @Autowired
    private Source channel;

    public void sendToDefaultChannel(String message) {
    
        channel.output().send(MessageBuilder.withPayload(message).build());
    }

}

发送消息端 KafkaSenderController.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class KafkaSenderController {
    
    @Autowired
    private KafkaMessageSender sender;

    @GetMapping("/send")
    public void testKafkaMessageSend(String message) {
    
        log.info("message:{}",message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
    }
}

然后启动项目浏览器访问 http://localhost:8581/send?message=%221234521313123
就可以看到接受到的消息了
在这里插入图片描述

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/xy3233/article/details/122084419

智能推荐

动手深度学习矩阵求导_向量变元是什么-程序员宅基地

文章浏览阅读442次。深度学习一部分矩阵求导知识的搬运总结_向量变元是什么

月薪已炒到15w?真心建议大家冲一冲数据新兴领域,人才缺口极大!-程序员宅基地

文章浏览阅读8次。近期,裁员的公司越来越多今天想和大家聊聊职场人的新出路。作为席卷全球的新概念ESG已然成为当前各个行业关注的最热风口目前,国内官方发布了一项ESG新证书含金量五颗星、中文ESG证书、完整ESG考试体系、名师主讲...而ESG又是与人力资源直接相关甚至在行业圈内成为大佬们的热门话题...当前行业下行,裁员的公司也越来越多大家还是冲一冲这个新兴领域01 ESG为什么重要?在双碳的大背景下,ESG已然成...

对比传统运营模式,为什么越拉越多的企业选择上云?_系统上云的前后对比-程序员宅基地

文章浏览阅读356次。云计算快速渗透到众多的行业,使中小企业受益于技术变革。最近微软SMB的一项研究发现,到今年年底,78%的中小企业将以某种方式使用云。企业希望投入少、收益高,来取得更大的发展机会。云计算将中小企业信息化的成本大幅降低,它们不必再建本地互联网基础设施,节省时间和资金,降低了企业经营风险。科技创新已成时代的潮流,中小企业上云是创新前提。云平台稳定、安全、便捷的IT环境,提升企业经营效率的同时,也为企业..._系统上云的前后对比

esxi网卡直通后虚拟机无网_esxi虚拟机无法联网-程序员宅基地

文章浏览阅读899次。出现选网卡的时候无法选中,这里应该是一个bug。3.保存退出,重启虚拟机即可。1.先随便选择一个网卡。2.勾先取消再重新勾选。_esxi虚拟机无法联网

thinkpad E580解决ubuntu下不能连接wifi的问题_e580断开充电器后wifi无法连接-程序员宅基地

文章浏览阅读1.7k次。参考链接:https://blog.csdn.net/fljhm/article/details/79281655一开始以为是内核的问题,所以一直在升级内核,thinkpad采用的是rtl的网卡,所以我的网卡一直没有加载出来,跟博客的问题很像,采用了很多方法的都没有解决好,期间遇到/boot的内存不足的问题,想升级到18.04是有点困难,解决好这个问题可以愉快的使用linux了。..._e580断开充电器后wifi无法连接

Java面试题系列——JavaSE面试题(集合二)_javase 面试题-程序员宅基地

文章浏览阅读3.8k次。Java面试题系列——JavaSE面试题(集合二)_javase 面试题

随便推点

大学抢课python脚本_用彪悍的Python写了一个自动选课的脚本 | 学步园-程序员宅基地

文章浏览阅读2.2k次,点赞4次,收藏12次。高手请一笑而过。物理实验课别人已经做过3、4个了,自己一个还没做呢。不是咱不想做,而是咱不想起那么早,并且仅有的一次起得早,但是哈工大的服务器竟然超负荷,不停刷新还是不行,不禁感慨这才是真正的“万马争过独木桥“啊!服务器不给力啊……好了,废话少说。其实,我的想法很简单。写一个三重循环,不停地提交,直到所有的数据都accepted。其中最关键的是提交最后一个页面,因为提交用户名和密码后不需要再访问其..._哈尔滨工业大学抢课脚本

english_html_study english html-程序员宅基地

文章浏览阅读4.9k次。一些别人收集的英文站点 http://www.lifeinchina.cn (nice) http://www.huaren.us/ (nice) http://www.hindu.com (okay) http://www.italki.com www.talkdatalk.com (transfer)http://www.en8848.com.cn/yingyu/index._study english html

Cortex-M3双堆栈MSP和PSP_stm32 msp psp-程序员宅基地

文章浏览阅读5.5k次,点赞19次,收藏78次。什么是栈?在谈M3堆栈之前我们先回忆一下数据结构中的栈。栈是一种先进后出的数据结构(类似于枪支的弹夹,先放入的子弹最后打出,后放入的子弹先打出)。M3内核的堆栈也不例外,也是先进后出的。栈的作用?局部变量内存的开销,函数的调用都离不开栈。了解了栈的概念和基本作用后我们来看M3的双堆栈栈cortex-M3内核使用了双堆栈,即MSP和PSP,这极大的方便了OS的设计。MSP的含义是Main..._stm32 msp psp

计算机图形学绘图程序设计_绘图程序计算机图形学-程序员宅基地

文章浏览阅读1.6k次。VC++的图形工作方式物理坐标系设备场景MFC中的设备场景CDC类的部分成员函数Windows的图形对象1.建立图形对象2.对该对象进行初始化3.将该对象选入当前设备场景,并保留初始对象4.画图5.恢复原始对象实例实验一 二维图形绘制金刚石一笔绘三种方法..._绘图程序计算机图形学

数字证书及CA的扫盲介绍_rsa和ca的区别-程序员宅基地

文章浏览阅读751次。★ 先说一个通俗的例子  考虑到证书体系的相关知识比较枯燥、晦涩。俺先拿一个通俗的例子来说事儿。  ◇ 普通的介绍信  想必大伙儿都听说过介绍信的例子吧?假设 A 公司的张三先生要到 B 公司去拜访,但是 B 公司的所有人都不认识他,他咋办捏?常用的办法是带公司开的一张介绍信,在信中说:兹有张三先生前往贵公司办理业务,请给予接洽......云云。然后在信上敲上A公司的公章。_rsa和ca的区别

B站C语言——指针_c语言a[4][4]-程序员宅基地

文章浏览阅读467次。学C语言就是学内存int *p = NULL;这里的/*是在定义时使用的,说明p是指针变量,而不是普通变量printf("%d\n",*p);这里的/*的表示取p保存的地址编号对应空间的内容..._c语言a[4][4]

推荐文章

热门文章

相关标签