Facebook 经常使用分析来进行数据驱动的决策。在过去的几年里,用户和产品都得到了增长,使得我们分析引擎中单个查询的数据量达到了数十TB。我们的一些批处理分析都是基于 Hive 平台(Apache Hive 是 Facebook 在2009年贡献给社区的)和 Corona( Facebook 内部的 MapReduce 实现)进行的。Facebook 还针对包括 Hive 在内的多个内部数据存储,继续增加了其 Presto 的 ANSI-SQL 查询的覆盖范围。Facebook 内部还支持其他类型的分析,如图计算、机器学习(Apache Giraph)和流处理(如 Puma、Swift 和 Stylus)。

尽管 Facebook 提供的服务涵盖了分析领域的广泛领域,但我们仍在不断地与开源社区互动,以分享我们的经验,并向他人学习。Apache Spark 于2009年由加州大学伯克利分校(UC-Berkeley)的 Matei Zaharia 创办,并于2013年贡献给 Apache。它是目前增长最快的数据处理平台之一,因为它能够支持流处理、批处理、命令式(RDD)、声明式(SQL)、图计算和机器学习用例,所有这些都在相同的 API 和底层计算引擎中。Spark 可以有效地利用大量内存,跨整个管道(pipelines)优化代码,并跨任务(tasks)重用 jvm 以获得更好的性能。Facebook 认为 Spark 已经成熟到可以在许多批处理用例中与 Hive 进行比较的地步。在本文的后面部分,将介绍 Facebook 使用 Spark 替代 Hive 的经验和教训。

用例:为实体排序(entity ranking)做特性准备

实时实体排名在 Facebook 有着多种使用场景。对于一些在线服务平台,原始的特性值是使用 Hive 离线生成的,并将生成的数据加载到这些实时关联查询系统中。这些 Hive 作业是数年前开发的,占用了大量的计算资源,并且难以维护,因为这些作业被拆分成数百个 Hive 小作业。为了使得业务能够使用到新的特征数据,并且让系统变得可维护,我们开始着手将这些作业迁移到 Spark 中。

以前的 Hive 作业实现

基于 Hive 的作业由三个逻辑阶段组成,每个阶段对应数百个由 entity_id 分割的较小 Hive 作业,因为为每个阶段运行较大的 Hive 作业不太可靠,并且受到每个作业的最大任务数限制。具体如下:

阿里云hadoop的功能(数据量的作业从)(1)

以上三个逻辑阶段可以概括如下:

基于 Hive 构建索引的作业大约需要运行三天。管理起来也很有挑战性,因为这条管道包含数百个分片作业,因此很难进行监控。没有简单的方法来衡量作业的整体进度或计算 ETA。考虑到现有 Hive 作业的上述局限性,我们决定尝试使用 Spark 来构建一个更快、更易于管理的作业。

Spark 实现

如果使用 Spark 全部替换上面的作业可能会很慢,并且很有挑战性,需要大量的资源。所以我们首先将焦点投入在 Hive 作业中资源最密集的部分:第二阶段。我们从50GB的压缩输入样本开始,然后逐步扩展到 300 GB、1 TB 和20 TB。在每次增加大小时,我们都解决了性能和稳定性问题,但是尝试 20 TB 时我们发现了最大改进的地方。

在运行 20 TB 的输入时,我们发现由于任务太多,生成了太多的输出文件(每个文件的大小大约为100 MB)。在作业运行的10个小时中,有3个小时用于将文件从 staging 目录移动到 HDFS 中的最终目录。最初,我们考虑了两个方案:要么改进 HDFS 中的批量重命名以支持我们的用例;要么配置 Spark 以生成更少的输出文件(这一阶段有大量的任务——70,000个)。经过认真思考,我们得到了第三种方案。由于我们在作业的第二步中生成的 tmp_table2 表是临时的,并且只用于存储作业的中间输出。最后,我们把上面 Hive 实现的三个阶段的作业用一个 Spark 作业表示,该作业读取 60 TB 的压缩数据并执行 90 TB的 shuffle 和排序,最后的 Spark job 如下:

阿里云hadoop的功能(数据量的作业从)(2)

