您好!欢迎访问pg电子!
专注精密制造10载以上
专业点胶阀喷嘴,撞针,精密机械零件加工厂家
联系方式
0988-183618236
您当前的位置: 主页 > 检测设备 >

检测设备

降本增效利器!趣头条Spark RSS最佳实践

更新时间  2023-02-22 00:20 阅读
本文摘要:导读:阿里云 EMR 团队和趣头条的大数据团队配合研发了 RSS,解决 Spark on Yarn 层面提到的所有问题,并为 Spark 跑在 Kubernetes 上提供 Shuffle 基础组件。作者 | 王振华、曹佳清、范振业务场景与现状趣头条是一家依赖大数据的科技公司,在 2018~2019 年履历了业务的高速生长,主 App 和其他创新 App 的日活增加了 10 倍以上,相应的大数据系统也从最初的 100 台机械增加到了 1000 台以上规模。

pg电子入口

导读:阿里云 EMR 团队和趣头条的大数据团队配合研发了 RSS,解决 Spark on Yarn 层面提到的所有问题,并为 Spark 跑在 Kubernetes 上提供 Shuffle 基础组件。作者 | 王振华、曹佳清、范振业务场景与现状趣头条是一家依赖大数据的科技公司,在 2018~2019 年履历了业务的高速生长,主 App 和其他创新 App 的日活增加了 10 倍以上,相应的大数据系统也从最初的 100 台机械增加到了 1000 台以上规模。多个业务线依赖于大数据平台展开业务,大数据系统的高效和稳定成了公司业务生长的基石,在大数据的架构上我们使用了业界成熟的方案,存储构建在 HDFS 上、盘算资源调理依赖 Yarn、表元数据使用 Hive 治理、用 Spark 举行盘算,详细如图 1 所示:图 1 趣头条离线大数据平台架构图其中 Yarn 集群使用了单一大集群的方案,HDFS 使用了联邦的方案,同时基于成本因素,HDFS 和 Yarn 服务在 ECS 上举行了 DataNode 和 NodeManager 的混部。

在趣头条天天有 6W+ 的 Spark 任务跑在 Yarn 集群上,天天新增的 Spark 任务稳定在 100 左右,公司的迅速生长要求需求快速实现,积累了许多治理欠债,种种问题体现出来集群稳定性需要提升,其中 Shuffle 的稳定性越来越成为集群的桎梏,亟需解决。当前大数据平台的挑战与思考近半年大数据平台主要的业务指标是降本增效,一方面业务方希望离线平台天天能够承载更多的作业,另一方面我们自身有降本的需求,如何在降本的前提下支撑更多地业务量对于每个技术人都是很是大地挑战。熟悉 Spark 的同学应该很是清楚,在大规模集群场景下,Spark Shuffle 在实现上有比力大的缺陷,体现在以下的几个方面:Spark Shuffle Fetch 历程存在大量的网络小包,现有的 External Shuffle Service 设计并没有很是细致的处置惩罚这些 RPC 请求,大规模场景下会有许多connection reset 发生,导致 FetchFailed,从而导致 stage 重算。Spark Shuffle Fetch 历程存在大量的随机读,大规模高负载集群条件下,磁盘 IO 负载高、CPU 满载时常发生,极容易发生 FetchFailed,从而导致 stage 重算。

重算历程会放大集群的忙碌水平,抢占机械资源,导致恶性循环严重,SLA 完不成,需要运维人员手动将作业跑在空闲的Label集群。盘算和 Shuffle 历程架构不能拆开,不能把 Shuffle 限定在指定的集群内,不能使用部门 SSD 机械。M*N 次的 shuffle 历程:对于 10K mapper、5K reducer 级此外作业,基本跑不完。NodeManager 和 Spark Shuffle Service 是同一历程,Shuffle 历程太重,经常导致 NodeManager 重启,从而影响 Yarn 调理稳定性。

以上的这些问题对于 Spark 研发同学是很是痛苦的,很多多少作业天天运行时长方差会很是大,而且总有一些无法完成的作业,要么业务举行拆分,要么跑到独占的 Yarn 集群中。除了现有面临的挑战之外,我们也在努力构建下一代基础架构设施,随着云原生 Kubernetes 观点越来越火,Spark 社区也提供了 Spark on Kubernetes 版本,相比力于 Yarn 来说,Kubernetes 能够更好的使用云原生的弹性,提供越发富厚的运维、部署、隔离等特性。可是 Spark on Kubernetes 现在还存在许多问题没有解决,包罗容器内的 Shuffle 方式、动态资源调理、调理性能有限等等。我们针对 Kubernetes 在趣头条的落地,主要有以下几个方面的需求:实时集群、OLAP 集群和 Spark 集群之前都是相互独立的,怎样能够将这些资源形成统一大数据资源池。

通过 Kubernetes 的天生隔离特性,更好的实现离线业务与实时业务混部,到达降本增效目的。公司的在线业务都运行在 Kubernetes 集群中,如何使用在线业务和大数据业务的差别特点举行错峰调理,告竣 ECS 的总资源量最少。

希望能够基于 Kubernetes 来包容在线服务、大数据、AI 等基础架构,做到运维体系统一化。因为趣头条的大数据业务现在全都部署在阿里云上,阿里云 EMR 团队和趣头条的大数据团队举行了深入技术共创,配合研发了 Remote Shuffle Service(以下简称 RSS),旨在解决 Spark on Yarn 层面提到的所有问题,并为 Spark 跑在 Kubernetes 上提供 Shuffle 基础组件。Remote Shuffle Service 设计与实现Remote Shuffle Service 的配景早在 2019 年头我们就关注到了社区已经有相应的讨论,如 SPARK-25299。

该 Issue 主要希望解决的问题是在云原生情况下,Spark 需要将 Shuffle 数据写出到远程的服务中。可是我们经由调研后发现 Spark 3.0(之前的 master 分支)只支持了部门的接口,而没有对应的实现。

该接口主要希望在现有的 Shuffle 代码框架下,将数据写到远程服务中。如果基于这种方式实现,好比直接将 Shuffle 以流的方式写入到 HDFS 或者 Alluxio 等高速内存系统,会有相当大的性能开销,趣头条也做了一些相应的事情,并举行了部门的 Poc,性能与原版 Spark Shuffle 实现相差特别多,最差性能可下降 3 倍以上。同时我们也调研了一部门其他公司的实现方案,例如 Facebook 的 Riffle 方案以及 LinkedIn 开源的 Magnet,这些实现方案是首先将 Shuffle 文件写到当地,然后在举行 Merge 或者 Upload 到远程的服务上,这和后续我们的Kubernetes架构是不兼容的,因为 Kubernetes 场景下,当地磁盘 Hostpath 或者 LocalPV 并不是一个必选项,而且也会存在隔离和权限的问题。基于上述配景,我们与阿里云 EMR 团队配合开发了 Remote Shuffle Service。

RSS 可以提供以下的能力,完美的解决了 Spark Shuffle 面临的技术挑战,为我们集群的稳定性和容器化的落地提供了强有力的保证,主要体现在以下几个方面:高性能服务器的设计思路,差别于 Spark 原有 Shuffle Service,RPC 更轻量、通用和稳定。两副本机制,能够保证的 Shuffle fetch 极小概率(低于 0.01%)失败。合并 shuffle 文件,从 M*N 次 shuffle 酿成 N 次 shuffle,顺序读 HDD 磁盘会显著提升 shuffle heavy 作业性能。淘汰 Executor 盘算时内存压力,制止 map 历程中 Shuffle Spill。

盘算与存储分散架构,可以将 Shuffle Service 部署到特殊硬件情况中,例如 SSD 机械,可以保证 SLA 极高的作业。完美解决 Spark on Kubernetes 方案中对于当地磁盘的依赖。Remote Shuffle Service 的实现整体设计Spark RSS 架构包罗三个角色:Master、Worker、Client。

Master 和 Worker 组成服务端,Client 以不侵入的方式集成到 Spark ShuffleManager 里(RssShuffleManager 实现了 ShuffleManager 接口)。Master 的主要职责是资源分配与状态治理。Worker 的主要职责是处置惩罚和存储 Shuffle 数据。Client 的主要职责是缓存和推送 Shuffle 数据。

pg电子

整体流程如下所示(其中 ResourceManager 和 MetaService 是 Master 的组件),如图 2。图 2 RSS 整体架构图实现流程下面重点来讲一下实现的流程:RSS 接纳 Push Style 的 shuffle 模式,每个 Mapper 持有一个按 Partition 分界的缓存区,Shuffle 数据首先写入缓存区,每当某个 Partition 的缓存满了即触发 PushData。Driver 先和 Master 发生 StageStart 的请求,Master 接受到该 RPC 后,会分配对应的 Worker Partition 并返回给 Driver,Shuffle Client 获得这些元信息后,举行后续的推送数据。

Client 开始向主副本推送数据。主副本 Worker 收到请求后,把数据缓存到当地内存,同时把该请求以 Pipeline 的方式转发给从副本,从而实现了 2 副本机制。为了不阻塞 PushData 的请求,Worker 收到 PushData 请求后会以纯异步的方式交由专有的线程池异步处置惩罚。凭据该 Data 所属的 Partition 拷贝到事先分配的 buffer 里,若 buffer 满了则触发 flush。

RSS 支持多种存储后端,包罗 DFS 和 Local。若后端是 DFS,则主从副本只有一方会 flush,依靠 DFS 的双副本保证容错;若后端是 Local,则主从双方都市 flush。在所有的 Mapper 都竣事后,Driver 会触发 StageEnd 请求。

Master 吸收到该 RPC 后,会向所有 Worker 发送 CommitFiles 请求,Worker 收到后把属于该 Stage buffer 里的数据 flush 到存储层,close 文件,并释放 buffer。Master 收到所有响应后,记载每个 partition 对应的文件列表。若 CommitFiles 请求失败,则 Master 标志此 Stage 为 DataLost。

在 Reduce 阶段,reduce task 首先向 Master 请求该 Partition 对应的文件列表,若返回码是 DataLost,则触发 Stage 重算或直接 abort 作业。若返回正常,则直接读取文件数据。

总体来讲,RSS 的设计要点总结为 3 个层面:接纳 PushStyle 的方式做 shuffle,制止了当地存储,从而适应了盘算存储分散架构。根据 reduce 做聚合,制止了小文件随机读写和小数据量网络请求。做了 2 副本,提高了系统稳定性。

容错对于 RSS 系统,容错性是至关重要的,我们分为以下几个维度来实现:PushData 失败当 PushData 失败次数(Worker 挂了,网络忙碌,CPU忙碌等)凌驾 MaxRetry 后,Client 会给 Master 发消息请求新的 Partition Location,今后本 Client 都市使用新的 Location 地址,该阶段称为 Revive。若 Revive 是因为 Client 端而非 Worker 的问题导致,则会发生同一个 Partition 数据漫衍在差别 Worker 上的情况,Master 的 Meta 组件会正确处置惩罚这种情形。

若发生 WorkerLost,则会导致大量 PushData 同时失败,此时会有大量同一 Partition 的 Revive 请求打到 Master。为了制止给同一个 Partition 分配过多的 Location,Master 保证仅有一个 Revive 请求真正获得处置惩罚,其余的请求塞到 pending queue 里,待 Revive 处置惩罚竣事后返回同一个 Location。

Worker 宕机当发生 WorkerLost 时,对于该 Worker 上的副本数据,Master 向其 peer 发送 CommitFile 的请求,然后清理 peer 上的 buffer。若 Commit Files 失败,则记载该 Stage 为 DataLost;若乐成,则后续的 PushData 通过 Revive 机制重新申请 Location。数据去重Speculation task 和 task 重算会导致数据重复。

解决措施是每个 PushData的数据片里编码了所属的 mapId、attemptId 和 batchId,而且 Master 为每个 map task 记载乐成 commit 的 attemtpId。read 端通过 attemptId 过滤差别的 attempt 数据,并通过 batchId 过滤同一个 attempt 的重复数据。

多副本RSS 现在支持 DFS 和 Local 两种存储后端。在 DFS 模式下,ReadPartition 失败会直接导致 Stage 重算或 abort job。

在 Local 模式,ReadPartition 失败会触发从 peer location 读,若主从都失败则触发 Stage 重算或 abort job。高可用大家可以看到 RSS 的设计中 Master 是一个单点,虽然 Master 的负载很小,不会轻易地挂掉,可是这对于线上稳定性来说无疑是一个风险点。在项目的最初上线阶段,我们希望可以通过 SubCluster 的方式举行 workaround,即通过部署多套 RSS 来承载差别的业务,这样纵然 RSS Master 宕机,也只会影响有限的一部门业务。可是随着系统的深入使用,我们决议直面问题,引进高可用 Master。

