首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

3、kafka重要概念介紹及示例Apache Kafka系列文章一、概念二、Kafka生产者幂等性与事务三、分区的leader与follower四、消息不丢失机制五、Kafka配额限速机制(Quotas)

  • 23-09-04 15:57
  • 2272
  • 11786
blog.csdn.net

Apache Kafka系列文章

1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试
2、java调用kafka api
3、kafka重要概念介紹及示例
4、kafka分区、副本介绍及示例
5、kafka监控工具Kafka-Eagle介绍及使用


文章目录

  • Apache Kafka系列文章
  • 一、概念
    • 1、Kafka重要概念
      • 1)、broker
      • 2)、zookeeper
      • 3)、 producer(生产者)
      • 4)、consumer(消费者)
      • 5)、consumer group(消费者组)
      • 6)、分区(Partitions)
      • 7)、副本(Replicas)
      • 8)、主题(Topic)
      • 9)、偏移量(offset)
    • 2、消费者组验证
      • 1)、创建topic
      • 2)、生产者代码
      • 3)、消费者代码
      • 4)、验证
      • 5)、验证多于2个消费者情况
  • 二、Kafka生产者幂等性与事务
    • 1、Kafka生产者幂等性
      • 1)、配置幂等性
      • 2)、幂等性原理
    • 2、Kafka事务
      • 1)、简介
      • 2)、事务操作API
      • 3)、Kafka事务编程
      • 4)、属性配置说明
      • 5)、事务相关属性配置
        • 1、生产者
        • 2、消费者
    • 3、Kafka事务编程
      • 1)、需求
      • 2)、启动生产者控制台程序模拟数据
      • 3)、编写创建消费者代码
      • 4)、编写创建生产者代码
      • 5)、编写代码消费并生产数据
      • 6)、完整代码
      • 7)、测试功能正常与否
      • 8)、模拟异常测试事务
  • 三、分区的leader与follower
    • 1、Leader和Follower
    • 2、查看某个partition的leader
    • 3、AR、ISR、OSR
    • 4、Leader选举
      • 1)、如果leader崩溃,Kafka会如何?
      • 2)、Controller介绍
        • 1、Controller的选举
        • 2、找到当前Kafka集群的controller
        • 3、测试controller选举
        • 4、Controller选举partition leader
    • 5、leader负载均衡
      • 1)、Preferred Replica
      • 2)、确保leader在broker中负载均衡
  • 四、消息不丢失机制
    • 1、broker数据不丢失
    • 2、生产者数据不丢失
  • 五、Kafka配额限速机制(Quotas)
    • 1、限制producer端速率
    • 2、限制consumer端速率
    • 3、取消Kafka的Quota配置


本文介绍了kafka相关重要的概念及使用示例。
本文前提是kafka环境可用。
本文分为五部分,即概念、幂等与事务、分区的leader和follower、消息可靠机制和限速机制。

一、概念

1、Kafka重要概念

1)、broker

在这里插入图片描述

  • 一个Kafka的集群通常由多个broker组成,这样才能实现负载均衡、以及容错
  • broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态
  • 一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能

2)、zookeeper

在这里插入图片描述
ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。

3)、 producer(生产者)

生产者负责将数据推送给broker的topic

4)、consumer(消费者)

消费者负责从broker的topic中拉取数据,并自己进行处理

5)、consumer group(消费者组)

consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。
在这里插入图片描述

  • consumer group是kafka提供的可扩展且具有容错性的消费者机制
  • 一个消费者组可以包含多个消费者
  • 一个消费者组有一个唯一的ID(group Id)
  • 组内的消费者一起消费主题的所有分区数据

6)、分区(Partitions)

在这里插入图片描述
在Kafka集群中,主题被分为多个分区

7)、副本(Replicas)

在这里插入图片描述
副本可以确保某个服务器出现故障时,确保数据依然可用
在Kafka中,一般都会设计副本的个数>1

8)、主题(Topic)

在这里插入图片描述

  • 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
  • Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制
  • 在主题中的消息是有结构的,一般一个主题包含某一类消息
  • 一旦生产者发送消息到主题中,这些消息就不能被更新(更改)

9)、偏移量(offset)

在这里插入图片描述

  • offset记录着下一条将要发送给Consumer的消息的序号
  • 默认Kafka将offset存储在ZooKeeper中
  • 在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset
  • 偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的

2、消费者组验证

Kafka支持有多个消费者同时消费一个主题中的数据。

1)、创建topic

由于每个分区数据只能是一个消费者进行消费,故该topic至少需要2个分区才能验证消费者组。

kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_consumerGroup --partitions 2 --replication-factor 1
  • 1

2)、生产者代码

// 生产40条数据

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;


public class KafkaProducerTest {
    // 只有主题分区超过一个以上的,才会由不同的消费者进行消费,一个分区的数据只能由组内的一个消费者进行消费,本示例的topic有2个分区
    public final static String TOPIC_NAME = "t_consumerGroup";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "server1:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        boolean flag = true;
        int count = 0;
        while (flag) {
            count++;

            for (int i = 0; i < 10; ++i) {
                try {
                    // 获取返回值Future,该对象封装了返回值
                    Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, i + "", i + ""));
                    // 调用一个Future.get()方法等待响应
                    future.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }

            if (count > 3)
                flag = false;
        }
        // 5. 关闭生产者
        producer.close();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

3)、消费者代码

// 设置消费者组的属性
props.setProperty("group.id", TOPIC_NAME);
  • 1
  • 2
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * @author alanchan
 *
 */
public class KafkaConsumerTest {
    public final static String TOPIC_NAME = "t_consumerGroup";

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "server1:9092,server2:9092,server3:9092");
        // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        props.setProperty("group.id", TOPIC_NAME);
        // 自动提交offset
        props.setProperty("enable.auto.commit", "true");
        // 自动提交offset的时间间隔
        props.setProperty("auto.commit.interval.ms", "1000");
        // 拉取的key、value数据的
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        kafkaConsumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
            // 5.将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset:这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // keyvalue
                String key = consumerRecord.key();
                String value = consumerRecord.value();
                int partition = consumerRecord.partition();
                System.out.println("topic: " + topic + "  partition:" + partition + " offset:" + offset + " key:" + key + " value:" + value);
            }
            Thread.sleep(1000);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

4)、验证

1、启动2个消费者程序
2、启动生产者程序
3、查看日志和kafka数据存储情况
在这里插入图片描述
在这里插入图片描述
消费者程序1运行输出

topic: t_consumerGroup partition:0 offset:0 key:0 value:0 topic:
t_consumerGroup partition:0 offset:1 key:2 value:2 topic:
t_consumerGroup partition:0 offset:2 key:5 value:5 topic:
t_consumerGroup partition:0 offset:3 key:6 value:6 topic:
t_consumerGroup partition:0 offset:4 key:0 value:0 topic:
t_consumerGroup partition:0 offset:5 key:2 value:2 topic:
t_consumerGroup partition:0 offset:6 key:5 value:5 topic:
t_consumerGroup partition:0 offset:7 key:6 value:6 topic:
t_consumerGroup partition:0 offset:8 key:0 value:0 topic:
t_consumerGroup partition:0 offset:9 key:2 value:2 topic:
t_consumerGroup partition:0 offset:10 key:5 value:5 topic:
t_consumerGroup partition:0 offset:11 key:6 value:6 topic:
t_consumerGroup partition:0 offset:12 key:0 value:0 topic:
t_consumerGroup partition:0 offset:13 key:2 value:2 topic:
t_consumerGroup partition:0 offset:14 key:5 value:5 topic:
t_consumerGroup partition:0 offset:15 key:6 value:6

消费者程序2运行输出

