原文链接:https://select.dev/posts/snowflake-range-join-optimization

原文作者:https://twitter.com/IanWhitestone

在大多数数据库中,范围联接和其他类型的非等值联接速度是出了名的慢。虽然 Snowflake 对于大多数查询来说速度非常快,但在处理这些类型的连接时,它的性能也很差。在这篇文章中,我们将介绍一种优化技术,从业者可以用来将涉及范围连接的查询速度提高多达 300 倍 【参见:说明1】。

在深入研究优化技术之前,我们将介绍一些关于不同类型的连接的背景知识,以及是什么让 Snowflake 中的范围连接如此缓慢。如果您已经熟悉,请随时跳过。

等值连接与非等值连接

1、等值连接是涉及相等条件的连接,大多数用户通常会编写涉及一个或多个等值联接条件的查询。

select ... from orders join customers on orders.customer_id=customers.id -- 等值条件

2、非等值连接是涉及不等式条件的连接。例如,查找购买了相同产品的客户列表:

select distinct o1.customer_id, o2.customer_id, o1.product_id from orders_items as o1 join orders_items as o2 on o1.product_id=o2.product_id -- 等值条件 and o1.customer_id<>o2.customer_id -- 非等值条件

3、或者查找每个客户在特定日期之后的所有订单:

select ... from orders inner join customers on orders.customer_id=customers.id and orders.created_at > customers.one_year_anniversary_date

​ 什么是范围连接

范围联接是一种特定类型的非等联接。当联接条件的值是否落在某个值范围内(“区间中的点联接”)或查找两个重叠的周期(“区间重叠联接”)时的情况。

区间范围中的点连接

区间范围连接中的点连接的示例:计算每秒运行的查询数

select seconds.timestamp, count(queries.query_id) as num_queries from seconds left join queries on seconds.timestamp between date_trunc('second', queries.start_time) and date_trunc('second', queries.end_time) group by 1

此联接也可以基于派生时间戳。例如,查找用户查看首页后 24 小时内发生的所有购买事件:

select ... from page_views inner join events on events.event_type='purchase' -- 过滤条件 and page_views.pathname = '/' -- 过滤条件 and page_views.user_id=events.user_id -- 等值条件 and page_views.viewed_at < events.event_at -- 范围条件 and dateadd('hour', 24, page_views.viewed_at) >= events.event_at -- 范围条件

区间重叠范围连接

区间重叠范围联接是指查询尝试匹配重叠周期。想象一下,对于登录网站上的每个浏览会话,如果需要查找应用程序中同时发生的所有其他会话:

select s1.session_id, array_agg(s2.session_id) as concurrent_sessions from landing_page_sessions as s1 inner join app_sessions as s2 on s1.end_time > s2.start_time and s1.start_time < s2.end_time group by 1

​ 为什么Snowflake中的范围连接很慢

范围连接在 Snowflake 中很慢,因为它们作为带有后过滤条件的笛卡尔连接执行。笛卡尔联接(也称为交叉联接)返回要联接的两个数据集之间记录的笛卡尔积。如果两个表都有 10,000 条记录,则笛卡尔连接的输出将是 1 亿条记录!从业者常将此称为联接爆炸。当 Snowflake 必须处理这些非常大的中间数据集时,查询执行速度会大大减慢。

让我们使用上面的“每秒运行的查询数”示例来更详细地探讨这一点。

select seconds.timestamp, count(queries.query_id) as num_queries from seconds left join queries on seconds.timestamp between date_trunc('second', queries.start_time) and date_trunc('second', queries.end_time) group by 1

SECONDS表每秒包含 1 行,QUERIES表每个查询包含 1 行。此查询的目标是查找每秒正在运行的查询,然后进行聚合和计数。

snowflake处理性能(如何将Snowflake中的范围连接速度提高300倍)(1)

原始表:SECONDS、QUERIES

执行连接时,Snowflake 首先创建一个中间数据集,该数据集是要连接的两个输入数据集的笛卡尔乘积。在此示例中,SECONDS表为 7 行,QUERIES表为 4 行,因此中间数据集分解为 28 行。执行“间隔点”检查的范围连接条件在创建此中间数据集后作为连接后筛选器发生。可以在下图中看到此过程的可视化。

snowflake处理性能(如何将Snowflake中的范围连接速度提高300倍)(2)

未优化的范围连接

对包含 267000 个查询的 30 天数据样本运行此查询需要 12 分 30 秒。如查询配置文件所示,联接是此查询中的明显瓶颈。你可以看到标示为“Additional Join Condition”的范围连接条件:

snowflake处理性能(如何将Snowflake中的范围连接速度提高300倍)(3)

优化前:查询配置(一种可视化的消耗占比分析图)

如何在Snowflake中优化范围连接

执行范围连接时,Snowflake 的瓶颈成为将范围连接条件应用为连接后筛选器之前在中间数据集中生成的数据量。为了加速这些查询,我们需要找到一种方法来最小化中间数据集的大小。这可以通过添加等值连接条件来实现,Snowflake 可以使用哈希连接非常快速地处理该条件【参见:说明2】。

最小化行爆炸

虽然这背后的原理是直观的,让我们的数据集更小,但在实践中是很棘手的。如何在应用范围连接后连接过滤器之前约束中间数据集?继续上面的每秒查询示例,很容易在每个时间戳添加一个小时级等值连接条件:

select seconds.timestamp, count(queries.query_id) as num_queries from seconds left join queries on date_trunc('hour', seconds.timestamp)=date_trunc('hour', queries.start_time) -- 小时级等值连接条件 and seconds.timestamp -- 秒级范围连接条件 between date_trunc('second', queries.start_time) and date_trunc('second', queries.end_time) group by 1

看上去有戏,但是当间隔(查询总运行时间)大于 1 小时时,该方法就会崩溃。由于等值连接是在查询开始的小时,因此不会计算任何后续小时中的所有记录。这可以通过创建一个中间数据集 query_hours 来解决,该数据集每小时每个查询包含 1 行。

with query_hours as ( select queries.*, hours_list.timestamp as query_hour from queries inner join hours_list -- 该数据集每小时每个查询包含 1 行,相同查询存在跨小时重复出现 on hours_list.timestamp between date_trunc('hour', queries.start_time) and date_trunc('hour', queries.end_time) )

然后,按小时加入变得安全,因为查询运行的每个小时将有 1 行。不会无意中丢弃任何记录。

with query_hours as ( select queries.*, hours_list.timestamp as query_hour from queries inner join hours_list -- 该数据集每小时每个查询包含 1 行,相同查询存在跨小时重复出现 on hours_list.timestamp between date_trunc('hour', queries.start_time) and date_trunc('hour', queries.end_time) ) select seconds.timestamp, count(queries.query_id) as num_queries from seconds left join query_hours as queries on date_trunc('hour', seconds.timestamp)=queries.query_hour -- 等值连接条件 and seconds.timestamp -- 范围连接条件 between date_trunc('second', queries.start_time) and date_trunc('second', queries.end_time) group by 1

您可能已经注意到,query_hours CTE(common table expression)本身涉及范围联接 - 这会不会很慢?当应用于正确的查询时,在输入数据集准备上花费的额外时间将导致整体查询速度更快 【参见:说明3】.另一个问题可能是query_hours数据集变得比原始查询数据集大得多,因为它每小时扇出到每个查询 1 行。由于大多数查询在 1 小时内完成,因此query_hours数据集的大小将与原始查询数据集相似。

在小时数上添加新的等联接条件有助于通过限制中间数据集的大小来加速此范围联接查询。但是,由于一些原因,这种方法并不理想。也许小时不是最好的选择,应该使用其他东西作为约束。此外,如何扩展此方法以支持涉及其他数值数据类型(如整数和浮点数)的范围联接?

分箱范围连接

我们可以通过使用“箱子(bins)”【参见:说明4】将上述想法扩展到更通用的方法。

通过告诉 Snowflake 仅对较小的数据子集应用范围连接条件,连接操作要快得多。对于每个时间戳,Snowflake 现在只联接在同一小时内运行的查询,而不是所有时间的每个查询。

与其将自己限制在预定义的范围(如“小时”、“分钟”或“天”),我们可以改用任意大小的箱。例如,如果大多数查询在 2 秒内运行,我们可以将查询分到每个箱中,每个箱跨越 2 秒。

该算法看起来像这样:

1、 生成箱并向每个数据集添加箱编号

2、 使用 bin_num 将等连接条件约束添加到范围连接,类似于上面对小时所做的操作。

3、 创建的中间数据集现在要小得多。

4、 像往常一样,Snowflake 将范围联接应用为联接后筛选器。这一次,要快得多。

您可以在下图中看到此过程的可视化。

snowflake处理性能(如何将Snowflake中的范围连接速度提高300倍)(4)

优化范围连接

示例:分箱区域联接查询

箱数只是表示数据范围的整数。创建它们的一种方法是将数字除以所需的箱大小。使用时间戳,我们可以先将时间戳转换为 unix 时间,这是一个整数,然后再用除法:

-- 60秒大小的箱子 select timestamp, floor(date_part(epoch_second, timestamp) / 60) as bin_num

我们将它保存在函数 get_bin_number 【参见:说明5】 中,以避免每次都重复它。

按照上述步骤,我们首先需要生成适用箱的列表。这是通过使用生成器创建整数列表,然后将该列表过滤到所需的开始和结束箱编号【 参见:说明6】 来实现的。

set bin_size_s = 60; with metadata as ( select -- 这将是针对所需时间范围的查询 min(timestamp) as start_time, max(timestamp) as end_time, get_bin_number(start_time, $bin_size_s) as bin_num_start, get_bin_number(end_time, $bin_size_s) as bin_num_end from seconds ), -- 需要一个 bin_num_start 和 bin_num_end 之间有 1 行的 CTE(common table expression) -- 必须首先生成一个庞大的列表,然后过滤掉,因为你不能传入计算值 -- 当bins_base为 1 万亿时需要 5 秒才能过滤掉。106 毫秒,用于 100 万 bins_base as ( select seq4() as row_num from table(generator(rowcount => 1e9)) ), bins as ( select bins_base.row_num as bin_num from bins_base inner join metadata on bins_base.row_num between metadata.bin_num_start and metadata.bin_num_end ),

现在我们可以将箱号添加到每个数据集中。对于查询数据集,我们将输出一个数据集,其中每个查询每个运行查询的箱 1 行。对于秒数据集,每个时间戳将映射到单个箱子。

queries_w_bin_number as ( select start_time, end_time, warehouse_id, cluster_number, bins.bin_num from queries inner join bins on bins.bin_num between get_bin_number(queries.start_time, $bin_size_s) and get_bin_number(queries.end_time, $bin_size_s) ), seconds_w_bin_number as ( select timestamp, get_bin_number(timestamp, $bin_size_s) as bin_num from seconds )

并应用最终连接条件,并在bin_num上添加等连接条件:

select s.timestamp, count(q.warehouse_id) as num_queries from seconds_w_bin_number as s left join queries_w_bin_number as q on s.bin_num=q.bin_num and s.timestamp between date_trunc('second', q.start_time) and date_trunc('second', q.end_time) group by 1

使用与上述相同的数据集,此查询 【参见:说明7】 在 2.2 秒内执行,而之前未优化的版本需要 750 秒。这比改进了 300 倍以上。查询配置文件如下所示。请注意连接条件现在如何显示两个部分:一个用于bin_num上的等值连接条件,另一个用于范围连接条件。

snowflake处理性能(如何将Snowflake中的范围连接速度提高300倍)(5)

优化后:查询配置(一种可视化的消耗占比分析图)

选择合适的箱子大小

使此策略起作用的关键部分涉及选择正确的箱大小。您希望每个箱子包含较小的值范围,以便在应用范围联接筛选器之前最大程度地减少中间数据集中的行爆炸。但是,如果您选择的箱大小太小,则当您将其扇出到每个箱 1 行时“右表”(QUERIES)的大小将显着增加。

