Fork me on GitHub

【干货篇】bilibili:基于 Flink 的机器学习工作流平台在 B 站的应用

分享嘉宾: 张杨,B站资深开发工程师
整理出品: AICUG人工智能社区

导读 : 整个机器学习的过程,从数据上报,到特征计算,到模型训练,再到线上部署,最终效果评估,整个流程非常冗长,在b站,多个团队都会搭建自己的机器学习链路,来完成各自的机器学习需求,工程效率和数据质量都难以保证。我们基于flink社区的aiflow项目,构建了整套机器学习的标准工作流平台,加速机器学习流程构建,提升多个场景的数据实效和准确性。本次分享将介绍b站的机器学习工作流平台ultron在b站多个机器学习场景上的应用。

嘉宾介绍: 张杨,17年入职b站,从事大数据方面工作。

01 机器学习实时化

图片

首先讲下机器学习的实时化,主要是分为三部分:第一是样本的实时化。传统的机器学习,样本全部都是t+1,就是今天模型用的是昨天的训练数据,每天早上使用昨天的全天数据训练一次模型。

第二是特征的实时化,以前特征基本全部也是t+1,这样就会带来一些推荐的问题,很不准,可能今天我看到很多新的视频,但是给我推荐的还是一些我昨天看到或者更久之前的一些东西。

第三就是模型训练的实时化,我们有了样本的实时化和特征的实时化之后,那模型训练也是完全可以做到在线训练实时化的,能带来更实时的推荐效果。

图片

第二部分是传统的离线链路图,首先APP产生日志或者服务端产生log,通过数据管道整个数据会落到HDFS上,然后每天t+1做一些特征生成和模型训练,特征生成会放到特征存储里面,可能是redis或者是一些其他的kv存储,再给到上面的inference给在线服务用。

图片

那它有什么问题呢?第一个是t+1数据模型的特征时效性都很低,很难做到特别高时效性的更新。第二是整个模型训练或者一些特征生产的过程中,每天都要用天级的数据,整个训练或者特征生产的时间非常长,对集群的算力要求非常高。

图片

这是我们进行优化之后整个实时链路的过程,红叉的部分是被去掉的。整个数据上报后通过pipeline直接落到实时的kafka,之后会做一个实时特征的生成,还有实时样本的生成,特征结果的会写到 feature store里面去,样本的生成也需要从feature store里面去读区一些特征。生成完样本之后我们直接进行实时训练。整个右边的一个很长的链路已经去掉了,但是离线特征的部分我们还是保存了。因为一些特殊特征我们还是要做一些离线计算,比如一些特别复杂的不好实时化的或者没有实时化需求的。

02 Flink在b站机器学习的使用

图片

下面讲下我们是怎么做到实时样本、实时特征和实时效果评估的。

第一个是实时样本,flink目前托管b站所有推荐业务样本数据生产流程。

第二个是实时特征,目前相当一部分特征都使用了flink进行实时计算,时效性非常高。有很多特征是使用离线+实时组合的方式得出结果,历史数据用离线算,实时数据用flink,读取特征的时候就行拼接,这两套计算逻辑可能有的时候不能复用,这块我们也在尝试使用flink做批流一体,特征的定义全部用flink来做,根据业务需要,实时算或者离线算,底层的计算引擎全部是flink。

第三是实时效果的一个评估,我们使用了flink+olap来打通整个实时计算+实时分析链路,进行最终的模型效果评估。

图片

这个图是目前实时样本生成的主要的图,是针对整个推荐业务链路的。日志数据落入kafka后,首先我们做一个flink的label-join,把点击和展现进行拼接,结果继续落入kafka,再接一个flink任务进行特征join,特征join会拼接多个特征,有些特征是公域特征,有些是业务方的私域特征。特征的来源比较多样,有离线有实时。特征全部补全之后,就会生成一个instance样本数据落到kafka,给后面的训练模型使用。

图片

这个图是实时特征,这边列的是一个比较复杂的特征的过程,整个计算流程涉及到了5个任务,第一个任务是离线任务,后面有4个flink任务,一系列复杂计算后生成的一个特征落到kafka里面,再写入feature-store,然后被在线预测或者实时训练所用到。

图片

这是效果的一个评估,推荐算法关注的一个非常核心的指标就是ctr点击率,做完label-join之后,就可以算出ctr数据了,除了进行下一步的样本生成之外,同时会导一份到clickhouse里面,报表系统对接后就可以看到非常实时的效果。数据本身会带上实验标签,在clickhouse里面可以根据标签进行实验区分,看出对应的实验效果。

03 机器学习工作流平台构建

图片

机器学习的整个链路里面是有样本生成,特征生成,训练,预测,效果评估,每个部分都要配置开发很多任务,一个模型的上线最终需要横跨多个任务,链路非常长。新的算法同学也很难理解这个复杂链路的全貌,学习成本极高,而且整个链路的改动牵一发而动全身,非常容易出故障。计算层用到多个引擎,批流混用,语义很难保持一致,同样的逻辑要开发两套,保持没有gap也很困难。整个实时化成本门槛也比较高,需要有很强的实时离线能力,很多小的业务团队在没有平台支持下难以完成。

图片

右图是一个模型从数据准备到训练的大概过程,中间涉及到了七八个节点,那我们能不能在一个平台上完成所有的流程操作?我们为什么要用flink?第一是因为我们团队实时计算平台是基于flink来做的,我们也看到了flink在批流一体上的潜力以及在实时模型训练和部署上一些未来发展路径。

