6> 测试
package com.nianxi.mybatisplus;
import com.nianxi.mybatisplus.mapper.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMqHelloTest {
@Autowired
private HelloSender helloSender;
@Test
public void hello() throws Exception {
helloSender.send();
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
注意:发送者和接收者的 queue name 必须一致,不然不能接收

2.RabbitTemplate
**RabbitTemplate
**是SpringAMQP提供的一个高级消息操作模板,**用于在与RabbitMQ进行交互时进行消息的发送和接收操作。**它是对底层AMQP协议的封装,简化了与RabbitMQ的交互过程, 是SpringAMQP中的核心类,提供声明式方式处理RabbitMQ,包括发送和接收消息、消息转换、属性设置及回调机制。通过配置和正确使用,简化了RabbitMQ的集成与操作。
1> 发送消息
**RabbitTemplate
**提供了多种发送消息的方法,包括同步发送和异步发送。通过指定交换机、路由键和消息体,我们可以将消息发送到 RabbitMQ 服务器上的指定位置。此外,RabbitTemplate
还支持消息的确认机制,以确保消息被成功发送和接收。
rabbitTemplate.convertAndSend("exchangeName", "routingKey", message);
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
2> 接收消息
除了发送消息外,**RabbitTemplate
**还提供了接收消息的功能。通过调用相关方法,我们可以从指定的队列中接收消息,并进行相应的处理。这通常涉及到监听队列、处理消息和确认消息接收等步骤。
Message receivedMessage = rabbitTemplate.receive("queueName");
MyMessage myMessage = rabbitTemplate.receiveAndConvert("queueName", MyMessage.class);
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
3> 消息转换
**RabbitTemplate
支持消息的自动转换。这意味着我们可以将 Java 对象作为消息体发送,而RabbitTemplate
会自动将其转换为可序列化的格式(如 JSON 或 XML)。同样地,当从队列中接收消息时,RabbitTemplate
**也可以自动将消息体转换回 Java 对象。
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
rabbitTemplate.setMessageConverter(messageConverter);
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
4> 消息属性设置
在发送消息时,我们可以设置各种消息属性,如消息的优先级、持久化标志、过期时间等。这些属性可以通过**MessageProperties
对象进行设置,并在发送消息时传递给RabbitTemplate
**。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String message, int priority, boolean persistent, int ttl) {
// 创建MessageProperties
MessageProperties properties = new MessageProperties();
// 设置优先级,值范围0-9,其中0为最低优先级,9为最高优先级
properties.setPriority(priority);
// 设置消息持久化
properties.setDeliveryMode(persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
// 设置消息的过期时间,单位为毫秒
properties.setExpiration(String.valueOf(ttl));
// 使用MessageBuilder构建Message对象
Message msg = MessageBuilder.withBody(message.getBytes())
.setContentEncoding("UTF-8")
.setContentType("text/plain")
.setMessageId(UUID.randomUUID().toString()) // 可选,设置消息ID
.setTimestamp(new Date()) // 可选,设置时间戳
.setHeaders(Collections.singletonMap("x-custom-header", "value")) // 可选,设置自定义头
.andProperties(properties)
.build();
// 发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
5> 回调机制
**RabbitTemplate
**支持发送消息时的回调机制。这意味着在发送消息后,我们可以注册一个回调函数来处理发送结果或接收响应。这对于需要异步处理发送结果或接收响应的场景非常有用。
**setConfirmCallback
方法是RabbitTemplate
**类中的一个回调方法,用于处理消息的确认(acknowledgment)结果。当消息成功发送到RabbitMQ的交换机时,会触发确认回调,你可以在该回调中处理相应的逻辑。
-
correlationData
:关联数据,可以是任意类型的对象,通常用于唯一标识消息。
-
ack
:布尔值,表示消息是否成功发送到交换机。true
表示成功,false
表示失败。
-
cause
:失败的原因,当ack
为false
时,此参数会提供一个可选的异常信息。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// 消息发送成功
System.out.println(“Message sent successfully”);
} else {
// 消息发送失败,进行处理
System.out.println("Message sent failed: " + cause);
}
});
6> 异步消息处理
RabbitTemplate
支持异步消息处理,你可以注册ConfirmCallback
和ReturnCallback
来处理消息的确认和返回结果。ConfirmCallback
用于确认消息是否成功发送到交换机,ReturnCallback
用于处理无法路由到队列的消息。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// 消息发送成功
} else {
// 消息发送失败,进行处理
}
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 处理无法路由到队列的消息
});
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
3.使用 RabbitTemplate 的步骤
1> 配置 RabbitTemplate
在使用**RabbitTemplate
**之前,我们需要对其进行配置。这通常涉及到设置连接工厂、交换机、队列和绑定等。这些配置可以通过 XML 配置或 Java 配置完成。
2> 创建 RabbitTemplate 实例
一旦配置完成,我们可以创建一个**RabbitTemplate
**实例。这个实例将使用我们提供的配置来与 RabbitMQ 服务器进行交互。
3> 发送消息
使用**RabbitTemplate
**的发送方法,我们可以将消息发送到指定的交换机和路由键。我们可以指定消息体、消息属性和其他发送选项。
4> 接收消息
要接收消息,我们可以使用**RabbitTemplate
**的接收方法或结合监听器来监听指定的队列。当消息到达时,我们可以处理消息并执行相应的业务逻辑。
5> 处理异常和错误
在使用**RabbitTemplate
**时,我们还需要考虑异常和错误处理。例如,当发送消息失败或接收消息时发生异常时,我们需要有相应的处理机制来确保系统的稳定性和可靠性。
4.RabbitTemplate 的优势与注意事项
优势:
- 简化操作:
RabbitTemplate
封装了底层细节,使得开发者能够专注于业务逻辑的实现,而无需关心底层的消息传输细节。 - 灵活性:
RabbitTemplate
提供了丰富的配置选项和扩展点,使得开发者能够根据实际需求进行定制和优化。 - 性能优化:
RabbitTemplate
内部进行了性能优化,如连接池管理、消息缓存等,以提高消息传输的效率和可靠性。
注意事项:
- 配置正确性:确保
RabbitTemplate
的配置正确无误,包括连接工厂、交换机、队列和绑定等的设置。错误的配置可能导致消息无法正确发送或接收。 - 异常处理:在使用
RabbitTemplate
时,要充分考虑异常处理机制,确保在发生异常时能够及时发现并处理。 - 资源释放:在使用完
RabbitTemplate
后,要确保释放相关资源,如关闭连接、释放连接池中的连接等,以避免资源泄漏和性能问题。
评论记录:
回复评论: