首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

AdaptiveRecvByteBuAllocator 源码分析

  • 25-04-24 21:01
  • 2080
  • 5314
juejin.cn

名词定义 单次网络IO: 执行一次Netty的Unsafe.beginRead();(因为这个beginRead()中可能会从内核读取多次数据)

1、作用

用于动态调整接收从网络上获取的数据

2、类图

image.png

3、源码

本文基于4.1.38版本的AdaptiveRecvByteBufAllocator、Epoll模式进行源码分析

核心API介绍

AdaptiveRecvByteBufAllocator

核心变量
java
代码解读
复制代码
// 默认值 static final int DEFAULT_MINIMUM = 64; static final int DEFAULT_INITIAL = 1024; static final int DEFAULT_MAXIMUM = 65536; // 自适应变化的时候,一次扩容2^4=16倍,缩容2^1=2倍; private static final int INDEX_INCREMENT = 4; private static final int INDEX_DECREMENT = 1; // 每次读取的最大读取次数 private volatile int maxMessagesPerRead; // 希望一次读取更多的数据 private volatile boolean respectMaybeMoreData = true; // 最小读取的字节数的索引值 private int minIndex; // 最大读取的字节数的索引值 private int maxIndex; // 初始值 private int initial;
构造器
  1. 规格化minIndex、maxIndex、initial;
  2. 如果minIndex 对应的字节数小于minimum,那么就会minIndex+1
  3. 如果maxIndex对应的字节数大于maximum,就会maxIndex-1
  4. 默认参数是:minimum=64,initial=1024, maximum=64k
ini
代码解读
复制代码
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { ...... int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial; }
getSizeTableIndex

通过二分法,获取size对应的索引值

ini
代码解读
复制代码
private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { return mid + 1; } } }

HandleImpl

io.netty.channel.AdaptiveRecvByteBufAllocator.HandleImpl

关键变量
java
代码解读
复制代码
private ChannelConfig config; // 单次IO读取message的次数; private int maxMessagePerRead; // 读取message的次数 private int totalMessages; // 本次IO读取字节总量; private int totalBytesRead; // 单次期望读取的字节总量 private int attemptedBytesRead; // 最近一次从内核读取的字节数(真实数据量) private int lastBytesRead; private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData; private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() { @Override public boolean get() { // 通过判断预计读取的字节数和真实读取的字节数相比较;若相同, 说明可能还有数据 // 如果不相等,说明真实读取的字节数小于预计,那么说明内核中没有可读数据了;那也就不用再读了; return attemptedBytesRead == lastBytesRead; } };
构造器
  1. 设置minIndex、maxIndex、index
  2. 比较重要的nextReceiveBufferSize变量,表示下一次recvBuffer的大小;它是如何动态变化呢?看后文