主要的实现如下:首先,Master 现在的元数据比力多,我们可以将一部门与 ApplD+ShuffleId 自己相关的元数据下沉到 Driver 的 ShuffleManager 中,由于元数据并不会许多,Driver 增加的内存开销很是有限。另外,关于全局负载平衡的元数据和调理相关的元数据,我们使用 Raft 实现了 Master 组件的高可用,这样我们通过部署 3 或 5 台 Master,真正的实现了大规模可扩展的需求。实际效果与分析性能与稳定性团队针对 TeraSort、TPC-DS 以及大量的内部作业举行了测试,在 Reduce 阶段淘汰了随机读的开销,任务的稳定性和性能都有了大幅度提升。

图 3 是 TeraSort 的 benchmark,以 10T Terasort 为例,Shuffle 量压缩后约莫 5.6T。可以看出该量级的作业在 RSS 场景下,由于 Shuffle read 变为顺序读,性能会有大幅提升。图 3 TeraSort 性能测试(RSS 性能更好)图 4 是一个线上实际脱敏后的 Shuffle heavy 大作业,之前在混部集群中很小概率可以跑完,天天任务 SLA 不能定时告竣,分析原因主要是由于大量的 FetchFailed 导致 stage 举行重算。

使用 RSS 之后天天可以稳定的跑完,2.1T 的 shuffle 也不会泛起任何 FetchFailed 的场景。在更大的数据集性能和SLA体现都更为显著。图 4 实际业务的作业 stage 图(使用 RSS 保障稳定性和性能)业务效果在大数据团队和阿里云 EMR 团队的配合努力下,经由近半年的上线、运营 RSS,以及和业务部门的长时间测试,业务价值主要体现在以下方面:降本增效效果显着,在集群规模小幅下降的基础上,支撑了更多的盘算任务,TCO 成本下降 20%。SLA 显著提升,大规模 Spark Shuffle 任务从跑不完到能跑完,我们能够将差别 SLA 级别作业合并到同一集群,减小集群节点数量,到达统一治理,缩小成本的目的。

原本业务方有一部门 SLA比 较高的作业在一个独占的 Yarn 集群 B 中运行,由于主 Yarn 集群 A 的负载很是高,如果跑到集群 A 中,会经常的挂掉。使用 RSS 之后可以放心的将作业跑到主集群 A 中,从而释放掉独占 Yarn 集群 B。作业执行效率显著提升,跑的慢→跑的快。

pg电子

我们比力了几个典型的 Shuffle heavy 作业,一个重要的业务线作业原本需要 3 小时,RSS 版本需要 1.6 小时。抽取线上 5~10 个作业,大作业的性能提升相当显着,差别作业平均下来有 30% 以上的性能提升,纵然是 shuffle 量不大的作业,由于比力稳定不需要 stage 重算,恒久运行平均时间也会淘汰 10%-20%。

架构灵活性显著提升,升级为盘算与存储分散架构。Spark 在容器中运行的历程中,将 RSS 作为基础组件,可以使得 Spark 容器化能够大规模的落地,为离线在线统一资源、统一调理打下了基础。未来展望趣头条大数据平台和阿里云 EMR 团队后续会继续保持深入共创,将探索更多的偏向。

主要有以下的一些思路:RSS 存储能力优化,包罗将云的工具存储作为存储后端。RSS 多引擎支持,例如 MapReduce、Tez 等,提升历史任务执行效率。加速大数据容器化落地,配合 RSS 能力,解决 K8s 调理器性能、调理计谋等一系列挑战。连续优化成本,配合 EMR 的弹性伸缩功效,一方面 Spark 可以使用更多的阿里云 ECS/ECI 抢占式实例来进一步压缩成本,另一方面将已有机械包罗阿里云 ACK、ECI 等资源形成统一大池子,将大数据的盘算组件和在线业务举行错峰调理以及混部。


本文关键词:降本,增效,利器,趣,头条,Spark,RSS,最佳,pg电子,实践

本文来源:pg电子-www.jukecm.com