topic: t_consumerGroup partition:1 offset:0 key:1 value:1 topic:
t_consumerGroup partition:1 offset:1 key:3 value:3 topic:
t_consumerGroup partition:1 offset:2 key:4 value:4 topic:
t_consumerGroup partition:1 offset:3 key:7 value:7 topic:
t_consumerGroup partition:1 offset:4 key:8 value:8 topic:
t_consumerGroup partition:1 offset:5 key:9 value:9 topic:
t_consumerGroup partition:1 offset:6 key:1 value:1 topic:
t_consumerGroup partition:1 offset:7 key:3 value:3 topic:
t_consumerGroup partition:1 offset:8 key:4 value:4 topic:
t_consumerGroup partition:1 offset:9 key:7 value:7 topic:
t_consumerGroup partition:1 offset:10 key:8 value:8 topic:
t_consumerGroup partition:1 offset:11 key:9 value:9 topic:
t_consumerGroup partition:1 offset:12 key:1 value:1 topic:
t_consumerGroup partition:1 offset:13 key:3 value:3 topic:
t_consumerGroup partition:1 offset:14 key:4 value:4 topic:
t_consumerGroup partition:1 offset:15 key:7 value:7 topic:
t_consumerGroup partition:1 offset:16 key:8 value:8 topic:
t_consumerGroup partition:1 offset:17 key:9 value:9 topic:
t_consumerGroup partition:1 offset:18 key:1 value:1 topic:
t_consumerGroup partition:1 offset:19 key:3 value:3 topic:
t_consumerGroup partition:1 offset:20 key:4 value:4 topic:
t_consumerGroup partition:1 offset:21 key:7 value:7 topic:
t_consumerGroup partition:1 offset:22 key:8 value:8 topic:
t_consumerGroup partition:1 offset:23 key:9 value:9

经过以上数据核对,验证结果符合预期。

5)、验证多于2个消费者情况

1、启动3个消费者程序
2、启动生产者程序
3、查看日志和kafka数据存储情况
在这里插入图片描述
在这里插入图片描述
消费者1程序运行结果
在这里插入图片描述
消费者2程序运行结果
在这里插入图片描述
消费者3程序运行结果
在这里插入图片描述
经过以上数据核对,验证结果可以得出结论是每个数据分区只会分配给一个消费者,即便有多个消费者。

二、Kafka生产者幂等性与事务

1、Kafka生产者幂等性

在这里插入图片描述

1)、配置幂等性

props.put("enable.idempotence",true);
  • 1

2)、幂等性原理

为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和Sequence Number的概念。

  • PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。
  • Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。
    在这里插入图片描述

2、Kafka事务

1)、简介

Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。
在这里插入图片描述

2)、事务操作API

Producer接口中定义了以下5个事务相关方法:

  • initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
  • beginTransaction(开始事务):启动一个Kafka事务
  • sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交
  • commitTransaction(提交事务):提交事务
  • abortTransaction(放弃事务):取消事务
//producer提供的事务方法
/**
* 初始化事务。需要注意的有:
* 1、前提
*   需要保证transation.id属性被配置。
* 2、这个方法执行逻辑是:
*   (1)Ensures any transactions initiated by previous instances of the producer with the same
 transactional.id are completed. If the previous instance had failed with a transaction in
*      progress, it will be aborted. If the last transaction had begun completion, but not yet finished, this method awaits its completion.
*    (2)Gets the internal producer id and epoch, used in all future transactional messages issued by the producer.

*/
    public void initTransactions();
 
/**
 * 开启事务
 */
    public void beginTransaction() throws ProducerFencedException ;
 
/**
 * 为消费者提供的在事务内提交偏移量的操作
 */
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException ;
 
/**
 * 提交事务
 */
    public void commitTransaction() throws ProducerFencedException;
 
/**
 * 放弃事务,类似回滚事务的操作
 */
    public void abortTransaction() throws ProducerFencedException ;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

3)、Kafka事务编程

在一个原子操作中,根据包含的操作类型,可以分为三种情况:

  • 只有Producer生产消息;
  • 消费消息和生产消息并存,这个是事务场景中最常用的情况,就是我们常说的“consume-transform-produce ”模式
  • 只有consumer消费消息

前两种情况是事务引入的场景,最后一种情况没有使用价值(跟使用手动提交效果一样)。

4)、属性配置说明

使用kafka的事务api时的一些注意事项

  • 需要消费者的自动模式设置为false,并且不能再手动的执行consumer#commitSync或者consumer#commitAsyc
  • 生产者配置transaction.id属性
  • 生产者不需要再配置enable.idempotence,因为如果配置了transaction.id,则此时enable.idempotence会被设置为true
  • 消费者需要配置Isolation.level。在consume-trnasform-produce模式下使用事务时,必须设置为READ_COMMITTED。
    在这里插入图片描述
    在这里插入图片描述

5)、事务相关属性配置

1、生产者

// 配置事务的id,开启了事务会默认开启幂等性
props.put("transactional.id", "first-transactional");
  • 1
  • 2

2、消费者

// 1. 消费者需要设置隔离级别
props.put("isolation.level","read_committed");
//  2. 关闭自动提交
 props.put("enable.auto.commit", "false");
  • 1
  • 2
  • 3
  • 4

3、Kafka事务编程

1)、需求

在Kafka的topic ods_user 中有一些用户数据,数据格式如下:

姓名,性别,出生日期
张三,1,1980-10-09
李四,0,1985-11-01
将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到topic dwd_user 中。
要求使用事务保障,要么消费了数据同时写入数据到 topic,提交offset。要么全部失败。

2)、启动生产者控制台程序模拟数据

# 创建名为ods_user和dwd_user的主题
kafka-topics.sh --create --bootstrap-server server1:9092 --topic ods_user --partitions 1 --replication-factor 1
kafka-topics.sh --create --bootstrap-server server1:9092 --topic dwd_user --partitions 1 --replication-factor 1
# 生产数据到 ods_user
kafka-console-producer.sh --broker-list server1:9092 --topic ods_user
# 从dwd_user消费数据
kafka-console-consumer.sh --bootstrap-server server1:9092 --topic dwd_user --from-beginning  --isolation-level  read_committed
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3)、编写创建消费者代码

编写一个方法 createConsumer,该方法中返回一个消费者,订阅 ods_user 主题。
需要配置事务隔离级别、关闭自动提交。
实现步骤:
1、创建Kafka消费者配置
2、创建消费者,并订阅 ods_user 主题
3、代码

// 一、创建一个消费者来消费ods_user中的数据
public Consumer buildConsumer() {
    Properties props = new Properties();
    // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
    props.put("bootstrap.servers", KAFKA_SERVERS);
    // 消费者群组
    props.put("group.id", TOPIC_ODS_USER);
    // 设置隔离级别
    // 1、关闭自动提交 enable.auto.commit
    // 2、isolation.level为read_committed
    props.put("isolation.level", "read_committed");
    // 关闭自动提交
    // 在代码里面也不能使用手动提交commitSync( )或者commitAsync( )
    props.put("enable.auto.commit", "false");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

    consumer.subscribe(Arrays.asList(TOPIC_ODS_USER));
    return consumer;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

4)、编写创建生产者代码

编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。
1、创建生产者配置
2、创建生产者对象
3、代码

// 二、编写createProducer方法,用来创建一个带有事务配置的生产者
    // 1、设置transactional.id
    // 2、设置enable.idempotence
    private Producer buildProducer() {
        // create instance for properties to access producer configs
        Properties props = new Properties();
        // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
        props.put("bootstrap.servers", KAFKA_SERVERS);
        // 设置事务id
        props.put("transactional.id", TOPIC_DWD_USER);
        // 设置幂等性
        // 生产者不需要再配置enable.idempotence,因为如果配置了transaction.id,则此时enable.idempotence会被设置为true
        props.put("enable.idempotence", true);
        // Set acknowledgements for producer requests.
        props.put("acks", "all");
        // If the request fails, the producer can automatically retry,
        props.put("retries", 1);
        // Specify buffer size in
        // config,这里不进行设置这个属性,如果设置了,还需要执行producer.flush()来把缓存中消息发送出去
        // props.put("batch.size", 16384);
        // Reduce the no of requests less than 0
        props.put("linger.ms", 1);
        // The buffer.memory controls the total amount of memory available to the
        // producer for buffering.
        props.put("buffer.memory", 33554432);
        // Kafka消息是以键值对的形式发送,需要设置key和value类型序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        return producer;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

5)、编写代码消费并生产数据

实现步骤:
1、调用之前实现的方法,创建消费者、生产者对象
2、生产者调用initTransactions初始化事务
3、编写一个while死循环,在while循环中不断拉取数据,将 TOPIC_ODS_USER 队列中user信息中的0/1转换成女/男,并写入 TOPIC_DWD_USER 队列
(1) 生产者开启事务
(2) 消费者拉取消息
(3) 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)
(4) 生产消息到dwd_user topic中
(5) 提交偏移量到事务中
(6) 提交事务
(7) 捕获异常,如果出现异常,则取消事务

6)、完整代码

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

/**
 * 功能: 
 1、消费 ods_user 队列中的数据 
 2、在消费ods_user 队列中的数据同时,完成转换,同时将数据写入 dwd_user 队列中 
 验证:
 1、通过客户端向ods_user中写入业务数据 
 2、查看dwd_user中的数据是否按照要求进行转换
 * 
 * @author alanchan
 *
 */
public class ConsumeTransferProduceTransactionTest {
    private final static String TOPIC_ODS_USER = "ods_user";
    private final static String TOPIC_DWD_USER = "dwd_user";
    private final static String KAFKA_SERVERS = "server1:9092";

    // 一、创建一个消费者来消费ods_user中的数据
    public Consumer buildConsumer() {
        Properties props = new Properties();
        // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
        props.put("bootstrap.servers", KAFKA_SERVERS);
        // 消费者群组
        props.put("group.id", TOPIC_ODS_USER);
        // 设置隔离级别
        // 1、关闭自动提交 enable.auto.commit
        // 2、isolation.level为read_committed
        props.put("isolation.level", "read_committed");
        // 关闭自动提交
        // 在代码里面也不能使用手动提交commitSync( )或者commitAsync( )
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(TOPIC_ODS_USER));
        return consumer;
    }

    // 二、编写createProducer方法,用来创建一个带有事务配置的生产者
    // 1、设置transactional.id
    // 2、设置enable.idempotence
    private Producer buildProducer() {
        // create instance for properties to access producer configs
        Properties props = new Properties();
        // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
        props.put("bootstrap.servers", KAFKA_SERVERS);
        // 设置事务id
        props.put("transactional.id", TOPIC_DWD_USER);
        // 设置幂等性
        // 生产者不需要再配置enable.idempotence,因为如果配置了transaction.id,则此时enable.idempotence会被设置为true
        props.put("enable.idempotence", true);
        // Set acknowledgements for producer requests.
        props.put("acks", "all");
        // If the request fails, the producer can automatically retry,
        props.put("retries", 1);
        // Specify buffer size in
        // config,这里不进行设置这个属性,如果设置了,还需要执行producer.flush()来把缓存中消息发送出去
        // props.put("batch.size", 16384);
        // Reduce the no of requests less than 0
        props.put("linger.ms", 1);
        // The buffer.memory controls the total amount of memory available to the
        // producer for buffering.
        props.put("buffer.memory", 33554432);
        // Kafka消息是以键值对的形式发送,需要设置key和value类型序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        return producer;
    }

    // 三、将 TOPIC_ODS_USER 队列中user信息中的0/1转换成女/男,并写入 TOPIC_DWD_USER 队列
    //在一个事务内,即有生产消息又有消费消息,即常说的Consume-tansform-produce模式
    public void consumeTransferProduce() {
        // 1.构建生产者
        Producer producer = buildProducer();
        // 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
        producer.initTransactions();
        // 3.构建消费者和订阅主题
        Consumer consumer = buildConsumer();

        while (true) {
            // 4.开启事务
            producer.beginTransaction();
            // 5.1 接受消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
            try {
                // 5.2 do业务逻辑;
                Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();

                for (ConsumerRecord<String, String> record : records) {
                    // 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer records.
                    System.out.printf("接收消息信息:offset = %d, key = %s, value = %s
", record.offset(), record.key(), record.value());
                    String msg = trans(record.value());
                    System.out.println("转换后的消息信息:" + msg);

                    // 5.2.2 记录提交的偏移量
                    // offset + 1:offset是当前消费的记录(消息)对应在partition中的offset
                    // 必须要+1,解决服务重启后,消息消费不重复,即消费下一条消息
                    commits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));

                    // 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
                    producer.send(new ProducerRecord<String, String>(TOPIC_DWD_USER, msg));
                    
                }

                // 7.提交偏移量
                producer.sendOffsetsToTransaction(commits, TOPIC_ODS_USER);
//                int i = 1 / 0;
                // 8.事务提交
                producer.commitTransaction();

            } catch (Exception e) {
                e.printStackTrace();
                // 7.放弃事务
                producer.abortTransaction();
            }
        }
    }

    public static void main(String[] args) {
        ConsumeTransferProduceTransactionTest ctp = new ConsumeTransferProduceTransactionTest();
        ctp.consumeTransferProduce();
    }

    private String trans(String message) {
        // 将1转换为男,0转换为女
        String[] fieldArray = message.split(",");

        // 将字段进行替换
        if (fieldArray != null && fieldArray.length > 2) {
            String sexField = fieldArray[1];
            if (sexField.equals("1")) {
                fieldArray[1] = "男";
            } else if (sexField.equals("0")) {
                fieldArray[1] = "女";
            }
        }

        // 重新拼接字段
        return fieldArray[0] + "," + fieldArray[1] + "," + fieldArray[2];
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161

7)、测试功能正常与否

