首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

2、java调用kafka apiApache Kafka系列文章一、生产消息到Kafka中1、POM依赖2、开发步骤3、代码4、验证二、从Kafka的topic中消费消息1、开发步骤2、代码三、异步使用带有回调函数方法生产消息1、需求2、开发步骤3、代码四、kafka中发送和消费复杂类型五、消费kafka topic中的历史数据

  • 23-09-04 15:57
  • 2036
  • 5755
blog.csdn.net

Apache Kafka系列文章

1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试
2、java调用kafka api
3、kafka重要概念介紹及示例
4、kafka分区、副本介绍及示例
5、kafka监控工具Kafka-Eagle介绍及使用


文章目录

  • Apache Kafka系列文章
  • 一、生产消息到Kafka中
  • 1、POM依赖
  • 2、开发步骤
  • 3、代码
  • 4、验证
  • 二、从Kafka的topic中消费消息
  • 1、开发步骤
  • 2、代码
  • 三、异步使用带有回调函数方法生产消息
  • 1、需求
  • 2、开发步骤
  • 3、代码
  • 四、kafka中发送和消费复杂类型
    • 1、需求
    • 2、代码
      • 1)、创建topic
      • 2)、创建kafka序列化和反序列化方法
        • 1、将复杂类型和字节数字相互转换
        • 2、实现kafka的序列化接口
        • 3、实现kafka的反序列化接口
      • 3)、生产者发送数据
      • 4)、消费者消费数据
  • 五、消费kafka topic中的历史数据
    • 1、代码


本分介绍java调用kafka api。
本文前置条件是kafka环境搭建好。
本分五部分,即简单的写数据到kafka、从topic中消费数据、异步回调、读写kafka中复杂数据类型和读取历史数据。

一、生产消息到Kafka中

将1-100的数字消息写入到Kafka中。

1、POM依赖

导入Maven Kafka POM依赖

        
        <dependency>
            <groupId>org.apache.kafkagroupId>
            <artifactId>kafka-clientsartifactId>
            <version>2.4.1version>
        dependency>

        
        <dependency>
            <groupId>org.apache.commonsgroupId>
            <artifactId>commons-ioartifactId>
            <version>1.3.2version>
        dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2、开发步骤

  1. 创建用于连接Kafka的Properties配置
  2. 创建一个生产者对象KafkaProducer
  3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
  4. 再调用一个Future.get()方法等待响应
  5. 关闭生产者

3、代码

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;


