首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

浅聊MQ之Kafka、RabbitMQ、ActiveMQ、RocketMQ持久化策略

  • 25-03-03 00:44
  • 3987
  • 11235
blog.csdn.net

以下是主流消息队列(Kafka、RabbitMQ、ActiveMQ、RocketMQ)的持久化策略详解及实际场景示例:

1. Kafka持久化策略

核心机制:
  • 日志分段存储:

    • 每个Topic分区对应一个物理日志文件(顺序写入)

    • 分段策略:默认每1GB或7天生成新Segment(log.segment.bytes/log.roll.hours)

    • 索引文件:.index(偏移量索引)和.timeindex(时间戳索引)

  • 刷盘策略:

    1. # 异步刷盘(高性能)
    2. log.flush.interval.messages=10000 # 每1万条刷盘
    3. log.flush.interval.ms=1000 # 每秒刷盘
    4. # 同步刷盘(高可靠)
    5. log.flush.interval.messages=1
    6. log.flush.interval.ms=0
  • 副本同步:

    1. # 配置ISR最小同步副本数
    2. min.insync.replicas=2
实战案例:
  • 场景:某电商平台订单日志采集

    1. # Topic配置
    2. bin/kafka-topics.sh --create \
    3. --topic order_logs \
    4. --partitions 6 \
    5. --replication-factor 3 \
    6. --config retention.ms=604800000 # 保留7天
    • 使用LZ4压缩(compression.type=lz4)降低存储成本

    • 通过kafka-reassign-partitions.sh实现跨机架存储

特点:
  • 优势:顺序写盘+零拷贝技术实现百万级TPS

  • 缺陷:单个大消息可能影响整体吞吐


2. RabbitMQ持久化策略

核心机制:
  • 消息存储:

    • 持久化消息:同时写入内存和磁盘(delivery_mode=2)

    • 非持久化消息:仅存内存(重启丢失)

  • 队列存储:

    1. # 声明持久化队列
    2. channel.queue_declare(queue='payment', durable=True)
  • 消息日志:

    • 使用消息存储(msg_store)和队列索引(queue_index)分离存储

    • 默认存储位置:

      /var/lib/rabbitmq/mnesia
刷盘策略:
  1. # 配置刷盘频率(rabbitmq.conf)
  2. disk_free_limit.absolute = 5GB
  3. queue_index_embed_msgs_below = 4096 # 小于4KB的消息嵌入索引
实战案例:
  • 场景:银行转账系统

    1. // 发送持久化消息
    2. MessageProperties props = new MessageProperties();
    3. props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    4. channel.basicPublish("", "transfer", props, message.getBytes());
    • 使用镜像队列实现高可用:

      rabbitmqctl set_policy ha-all "^transfer" '{"ha-mode":"all"}'
特点:
  • 优势:灵活的消息路由与ACK机制

  • 缺陷:海量持久化消息时性能显著下降


3. ActiveMQ持久化策略

存储方案对比:
存储类型原理适用场景配置示例
KahaDB基于事务日志的存储常规消息持久化
LevelDB基于LSM-Tree的高性能存储高写入吞吐场景
JDBC数据库存储(MySQL/Oracle)强事务需求
KahaDB深度配置:
  1. directory="activemq-data"
  2. indexWriteBatchSize="1000"
  3. journalMaxFileLength="32mb"
  4. enableIndexWriteAsync="true"/>

运行 HTML

实战案例:
  • 场景:航空订票系统

    1. -- 使用MySQL存储消息
    2. CREATE TABLE activemq_msgs (
    3. ID BIGINT PRIMARY KEY,
    4. CONTAINER VARCHAR(250),
    5. MSGID_PROD VARCHAR(250),
    6. MSGID_SEQ BIGINT,
    7. EXPIRATION BIGINT,
    8. MSG BLOB
    9. );
    • 配置每100条消息批量提交(jdbcPersistenceAdapter batchSize=100)

特点:
  • 优势:支持多种存储后端

  • 缺陷:LevelDB官方已停止维护


4. RocketMQ持久化策略

存储架构:
  • CommitLog:

    • 所有Topic消息顺序写入单个文件

    • 默认每1GB分新文件(mapedFileSizeCommitLog=1073741824)

  • ConsumeQueue:

    • 逻辑队列索引(存储CommitLog物理偏移)

    • 异步构建(flushIntervalCommitLog=1000)

刷盘模式:
模式配置特点适用场景
同步刷盘flushDiskType=SYNC_FLUSH每条消息写盘确认金融交易
异步刷盘flushDiskType=ASYNC_FLUSH批量刷盘(默认)常规业务
实战案例:
  • 场景:物流状态更新

    1. # broker.conf
    2. brokerRole=SYNC_MASTER # 同步主从复制
    3. flushDiskType=SYNC_FLUSH
    4. mappedFileSizeConsumeQueue=6000000 # ConsumeQueue文件大小
    • 使用Dledger实现自动选主:

      sh mqadmin updateBrokerConfig -b broker-a:10911 -n localhost:9876 -k enableDledger -v true
特点:
  • 优势:CommitLog顺序写+ConsumeQueue随机读优化

  • 缺陷:单机海量Topic时性能下降


5. 持久化策略对比总结

MQ存储模型写入方式可靠性典型吞吐适用场景
Kafka分区日志分段顺序追加极高百万级TPS日志流处理
RabbitMQ队列独立存储随机写入高万级TPS复杂路由系统
ActiveMQ统一日志存储混合模式中万级TPS传统企业应用
RocketMQCommitLog统一存储顺序写入极高十万级TPS金融交易系统

6. 生产环境配置建议

Kafka高可靠配置:
  1. # server.properties
  2. acks=all
  3. min.insync.replicas=2
  4. unclean.leader.election.enable=false
RabbitMQ防丢失配置:
  1. # 启用镜像队列
  2. rabbitmqctl set_policy ha-all "^critical." '{"ha-mode":"exactly","ha-params":3}'
  3. # 持久化交换机
  4. channel.exchangeDeclare("orders", "direct", true)
RocketMQ事务消息示例:
  1. // 发送事务消息
  2. TransactionSendResult result = producer.sendMessageInTransaction(msg, localExecuter, arg);
  3. if(result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
  4. // 执行成功逻辑
  5. }

7. 故障恢复案例

案例1:Kafka日志损坏:
  1. # 使用DumpLog工具恢复
  2. bin/kafka-run-class.sh kafka.tools.DumpLogSegments \
  3. --files 00000000000000000123.log \
  4. --print-data-log
案例2:RabbitMQ数据迁移:
  1. # 使用Federation插件跨集群同步
  2. rabbitmqctl set_parameter federation-upstream orders-upstream \
  3. '{"uri":"amqp://user:pass@old-server"}'

通过理解各MQ的持久化机制,开发者可根据业务特性(如吞吐量要求、数据重要性、运维复杂度)做出合理选择。例如在证券交易系统中,RocketMQ的同步刷盘+主从同步能完美满足毫秒级延迟与零数据丢失的要求。

(望各位潘安、各位子健/各位彦祖、于晏不吝赐教!多多指正!?)

注:本文转载自blog.csdn.net的天天向上杰的文章"https://blog.csdn.net/qq_37679639/article/details/145648069"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top