首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

B站推荐模型数据流的一致性架构

  • 25-04-18 15:40
  • 2533
  • 10189
juejin.cn

01背景

推荐系统的模型,通过学习用户历史行为来达到个性化精准推荐的目的,因此模型训练依赖的样本数据,需要包括用户特征、服务端推荐的视频特征,以及用户在推荐视频上是否有一系列的消费行为。

推荐模型数据流,即为推荐模型提供带特征和优化目标的训练样本,包括两个模块,一是Label Join模块,负责用户行为的采集。二是feature extract模块,从原始日志中抽取特征,并基于用户行为计算模型优化的目标label。

在B站早期的推荐模型数据流架构中,如下图所示,采样两阶段特征补齐设计。Label Join模块除了完成用户行为的采集,还需要查询实时特征,补齐训练样本依赖的部分原始特征数据,一般是秒级更新的实时特征,存储在Redis中。而Feature Extract模块在计算样本之前,补齐另外一部分原始特征数据,一般是批量更新的特征数据,存储在KFC中(B站自研的KV系统)

图片

02问题分析

上述的推荐模型数据流架构,存在“不一致”问题,包括“数据不一致”和“计算不一致”

2.1 数据不一致

在这种自反馈系统中,推理输入的特征版本和训练输入的特征版本,如果有差异的话,会影响模型的准确性。数据不一致有3个原因:

  • 访问时间差异。推理服务和Label Join/Feature Extract模块访问同一个特征的时间是不一样的。特别是秒级更新的实时特征,这种数据差异会被放的更大

  • 新稿件问题。在线推理服务有稿件服务可获取新稿件的特征。而离线没有这个服务,所以离线缺少新稿件的特征,导致新稿件的推荐不准确

  • 特征穿越问题。LabelJoin有N分钟的固定时间窗,所以Feature Extract在查询批量特征的时候,可能批量特征版本已经更新,查到的是最新版本特征,从而导致特征穿越。一般需要有经验的算法工程师,在离线批量特征更新上加上时间延迟,规避穿越问题。但这种规避可能会进一步加剧数据不一致

2.2 计算不一致

这里“计算”指的是从原始数据,生成特征的过程。特征可以用于在线推理和离线训练。特征计算有3个地方:

  • 在线推理,一个c++的服务。特征计算使用c++实现
  • Online Feature Extract,实时的样本计算,基于FLINK,特征计算使用java实现
  • Offline Feature Extract,离线样本回溯,基于Spark,特征计算使用python实现

此外这3个地方的数据格式也是不一样的,不是简单的语言之间的转换。需要用户开发3种特征计算逻辑,并且要很小心的对齐

03.  一致性架构

为了解决上述的“数据不一致”和“计算不一致”,我们将B站推荐模型数据流升级成一致性架构:

  • 数据一致性:将在线推理的原始特征现场snapshot,dump到近线。近线基于snapshot做Label Join和Feature Extract。因为在离线用的同一份数据,可保证数据完全一致

  • 计算一致性:特征计算逻辑算子化,基于tenforflow实现一个c++ lib。推理服务直接调用特征抽取lib,离线Feature Extract通过java JNI调用c++ lib。特征计算都基于同一个c++ lib,用户只需要定义一次,可保证计算完全一致

3.1 整体架构

-推荐服务,将推理的原始特征现场,打包成snapshot,通过数据集成工具,从在线服务同步到近线Kafka

-在近线,基于flink latency join功能,实现Label Join,为每条请求join用户在稿件上的各种行为label,e.g. 点击、播放、后验等

-Label Join任务输出Shitu,同时写到kafka和hive

-在近线,基于flink实现实时样本计算,消费kafka Shitu,产出训练样本,写到kafka,再接入实时训练。产出模型用于线上推理

-在离线,基于flink/spark实现的批量样本计算,从hive表load Shitu,产出样本写到hive。训练任务依赖hive表作批量的模型训练

图片

3.2 Label Join

Label Join,使用flink letency join(延时拼接)功能,完成每个请求稿件上的用户行为采集。用户行为作为训练样本的label,即模型的优化目标。

flink letency join基于时间驱动,以一个固定时间窗做数据下发。e.g.固定时间窗是N分钟,即每条请求数据到flink任务后,等待N分钟后输出数据

在最新的一致性数据流上,我们在Label Join上增加了事件驱动,增加数据下发的时效性,将数据流的时效性提升60%

  • 定义下发事件:原则上按用户不会再看到视频作下发规则。对用户连续请求,按时间排序。如果最新的请求是清屏请求,那这次请求前的所有请求都可以下发。如果是普通请求,那这次请求往前第5个请求可以下发

  • 采用事件驱动+时间驱动的数据下发方式,优先事件驱动下发,没有被事件触发的请求,走时间驱动,超时下发

图片

3.3 样本计算

样本计算是基于Label Join产出的Shitu数据,计算训练样本,分两种模式:

-online extract:实时的样本计算,flink streaming计算引擎,读kafka写kafka

