分布式机器学习平台架构设计

图片

文章作者:赵喜生 机器学习平台架构师

导读:用户数据大规模积累、用户体验需求升级、算力革新和计算模型的演进,这三大核心要素及其相互作用成为现在和未来基于网络互联用户活动的主要组成部分。机器学习平台作为使数据、计算和用户体验三者相互作用的关键基础设施发挥着作用。

▶ 数据

用户在 APP 上每个点击,划过,从一个 APP 切换到另一 APP,在每个页面的停留,甚至浏览的注意力和速度;屏幕、电池、GPS 定位、运动传感器......等设备的运行数据。人们的各种数据无时不刻的被记录、存储和计算。

数据的用途也从事务交易、统计分析到体验优化进行了一个完整的演进迭代。

▶ 算力&计算模型

从单核 CPU 到 HPC,再到基于微内核的 GPU,外存和内存容量和速度的不断增长,万兆网络、InfiniBand、RDMA 网络技术的发展和应用。我们走过的短短数十年中,算力的革新已经是有很大的变化,同时适配于计算硬件的新的计算模型的发展:从分布式的 RPC,到基于低 I/O 成本的 MapReduce,发展到 GPU 的矩阵运算等,计算模型也在影响着软件开发的变革。

图片

▶ 用户体验

最开始用户是在基于命令行/桌面进行指令性交互,随后门户时代是基于类目组织的信息浏览,搜索引擎给我们带来的是用户主动信息检索体验,而今各种 Feed 大行其道。用户在“懒”的道路上一路向前,用户参与信息交互更广、更深,信息获取更加简单高效。

用户、数据和计算正在以前所未有的程度参与并加速重构人类生活方式。

01 全栈机器学习平台

一个良好的机器学习平台产品需要满足整个数据科学的完整生命周期的各个环节。在产品设计方面需要考虑到:

▶ 全栈用户体验

用户数据科学的所有活动需要在一个产品中无缝且无差别体验地完成,这是解放数据科学家生产力,服务数据科学家聚焦于其核心工作的基础

▶ 工程能力和数据科学解耦

完成一个完整的数据接入、特征处理、模型训练优化及模型上线服务的整体工作,尤其在面向大规模/超大规模数据和模型的情况下,数据存储格式,多机多卡协同计算,资源调度管理,模型弹性伸缩等工程化工作是必不可少的。让用户对无感知或者提供易用的自定义完成对复杂工程的应用

▶ 开放性

通常一个团队中的数据科学具有不同的技术背景,不同的计算框架有其特定,通用的机器学习平台面向的场景和任务具有多样性;平台要对变化开发,以高效的方式支持不同用户的需求

图片

02 关键架构因素

构建机器学习平台需要的技术包括了存储、通信、计算、分布式、资源管理等计算机体系中的很多技术,在项目周期及成本条件下设计一个面向业务且具有生命力的机器学习平台架构,需要考虑诸多因素:

▶ 遗留系统集成

通常在组织中已经存在数据存储、资源调度和任务调度系统,架构设计需要拥抱这些遗留系统。对遗留系统良好的集成不仅可以提升用户体验,降低系统复杂程度,还可以缩短实现周期。

▶ 数据和模型规模

数据量、模型的复杂程度和规模以及用户对模型训练周期的容忍度决定了技术选型和核心能力的设计方式。

*▶ 资源管理调度

依托于 Kubernetes 或者 YARN 设计资源队列,进行异构资源管理调度,确保资源的合理高效利用。

▶ Pipeline

一方面对数据接入、特征工程、训练及上线等环节通过 Pipeline 组织,驱动数据在不同计算组件中运转起来,合理利用内存共享,避免低效 I/O,从而系统性降低数据处理周期。

▶ Serving 系统

Serving 是一个相对独立的子系统,需要考虑 E2E 预测延时,弹性伸缩能力,模型版本管理,监控,A/B 分流等能力,一般 Serving 都是无状态的,适合用微服务的架构来实现。

▶ 数据存储和表示

在数据源,特征工程及模型训练阶段数据可能以不同形式存储在不同的介质上;同时数据有 Dense 或者 Sparse 不同特点,以及数据可能存储在单机或者分布式多节点上。需要考虑数据存储格式,存储介质,分布式表示方式。

▶ 并行化

并行化不仅仅存在于训练阶段,同样存在于数据访问,特征处理甚至在 Serving 阶段。

▶ 计算加速

除了基于 GPU 的 CUDA 加速,我们同样可以在 Intel CPU 上用合适的计算库来实现加速。

▶ 网络通信

尤其在训练复杂网络的情况下,通信可能是主要的性能瓶颈;合理的网络拓扑和连接方式,以及采用高效的通信协议会降低计算等待。

03 架构设计

