5. 测试

创建一个控制器或测试类,用于测试生产者和消费者的功能:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send")
    public String send(@RequestParam("message") String message) {
        kafkaProducer.sendMessage("my-topic", message);
        return "Message sent to the Kafka Topic";
    }
}
 class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">

6. 启动应用

启动Spring Boot应用,访问/send接口发送消息,查看控制台输出,确认消息是否被成功消费。

7. 异常处理

为了更好地处理异常情况,可以在生产者和消费者中添加异常处理逻辑:

生产者异常处理
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(new ProducerRecord<>(topic, message));
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                RecordMetadata metadata = result.getRecordMetadata();
                System.out.println("Message sent successfully to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                System.err.println("Unable to send message due to : " + ex.getMessage());
            }
        });
    }
}
 class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}"> class="hide-preCode-box">
消费者异常处理
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(ConsumerRecord<String, String> record) {
        try {
            System.out.println("Received Message: " + record.value());
        } catch (Exception e) {
            System.err.println("Error processing message: " + e.getMessage());
        }
    }
}
 class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}"> class="hide-preCode-box">

通过以上步骤,可以在Spring Boot项目中成功集成Kafka,并实现消息的生产和消费。

>>
注:本文转载自blog.csdn.net的PlutoZuo的文章"https://blog.csdn.net/PlutoZuo/article/details/133824926"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接

评论记录:

未查询到任何数据!