public class KafkaProducerTest {
    public static void main(String[] args) {
//            1. 创建用于连接Kafka的Properties配置
//            2. 创建一个生产者对象KafkaProducer
//            3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
//            4. 再调用一个Future.get()方法等待响应
//            5. 关闭生产者
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "server1:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // 3. 调用send发送1-100消息到指定Topic test
        for (int i = 0; i < 100; ++i) {
            try {
                // 获取返回值Future,该对象封装了返回值
                Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
                // 调用一个Future.get()方法等待响应
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        // 5. 关闭生产者
        producer.close();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

4、验证

通过客户端查看写入的内容
在这里插入图片描述

二、从Kafka的topic中消费消息

从 test topic中,将消息都消费,并将记录的offset、key、value打印出来

1、开发步骤

  1. 创建Kafka消费者配置
    
    • 1
  2. 创建Kafka消费者
  3. 订阅要消费的主题
  4. 使用一个while循环,不断从Kafka的topic中拉取消息
  5. 将将记录(record)的offset、key、value都打印出来

2、代码


import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;


public class KafkaConsumerTest {

    /**
     * @param args
     * @throws InterruptedException 
     */
    public static void main(String[] args) throws InterruptedException {
//        1.    创建Kafka消费者配置
//        2.    创建Kafka消费者
//        3.    订阅要消费的主题
//        4.    使用一个while循环,不断从Kafka的topic中拉取消息
//        5.    将将记录(record)的offset、key、value都打印出来

        // 1.创建Kafka消费者配置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "server1:9092,server2:9092,server3:9092");
        // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        props.setProperty("group.id", "test");
        // 自动提交offset
        props.setProperty("enable.auto.commit", "true");
        // 自动提交offset的时间间隔
        props.setProperty("auto.commit.interval.ms", "1000");
        // 拉取的key、value数据的
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 2.创建Kafka消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        // 3. 订阅要消费的主题
        // 指定消费者从哪个topic中拉取数据
        kafkaConsumer.subscribe(Arrays.asList("test"));

        // 4.使用一个while循环,不断从Kafka的topic中拉取消息
        while (true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
            // 5.将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset:这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // keyvalue
                String key = consumerRecord.key();
                String value = consumerRecord.value();

                System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
            }
            Thread.sleep(1000);
        }

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

运行结果
在这里插入图片描述

三、异步使用带有回调函数方法生产消息

如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。

1、需求

  1. 在发送消息出现异常时,能够及时打印出异常信息
    
    • 1
  2. 在发送消息成功时,打印Kafka的topic名字、分区id、offset
    
    • 1

2、开发步骤

  1. 创建用于连接Kafka的Properties配置
  2. 创建一个生产者对象KafkaProducer
  3. 调用send发送1-100消息到指定Topic test,并在回调方法中打印异常信息或者打印topic、分区和offset
  4. 关闭生产者

3、代码

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;


public class KafkaProducerTest2 {
    public static void main(String[] args) {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "server1:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // 3. 调用send发送1-100消息到指定Topic test
        for (int i = 0; i < 100; ++i) {
            // 一、同步方式
            // 获取返回值Future,该对象封装了返回值
            // Future future = producer.send(new ProducerRecord
            // String>("test", null, i + ""));
            // 调用一个Future.get()方法等待响应
            // future.get();

            // 二、带回调函数异步方式
            producer.send(new ProducerRecord<String, String>("test", null, i + ""), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.out.println("发送消息出现异常");
                    } else {
                        String topic = metadata.topic();
                        int partition = metadata.partition();
                        long offset = metadata.offset();

                        System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
                    }
                }
            });
        }

        // 5. 关闭生产者
        producer.close();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

四、kafka中发送和消费复杂类型

1、需求

1、往kafka中写入user数据
2、消费kafka中user的数据
3、kafka的topic是userinfo

2、代码

1)、创建topic

kafka-topics.sh --create --bootstrap-server server1:9092 --topic userinfo --partitions 1 --replication-factor 1
  • 1

2)、创建kafka序列化和反序列化方法

kafka本身已经实现了基本类型的序列化与反序列化,复杂类型则需要自己实现。
http://iyenn.com/index/link?url=https://kafka.apache.org/31/javadoc/org/apache/kafka/common/serialization/package-summary.html
实现序列化与反序列化,主要是以字节流的形式读取和写入数据,然后实现kafka的序列化与反序列化的方法,最后在生产者或消费者中设置key、value的序列化与反序列化的类。

示例:

  • 生产者
// key的序列化,其类型是Integer
 props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
 
// value的序列化,其类型是Object
props.put("value.serializer", "org.kafka.objectexample.util.EncodeingKafka");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 消费者
// key的反序列化,其类型是Integer 
 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
// value的反序列化,其类型是Object
props.setProperty("value.deserializer", "org.kafka.objectexample.util.DecodeingKafka");
  • 1
  • 2
  • 3
  • 4

1、将复杂类型和字节数字相互转换

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

  
public class BeanConversion {
    /**
     * 对象转字节数组
     * 
     * @param obj
     * @return
     */
    public static byte[] ObjectToBytes(Object obj) {
        byte[] bytes = null;
        ByteArrayOutputStream bo = null;
        ObjectOutputStream oo = null;
        try {
            bo = new ByteArrayOutputStream();
            oo = new ObjectOutputStream(bo);
            oo.writeObject(obj);
            bytes = bo.toByteArray();

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (bo != null) {
                    bo.close();
                }
                if (oo != null) {
                    oo.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return bytes;
    }

    /**
     * 字节数组转对象
     * 
     * @param bytes
     * @return
     */
    public static Object BytesToObject(byte[] bytes) {
        Object obj = null;
        ByteArrayInputStream bi = null;
        ObjectInputStream oi = null;
        try {
            bi = new ByteArrayInputStream(bytes);
            oi = new ObjectInputStream(bi);
            obj = oi.readObject();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (bi != null) {
                    bi.close();
                }
                if (oi != null) {
                    oi.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        return obj;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

2、实现kafka的序列化接口

import org.apache.kafka.common.serialization.Serializer;

public class EncodeingKafka implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        return BeanConversion.ObjectToBytes(data);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

3、实现kafka的反序列化接口


import org.apache.kafka.common.serialization.Deserializer;

public class DecodeingKafka implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] data) {
        return BeanConversion.BytesToObject(data);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3)、生产者发送数据

import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaProducerUser {
    public final static String TOPIC_NAME = "userinfo";

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "server1:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.kafka.objectexample.util.EncodeingKafka");

        KafkaProducer producer = new KafkaProducer<Integer, User>(props);

        for (int i = 0; i < 100; i++) {
            User u = new User();
            u.setId(i * 3);
            u.setName("name" + i);
            if (i < 20) {
                u.setAge(i + 20);
            } else {
                u.setAge(i);
            }
   
            // public ProducerRecord(String topic, K key, V value)
            ProducerRecord<Integer, User> producerRecord = new ProducerRecord<Integer, User>(TOPIC_NAME, u.getId(), u);

            // send() 方法会返回一个包含 RecordMetadata 的
            // Future对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。
            Future<RecordMetadata> future = producer.send(producerRecord);
            // send() 方住先返回一个 Future对象,然后调用 Future对象的 get() 方法等待 Kafka 响应。如果服务器返回错误,
            // get()方怯会抛出异常。如果没有发生错误,我们会得到一个
            // RecordMetadata对象,可以用它获取消息的偏移量。如果在发送数据之前或者在发送过程中发生了任何错误 ,比如 broker返回
            // 了一个不允许重发消息的异常或者已经超过了重发的次数 ,那么就会抛出异常。

            RecordMetadata rm = future.get();
            System.out.println(rm.topic() + "  分区:" + rm.partition() + "  偏移量:" + rm.offset());
        }
        producer.close();

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

4)、消费者消费数据

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;


public class kafkaConsumerUser {

    public final static String TOPIC_NAME = "userinfo";

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "server1:9092,server2:9092,server3:9092");

        props.setProperty("group.id", TOPIC_NAME);
        // 自动提交offset
        props.setProperty("enable.auto.commit", "true");
        // 自动提交offset的时间间隔
        props.setProperty("auto.commit.interval.ms", "1000");
        // 拉取的key、value数据的
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.setProperty("value.deserializer", "org.kafka.objectexample.util.DecodeingKafka");

        // 2.创建Kafka消费者
        KafkaConsumer<Integer, User> kafkaConsumer = new KafkaConsumer<>(props);

        // 3. 订阅要消费的主题
        // 指定消费者从哪个topic中拉取数据
        kafkaConsumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 4.使用一个while循环,不断从Kafka的topic中拉取消息
        while (true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<Integer, User> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
            // 5.将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<Integer, User> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset:这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // keyvalue
                Integer key = consumerRecord.key();
                User value = consumerRecord.value();

                System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
            }
            Thread.sleep(1000);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

五、消费kafka topic中的历史数据

消费topic已经存在的数据。类似命令中–from-beginning 参数。

1、代码

在该服务启动前,如果topic中存在数据,是可以全部读出来,但如果topic数据部分已经被消费了,也会被读出来。

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;


public class KafkaConsumerHistoryOfUser {
    public final static String TOPIC_NAME = "userinfo";

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "server1:9092,server2:9092,server3:9092");
        props.setProperty("group.id", TOPIC_NAME);
        // 自动提交offset
        props.setProperty("enable.auto.commit", "true");
        // 自动提交offset的时间间隔
        props.setProperty("auto.commit.interval.ms", "1000");
        // 拉取的key、value数据的
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.setProperty("value.deserializer", "org.kafka.objectexample.util.DecodeingKafka");

        KafkaConsumer<Integer, User> kafkaConsumer = new KafkaConsumer<>(props);

        // 基于再均衡监听器,在给消费者分配分区的时候将消息偏移量跳转到起始位置
        kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                Map<TopicPartition, Long> beginningOffset = kafkaConsumer.beginningOffsets(collection);

                // 读取历史数据 --from-beginning
                for (Map.Entry<TopicPartition, Long> entry : beginningOffset.entrySet()) {
                    // 基于seek方法
                    // TopicPartition tp = entry.getKey();
                    // long offset = entry.getValue();
                    // consumer.seek(tp,offset);

                    // 基于seekToBeginning方法
                    kafkaConsumer.seekToBeginning(collection);
                }
            }
        });

        while (true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<Integer, User> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
            // 5.将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<Integer, User> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset:这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // keyvalue
                Integer key = consumerRecord.key();
                User value = consumerRecord.value();

                System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
            }
            Thread.sleep(1000);
        }

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

以上示例简单的介绍了几种读写kafka数据的情况。

文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树首页概览126555 人正在系统学习中
注:本文转载自blog.csdn.net的一瓢一瓢的饮 alanchan的文章"https://blog.csdn.net/chenwewi520feng/article/details/130577664"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top