图片

Aiflow是阿里的flink生态团队开源的一套机器学习工作流平台,专注于流程和整个机器学习链路的标准化,去年八九月的时候,我们和他们接触后,引入了这样一套系统,一起共建完善,并开始逐渐在b站落地。它把整个机器学习抽象成图上的example、transform 、Train、validation、inference这些过程。在项目架构上非常核心的能力调度就是支持流批混合依赖,元数据层支持模型管理,非常方便的进行模型的迭代更新。我们基于此搭建了我们的机器学习工作流平台。

图片

接下来讲一下平台特性。第一是使用Python定义工作流,因为在ai方向大家用Python还是比较多的,我们也参考了一些外部的,像 Netflix也是使用Python来定义这种机器学习的工作流。

第二是支持批流任务混合依赖。在一个完整链路里面,涉及到的实时离线过程都可以加入到里面,并且批流任务之间可以通过信号就行互相依赖。

第三是支持一键克隆整个实验过程。从原始log到最终整个实验拉起训练这块,我们是希望能够一键整体链路克隆,快速拉起一个全新的实验链路。

第四是一些性能方面的优化,支持资源共享。

第五是支持特征回溯批流一体。很多特征的冷启动需要计算历史很长时间的数据,专门为冷启动写一套离线特征计算逻辑成本非常高,而且很难和实时特征计算结果对齐,我们支持直接在实时链路上来回溯离线特征。

图片

这个是一个基本架构,上面是一个业务,最下面是一个引擎。目前支持的引擎也比较多,flink、spark、Hive、kafka、Hbase、Redis,有计算引擎,也有存储引擎。以aiflow作为中间的工作流程管理,flink作为核心的计算引擎,来设计整个工流平台。

图片

整个工作流是用Python来描述的,在python里面用户只需要定义计算节点和资源节点,以及这些节点之间的依赖关系即可,语法有点像调度框架airflow。

图片

批流的依赖关系主要有4种:流到批,流到流,批到流,批到批,基本可以满足目前我们业务上的所有需求。

图片

资源共享主要是用来做性能方面,因为很多时候一个机器学习链路非常长,比如刚刚那个图里面我经常改动的可能只有五六个节点,那我想重新拉起整个实验流程,把整个图克隆一遍,但中间我只需要改动其中的部分节点或者大部分节点,上游节点是可以做数据共享的。

图片

这个是技术上的实现,克隆之后对共享节点做了一个状态追踪。

图片

这个是实时训练的过程,特征穿越是一个非常常见的问题,多个计算任务的进度不一致就会发生,在工作流平台里面,我们可以定义好各个节点的依赖关系即可么,一旦节点之间发生了依赖,处理进度就会进行同步,通俗来说就是快的等慢的,避免特征穿越。在flink里面我们是使用watermark来定义处理进度。

图片

这个是特征回溯的过程,我们使用实时链路,直接去回溯它历史数据。离线和实时数据毕竟不同,这中间有很多问题需要解决,因此也用到了spark,后面这块我们会改成flink。

图片

特征回溯有一个比较大的问题,就是如何保证数据的顺序性。实时数据有个隐含的语义就是数据是顺序进来的,生产出来立马处理,天然有一定的顺序性。但是离线的HDFS不是,HDFS是有分区的,分区内的数据完全乱序,实际业务里面大量计算过程是依赖时序的,如何解决离线数据的乱序问题。

第二是如何保证特征和样本版本的一致性。这有两条链路,一条是特征的生产,一条是样本生产,样本生产依赖特征生产,如何它们之间版本的一致性,没有穿越?

第三就是如何保证实时链路和回溯链路计算逻辑的一致?这个问题其实没有,我们是直接在实时链路上回溯离线数据。

第四是一些性能方面的问题,怎么快速的算完大量的历史数据。

图片

为了数据的顺序性,我们HDFS的离线数据进行kafka化处理,这里不是把它灌到kafka里面去,而是模拟kafka的数据架构,分区并且分区内有序,我们把HDFS数据也处理成类似的架构,模拟成逻辑上的分区,并且逻辑分区内有序,flink读取的hdfssource也进行了对应的开发支持这种模拟的数据架构。这块的模拟计算目前是使用spark做的,后面我们会改成flink。

第二个问题分为两部分。实时特征部分的解决依赖于hbase存储,Hbase支持根据版本查询。特征计算完后直接按照版本写入hbase,样本生成的时候去查hbase带上对应的版本号即可,这里面的版本通常是数据时间。离线特征部分,因为不需要重新计算了,离线存储hdfs都有,但是不支持点查,这块进行kv化处理就好,为了性能我们做了异步预加载。

图片

异步预加载的过程如图。

04 未来规划

图片

接下来介绍下我们后面规划。一个是数据质量保证。现在整个链路越来越长,可能有10个节点20个节点,那怎么在整个链路出问题的时候快速发现问题点。这里我们是想针对节点集来做dpc,对每个节点我们可以自定义一些数据质量校验规则,数据通过旁路到统一的dqc-center进行规则运算告警。

图片

第二是全链路的exactly once,工作流节点之间如何保证精确一致,这块目前还没有想清楚。

图片

第三是我们会在工作流里面加入模型训练和部署的节点。训练和部署可以是连接到别的平台,也能是flink本身支持的训练模型和部署服务。


本文地址:https://www.6aiq.com/article/1621989530592
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出