根据Databricks(https://docs.databricks.com/optimizations/range-join.html#range-join-optimization)介绍,一个好的经验法则是选择间隔长度的第 90 个百分位数。您可以使用approx_percentile函数进行计算。我已经展示了我在整篇文章中一直在使用的查询示例数据集的值。

select approx_percentile(datediff('second', start_time, end_time), 0.5) as p50, -- 2s approx_percentile(datediff('second', start_time, end_time), 0.90) as p90, -- 30s approx_percentile(datediff('second', start_time, end_time), 0.95) as p95, -- 120s approx_percentile(datediff('second', start_time, end_time), 0.99) as p99, -- 600s approx_percentile(datediff('second', start_time, end_time), 0.999) as p999, -- 900s count(*) -- 267K from queries

经验法则并不完美。如果可能,请使用几种不同的数据箱大小测试查询,看看哪种大小效果最佳。下面是使用不同箱大小的上述查询的性能曲线。在这种情况下,选择 99.9 个百分位数与第 90 个百分位数并没有太大区别。正如预期的那样,一旦箱大小变得非常小时,查询时间就会开始变得更糟。

snowflake处理性能(如何将Snowflake中的范围连接速度提高300倍)(6)

箱子大小性能分析

如何扩展到固定间隔的联接?

• 规划如何将其扩展到区间联接中的固定间隔点

• 箱大小将设置为固定间隔大小

如果间隔范围内的点具有固定的间隔大小,则连接,如前面分享的查询所示:

select ... from page_views inner join events on events.event_type='purchase' -- 过滤条件 and page_views.pathname = '/' -- 过滤条件 and page_views.user_id=events.user_id -- 等值连接条件 and page_views.viewed_at < events.event_at -- 范围连接条件 and dateadd('hour', 24, page_views.viewed_at) >= events.event_at -- 范围连接条件

然后将箱大小设置为间隔的大小:24 小时。

如何扩展到区间重叠范围连接?

如果要处理区间重叠范围连接,如下所示:

select s1.session_id, array_agg(s2.session_id) as concurrent_sessions from landing_page_sessions as s1 inner join app_sessions as s2 on s1.end_time > s2.start_time and s1.start_time < s2.end_time group by 1

在扇出landing_page_sessions和app_sessions后,可以应用相同的分箱范围联接技术,以在会话所属的每个箱中每个会话包含 1 行(如上述queries所做的那样)。

何时应使用此优化?

第一步,通过使用 Snowflake 查询配置文件(一种可视化的消耗占比分析图)验证范围联接是否是查询执行中最昂贵的节点之一,确保范围联接实际上是一个瓶颈。添加分箱范围联接优化确实会使查询更难理解和维护。

分箱范围连接优化技术仅适用于涉及数值类型的区间中的点和区间重叠范围联接。它不适用于其他类型的非等连接,尽管您可以应用相同的原则,即尽可能尝试添加等值连接约束以减少行爆炸。

如果“右侧”的数据集(包含开始和结束时间)包含相对平坦的区间大小分布,则此技术将不会有效。

​ 说明

【1】 此统计信息来自单个查询,因此请用少量场景来验证。您的业务场景会因许多因素而异。

【2】这种方法的灵感来自 Simeon Pilgrim 在 2016 年的帖子(https://simeonpilgrim.com/blog/2016/08/02/snowflake-data-warehouse-date-range-based-joins-45x-speedup/)早在 Snowflake 还 snowflake.net 的时候!在实现更通用的分箱方法之前,我非常成功地使用了它。

【3】 范围连接到小时表将比范围连接到秒表快得多,因为中间表将小 ~3600 倍。

【4】 这种方法的灵感来自Databricks。他们没有详细介绍他们的算法是如何实现的,但我认为它以类似的方式工作。

【5】(可选)创建一个get_bin_number函数以避免在整个查询过程中复制相同的计算:

create or replace function get_bin_number(timestamp timestamp_tz, bin_size_s integer) returns integer as $$ floor(date_part(epoch_second, timestamp) / bin_size_s) $$

【6】 Snowflake 不允许您将计算值传递给生成器,因此这必须分两步完成。在不久的将来,我们将开源一些dbt宏来抽象这个过程。

【7 】基于Snowflake完整的分箱范围联接优化查询示例:

create or replace function get_bin_number(timestamp timestamp_tz, bin_size_s integer) returns integer as $$ floor(date_part(epoch_second, timestamp) / bin_size_s) $$ ; set bin_size_s = 60; with metadata as ( select -- Get the time range your query will span min(timestamp) as start_time, max(timestamp) as end_time, get_bin_number(start_time, $bin_size_s) as bin_num_start, get_bin_number(end_time, $bin_size_s) as bin_num_end from seconds ), bins_base as ( select seq4() as row_num from table(generator(rowcount => 1e9)) ), bins as ( select bins_base.row_num as bin_num from bins_base inner join metadata on bins_base.row_num between metadata.bin_num_start and metadata.bin_num_end ), queries_w_bin_number as ( select start_time, end_time, warehouse_id, cluster_number, bins.bin_num from queries inner join bins on bins.bin_num between get_bin_number(queries.start_time, $bin_size_s) and get_bin_number(queries.end_time, $bin_size_s) ), seconds_w_bin_number as ( select timestamp, get_bin_number(timestamp, $bin_size_s) as bin_num from seconds ) select s.timestamp, count(q.warehouse_id) as num_queries from seconds_w_bin_number as s left join queries_w_bin_number as q on s.bin_num=q.bin_num and s.timestamp between date_trunc('second', q.start_time) and date_trunc('second', q.end_time) group by 1 ;

参考知识

[1] Spark优化之高性能Range Join; https://cloud.tencent.com/developer/article/1934016

[2] Range join optimization; https://docs.databricks.com/optimizations/range-join.html#range-join-optimization

[3] Snowflake data warehouse date range based joins 45x speedup; https://simeonpilgrim.com/blog/2016/08/02/snowflake-data-warehouse-date-range-based-joins-45x-speedup/

,