测试步骤
1)、创建ods_user、dw_user topic
2)、启动ConsumeTransferProduceTransactionTest.java
3)、在kafka客户端创建ods_user消息命令行,并生产消息

[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic ods_user
>张三,1,1980-10-09
>
  • 1
  • 2
  • 3

4)、在kafka客户端消费dwd_user消息命令,并查看其转换结果

[alanchan@server3 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic dwd_user --from-beginning  --isolation-level  read_committed
张三,男,1980-10-09
  • 1
  • 2

5)、同时查看ConsumeTransferProduceTransactionTest.java运行结果

接收消息信息:offset = 4, key = null, value = 李四,0,1985-11-01

转换后的消息信息:张三,男,1980-10-09

8)、模拟异常测试事务

测试步骤与上面的基本上一致,只是需要在提交事务前增加个认为异常,进行验证

            // 7.提交偏移量
            producer.sendOffsetsToTransaction(commits, TOPIC_ODS_USER); //                int i = 1 / 0;
            // 8.事务提交
            producer.commitTransaction();
  • 1
  • 2
  • 3
  • 4

通过上述验证可以发现,如果该程序运行出现异常,ods_user消息队列的数据不会消费,也不会写入dw_user队列中,一旦该程序运行正常,则ods_user的数据可以正常消费、且会写入dw_user队列中。需要注意的是程序运行异常期间,ods_user队列中未消费数据,在程序运行正常后也会继续消费掉。

三、分区的leader与follower

1、Leader和Follower

在Kafka中,每个topic都可以配置多个分区以及多个副本。每个分区都有一个leader以及0个或者多个follower,在创建topic时,Kafka会将每个分区的leader均匀地分配在每个broker上。正常使用kafka是感觉不到leader、follower的存在的。但其实,所有的读写操作都是由leader处理,而所有的follower都复制leader的日志数据文件,如果leader出现故障时,follower就会被选举为leader。

  • Kafka中的leader负责处理读写操作,而follower只负责副本数据的同步
  • 如果leader出现故障,其他follower会被重新选举为leader
  • follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中
    在这里插入图片描述

2、查看某个partition的leader

使用Kafka-eagle查看某个topic的partition的leader在哪个服务器中。为了方便观察,我们创建一个名为test_query_partition的3个分区、2个副本的topic。
在这里插入图片描述

3、AR、ISR、OSR

Kafka中,把follower可以按照不同状态分为三类——AR、ISR、OSR。
分区的所有副本称为AR(Assigned Replicas——已分配的副本)
所有与leader副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas——在同步中的副本)
由于follower副本同步滞后过多的副本(不包括 leader 副本)组成 OSR(Out-of-Sync Replias)
AR = ISR + OSR
正常情况下,所有的follower副本都应该与leader副本保持同步,即AR = ISR,OSR集合为空。

1、查看分区的ISR
使用Kafka Eagle查看某个Topic的partition的ISR有哪几个节点。
在这里插入图片描述
2、尝试关闭id为0的broker(杀掉该broker的进程),参看topic的ISR情况。
在这里插入图片描述

4、Leader选举

leader对于消息的写入以及读取是非常关键的,此时有两个疑问:
1、Kafka如何确定哪个partition是leader、哪个partition是follower呢?
2、某个leader崩溃了,如何快速确定另外一个leader呢?因为Kafka的吞吐量很高、延迟很低,所以选举leader必须非常快

1)、如果leader崩溃,Kafka会如何?

使用Kafka Eagle找到某个partition的leader,再找到leader所在的broker。在Linux中强制杀掉该Kafka的进程,然后观察leader的情况。
在这里插入图片描述
通过观察,我们发现,leader在崩溃后,Kafka又从其他的follower中快速选举出来了leader。

2)、Controller介绍

Kafka启动时,会在所有的broker中选择一个controller

  • 前面leader和follower是针对partition,而controller是针对broker的
  • 创建topic、或者添加分区、修改副本数量之类的管理任务都是由controller完成的
  • Kafka分区leader的选举,也是由controller决定的

1、Controller的选举

在Kafka集群启动的时候,每个broker都会尝试去ZooKeeper上注册成为Controller(ZK临时节点)
但只有一个竞争成功,其他的broker会注册该节点的监视器
一点该临时节点状态发生变化,就可以进行相应的处理
Controller也是高可用的,一旦某个broker崩溃,其他的broker会重新注册为Controller

2、找到当前Kafka集群的controller

点击Kafka Tools的Tools菜单,找到ZooKeeper Brower… 点击左侧树形结构的controller节点,就可以查看到哪个broker是controller了。
在这里插入图片描述

3、测试controller选举

通过kafka tools找到controller所在的broker对应的kafka进程,杀掉该进程,重新打开ZooKeeper brower,观察kafka是否能够选举出来新的Controller。
在这里插入图片描述

4、Controller选举partition leader

  • 所有Partition的leader选举都由controller决定
  • controller会将leader的改变直接通过RPC的方式通知需为此作出响应的Broker
  • controller读取到当前分区的ISR,只要有一个Replica还幸存,就选择其中一个作为leader否则,则任意选这个一个Replica作为leader
  • 如果该partition的所有Replica都已经宕机,则新的leader为-1

为什么不能通过ZK的方式来选举partition的leader?
Kafka集群如果业务很多的情况下,会有很多的partition
假设某个broker宕机,就会出现很多的partiton都需要重新选举leader
如果使用zookeeper选举leader,会给zookeeper带来巨大的压力。所以,kafka中leader的选举不能使用ZK来实现

5、leader负载均衡

1)、Preferred Replica

Kafka中引入了一个叫做preferred-replica的概念,意思就是:优先的Replica
在ISR列表中,第一个replica就是preferred-replica
第一个分区存放的broker,就是preferred-replica
执行以下脚本可以将preferred-replica设置为leader,均匀分配每个分区的leader。

kafka-leader-election.sh
--bootstrap-server server1:9092 
--topic 主题 
--partition=1
--election-type preferred
  • 1
  • 2
  • 3
  • 4
  • 5

2)、确保leader在broker中负载均衡

杀掉test主题的某个broker,这样kafka会重新分配leader。等到Kafka重新分配leader之后,再次启动kafka进程。观察test主题各个分区leader的分配情况。

在这里插入图片描述
此时,会造成leader分配是不均匀的,所以可以执行以下脚本来重新分配leader:

kafka-leader-election.sh
--bootstrap-server server1:9092 
--topic test 
--partition=2
--election-type preferred

# --partition:指定需要重新分配leader的partition编号
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述

四、消息不丢失机制

1、broker数据不丢失

生产者通过分区的leader写入数据后,所有在ISR中follower都会从leader中复制数据,这样可以确保即使leader崩溃了,其他的follower的数据仍然是可用的

2、生产者数据不丢失

1、生产者连接leader写入数据时,可以通过ACK机制来确保数据已经成功写入。ACK机制有三个可选配置

  • 配置ACK响应要求为 -1 时 —— 表示所有的节点都收到数据(leader和follower都接收到数据)
  • 配置ACK响应要求为 1 时 —— 表示leader收到数据
  • 配置ACK影响要求为 0 时 —— 生产者只负责发送数据,不关心数据是否丢失(这种情况可能会产生数据丢失,但性能是最好的)

2、生产者可以采用同步和异步两种方式发送数据

  • 同步:发送一批数据给kafka后,等待kafka返回结果
  • 异步:发送一批数据给kafka,只是提供一个回调函数。
    如果broker迟迟不给ack,而buffer又满了,开发者可以设置是否直接清空buffer中的数据。

3、消费者数据不丢失
在消费者消费数据的时候,只要每个消费者记录好offset值即可,就能保证数据不丢失。

五、Kafka配额限速机制(Quotas)

生产者和消费者以极高的速度生产/消费大量数据或产生请求,从而占用broker上的全部资源,造成网络IO饱和。通过配额(Quotas)设置可以避免这些问题。
Kafka支持配额管理,从而可以对Producer和Consumer的produce&fetch操作进行流量限制,防止个别业务过载。

1、限制producer端速率

为所有client id设置默认值,以下为所有producer程序设置其TPS不超过1MB/s,即1048576/s,
命令如下:

kafka-configs.sh
--bootstrap-server server1:9092
--alter 
--add-config 'producer_byte_rate=1048576' 
--entity-type clients 
--entity-default
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

运行基准测试,观察生产消息的速率

kafka-producer-perf-test.sh 
--topic test
--num-records 500000 
--throughput -1 
--record-size 1000 
--producer-props
bootstrap.servers=server1:9092,server2:9092,server3:9092
acks=1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

结果:50000 records sent, 1108.156028 records/sec (1.06 MB/sec)

2、限制consumer端速率

对consumer限速与producer类似,只不过参数名不一样。
为指定的topic进行限速,以下为所有consumer程序设置topic速率不超过1MB/s,即1048576/s。
命令如下:

kafka-configs.sh
--bootstrap-server server1:9092 
--alter 
--add-config 'consumer_byte_rate=1048576' 
--entity-type clients 
--entity-default
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

运行基准测试

kafka-consumer-perf-test.sh 
--broker-list server1:9092,server2:9092,server3:9092
--topic test
--fetch-size 1048576 
--messages 500000
  • 1
  • 2
  • 3
  • 4
  • 5

结果为:MB.sec:1.0743

3、取消Kafka的Quota配置

使用以下命令,删除Kafka的Quota配置

kafka-configs.sh 
--bootstrap-server server1:9092  
--alter 
--delete-config 'producer_byte_rate' 
--entity-type  clients 
--entity-default

kafka-configs.sh 
--bootstrap-server server1:9092  
--alter 
--delete-config 'consumer_byte_rate' 
--entity-type  clients 
--entity-default
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

以上,完成了kafka相关重要概念的介绍、示例等内容。

文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树首页概览126567 人正在系统学习中
注:本文转载自blog.csdn.net的一瓢一瓢的饮 alanchan的文章"https://blog.csdn.net/chenwewi520feng/article/details/130577766"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top