本记录为Springboot框架整合本地成功部署的模拟3节点kafka集群
编码语言为Java
Java版本为jdk21
1. 确保 Kafka 集群已正确启动
-
ZooKeeper 集群(端口
2181
、2182
、2183
)和 Kafka 集群(端口9092
、9093
、9094
)已正常运行。 -
验证集群状态:
python代码解读复制代码# 查看 Broker 列表 bin\windows\kafka-broker-api-versions.bat --bootstrap-server localhost:9092
2. 添加 Kafka 依赖
在 pom.xml
中添加 Kafka 依赖:
xml 代码解读复制代码<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
dependency>
3. 配置 Kafka 连接
在 application.yml
(或 application.properties
)中配置 Kafka:
yaml 代码解读复制代码spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9093
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer # Key的序列化器类
value-serializer: org.apache.kafka.common.serialization.StringSerializer # Value的序列化器类
retries: 3 # 生产失败重试次数
acks: all # 确保所有副本确认 0 最快但不持久 1 等待leader确认 all 等待所有副本确认
batch-size: 30720000 # 30MB # 批量发送时的批次大小(字节)
buffer-memory: 33554432 # 32MB # 生产者的内存缓冲区大小(字节)
consumer:
auto-offset-reset: earliest # 偏移量重置策略: earliest:从最早的记录开始消费 latest:从最新的记录开始消费
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Key的反序列化器类
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Value的反序列化器类
enable-auto-commit: false # 手动提交 Offset
# 每次poll()调用返回的最大消息条数
max-poll-records: 10
session:
# 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)
timeout:
ms: 300000 # 5分钟
listener:
# 如果指定的主题不存在,是否让应用启动失败,false表示不会报错
missing-topics-fatal: false
# 消费模式:single=单条消息,batch=批量消费
type: batch
# 消费确认模式:
# manual_immediate:手动确认消息,立即提交offset
ack-mode: manual_immediate
4. 生产者代码
typescript 代码解读复制代码import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
/**
* @author kangk
* @description 消息生产者
*/
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 没有key,在集群下,也只会到默认分区
public void sendMessage(String topic, String message) {
CompletableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, message);
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent to partition " +
result.getRecordMetadata().partition());
} else {
System.out.println("Failed: " + ex.getMessage());
}
});
}
public void sendMessage(String topic, String key, String message) {
CompletableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, message);
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent to partition " +
result.getRecordMetadata().partition());
} else {
System.out.println("Failed: " + ex.getMessage());
}
});
}
}
5. 消费者代码(手动提交)
ini 代码解读复制代码import com.kangk.demo.comm.KafkaConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author kangk
* @description 批量消息消费者
*/
@Service
@Slf4j
public class KafkaBatchConsumer {
//监听多个 Topic @KafkaListener(topics = {"topic1", "topic2"}, groupId = "group1")
// autoStartup参数:是是否自动启动;=”true“:自动启动,即生产者启动,该消费者将会开始消费;=”false":不自动启动,不开该模式的消费。
@KafkaListener(
topics = {KafkaConstants.TOPIC},
groupId = KafkaConstants.GROUP_ID,
containerFactory = "batchFactory",
autoStartup = "true"
)
public void batchListen(List messages, Acknowledgment acknowledgment) {
log.info("BatchConsumer - Received batch: " + messages);
acknowledgment.acknowledge();
}
}
6. 测试集群容错性
(1) 发送消息
kotlin 代码解读复制代码import com.kangk.demo.comm.KafkaConstants;
import com.kangk.demo.kafka.KafkaProducerService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
/**
* @author kangk
* @description KafkaController
*/
@RestController
@RequestMapping("/api/1/kafka")
public class KafkaController {
@Resource
private KafkaProducerService producerService;
@PostMapping("/send")
public String sendMessage() {
String key = UUID.randomUUID().toString();
producerService.sendMessage(KafkaConstants.TOPIC, key, key);
return "Message sent: " + message;
}
}
arduino 代码解读复制代码curl -X POST "http://localhost:8080/api/1/kafka/send"
控制台输出示例:
bash 代码解读复制代码Sent to partition 1 # 消息可能被发送到任意 Broker 的分区
(2) 关闭一个 Broker 模拟故障
- 停止
localhost:9093
的 Kafka 进程。 - 再次发送消息,观察是否仍能正常生产/消费(其他 Broker 应接管流量)。
7. 代码补充配置
(1) containerFactory编写
typescript 代码解读复制代码import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
/**
* @author kangk
* @description 消费监听器工厂
*/
@EnableKafka
@Configuration
public class KafkaConfig {
// 单条消费监听器工厂,手动提交offset
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
// 批量消费监听器工厂,手动提交offset
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true); // 启用批量消费
factory.setConcurrency(10);
factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
(2) 常量提取
java 代码解读复制代码
/**
* @author kangk
* @description kafka常量类
*/
public class KafkaConstants {
private KafkaConstants() {} // 防止实例化
public static final String GROUP_ID = "my-demo-group";
public static final String TOPIC = "test02";
}
8.完整目录结构示例
bash 代码解读复制代码src/main/java/
├── com/kangk/demo/
│ ├── config/
│ │ └── KafkaConfig.java # 消费监听器工厂
│ ├── constants/
│ │ └── KafkaConstants.java # 常量类
│ ├── controller/
│ │ ├── KafkaController.java
│ ├── kafka/
│ │ ├── KafkaBatchConsumer.java
│ │ └── KafkaConsumerService.java
│ └── DemoApplication.java
src/main/resources/
├── application.yml # Kafka 配置
评论记录:
回复评论: