介绍

在我之前的角色中,我使用 CDC(变更数据捕获)、数据复制、ETL(提取-转换-加载)和 RDBMS(关系数据库管理软件)组件的专有技术开发和管理了一个大型近实时数据仓库。准确地说,我们的流程是 E-L-T,这意味着对于实时数据仓库,数据库不断运行混合工作负载,激烈竞争系统资源,只是为了保持维度模型最新。有时因次优性能、延迟挑战和供应商锁定而感到沮丧,我经常考虑迁移到使用开源/大数据技术(称为 ETL 卸载)构建的 ETL 过程,以及这是否会提供真正的水平可扩展性。当然,我对技术、所涉及的学习曲线和所需的开发时间之间的根本差异持谨慎态度。

然而,最近我有幸在一个 POC 上工作,它需要使用 Azure Databricks 的流 ETL 管道,所以我罕见地瞥见了这个迁移挑战可能是什么样的,主要差异以及是否会有任何重大交易 -关闭。

在这篇博客中,我想通过使用 Azure Databricks 中最新最好的东西来构建一个更“传统”但实时的 ETL 管道来分享我的经验。我说“传统”是因为结果应该代表数据仓库中的星型架构,特别是 Azure SQL 数据仓库,尽管在流模式下源和目标之间的延迟很低。我还想考虑纳入数据湖的流行概念,这是一种经济高效的可扩展存储选项。幸运的是,Azure Data Lake Storage gen 2 在本月初正式发布,并且完全支持 Databricks 连接器。

本博客的第一部分将介绍主要概念和组件,以及用于演示的整体架构。第二部分介绍了 Azure 中的完整设置,第三部分介绍了在一系列 Databricks 笔记本中开发的 ETL 管道。但是,如果您渴望深入了解细节,您将能够在 Azure 中以最少的设置运行管道。

但是,如果您在我们开始之前感觉像是一个轻松的消遣,那么从 ETL 开发人员的角度来看,这是一段半虚构的时间之旅……

Azure 数据块

Databricks 由 Apache Spark 的创建者创立,提供了一个统一平台,旨在提高数据工程师、数据科学家和业务分析师的工作效率。 Databricks 平台提供开箱即用的交互式和协作笔记本体验,并且由于其优化的 Spark 运行时,经常优于云中的其他大数据 SQL 平台。

Azure Databricks 是一项完全托管的服务,可提供强大的 ETL、分析和机器学习功能。与其他供应商不同,它是 Azure 上的第一方服务,可与其他 Azure 服务(例如事件中心和 Cosmos DB)无缝集成。

正如您对此类服务所期望的那样,启动可以自动扩展和自动终止的集群既快速又简单——因此,一旦您的作业完成,您的集群将在指定的时间后关闭。这两个功能使该服务具有成本效益,因为您仅在集群运行时根据集群中节点 (VM) 的数量和大小付费。有关计费的更多信息,请参见第二部分。如果您想使用第三部分中提供的笔记本运行演示,您可以预配 14 天的免费试用期,您只需为基础 Azure VM 付费。在 Azure 中创建资源后,将为您预配一个 Databricks 工作区,它实质上存储您的所有 Databricks 资产,例如笔记本和库。即使没有任何正在运行的集群,您仍然可以访问工作区、查看笔记本、安排作业等,并且在启动集群之前不会向您收费。值得注意的是,在流式传输场景或需要用于 BI 工具的专用分析集群的场景中,您需要一个专用集群,尽管它可以根据工作负载设置为自动扩展。

结构化流媒体和 Databricks Delta Lake

与 Azure Databricks 一起,在我看来真正开启真正 ETL / 数据仓库用例的两个关键组件是 Spark Structured Streaming 和 Databricks Delta(现在称为 Delta Lake)。通过这些改进,普通人现在可以尝试在 Spark 中开发(批处理和流媒体)ETL 和分析管道。为什么这样?在结构化流之前,有使用 RDD 和 DStreams(基于 Java/Python 上相对低级操作)的 Spark Streaming,在 Databricks Delta 之前,并发读/写、DML 操作和性能优化是有限且复杂的。

Azure Databricks 上的结构化流式处理使用一组简单的高级 API 提供可靠的、一次性的、容错的流式处理平台。您可以将传入流视为无界表,它允许您处理迟到或无序的数据。更重要的是,通过称为水印的功能,您可以加入流数据(与静态或其他流数据)并指示引擎“等待”记录加入多长时间。这与我们基于 RDBMS 的 ELT 流程的运行方式有很大不同,它会运行多个映射,但一次仅在一个更改集(CDC 表)上运行。这样做是因为您不能依赖在同一(微批次)时间窗口中记录的主从事件。

有人可能会问,为什么 Spark 引擎不能简单地无限期地等待这些迟到的记录出现。嗯,它可以,但是这样做需要保持无界状态,换句话说,将密钥存储在内存中,没有任何可以清除它们的阈值,这最终会耗尽所有可用内存并导致内存不足 (OOM)失败。

因此,虽然处理迟到数据的能力在近实时 ETL 场景中可能非常有用,但它并非没有限制和后果,因此需要定义仔细评估的水印阈值以限制状态。至关重要的是,到达超过此阈值的数据将需要在批处理中进行处理。

更积极的一点是,使用 Spark 的结构化 API 的批处理和流之间的代码更改很少,因此一旦您在流模式下开发了 ETL 管道,批处理运行的语法将需要最少的重新编码。 (最终笔记本中提供了一个示例。)

现在将我们的注意力转向 Databricks Delta,有一些“独家”功能可以简化和优化流(和批处理)ETL 场景。本质上,它是一个优化的 Spark 表,具有许多类似 SQL 的功能:

ACID 事务——多个作者可以同时修改一个数据集并查看一致的视图。作者不影响读者。

DELETES/UPDATES/UPSERTS — 编写者可以修改数据集,而不会干扰读取数据集的作业。合并操作也得到很好的支持。

统计信息、数据跳过和 ZORDER 集群——当跟踪每个文件中的数据的统计信息时,读取​速度提高 10-100 倍,允许 Delta 避免读取不相关的信息。

使用 Delta,您可以将批处理和流数据写入同一个表,其他笔记本和集群可以从同一个表中读取并获得一致的最新视图。此外,Delta 可以通过将数据组织成可以高效读取的大文件来提高数据访问速度。这是通过将小文件合并为大文件来完成的。

所以本质上它是一个事务存储层,专门设计用于利用 Apache Spark 和 Databricks 文件系统 (DBFS) 的强大功能。底层数据作为 Parquet 文件存储在 DBFS 中,但 Delta 维护一个事务日志,可有效跟踪表的更改。您可以使用相同的熟悉的 Apache Spark SQL 批处理和流(结构化)API 读取和写入存储在 Databricks Delta 中的数据。

那么,Databricks Delta 可能是所有 ETL 挑战的完美解决方案吗?

虽然这两种技术之间的差距似乎正在缩小,但仍然需要了解 Delta 和 RDBMS 之间的根本区别。它是文件而不是记录,它是一种基于列和基于行的存储格式,因此某些点操作(如合并或更新)(尽管可能)不太可能运行得那么快,主要是由于缺乏真正的索引。在流场景中更是如此,因此如果您的工作负载依赖于以时间敏感的方式运行这些类型的操作,那么您可能需要考虑替代方案,例如 Cosmos DB。希望通过自己运行第三部分中的演示,您可以评估这些注意事项。

数据框、数据集、转换和惰性评估

如果您不熟悉 Spark 并且有 SQL 背景,我建议我们介绍一些您需要了解的“外国”概念:数据帧、转换、动作和惰性求值。我不会详细介绍任何细节,因此如果您喜欢睡前阅读,我推荐由 Matei Zaharia 撰写的“Spark:权威指南:大数据处理变得简单”,他于 2009 年创立了 Spark 并与他人共同创立Databricks 和 Bill Chambers,他是 Databricks 的 PM。

azure云平台介绍(妙手数评流式服务和Azure)(1)

数据帧和转换的序列

数据帧和数据集都是分布式的、不可变的类似表格的数据结构,可以跨越数千台计算机。因为它们是不可变的,所以我们需要对它们执行转换,但将结果存储在另一个数据帧中。转换可以是任何形式,从将列转换为不同类型、应用内置或自定义函数 (UDF) 来连接数据帧。因此,在实现您需要的输出之前,通常会“链接”多个数据帧,每个数据帧都有自己的转换。 Spark 将形成最佳执行计划,但仅在您指定“操作”时执行它们 - 例如在笔记本中显示计数,或写入输出接收器。因此就有了惰性求值的概念。

您可以将业务逻辑(在 R、Python、Scala 或 Java 中)表示为 SQL、DataFrames 或 Datasets 中的一组逻辑转换,Spark 将透明地将该逻辑编译为 RDD 并生成底层执行计划。

首先,您可能只使用 Dataframes 而不是 Datasets,但是如果您想知道两者之间的区别,那就是 Datasets 是强类型的,而 Dataframe 类型仅在运行时进行评估。

最后,Spark 支持 ANSI SQL 2003 标准的一个子集,因此您可以使用类似 SQL 的符号开发管道的许多部分,甚至可以根据需要在数据帧和 SQL 之间进行互操作。

演示场景

虽然下面的演示松散地基于我之前提到的 POC,但出于保密目的,它已被更改和更改,尤其是数据集。我决定使用的场景基于经常引用的 Ad Tech 用例,但为简单起见,我忽略了某些指标,例如点击次数和转化次数,这些指标已经很好地涵盖了。

假设该业务属于广告行业,并监控某些网站上显示的广告量(印象数)。他们只对展示特定品牌的广告感兴趣。该企业对通常的点击和转化不感兴趣。在这种情况下,广告和印象作为来自第 3 方的 json 消息(可能是文件)的连续流/提要到达。广告包含元数据,例如创建时间、唯一标识符、名称以及它所属的品牌。展示次数是广告在何时以及在哪个网站上展示,包括会话和用户信息。幸运的是,我们在两者中都有一个广告 ID 来链接这些,但是对于每次展示我们没有任何类型的唯一键。

有一些静态操作/参考源包含缓慢变化的数据:

包含正在监控的品牌列表的 SQL DB

运营团队用来维护受监控域列表的 Web 应用程序。

为方便起见,假设这两个数据集都已由第 3 方根据这些特定品牌和域进行了预过滤。广告是参考数据的重要来源,运营应用程序需要能够轻松访问各个广告。

我们需要生成的源表集和目标星型模式如下:

azure云平台介绍(妙手数评流式服务和Azure)(2)

从源模式到星型模式

请注意,为简洁起见,我排除了关键的时间维度,您还会注意到我包含了一个批处理 ETL 场景,以展示我们如何实现类型 II 缓慢变化的维度要求。

给定场景,我们将实现以下架构:

azure云平台介绍(妙手数评流式服务和Azure)(3)

流媒体源部分可能看起来过于复杂,因此值得稍微解释一下。为了生成数据流,我们将使用一个实用程序为广告和展示生成 json 数据。该实用程序支持定义您选择的架构,但遗憾的是仅支持将数据发送到 IoT 中心。我们需要使用流分析来使用流并根据文件内容将其路由到事件中心或 Cosmos DB。

现在查看商店部分,您可能想知道为什么或如何向 Cosmos DB 发送流。在上面的场景描述中,提到广告需要可由操作应用程序访问,需要从中快速访问单个广告。这非常适合 Cosmos DB,它提供可扩展性和个位数毫秒的延迟。流分析可以轻松地将流发送到 Cosmos DB,我们可以使用出色的更改提要功能将其作为流再次读回。流分析会将印象路由到事件中心,Databricks 将读取这两个流,运行 ETL 管道并将结果流式传输到 Azure SQL 数据仓库。还将批量运行各种管道,以提供更全面的场景集。

为了更真实的演示,我想展示我们如何获取两个流,加入它们,以及加入其他缓慢变化的参考数据。事实上,Databricks(更具体地说是 Spark)的优势之一是您可以相对轻松地从许多不同的数据源和数据格式中读取数据,执行连接和转换,然后将输出写入多个目标。这就是为什么它已成为数据湖分析的流行选择。

接下来,在本博客的第 2 部分,我们将构建上述架构,但如果您只想运行管道,则将在第 III 部分中描述一个选项,只需最少的设置即可实现这一点。

,