此处我们以面向:存储在 HDFS 上 TB 级用户和物品数据训练面向 CTR 场景的 Embedding+MLP 结构的模型,提供在线低延时且可弹性伸缩的排序服务,这样一个具有通用场景展开机器学习平台的架构设计。

▶ Hadoop 集成

数据的传输和存储成本很高,利用已有 Hadoop 体系的完成机器学习平台中合适的组件将有效降存储和传输成本,同时也在一定程度上让平台变的轻量。利用 Hadoop Kerberos 安全机制可以将数据和资源调度基于租户隔离开。

在计算方面可以用 Data Locality 特性将计算和数据调度到同一结点上,业界的 TensorFlowOnSparkAngel 都用这个思路实现 Hadoop 体系和机器学习的结合。简单来说该方案是:

图片

  • 通过 YARN 调度提交一个 Spark 作业
  • 在 Spark 中调用 foreachPartition,这样每个结点上将会在内存中持有 RDD 的一个分区
  • 在每个 Worker 中通过 JNI 调用或者本地进程的方式调用 Tensorflow 或者 Torch 进行模型计算

这样相当于 Spark 变成 Container 进行计算资源的调度,而充分利用了已有的大数据体系的数据和计算特性。

▶ 稀疏特征压缩&向量/矩阵分布式表示

在分布式机器学习计算中对分布式向量和矩阵的分布式表示以及对稀疏特征的压缩处理可以提高数据的并行计算度和存储压缩比,Spark 中会采用 RowMatrix, CoordinateMatrix 和 BlockMatrix 等存储来对不同形式的数据类型进行矩阵进表示。

业界也有很多标准的稀疏矩阵和分布式矩阵表示方法。

图片

▶ 特征处理 Pipeline

在特征处理过程中,需要对离散和连续的特征进行 分桶标准化Onehot 编码等特征处理。Berkeley Data Analytics Stack 中的 Spark 是非常适用于特征处理,采用 Spark 进行特征处理可以帮我们解决三个主要的问题:

1. 内存计算,加速特征处理

由于特征处理过程中大部分情况不需要进行数据 Shuffle,这样我们可以很好的利用 Spark 内存计算,将多个特征处理用 Spark ML Pipeline 串联起来,这样可以在分布式的将多个特征处理环节在一个 Job 中完成。

图片

2. Feature Map 一致性

Pipeline 中的每个算子都是一个 Estimator,我们在 fit 阶段计算特征的 Feature Map(例如 Onehot 编码中离散值对应 Index),然后在模型 save 阶段对 Feature Map 进行序列化存储,在 transform 阶段对特征进行编码。fit 时可以尽可能加载全量数据,这样在特征处理时就降低没有出现过样本值出现的机率,对于 Feature Map 的更新可以根据实时性要求进行 Stream 更新或者 Batch 全量更新。

特征处理阶段生成的 Feature Map 也会在 Oneline 阶段被加载。

3. 特征格式



在进行模型训练阶段,可能 CSR, TFRecordn 或者其他数据格式,可以实现自定义数据类型,让 Spark 来读写自定义的数据格式。

首先实现自定义 FileReaderOutputWriter

在自定义数据源中用自定义 FileReaderOutputWriter 实现 readwrite

def write(kryo: Kryo, out: Output): Unit = {}def read(kryo: Kryo, in: Input): Unit = {}

首先在 org.apache.spark.sql.sources.DataSourceRegister 中添注册自定义数据源类型

然后就可以像处理 CSV 格式数据一样 spark.read.format("csv"),处理自定义数据格式

▶ 并行计算

在大规模数据和模型训练的场景中,由于训练样本规模大或者网络参数大,通常单个节点不能完成对模型的训练,这时就需要多节点协同的方式完成模型的训练,采用分而治之的思想。一般会有数据并行和模型并行两种思路来实现任务的分解和并行训练。

1. 数据并行

为不同的计算节点保留同一个模型的副本,每个节点分配到不同的数据,每个节点在本地计算持有数据的模型参数,然后将所有计算节点的计算结果按照某种方式合并生成最终的模型。

在这个过程中数据拆分的方式可以是随机的方式,也可以采用 shuffle 机制保证数据样本的均衡;而参数同步和合并方式主流的有 Parameter ServerRing All-Reduce 方式。

图片

Parameter Server:

在 Parameter Server 架构中,集群中的节点被分为两类:Parameter Server 和 Worker。其中 Parameter Server 存放模型的参数,而 Worker 负责计算参数的梯度。在每个迭代过程,Worker 从 Parameter Sever 中获得参数,然后将计算的梯度返回给 Parameter Server,Parameter Server 聚合从 Worker 传回的梯度,然后更新参数,并将新的参数广播给 Worker。

其中参数在 Parameter Server 和 Worker 之间的同步既可以是同步的 ( Synchronous ),也可以是异步的 ( Asynchronous )。

