030-从零搭建微服务-消息队列(二)

news/2024/7/4 7:48:03 标签: 微服务, 架构, 云原生

写在最前

如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。

源码地址(后端):mingyue: 🎉 基于 Spring Boot、Spring Cloud & Alibaba 的分布式微服务架构基础服务中心

源码地址(前端):mingyue-ui: 🎉 基于 Vue3 + TS + Vite + Element plus 等技术,适配 MingYue 后台微服务

文档地址:Wiki - Gitee.com

mingyue-common-mq

添加依赖

根据需要在 mingyue-common-mq 模块中添加所需的 MQ 中间件,例如:RocketMQ、Kafka。

<dependencies>
    <!-- RocketMQ -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    </dependency>
​
    <!-- Kafka -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>

集成 RocketMQ

引入依赖

<!-- MQ工具 -->
<dependency>
    <groupId>com.csp.mingyue</groupId>
    <artifactId>mingyue-common-mq</artifactId>
</dependency>

Nacos 配置

spring:
  cloud:
    stream:
      function:
        # 重点配置 与 binding 名与消费者对应
        definition: rocketmqDemo
      rocketmq:
        binder:
            # rocketmq 地址
            name-server: 192.168.21.32:9876
        bindings:
            rocketmqDemo-out-0:
                producer:
                    # 必须得写
                    group: default
      bindings:
        rocketmqDemo-out-0:
            content-type: application/json
            destination: stream-rocketmq-demo-topic
            group: demo-group
            binder: rocketmq
        rocketmqDemo-in-0:
            content-type: application/json
            destination: stream-rocketmq-demo-topic
            group: demo-group
            binder: rocketmq

RocketMQ 生产者

@Component
public class RocketMqProducer {
​
    @Resource
    private StreamBridge streamBridge;
​
    public void rocketMqDemoMsg(String msg) {
        // 构建消息对象
        MqMessageDto messageDto = new MqMessageDto()
                .setMsgId(IdUtil.fastSimpleUUID())
                .setMsgText(msg);
​
        streamBridge.send("rocketmqDemo-out-0", MessageBuilder.withPayload(messageDto).build());
    }
​
}

RocketMQ 消费者

@Slf4j
@Component
public class RocketMqConsumer {
​
    @Bean
    Consumer<MqMessageDto> rocketmqDemo() {
        log.info("Rocket MQ 初始化订阅");
        return msg -> {
            log.info("通过 Rocket MQ 消费到消息 => {}", msg.toString());
        };
    }
​
}

推送消息到 RocketMQ

@GetMapping("/sendRocketMq")
@Operation(summary = "发送消息到RocketMQ", parameters = { @Parameter(name = "msg", description = "推送的消息体", required = true) })
public R<Void> sendRocketMq(String msg) {
    rocketMqProducer.rocketMqDemoMsg(msg);
    return R.ok();
}

集成 Kafka

引入依赖

<!-- MQ工具 -->
<dependency>
    <groupId>com.csp.mingyue</groupId>
    <artifactId>mingyue-common-mq</artifactId>
</dependency>

Nacos 配置

spring:
  cloud:
    stream:
      function:
        # 重点配置 与 binding 名与消费者对应
        definition: kafkaDemo
      kafka:
        binder:
            brokers: 192.168.21.32:9092
      bindings:
        kafkaDemo-out-0:
            destination: stream-kafka-demo-topic
            contentType: application/json
            group: demo-group
            binder: kafka
        kafkaDemo-in-0:
            destination: stream-kafka-demo-topic
            contentType: application/json
            group: demo-group
            binder: kafka

Kafka 生产者

@Component
public class KafkaProducer {
​
    @Resource
    private StreamBridge streamBridge;
​
    public void kafkaDemoMsg(String msg) {
        // 构建消息对象
        MqMessageDto messageDto = new MqMessageDto()
                .setMsgId(IdUtil.fastSimpleUUID())
                .setMsgText(msg);
​
        streamBridge.send("kafkaDemo-out-0", MessageBuilder.withPayload(messageDto).build());
    }
​
}

Kafka 消费者

@Slf4j
@Component
public class KafkaConsumer {
​
    @Bean
    Consumer<MqMessageDto> kafkaDemo() {
        log.info("Kafka 初始化订阅");
        return msg -> {
            log.info("通过 Kafka 消费到消息 => {}", msg.toString());
        };
    }
​
}

推送消息到 Kafka

@GetMapping("/sendKafka")
@Operation(summary = "发送消息到Kafka", parameters = { @Parameter(name = "msg", description = "推送的消息体", required = true) })
public R<Void> sendKafka(String msg) {
    kafkaProducer.kafkaDemoMsg(msg);
    return R.ok();
}

拓展 RabbitMQ

mingyue-common-mq 添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

Nacos 配置

--- # rabbitmq 配置
spring:
  rabbitmq:
    host: rabbitmqIp
    port: 5672
    username: root
    password: root
  cloud:
    stream:
      function:
          # 重点配置 与 binding 名与消费者对应
          definition: rabbitmqDemo
      rabbit:
        bindings:
          rabbitmqDemo-in-0:
            consumer:
              delayedExchange: true
          rabbitmqDemo-out-0:
            producer:
              delayedExchange: true
      bindings:
        rabbitmqDemo-in-0:
          destination: delay.exchange.demo
          content-type: application/json
          group: delay-group
          binder: rabbit
        rabbitmqDemo-out-0:
          destination: delay.exchange.demo
          content-type: application/json
          group: delay-group
          binder: rabbit