-offline extract:离线的批量样本计算,支持flink batch/spark batch两种计算引擎。offline extract支持两种样本计算模式:1) 无新增特征的样本计算,直接读Shitu hive table产也训练样本。2) 有新增特征的样本计算,用户挖掘的新特征,不在Shitu里。训练样本依赖Shitu和新挖掘特征

目前一致性的样本计算框架支持两种模型:

(1)直接计算:一般用于精排模型。整个样本计算过程抽象成几个算子:

-selector:数据筛选。过滤请求或者稿件

-calculate label : 通过用户行为label,计算每个视频的train label

-刷内item采样:在一刷请求内,对稿件进行采样,e.g. 按正负例

-pyfe:调用fealib,生成模型特征

每个算子,都可以支持算法同学根据业务需求自定义

图片

(2)有外部采样的样本计算:一般在召回模型上使用

-calculate label : 通过用户行为label,计算每个视频的train label

-外接一个采样稿件候选池,根据稿件的train label,进行采样。采样逻辑按算法需求可定制

-从KFC查询采样稿件的特征,并组装一条完整的snapshot

-pyfe:调用fealib,生成模型特征

图片

3.4 BackFill

BackFill特征回填,指的是算法同学调研新特征在模型上的收益,流程如下:

  • 对于NoDelta模式,直接读Base Shitu,生成全量的训练样本
  • 对HasDelta模式,用户挖掘一批新增的特征(delta snapshot)
  • 基线Shitu join delta snapshot,生成一份新Shitu
  • 基于新Shitu,作全量的特征计算,生成全量训练样本
  • 模型训练样本并评估auc,效果不符合预期重新设计数据和特征

同时我们提供了一套python sdk,支持用户在镜像或者jupyter上自己订制特征回填特征的逻辑和流程

图片

3.5 基于protobuf wireformat的partial decode优化

对于在线推理现场snapshot,采用了protobuf组织数据,包含了模型特征需要所有原始数据,单条数据超过250KB,有上千个字段。在样本计算阶段,对snapshot有两个处理逻辑:

调用protobuf ParseFrom接口,将snapshot bytes 反序列化成Message,平均耗时7~8ms

将snapshot所有稿件类的特征做裁剪:一刷请求n个稿件,其中m个稿件参与训练,平均耗时5~6ms

通过性能分析,样本计算中有50%的时间消耗在上述snapshot protobuf解析和处理上。但实际样本计算相关逻辑上,并不需要所有snapshot字段,所以我们使用protobuf wireformat,对snapshot做partial decode,只解析需要的field。最终将snapshot处理的性能从14ms优化到1.5ms,样本计算的cpu资源降低了30%+

04 未来工作

4.1 基于Iceberg批流一体的训练样本计算框架

如3.1章节的数据流架构中,通过FLINK实时计算产出的训练样本,会同时写到Kafka和Hive表,分别用于实时训练和批量训练。同时离线回溯也可以产出训练样本写到Hive表。这种架构存在两个问题:

(1) 需要额外的FLINK资源,把Kafka中的样本备份到Hive表中,即一个实验样本流,需要搭建两个FLINK任务

(2) 实时样本和离线样本,输入输出的介质不同,框架层面需要适配。下游训练模块也需要适配不同的样本源,无法做到批流一体

未来我们计划引入iceberg实现样本计算框架的批流一体,解决上述问题。‌Apache Iceberg‌ 是一种用于大型分析表的高性能格式,旨在解决数据存储和计算引擎之间的适配问题,其核心特性之一是支持同时处理流数据和批数据,提供统一的读写接口

框架如下图所示:

  • Label Join产出数据,实时写到iceberg Shitu表

  • 样本计算框架,从iceberg Shitu读数据,可以实时计算或批量计算,产出数据写到iceberg样本表

  • 训练框架读iceberg样本表,可online training或者batch training

图片

4.2 基于Iceberg MOR的增量特征回填优化

如3.4节的BackFill功能,将全量Shitu和Delta Snapshot拼接之后,再进行样本计算。这个逻辑存在2个问题,一是Shitu数据量比较多,拼接效率低。二是每次都需要全量计算所有特征,性能开销大。当然可以做增量特征计算,在和基线样本拼接。但样本数据量比较大,Hive表拼接性能较差,在某些情况下,可能比全量计算特征慢。

为此我们计划在4.1工作基础上,利用iceberg的MOR技术,优化BackFill的性能:

  • 维护一份基线样本的iceberg表

  • 在基线样本iceberg表新建一个branch,增加新特征列

  • 基于Shitu和delta snapshot,只做增量特征计算,并将增量特征写到新特征列。这一步只计算增量特征,不需要join,可极大提升性能

  • 训练模块读样本表,利用Iceberg MOR的能力,读基线特征+增量特征,再merge成完整的特征列表,完成训练

-End-

作者丨lixiaowei、正鼎

注:本文转载自juejin.cn的哔哩哔哩技术的文章"https://juejin.cn/post/7452892770288025609"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

143
阅读
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2024 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top