图片

Ring AllReduce:

在 Ring-Allreduce 架构中,各个节点都是 Worker,没有中心节点来聚合所有 Worker 计算的梯度。在一个迭代过程,每个 Worker 完成自己的 mini-batch 训练,计算出梯度,并将梯度传递给环中的下一个 Worker,同时它也接收从上一个 Worker 的梯度。对于一个包含 N 个 Worker 的环,各个 Worker 需要收到其它 N-1 个 worker 的梯度后就可以更新模型参数。

图片

2. 模型并行

如果训练模型的规模很大,不能在每个计算节点的本地内存中完全存储,那么就可以对模型进行划分,然后每个计算节点负责对本地局部模型的参数进行更新。通常对线性可分的模型和非线性模型(神经网络),模型并行的方法也会有所不同。

线性模型:

把模型和数据按照特征维度进行划分,分配到不同的计算节点上,在每个计算节点上采用梯度下降优化算法进行优化,局部的模型参数计算不依赖于其他维度的特征,相对独立,那么就不需要与其他节点进行参数交换。

在 Angel 的实现中就主要使用模型并行的方法使用 ModelPartitioner 实现模型分布式计算。

图片

神经网络:

神经网络中模型具有很强的非线性性,参数之间有较强的关联依赖,通常可以横向按层划分或纵向跨层划分进行网络划分。每个计算节点计算局部参数然后通过 RPC 将参数传递到其他节点上进行参数的合并,复杂的神经网络需要较高的网络带宽来完成节点之间的通信。

▶ CTR 模型并行训练

通常典型深度 CTR 模型是 Embedding Layer + MLP 结构;对于 10 亿特征,Embedding Size 为 16 的 CTR 模型来说 Embedding 模型的大小为 10^9 * 16 * 4B ≈ 60GB,而 MLP 的大小只有几个 MB。训练 Embedding 网络的效率将会是 CTR 模型训练的瓶颈所在。如果在网络中传输 Embedding 模型参数,整个时延和成本将是不可接受的。如何解决模型的存储及减少网路传输是关键,华为 Mindspore 和 NVIDIAHugeCTR 分别给出 Host-Device 和 Embedding Hashtable 的方案。

Host-Device:

图片

根据模型大小,选择将模型放在 Device 或者 Host 内存中,Device 计算梯度后更新 Embedding 模型,如果模型在 Host 内存中,只需要在 Device 和 Host 之间进行内存拷贝,这个速度是远远大于网络传输的。

Embedding Hash Table:

图片

HugeCTR 介绍中详细描述其设计方案:通过 open addressing hash 算法将所有的特征平均地分在所有 Device 上,每个 Device 内存中存储 Embedding 的一部分,通过实现 reduce_scatter 算子实现模型传输,all_gather 进行模型合并。

▶ 在线 Serving

在实际场景中,业务对模型的要求是高复杂,低延时,大批量,高吞吐量和弹性伸缩。

对于高吞吐量和弹性伸缩的处理我们可以交给 kubernetes 来实现,在容器基础设施具备的前提下并不会成为瓶颈,而对于复杂模型在大 batch 的请求和低延时的矛盾中还是需要通过设计和优化来实现。

而在更进一步的模型在线优化中可以考虑通过模型量化和硬件加速的策略提升模型 Serving 性能。

合理使用缓存和多级缓存结合的设计方法将有效降低高延时请求处理,从而提升系统的整体延时表现。缓存的设计主要考虑缓存介质的成本、容量、读写速度以及缓存更新策略 。

参考资料:

Hidden Technical Debt in Machine Learning Systems https://papers.nips.cc/paper/2015/file/86df7dcfd896fcaf2674f757a2463eba-Paper.pdf

Data locality in Hadoop: The Most Comprehensive Guide

https://data-flair.training/blogs/data-locality-in-hadoop-mapreduce

TensorFlowOnSpark

https://github.com/yahoo/TensorFlowOnSpark

Angel

https://github.com/Angel-ML/angel

Mindspore

https://zhuanlan.zhihu.com/p/164683221

HugeCTR

https://www.nvidia.cn/content/dam/en-zz/zh_cn/assets/webinars/nov19/HugeCTR_Webinar_1.pdf

Spark Data Types - RDD-based API

https://spark.apache.org/docs/latest/mllib-data-types.html

Spark ML Pipelines

https://spark.apache.org/docs/latest/ml-pipeline.html

PERFORMANCE COMPARISON OF STORAGE FORMATS FOR SPARSE MATRICES

http://facta.junis.ni.ac.rs/mai/mai24/fumi-24_39_51.pdf

BDAS, the Berkeley Data Analytics Stack

https://amplab.cs.berkeley.edu/software


本文地址:https://www.6aiq.com/article/1612618273866
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出