小结

MQ 基础搭建已经完成,后续会编写一些实际开发中使用到队列的场景,如:

  1. 订单处理:

    • 电子商务平台可以使用消息队列来处理订单,确保订单的创建、支付、发货和通知等各个步骤都能按顺序和可靠地执行。

  2. 通知和提醒:

    • 网站或应用程序可以使用消息队列来发送通知和提醒,如电子邮件通知、短信通知、推送通知等,以便与用户互动。

  3. 用户注册和身份验证:

    • 当用户注册或请求密码重置时,消息队列可以用于生成和发送验证链接或令牌,确保用户身份验证的安全性和可扩展性。

  4. 数据同步:

    • 在多个系统之间同步数据,以确保数据的一致性,例如将用户配置信息从一个微服务同步到另一个微服务

  5. 事件日志和审计:

    • 记录应用程序事件、用户活动和系统操作,以进行审计、监视和故障排除。

  6. 批量处理:

    • 处理大量数据导入、数据清洗、ETL(提取、转换、加载)操作等批处理任务,以提高性能和可维护性。

  7. 异步任务处理:

    • 处理后台任务,如图像处理、视频编码、生成报告等,以减少响应时间和提高系统的吞吐量。

  8. 队列服务:

    • 提供队列服务以支持其他应用程序或团队的异步通信需求,例如云服务提供商的消息队列服务。

  9. 数据分发:

    • 将数据从生产者分发给多个消费者,以实现发布-订阅模式,例如新闻订阅、市场报价和天气预报。

  10. 错误处理和重试:

    • 处理意外错误和故障,将失败的操作或任务放入队列,以便进行重试或错误处理。

这些业务使用场景只是消息队列的一些示例。消息队列有助于提高系统的可扩展性、弹性和可靠性,允许异步处理和解耦合组件,从而改善了应用程序的整体性能和用户体验。不同的业务需求可能需要不同类型的消息队列系统和配置。


http://www.niftyadmin.cn/n/5055246.html

相关文章

【Java 进阶篇】使用 SQL 进行排序查询

在数据库中&#xff0c;我们经常需要对查询的结果进行排序&#xff0c;以便更容易地理解和分析数据。SQL&#xff08;Structured Query Language&#xff09;提供了强大的排序功能&#xff0c;允许我们按照指定的列对数据进行升序或降序排序。本文将详细介绍如何使用 SQL 进行排…

【力扣2057】值相等的最小索引

&#x1f451;专栏内容&#xff1a;力扣刷题⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录 一、题目描述二、题目分析 一、题目描述 题目链接&#xff1a;值相等的最小索引 给你一个下标从 0 开始的整数数组 nums …

第3章-指标体系与数据可视化-3.2-描述性统计分析与绘图

目录 3.2.1 描述性统计进行数据探索 1. 变量度量类型与分布类型 度量类型 分布类型

【图论C++】树的直径(DFS 与 DP动态规划)

》》》算法竞赛 /*** file * author jUicE_g2R(qq:3406291309)————彬(bin-必应)* 一个某双流一大学通信与信息专业大二在读 * * brief 一直在竞赛算法学习的路上* * copyright 2023.9* COPYRIGHT 原创技术笔记&#xff1a;转载…

Spring Controller内存马

获取当前上下文运行环境 getCurrentWebApplicationContext WebApplicationContext context ContextLoader.getCurrentWebApplicationContext(); 在SpringMVC环境下获取到的是一个XmlWebApplicationContext类型的Root WebApplicationContext&#xff1a; 在Spring MVC环境中…

浅谈安科瑞ADL系列导轨式多功能仪表在迪拜楼宇EMS中的应用

摘要&#xff1a;用户端消耗着整个电网80%的电能&#xff0c;用户端智能化用电管理对用户可靠、安全、节约用电有十分重要的意义。构建智能用电服务体系&#xff0c;推广用户端智能多功能仪表、智能用电管理终端等设备用电管理解决方案&#xff0c;实现电网与用户的双向良性互动…

【VUE复习·5】插值语法(使用 计算属性 的插值语法)

总览 1.计算属性 2.使用 methods 的 插值语法 3.使用 computed 的 插值语法 一、计算属性 1.解释 首先&#xff0c;如果我们要写一个插值语法&#xff0c;而 {{ }} 内的内容&#xff0c;是一个经过计算的值&#xff0c;那么按照原本 JS 的写法&#xff0c;应该是这样的&…

免费录音软件推荐,告别杂音,音质更清晰!

“求推荐一款免费的录音软件&#xff01;最近下载了好多的录音软件&#xff0c;不是音质太差&#xff0c;就是需要收费解锁新的功能&#xff0c;根本不好用&#xff0c;有没有人知道一款免费优秀的录音软件呀&#xff0c;告诉我一下。” 录音已成为现代人们学习和工作中的一项…