首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Springboot整合kafka记录

  • 25-04-18 16:21
  • 3737
  • 12023
juejin.cn

本记录为Springboot框架整合本地成功部署的模拟3节点kafka集群

编码语言为Java

Java版本为jdk21

1. 确保 Kafka 集群已正确启动​​

  • ZooKeeper 集群(端口 2181、2182、2183)和 Kafka 集群(端口 9092、9093、9094)已正常运行。

  • 验证集群状态:

    python
    代码解读
    复制代码
    # 查看 Broker 列表 bin\windows\kafka-broker-api-versions.bat --bootstrap-server localhost:9092

​2. 添加 Kafka 依赖​​

在 pom.xml 中添加 Kafka 依赖:

xml
代码解读
复制代码
<dependency> <groupId>org.springframework.kafkagroupId> <artifactId>spring-kafkaartifactId> dependency>

3. 配置 Kafka 连接​​

在 application.yml(或 application.properties)中配置 Kafka:

yaml
代码解读
复制代码
spring: kafka: bootstrap-servers: localhost:9092,localhost:9093,localhost:9093 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer # Key的序列化器类 value-serializer: org.apache.kafka.common.serialization.StringSerializer # Value的序列化器类 retries: 3 # 生产失败重试次数 acks: all # 确保所有副本确认 0 最快但不持久 1 等待leader确认 all 等待所有副本确认 batch-size: 30720000 # 30MB # 批量发送时的批次大小(字节) buffer-memory: 33554432 # 32MB # 生产者的内存缓冲区大小(字节) consumer: auto-offset-reset: earliest # 偏移量重置策略: earliest:从最早的记录开始消费 latest:从最新的记录开始消费 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Key的反序列化器类 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Value的反序列化器类 enable-auto-commit: false # 手动提交 Offset # 每次poll()调用返回的最大消息条数 max-poll-records: 10 session: # 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒) timeout: ms: 300000 # 5分钟 listener: # 如果指定的主题不存在,是否让应用启动失败,false表示不会报错 missing-topics-fatal: false # 消费模式:single=单条消息,batch=批量消费 type: batch # 消费确认模式: # manual_immediate:手动确认消息,立即提交offset ack-mode: manual_immediate

4. 生产者代码​​

typescript
代码解读
复制代码
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; /** * @author kangk * @description 消息生产者 */ @Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; // 没有key,在集群下,也只会到默认分区 public void sendMessage(String topic, String message) { CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); future.whenComplete((result, ex) -> { if (ex == null) { System.out.println("Sent to partition " + result.getRecordMetadata().partition()); } else { System.out.println("Failed: " + ex.getMessage()); } }); } public void sendMessage(String topic, String key, String message) { CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message); future.whenComplete((result, ex) -> { if (ex == null) { System.out.println("Sent to partition " + result.getRecordMetadata().partition()); } else { System.out.println("Failed: " + ex.getMessage()); } }); } }

5. 消费者代码(手动提交)​​

ini
代码解读
复制代码
import com.kangk.demo.comm.KafkaConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; import java.util.List; /** * @author kangk * @description 批量消息消费者 */ @Service @Slf4j public class KafkaBatchConsumer { //监听多个 Topic @KafkaListener(topics = {"topic1", "topic2"}, groupId = "group1") // autoStartup参数:是是否自动启动;=”true“:自动启动,即生产者启动,该消费者将会开始消费;=”false":不自动启动,不开该模式的消费。 @KafkaListener( topics = {KafkaConstants.TOPIC}, groupId = KafkaConstants.GROUP_ID, containerFactory = "batchFactory", autoStartup = "true" ) public void batchListen(List messages, Acknowledgment acknowledgment) { log.info("BatchConsumer - Received batch: " + messages); acknowledgment.acknowledge(); } }

6. 测试集群容错性​​

​​(1) 发送消息​​

kotlin
代码解读
复制代码
import com.kangk.demo.comm.KafkaConstants; import com.kangk.demo.kafka.KafkaProducerService; import jakarta.annotation.Resource; import org.springframework.web.bind.annotation.*; import java.util.UUID; /** * @author kangk * @description KafkaController */ @RestController @RequestMapping("/api/1/kafka") public class KafkaController { @Resource private KafkaProducerService producerService; @PostMapping("/send") public String sendMessage() { String key = UUID.randomUUID().toString(); producerService.sendMessage(KafkaConstants.TOPIC, key, key); return "Message sent: " + message; } }
arduino
代码解读
复制代码
curl -X POST "http://localhost:8080/api/1/kafka/send"

控制台输出示例:

bash
代码解读
复制代码
Sent to partition 1 # 消息可能被发送到任意 Broker 的分区

​​(2) 关闭一个 Broker 模拟故障​​

  • 停止 localhost:9093 的 Kafka 进程。
  • 再次发送消息,观察是否仍能正常生产/消费(其他 Broker 应接管流量)。

7. 代码补充配置​​

​​(1) containerFactory编写​​

typescript
代码解读
复制代码
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; /** * @author kangk * @description 消费监听器工厂 */ @EnableKafka @Configuration public class KafkaConfig { // 单条消费监听器工厂,手动提交offset @Bean public ConcurrentKafkaListenerContainerFactory<String, String> singleFactory( ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } // 批量消费监听器工厂,手动提交offset @Bean public ConcurrentKafkaListenerContainerFactory<String, String> batchFactory( ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setBatchListener(true); // 启用批量消费 factory.setConcurrency(10); factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } }

​​(2) 常量提取​​

java
代码解读
复制代码
/** * @author kangk * @description kafka常量类 */ public class KafkaConstants { private KafkaConstants() {} // 防止实例化 public static final String GROUP_ID = "my-demo-group"; public static final String TOPIC = "test02"; }

​8.完整目录结构示例​​

bash
代码解读
复制代码
src/main/java/ ├── com/kangk/demo/ │ ├── config/ │ │ └── KafkaConfig.java # 消费监听器工厂 │ ├── constants/ │ │ └── KafkaConstants.java # 常量类 │ ├── controller/ │ │ ├── KafkaController.java │ ├── kafka/ │ │ ├── KafkaBatchConsumer.java │ │ └── KafkaConsumerService.java │ └── DemoApplication.java src/main/resources/ ├── application.yml # Kafka 配置
注:本文转载自juejin.cn的精神内耗中的钙奶饼干的文章"https://juejin.cn/post/7494067439241740314"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

103
后端
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2024 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top