首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

  • 23-11-17 19:42
  • 3024
  • 13409
blog.csdn.net

目录

  • 引出
  • 点对点(simple)
  • Work queues 一对多
  • 发布订阅/fanout模式
    • 以登陆验证码为例
    • pom文件导包
    • application.yml文件
    • rabbitmq的配置
    • 生产者生成验证码,发送给交换机
    • 消费者消费验证码
  • topic模式
    • 配置类增加配置
    • 生产者发送信息
    • 进行发送
    • 控制台查看
  • rabbitmq回调确认
    • 配置类
    • 验证生产者发送是否成功
  • 延迟队列(死信)设计
    • java代码步骤
      • 创建正常+死信队列
      • 配置类+常量
    • 生产者到正常队列
    • 消费者进行延迟消费
  • 延迟队列插件安装
    • 访问官网
    • 进入rabbitmq docker容器
    • 上传到linux服务器
    • 拷贝插件到容器中
    • 进入容器安装插件
    • 打开管理页面
  • 总结

引出


1.rabbitmq队列方式的梳理,点对点,一对多;
2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例;
3.topic模式,根据规则决定发送给哪个队列;
4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback;
5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间;

点对点(simple)

点对对方式传输

在这里插入图片描述

Work queues 一对多

1个生产者多个消费者

在这里插入图片描述

在这里插入图片描述

发布订阅/fanout模式

生产者通过fanout扇出交换机群发消息给消费者,同一条消息每一个消费者都可以收到。

在这里插入图片描述

以登陆验证码为例

pom文件导包


        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-mailartifactId>
        dependency>

        
        <dependency>
            <groupId>com.aliyungroupId>
            <artifactId>aliyun-java-sdk-coreartifactId>
            <version>4.5.3version>
        dependency>


        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-amqpartifactId>
        dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

application.yml文件

server:
  port: 9099

spring:
  # 模块的名字
  application:
    name: user-auth


  # 邮箱的配置
  mail:
    host: smtp.qq.com
    port: 587
    username: xxxx
    password: xxxxx

  # rabbitmq的配置
  rabbitmq:
    host: 192.168.111.130
    port: 5672
    username: admin
    password: 123

logging:
  level:
    com.tianju.auth: debug
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

rabbitmq的配置

需要用到的常量

package com.tianju.auth.util;

/**
 * rabbitmq的常量
 */
public interface RabbitMqConstants {
    String MQ_MAIL_QUEUE="mq_email_queue";
    String MQ_PHONE_QUEUE="mq_phone_queue";
    String MQ_FANOUT_EXCHANGE="mq_fanout_exchange";

    // 参数 String name, boolean durable, boolean exclusive, boolean autoDelete
    boolean durable = true;
    boolean exclusive = false;
    boolean autoDelete = false;

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

RabbitMqConfig.java配置

邮箱队列,电话队列,交换机;

邮箱绑定交换机,电话绑定交换机;

创建队列参数说明:

参数说明
name字符串值,queue的名称。
durable布尔值,表示该 queue 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 queue 是否会恢复/仍存在。 另外,需要注意的是,queue 的持久化不等于其中的消息也会被持久化。
exclusive布尔值,表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用,其它连接不可见/不可用。
autoDelete布尔值,表示当该 queue 没“人”(connection)用时,是否会被自动删除。

不指定 durable、exclusive 和 autoDelete 时,默认为 true 、 false 和 false 。表示持久化、非排它、不用自动删除。

创建交换机参数说明

参数说明
name字符串值,exchange 的名称。
durable布尔值,表示该 exchage 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 exchange 是否会恢复/仍存在。
autoDelete布尔值,表示当该 exchange 没“人”(queue)用时,是否会被自动删除。

不指定 durable 和 autoDelete 时,默认为 true 和 false 。表示持久化、不用自动删除

package com.tianju.auth.config;

import com.tianju.auth.util.RabbitMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    @Bean // 邮箱的队列
    public Queue mailQueue(){
        return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }

    @Bean // 电话的队列
    public Queue phoneQueue(){
        return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }
    @Bean // 交换机
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                RabbitMqConstants.durable,
                RabbitMqConstants.autoDelete);
    }

    @Bean
    public Binding mailBinding(){
        return BindingBuilder.bind(mailQueue())
                .to(fanoutExchange());
    }

    @Bean
    public Binding phoneBinding(){
        return BindingBuilder.bind(phoneQueue())
                .to(fanoutExchange());
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

生产者生成验证码,发送给交换机

接口

package com.tianju.auth.service;

public interface IUserService {

    /**
     * 生产者生成信息发送给交换机
     * @param msg 信息,这里是验证码
     */
    void sendCode(String msg);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

实现

package com.tianju.auth.service.impl;


import com.tianju.auth.service.IUserService;
import com.tianju.auth.util.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
@Slf4j
public class UserServiceImpl implements IUserService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendCode(String msg) {
        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                "routingkey.fanout",
                msg);
        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

测试类生成验证码,发给交换机

在这里插入图片描述

package com.tianju.auth.service.impl;

import cn.hutool.core.lang.Snowflake;
import com.tianju.auth.service.IUserService;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;



@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)


public class UserServiceImplTest {


    @Autowired
    private IUserService userService;

    @Test
    public void sendCode() {
        String code = new Snowflake().nextIdStr().substring(0, 6);
        System.out.println(code);
        userService.sendCode(code);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

消费者消费验证码

package com.tianju.auth.consumer;


import com.tianju.auth.service.IEmailService;
import com.tianju.auth.util.RabbitMqConstants;
import com.tianju.auth.util.SMSUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class UserConsumer {
    @Autowired
    private IEmailService emailService;

    @RabbitListener(queues = RabbitMqConstants.MQ_MAIL_QUEUE)
    public void emailConsumer(String msg){
        log.debug("[email消费者:]消费{}",msg);
        emailService.sendEmail("[email protected]", "登陆验证码", msg);
    }

    @RabbitListener(queues = RabbitMqConstants.MQ_PHONE_QUEUE)
    public void phoneConsumer(String msg){
        log.debug("[phone消费者:]消费{}",msg);
        SMSUtil.send("xxxx", msg);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

在这里插入图片描述

topic模式

在这里插入图片描述

例如: routingkey: my.orange.rabbit —-> Q1,Q2

在这里插入图片描述

配置类增加配置

package com.tianju.auth.util;

/**
 * rabbitmq的常量
 */
public interface RabbitMqConstants {
    String MQ_MAIL_QUEUE="mq_email_queue";
    String MQ_PHONE_QUEUE="mq_phone_queue";
    String MQ_FANOUT_EXCHANGE="mq_fanout_exchange";
    
    String MQ_TOPIC_EXCHANGE="mq_topic_exchange";

    String MQ_TOPIC_QUEUE_A = "mq_topic_queue_a";
    String MQ_TOPIC_QUEUE_B = "mq_topic_queue_b";

    // 参数 String name, boolean durable, boolean exclusive, boolean autoDelete
    boolean durable = true;
    boolean exclusive = false;
    boolean autoDelete = false;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
package com.tianju.auth.config;

import com.tianju.auth.util.RabbitMqConstants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    @Bean // 邮箱的队列
    public Queue mailQueue(){
        return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }

    @Bean // 电话的队列
    public Queue phoneQueue(){
        return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }
    @Bean // 交换机
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                RabbitMqConstants.durable,
                RabbitMqConstants.autoDelete);
    }

    @Bean
    public Binding mailBinding(){
        return BindingBuilder.bind(mailQueue())
                .to(fanoutExchange());
    }

    @Bean
    public Binding phoneBinding(){
        return BindingBuilder.bind(phoneQueue())
                .to(fanoutExchange());
    }



    @Bean // A队列
    public Queue topicAQueue(){
        return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_A,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }

    /**
     * topic模式相关配置
     */

    @Bean // B队列
    public Queue topicBQueue(){
        return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_B,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }

    @Bean // topic的交换机
    public TopicExchange topicMyExchange(){
        return new TopicExchange(RabbitMqConstants.MQ_TOPIC_EXCHANGE,
                RabbitMqConstants.durable,
                RabbitMqConstants.autoDelete);
    }


    @Bean
    public Binding topicAQueueBinding(){
        return BindingBuilder
                .bind(topicAQueue())
                .to(topicMyExchange())
                .with("topic.xxx"); // 规则 topic.xxx
    }

    @Bean
    public Binding topicBQueueBinding(){
        return BindingBuilder
                .bind(topicBQueue())
                .to(topicMyExchange())
                .with("topic.*"); // 规则 topic.xxx
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92

生产者发送信息

在这里插入图片描述

    /**
     * topic模式下,生产者发送信息给交换机,可以决定给哪个队列发信息
     * @param msg 发送的信息
     * @param routingKey 类似正则表达式,决定给谁发
     *                   .with("topic.xxx"); // 规则 topic.xxx ---- A队列
     *                   .with("topic.*"); // 规则 topic.xxx   ---- B队列
     *                   在配置类中,如上所述配置,则如果输入的routingKey为 topic.xxx则给A和B发;
     *                                      如果输入的routingKey为 topic.yyy 则 只给B队列发;
     */
    void sendMsg(String msg,String routingKey);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

实现

package com.tianju.auth.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.tianju.auth.entity.UserPrivs;
import com.tianju.auth.mapper.UserMapper;
import com.tianju.auth.service.IUserService;
import com.tianju.auth.util.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
@Slf4j
public class UserServiceImpl implements IUserService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendCode(String msg) {
        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                "routingkey.fanout",
                msg);
        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
    }

    @Override
    public void sendMsg(String msg,String routingKey) {
        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_TOPIC_EXCHANGE,
                routingKey, // "topic.yyy",此时只有B队列有信息
                msg);
        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

进行发送

package com.tianju.auth.service.impl;

import cn.hutool.core.lang.Snowflake;
import com.tianju.auth.service.IUserService;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;



@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)


public class UserServiceImplTest {

    @Autowired
    private IUserService userService;

    @Test
    public void sendCode() {
        String code = new Snowflake().nextIdStr().substring(0, 6);
        System.out.println(code);
        userService.sendCode(code);
    }

    @Test
    public void sendTopic() {
        String code = new Snowflake().nextIdStr().substring(0, 6);
        System.out.println(code);
        userService.sendMsg(code,"topic.yyy");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

在这里插入图片描述

控制台查看

在这里插入图片描述

rabbitmq回调确认

配置类

spring:
  # rabbitmq的配置
  rabbitmq:
    host: 192.168.111.130
    port: 5672
    username: admin
    password: 123
    # 确认收到
    publisher-confirm-type: correlated
    publisher-returns: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

验证生产者发送是否成功

使用RabbitTemplate的回调方法。

先设置

  • setConfirmCallback
  • setReturnsCallback

在这里插入图片描述

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendCode(String msg) {
        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_FANOUT_EXCHANGE,
                "routingkey.fanout",
                msg);
        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
    }

    @Override
    public void sendMsg(String msg,String routingKey) {

        // 如果发到交换机,看一下有没有反馈
        rabbitTemplate.setConfirmCallback((c,ack,message)->{
            log.debug("***** setConfirmCallback:ack--{}", ack); // 是否发送到交换机
            log.debug("***** setConfirmCallback:c-->{}",c);
            // channel error; protocol method: #method(reply-code=404,
            // reply-text=NOT_FOUND - no exchange 'aaaa' in vhost '/', class-id=60, method-id=40)
            log.debug("***** setConfirmCallback:m-->{}",message);
            if (ack){
                log.debug("[生产者:] 发送信息到交换机{}","RabbitMqConstants.MQ_TOPIC_EXCHANGE");
            }else {
                log.debug(message);
            }
        });

        rabbitTemplate.setReturnsCallback(r->{
            log.debug("返回文字{}", r.getReplyText());
            log.debug("返回code{}", r.getReplyCode());
            log.debug("返回Exchange{}", r.getExchange());
            log.debug("返回RoutingKey{}", r.getRoutingKey());
        });


        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_TOPIC_EXCHANGE,
//                "aaaa",// 失败的情况
                routingKey, // "topic.yyy",此时只有B队列有信息
                msg);

        log.debug("[生产者向交换机:] 发送一条信息:{}",msg);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

在这里插入图片描述

rabbitTemplate.setConfirmCallback((c,ack,message)->{
    log.debug("******* setConfirmCallback:ack->{}",ack);
    log.debug("******* setConfirmCallback:c->{}",c);
    log.debug("******* setConfirmCallback:chanel->{}",message);
    if(ack){
        log.debug("[生产者]发送信息到达交换机{}","RabbitMqConstants.MQ_TOPIC_EXCHANGE");
    }else {
        log.debug(message);
    }
});
rabbitTemplate.setReturnsCallback(r->{
    log.debug("返回文字:{}",r.getReplyText());
    log.debug("返回code:{}",r.getReplyCode());
    log.debug("返回Exchange:{}",r.getExchange());
    log.debug("返回RoutingKey:{}",r.getRoutingKey());
});
rabbitTemplate.convertAndSend(
        RabbitMqConstants.MQ_TOPIC_EXCHANGE,
        "abc.xxx",
        msg
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
    @Test
    public void sendTopic() {
        String code = new Snowflake().nextIdStr().substring(0, 6);
        System.out.println(code);
        userService.sendMsg(code,"topic.rrr");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

延迟队列(死信)设计

Documentation: Table of Contents — RabbitMQ

在这里插入图片描述

在这里插入图片描述

java代码步骤

创建正常+死信队列

package com.tianju.mq.config;

import com.tianju.mq.constants.RabbitMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMqConfig {

    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange(RabbitMqConstants.MQ_NORMAL_EXCHANGE,
                RabbitMqConstants.durable,
                RabbitMqConstants.autoDelete);
    }
    @Bean
    public Queue normalQueue(){
        Map<String, Object> map = new HashMap<>(2);
        map.put("x-dead-letter-exchange",RabbitMqConstants.MQ_DELAY_EXCHANGE);
        map.put("x-dead-letter-routing-key",RabbitMqConstants.MQ_DELAY_ROUTING_KEY);
        return new Queue(
                RabbitMqConstants.MQ_NORMAL_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete,
                map);
    }
    @Bean
    public Binding normalBinding(){
        return BindingBuilder.bind(normalQueue())
                .to(normalExchange())
                .with(RabbitMqConstants.MQ_NORMAL_ROUTING_KEY);
    }


    //------------------死信队列设计--------------------------
    /**
     * 死信(延迟)队列
     * @return
     */
    @Bean
    public Queue delayQueue(){
        return new Queue(RabbitMqConstants.MQ_DELAY_QUEUE,
                RabbitMqConstants.durable,
                RabbitMqConstants.exclusive,
                RabbitMqConstants.autoDelete);
    }
    /**
     * 死信交换机
     * @return
     */
    @Bean
    public DirectExchange delayExchange(){
        return new DirectExchange(RabbitMqConstants.MQ_DELAY_EXCHANGE,
                RabbitMqConstants.durable,
                RabbitMqConstants.autoDelete);
    }
    /**
     * 死信交换机队列绑定
     * @return
     */
    @Bean
    public Binding delayBinding(){
        return BindingBuilder.bind(delayQueue())
                .to(delayExchange())
                .with(RabbitMqConstants.MQ_DELAY_ROUTING_KEY);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

配置类+常量

package com.tianju.mq.constants;

public interface RabbitMqConstants {
    String MQ_DELAY_QUEUE = "mq_delay_queue"; // 延迟队列,死信队列
    String MQ_DELAY_EXCHANGE = "mq_delay_exchange"; // 死信交换机
    String MQ_DELAY_ROUTING_KEY = "mq_delay_routing_key"; // 死信路由

    // 正常的队列,交换机,路由
    String MQ_NORMAL_QUEUE = "mq_normal_queue";
    String MQ_NORMAL_EXCHANGE = "mq_normal_exchange";
    String MQ_NORMAL_ROUTING_KEY = "mq_normal_routing_key";

    // 参数
    boolean durable = true;
    boolean exclusive = false;
    boolean autoDelete = false;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
server:
  port: 9099

spring:

  # 邮箱的配置
  mail:
    host: smtp.qq.com
    port: 587
    username: xxxxx.com
    password: xxxxx

  # rabbitmq的配置
  rabbitmq:
    host: 192.168.111.130
    port: 5672
    username: admin
    password: 123
    # 确认收到
    publisher-confirm-type: correlated
    publisher-returns: true


logging:
  level:
    com.tianju.mq: debug

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

生产者到正常队列

package com.tianju.mq.service;

public interface IUserService {
    /**
     * 延迟队列的生产者
     * @param msg 发送的信息
     * @param delayTime 延迟的时间,毫秒
     */
    void sendDelay(String msg,int delayTime);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
package com.tianju.mq.service.impl;

import com.tianju.mq.constants.RabbitMqConstants;
import com.tianju.mq.service.IUserService;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


import java.util.Date;

@Service
@Slf4j
public class UserServiceImpl implements IUserService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendDelay(String msg, int delayTime) {
        rabbitTemplate.convertAndSend(
                RabbitMqConstants.MQ_NORMAL_EXCHANGE,
                RabbitMqConstants.MQ_NORMAL_ROUTING_KEY,
                msg,
                process->{
                    process.getMessageProperties().setExpiration(String.valueOf(delayTime));
                    return process;
                }
        );
        log.debug("[生产者:]发送消息:{},时间{},延迟{}秒",msg,new Date(),delayTime/1000);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

在这里插入图片描述

消费者进行延迟消费

package com.tianju.mq.consumer;

import com.tianju.mq.constants.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.util.Date;

@Service
@Slf4j
public class UserConsumer {

    @RabbitListener(queues = RabbitMqConstants.MQ_DELAY_QUEUE)
    public void delayConsume(String msg){
        log.debug("[消费者消费信息:{},时间:{}",msg,new Date());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

在这里插入图片描述

延迟队列插件安装

访问官网

Community Plugins — RabbitMQ

在这里插入图片描述

在这里插入图片描述

进入rabbitmq docker容器

[root@localhost ~]# docker exec -it rabbitmq bash
  • 1

查询插件列表是否存在延迟插件

root@6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@6d2342d51b11
 |/
[  ] rabbitmq_amqp1_0                  3.9.11
[  ] rabbitmq_auth_backend_cache       3.9.11
[  ] rabbitmq_auth_backend_http        3.9.11
[  ] rabbitmq_auth_backend_ldap        3.9.11
[  ] rabbitmq_auth_backend_oauth2      3.9.11
[  ] rabbitmq_auth_mechanism_ssl       3.9.11
[  ] rabbitmq_consistent_hash_exchange 3.9.11
[  ] rabbitmq_event_exchange           3.9.11
[  ] rabbitmq_federation               3.9.11
[  ] rabbitmq_federation_management    3.9.11
[  ] rabbitmq_jms_topic_exchange       3.9.11
[E*] rabbitmq_management               3.9.11
[e*] rabbitmq_management_agent         3.9.11
[  ] rabbitmq_mqtt                     3.9.11
[  ] rabbitmq_peer_discovery_aws       3.9.11
[  ] rabbitmq_peer_discovery_common    3.9.11
[  ] rabbitmq_peer_discovery_consul    3.9.11
[  ] rabbitmq_peer_discovery_etcd      3.9.11
[  ] rabbitmq_peer_discovery_k8s       3.9.11
[E*] rabbitmq_prometheus               3.9.11
[  ] rabbitmq_random_exchange          3.9.11
[  ] rabbitmq_recent_history_exchange  3.9.11
[  ] rabbitmq_sharding                 3.9.11
[  ] rabbitmq_shovel                   3.9.11
[  ] rabbitmq_shovel_management        3.9.11
[  ] rabbitmq_stomp                    3.9.11
[  ] rabbitmq_stream                   3.9.11
[  ] rabbitmq_stream_management        3.9.11
[  ] rabbitmq_top                      3.9.11
[  ] rabbitmq_tracing                  3.9.11
[  ] rabbitmq_trust_store              3.9.11
[e*] rabbitmq_web_dispatch             3.9.11
[  ] rabbitmq_web_mqtt                 3.9.11
[  ] rabbitmq_web_mqtt_examples        3.9.11
[  ] rabbitmq_web_stomp                3.9.11
[  ] rabbitmq_web_stomp_examples       3.9.11
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

下载支持3.9.x的插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases?after=rabbitmq_v3_6_12

在这里插入图片描述

在这里插入图片描述

退出容器:

root@6d2342d51b11:/plugins# exit
exit
  • 1
  • 2

上传到linux服务器

在/usr/local/software/下创建文件夹rabbitmq/plugins

[root@localhost software]# mkdir -p rabbitmq/plugins
  • 1

在这里插入图片描述

拷贝插件到容器中

[root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
  • 1

进入容器安装插件

[root@localhost plugins]# docker  exec -it rabbitmq bash
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 1
  • 2

在这里插入图片描述

打开管理页面

进入Exchange页面,下拉Type看是否已经安装成功。

在这里插入图片描述


总结

1.rabbitmq队列方式的梳理,点对点,一对多;
2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例;
3.topic模式,根据规则决定发送给哪个队列;
4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback;
5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间;

注:本文转载自blog.csdn.net的Perley620的文章"https://blog.csdn.net/Pireley/article/details/132204466"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top