我们如何扩展 Spark 来完成这项工作?

当然,在如此大的数据量上运行单个 Spark 作业在第一次尝试甚至第十次尝试时都不会起作用。据我们所知,这是生产环境中 shuffle 数据量最大的 Spark 作业(Databricks 的 PB 级排序是在合成数据上进行的)。我们对 Spark 内核和应用程序进行了大量的改进和优化,才使这项工作得以运行。这项工作的好处在于,其中许多改进都适用于 Spark 的其他大型工作负载,并且我们能够将所有工作重新贡献给开源 Apache Spark 项目 - 有关更多详细信息,请参见下面相关的 JIRA。下面我们将重点介绍将一个实体排名作业部署到生产环境的主要改进。

可靠性修复(Reliability fixes)

处理节点频繁重启

为了可靠地执行长时间运行的作业,我们希望系统能够容错并从故障中恢复(主要是由于正常维护或软件错误导致的机器重新启动)。虽然 Spark 最初的设计可以容忍机器重动,但我们还是发现了各种各样的 bug/问题,我们需要在系统正式投入生产之前解决这些问题。

其他可靠性修复性能提升

在实现了上述可靠性改进之后,我们能够可靠地运行 Spark 作业。此时,我们将工作重心转移到与性能相关的问题上,以最大限度地利用 Spark。我们使用Spark 的指标和 profilers 来发现一些性能瓶颈。

我们用来发现性能瓶颈的工具

性能优化

在所有这些可靠性和性能改进之后,我们的实体排名系统变成了一个更快、更易于管理的管道,并且我们提供了在 Spark 中运行其他类似作业的能力。

使用 Spark 和 Hive 运行上面实体排名程序性能比较

我们使用以下性能指标来比较 Spark 和 Hive 运行性能。

CPU time:这是从操作系统的角度来看 CPU 使用情况。例如,如果您的作业在32核机器上仅运行一个进程,使用所有 CPU 的50%持续10秒,那么您的 CPU 时间将是 32 0.5 10 = 160 CPU 秒。

阿里云hadoop的功能(数据量的作业从)(3)

CPU reservation time:从资源管理框架的角度来看,这是 CPU 预留(CPU reservation)。例如,如果我们将32核机器预留10秒来运行这个作业,那么 CPU 预留时间是 32 * 10 = 320 CPU秒。CPU 时间与 CPU 预留时间的比率反映了我们集群预留 CPU 资源的情况。准确地说,当运行相同的工作负载时,与 CPU 时间相比,预留时间可以更好地比较执行引擎。例如,如果一个进程需要1个 CPU 秒来运行,但是必须保留100个 CPU 秒,那么根据这个指标,它的效率低于需要10个 CPU 秒但只预留10个 CPU 秒来做相同数量的工作的进程。我们还计算了内存预留时间,但这里没有列出来,因为这些数字与 CPU 预留时间类似,而且使用 Spark 和 Hive 运行这个程序时都没有在内存中缓存数据。Spark 有能力在内存中缓存数据,但由于集群内存的限制,我们并没有使用这个功能。

阿里云hadoop的功能(数据量的作业从)(4)

Latency:作业从开始到结束运行时间。

阿里云hadoop的功能(数据量的作业从)(5)

结论和未来工作

Facebook 使用高性能和可扩展的分析引擎来帮助产品开发。Apache Spark 提供了将各种分析用例统一到单个 API ,并且提供了高效的计算引擎。我们将分解成数百个 Hive 作业管道替换为一个 Spark 作业,通过一系列的性能和可靠性改进,我们能够使用 Spark 来处理生产中的实体数据排序的用例。在这个特殊的用例中,我们展示了 Spark 可以可靠地 shuffle 并排序 90 TB 以上的中间数据,并在一个作业中运行 250,000个 tasks。与旧的基于 Hive 计算引擎管道相比,基于 Spark 的管道产生了显著的性能改进(4.5-6倍 CPU性能提升、节省了 3-4 倍资源的使用,并降低了大约5倍的延迟),并且已经在生产环境中运行了几个月。

作者:过往记忆大数据

本文为阿里云原创内容,未经允许不得转载。

,