ini
代码解读
复制代码
HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; }
lastBytesRead
scss
代码解读
复制代码
public void lastBytesRead(int bytes) { // 如果从网卡中上去的真实数据的数量和预期要读的数据的数量是一致的话,那么就进行一次记录 // 为动态扩缩容做准备 if (bytes == attemptedBytesRead()) { record(bytes); } // 给lastBytesRead变量赋值,将当次的字节数加入到totalBytesRead里面; super.lastBytesRead(bytes); }
record
ini
代码解读
复制代码
private void record(int actualReadBytes) { // 如果真实读到的字节数小于等于index-1-1对应的字节数, // 第一次小于,那么只是将decreaseNow设置为true // 第二次小于的时候,就会进行索引; // 扩展一下,也就是两次网络IO读取的数据都比较小,就会触发缩容 if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) { if (decreaseNow) { index = max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } // 如果真实读取的数据大于nextReceiveBufferSize,index从(index+4,maxIndex)中获取最小的那个参数,同时将decreaseNow设置为false // 最大recvBuf的大小是64k } else if (actualReadBytes >= nextReceiveBufferSize) { index = min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } }
readComplete
csharp
代码解读
复制代码
public void readComplete() { // 获取本次IO读取的总的字节数,进行做一个记录; // 这里的作用就是为了可能得缩容做准备 record(totalBytesRead()); }
guess
csharp
代码解读
复制代码
public int guess() { // 返回下一次recvBuffer的大小; return nextReceiveBufferSize; }
continueReading
typescript
代码解读
复制代码
public boolean continueReading() { return continueReading(defaultMaybeMoreSupplier); } public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { // 可读 && 预期还有数据 && 读取数据的次数小于maxMessagePerRead &&读取总字节数大于0 return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0; }
reset

信息重置,主要是maxMessagePerRead, totalMessages、totalBytesRead

ini
代码解读
复制代码
public void reset(ChannelConfig config) { this.config = config; maxMessagePerRead = maxMessagesPerRead(); totalMessages = totalBytesRead = 0; }

初始化AdaptiveRecvByteBufAllocator

创建一个EpollSocketChannel对象时,其内部会生成一个EpollSocketChannelConfig,在其父类DefaultChannelConfig中会创建AdaptiveRecvByteBufAllocator对象;

arduino
代码解读
复制代码
public EpollSocketChannel() { super(newSocketStream(), false); config = new EpollSocketChannelConfig(this); }

在创建DefaultChannelConfig时

  1. 创建一个AdaptiveRecvByteBufAllocator
  2. 获取channel的metadata对象,然后设置defaultMaxMessagePerRead
  3. 注:这个Metadata对象是EpollSocketChannel(在AbstractEpollChannel)的常量;
scss
代码解读
复制代码
public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); } protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) { setRecvByteBufAllocator(allocator, channel.metadata()); this.channel = channel; }

io.netty.channel.epoll.AbstractEpollChannel

  1. 这个ChannelMetadata中,defaultMaxMessagePerRead大小是16,也就是每次最多从网络IO中读取16次的数据量,防止由于单次响应结果数据量过大,导致这个EventLoop中Queue里面的其他任务无法执行,导致其他任务耗时增加的情况发生;
java
代码解读
复制代码
private static final ChannelMetadata METADATA = new ChannelMetadata(false);

使用AdaptiveRecvByteBufAllocator进行动态分配内存

epollInReady

源码位置: io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe 简单逻辑如下:(详细可看代码解释)

  1. 获取EpollRecvByteAllocatorHandle,这个是代理类,代理了AdaptiveRecvByteBufAllocator.HandleImpl对象;
  2. 设置handle的处理模式是Native.EPOLLET
  3. 从channelConfig中获取Allocator,准备开始分配数据了;
  4. 分配一块内存,这块内存的大小由guess()决定,
  5. 分配完成后,将内核中的数据写入到这个ByteBuf对象中,同时设置allocHandle的最近一次读取数据的字节数(lastBytesRead)
  6. incMessage() 将读取次数加1;
  7. 触发pipeline.fireChannelRead()
  8. 判断是否可读(continueReading):如果可读,则继续读;不可读,跳出循环;
ini
代码解读
复制代码
void epollInReady() { final ChannelConfig config = config(); if (shouldBreakEpollInReady(config)) { clearEpollIn0(); return; } // 1. 获取Handle对象,这块是对AdaptiveRecvByteBuf$HandlerImpl的包装、代理; final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); final ChannelPipeline pipeline = pipeline(); // 2. 获取Allocator,准备开始分配内存了; final ByteBufAllocator allocator = config.getAllocator(); allocHandle.reset(config); epollInBefore(); ByteBuf byteBuf = null; boolean close = false; try { Queue sQueue = null; do { ...... // we use a direct buffer here as the native implementations only be able // to handle direct buffers. // 分配内存,这块是一个重点,扩展讲下,他会从guess()中获取,guess()返回的是下一次可以申请的最大字节数; byteBuf = allocHandle.allocate(allocator); // 简单总结:从IO读取数据,设置lastBytesRead,attemptedBytesRead allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 如果没有读取到数据,则返回 if (allocHandle.lastBytesRead() <= 0) { // nothing was read, release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } // 读取次数+1 allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; if (shouldBreakEpollInReady(config)) { ...... break; } // 可读 & lastBytesRead==attemptedBytesRead & 没有超过最大字节数 &&已读字节数>0 } while (allocHandle.continueReading()); // 说明本次IO读取完成,设置一下信息,为自适应扩缩容提供数据 allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { shutdownInput(false); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // 如果本次IO没有读取完成呢? 那么就需要将其封装后,在等下一次轮回~~ epollInFinally(config); } }

epollInFinally

scss
代码解读
复制代码
final void epollInFinally(ChannelConfig config) { // 判断是否还有更多数据,判断依据:期望读取的字节数=真实读取的字节数 maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); // 检测对端是否已经处于禁止写入状态(半关闭状态) , // || 是否在读挂起状态并且还有更多的数据要读 if (allocHandle.isReceivedRdHup() || (readPending && maybeMoreDataToRead)) { ....... executeEpollInReadyRunnable(config); } else if (!readPending && !config.isAutoRead()) { ....... clearEpollIn(); } }

executeEpollInReadyRunnable

  1. 判断是否处于活跃状态、是否执行挂起、是否已经关闭
  2. 将这个runnable放到EventLoop的Queue中,准备再次执行
ini
代码解读
复制代码
final void executeEpollInReadyRunnable(ChannelConfig config) { if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) { return; } epollInReadyRunnablePending = true; eventLoop().execute(epollInReadyRunnable); } private final Runnable epollInReadyRunnable = new Runnable() { @Override public void run() { epollInReadyRunnablePending = false; epollInReady(); } };

总结

  1. 经典代码的变量命名是很值得推敲的,变量名不怕长(但也不能太长。。。),就怕词不达意;举例 :方法名executeEpollInReadyRunnable就很能表达当前这个方法的含义,
  2. 在执行特定场景的时候,使用到公共处理类,仍然希望做一些定制的行为变动;此时可以考虑使用委托者模式,类似于EpollRecvByteAllocatorHandle包装了AdaptiveRecvByteBufAllocator$HandleImpl;
  3. 自适应变化:一次扩容16倍,避免频繁扩缩容,缩容2倍,避免缩容太多,导致容量不足
  4. 自适应变化:最好是需要足够的统计数据进行数据支撑,不然自适应容易成为性能瓶颈
  5. 阅读代码时,先梳理轮廓,再看细节,方能头脑清醒
注:本文转载自juejin.cn的用户9055584214805的文章"https://juejin.cn/post/7495703953666195456"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

103
后端
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2024 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top