- 从 Hadoop 文件系统或 Hive 表读取数据时,从事大数据应用程序的开发人员会遇到挑战。
- 合并作业是一种用于将较小文件合并到较大文件的技术,可以帮助提高读取 Hadoop 数据的性能。
- 通过合并,文件数量显着减少,读取数据的查询时间将更快。
- 当您通过 map-reduce 作业读取 Hive 表数据时,Hive 调整参数也有助于提高性能。
Hive表是依赖结构化数据的大数据表之一。默认情况下,它将数据存储在 Hive 仓库中。要将其存储在特定位置,开发人员可以在表创建期间使用位置标签设置位置。Hive 遵循相同的 SQL 概念,如行、列和架构。
从事大数据应用程序的开发人员在读取 Hadoop 文件系统数据或 Hive 表数据时会遇到一个普遍的问题。使用Spark 流、Nifi 流作业或任何流或摄取应用程序将数据写入 Hadoop 集群。大量的小数据文件通过摄取作业写入到 Hadoop Cluster 中。这些文件也称为零件文件。
这些部分文件是跨不同的数据节点写入的,当目录中的文件数量增加时,如果其他应用程序或用户尝试读取这些数据,就会变得乏味和性能瓶颈。原因之一是数据分布在节点之间。想想您的数据驻留在多个分布式节点中。它越分散,该作业大约需要“N *(文件数)”时间来读取数据,其中 N 是每个名称节点上的节点数。例如,如果有 100 万个文件,当我们运行 MapReduce 作业时,映射器必须跨数据节点运行 100 万个文件,这将导致整个集群利用率导致性能问题。
对于初学者来说,Hadoop集群自带了几个Name Nodes,每个Name Node都会有多个Data Nodes。摄取/流式传输作业跨多个数据节点写入数据,并且在读取这些数据时面临性能挑战。读取数据的工作将花费开发人员相当长的时间来找出与查询响应时间相关的问题。这个问题主要发生在每天数据量在数以百万计的客户身上。对于较小的数据集,这种性能技术可能不是必需的,但从长远来看,做一些额外的调整总是好的。
在本文中,我将讨论如何解决这些问题以及性能调优技术以改进 Hive 表的数据访问。Hive 与 Cassandra 和 Spark 等其他大数据技术类似,是一个非常强大的解决方案,但需要数据开发人员和运营团队进行调整,以便从针对 Hive 数据执行的查询中获得最佳性能。
我们先来看一些 Hive 数据使用的用例。
用例Hive 数据主要用于以下应用:
- 大数据分析,运行有关交易行为、活动、数量等的分析报告
- 跟踪欺诈活动并生成有关此活动的报告
- 根据数据创建仪表板
- 审计目的和历史数据存储
- 为机器学习提供数据并围绕它构建智能
有几种方法可以将数据摄取到 Hive 表中。摄取可以通过 Apache Spark 流式作业、Nifi 或任何流式技术或应用程序来完成。摄取的数据是原始数据,在摄取过程开始之前考虑所有调整因素非常重要。
组织 Hadoop 数据第一步是组织 Hadoop 数据。我们从摄取/流式传输作业开始。首先,需要对数据进行分区。对数据进行分区的最基本方法是按天或按小时。甚至可以有两个分区——天和小时。在某些情况下,您可以在一天内按某个国家、地区或适合您的数据和用例的东西进行分区。例如,考虑一个图书馆书架,其中的书籍是根据类型排列的,并且每种类型都设置在儿童或成人部分。
图 1:组织的数据
因此,我们以这个例子为例,我们在 Hadoop 目录中写入数据,如下所示:
hdfs://cluster-uri/app-path/category=children/genre=fairytale OR
hdfs://cluster-uri/app-path/category=adult/genre=thrillers
通过这种方式,您的数据更有条理。
在最常见的情况下,在没有特定用例的情况下,数据按天或小时进行分区
hdfs ://cluster-uri/app-path/day=20191212/hr=12
或者只是一天分区,具体取决于要求。
hdfs://cluster-uri/app-path/day=20191212
图 2:进入 Partition 文件夹的摄取流程
Hadoop 数据格式创建 Hive 表时,最好提供像 Zlib 这样的表压缩属性和像 orc 这样的格式。在摄取时,这些数据将以这些格式写入。如果您的应用程序使用普通的 Hadoop 文件系统编写,建议提供格式。大多数摄取框架(如 Spark 或 Nifi)都有指定格式的方法。指定数据格式有助于使数据以压缩格式更有条理,从而节省集群中的空间。
合并作业Consolidation 作业在提高 Hadoop 数据的整体读取性能方面起着至关重要的作用。有几个部分与合并技术相关。默认情况下,hdfs 目录中写入的文件是小零件文件,当零件文件过多时,读取数据时会出现性能问题。合并不是 Hive 的任何特定功能——它是一种用于将较小文件合并为较大文件的技术。在线任何地方都没有涵盖合并技术,因此这种特殊技术非常重要,尤其是在任何批处理应用程序读取数据时。
什么是整合工作?默认情况下,写入 Hive 的摄取/流式作业、目录写入小部分文件,对于大容量应用程序,一天内这些文件将超过 100,000 ,具体取决于数量。真正的问题出现在我们尝试读取数据时,需要很长时间,有时甚至几个小时才能最终返回结果,否则作业可能会失败。例如,假设您有一个 day 分区目录,您需要处理大约 100 万个小文件。例如,如果运行计数:
#Before:
hdfs dfs -count -v /cluster-uri/app-path/day=20191212/*
Output = 1Million
现在,运行 Consolidation 作业后,文件数量将显着减少。它将所有小部分文件合并为大文件。
#After:
hdfs dfs -count -v /cluster-uri/app-path/day=20191212/*
Output = 1000
注意:cluster-uri 因组织而异,它是一个 Hadoop 集群 uri,用于连接到您的特定集群。
整合工作如何提供帮助文件的整合不仅是为了性能,也是为了集群的健康。根据 Hadoop 平台指南,节点中不应该有这么多文件。拥有太多文件会导致太多节点读取并归因于高延迟。请记住,何时读取 Hive 数据,它会扫描所有数据节点。如果您有太多文件,则阅读时间会相应地分散。因此,必须将所有这些小文件合并为更大的文件。此外,如果在某些天后不需要数据,则必须有清除例程。
整合如何运作有几种方法可以合并文件。这主要取决于您在哪里写入数据。下面我将讨论不同的常见用例。
- 使用 Spark 或 Nifi 将数据写入日常分区文件夹中的 Hive 表
- 使用 Spark 或 Nifi 将数据写入 Hadoop 文件系统 (HDFS)
在这里,在这种情况下,大文件将被写入日常文件夹中。开发人员需要遵循以下任何选项。
图 3:整合逻辑
- 编写一个脚本来执行合并。该脚本采用 day 等参数,从同一分区数据中执行 Hive 选择数据,并在同一分区中插入覆盖。在这里,当 Hive 在同一分区中重新写入数据时,它会运行 map-reduce 作业并减少文件数量。
- 有时,如果命令失败,在同一命令中覆盖相同的数据可能会给我们带来意外的数据丢失。在这种情况下,从日常分区中选择数据并将其写入临时分区。如果成功,则使用 load 命令将临时分区数据移动到实际分区。此步骤如图 3 所示。
在这两个选项之间,选项 B 更好,它适合所有用例并且效率最高。选项 B 是有效的,因为如果任何步骤失败都不会丢失数据。开发人员可以编写一个控件 m 并安排它在第二天午夜左右没有活动用户读取数据时运行。
有一个用例,开发人员不需要编写 Hive 查询。而是提交一个spark作业,选择同一个分区,覆盖数据,但是建议只在分区文件夹中文件数量不大,spark仍然可以读取数据而不过度指定资源的情况下使用。此选项适用于低容量用例,并且此额外步骤可以提高读取数据的性能。
整个流程如何工作?让我们以一个示例用例来了解所有部分。
假设您拥有一个电子商务应用程序,您可以按不同的购买类别跟踪每日客户量。您的应用程序容量很大,您需要根据客户购买习惯和历史设置智能数据分析。
从表示层到中间层,您希望使用Kafka或 IBM MQ发布这些消息。下一部分是拥有一个使用 Kafka/MQ 并摄取到 Hadoop Hive 表中的流式应用程序。通过 Nifi 或 Spark,可以实现这一点。在此之前,需要设计和创建 Hive 表。在 Hive 表创建期间,您需要确定分区列的外观以及是否需要任何排序或是否需要应用Snappy 或Zlib等任何压缩算法。
Hive 表设计是确定整体性能的关键方面。您必须根据必须如何应用该设计来考虑如何查询数据。如果要查询每天有多少客户购买了玩具、家具等特定类别的商品,建议最多有两个分区,如一天分区,一个作为类别分区。然后,流应用程序应相应地摄取数据。
事先掌握所有可用性方面的内容,可以让您更好地设计表格以满足您的需求。因此,一旦将数据摄取到此表中,就应将数据组织为上述示例的日期和类别分区。
只有摄取的数据将是 Hive 位置中的小文件,因此如上所述,整合这些文件变得至关重要。
作为流程的下一部分,您可以设置调度程序或使用控件 M 每天每晚运行合并作业,例如凌晨 1 点左右,这将调用合并脚本。这些脚本将为您整合数据。最后,在这些 Hive 位置,您应该会看到文件数量减少了。
当真正的智能数据分析在前一天运行时,将很容易查询并具有更好的性能。
Hive 参数设置当您通过 map-reduce 作业读取 Hive 表数据时,某些调整参数会很方便。该技术已经讨论了这些调整参数。单击链接以阅读有关Hive 调整参数的更多信息。
Set hive.exec.parallel = true;
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
set mapred.compress.map.output = true;
set mapred.output.compress= true;
Set hive.execution.engine=tez;
要了解有关每个属性的更多信息,您可以参考现有教程。
技术实施现在,让我们以一个用例为例,一步一步地展示它。在这里,我正在考虑将客户事件数据提取到 Hive 表中。我的下游系统或团队将进一步使用这些数据进行分析(例如,在一天内,客户购买了哪些商品以及从哪个城市购买?)。这些数据将用于分析我的产品用户的人口统计数据,这将使我能够排除故障或扩展业务用例。这些数据可以进一步让我们了解我的活跃客户来自哪里,以及我可以如何做更多的事情来增加我的业务。
步骤 1:创建示例 Hive 表。这是代码片段:第 2 步:设置流式作业以摄取到 Hive 表中。
这个流式传输作业可以从 Kafka 的实时数据中触发流式传输,然后将其转换并摄取到 Hive 表中。
图 4:Hive 数据流
因此,当摄取实时数据时,数据将写入日分区。假设今天的日期是 20200101。
hdfs dfs -ls /data/customevents/day=20200101/
/data/customevents/day=20200101/part00000-djwhu28391
/data/customevents/day=20200101/part00001-gjwhu28e92
/data/customevents/day=20200101/part00002-hjwhu28342
/data/customevents/day=20200101/part00003-dewhu28392
/data/customevents/day=20200101/part00004-dfdhu24342
/data/customevents/day=20200101/part00005-djwhu28fdf
/data/customevents/day=20200101/part00006-djwffd8392
/data/customevents/day=20200101/part00007-ddfdggg292
到一天结束时,根据应用程序的流量,这个数字可能在 10K 到 1M 之间。对于大型公司来说,数量会很高。假设文件总数为 141K。
步骤 3:运行合并作业在 2020 年 1 月 2 日,即第二天凌晨 1 点左右,我们应该运行合并作业。示例代码在 git 中上传。文件名为consolidation.sh。
以下是在您的边缘节点/盒子中运行的命令
./consolidate.sh 20200101
该脚本现在将合并前一天的数据。完成后,您可以重新运行计数。
hdfs dfs -count -v /data/customevents/day=20200101/*
count = 800
所以,在它之前是 141K,在合并之后,计数是 800。所以,这将为您带来显着的性能优势。
合并逻辑代码的链接。
统计数据在不应用任何调优技术的情况下,读取 Hive 表数据的查询时间将需要 5 分钟到几个小时,具体取决于数量。
图 5:统计数据
合并后,查询时间显着减少,我们更快地得到结果。文件数量将显着减少,读取数据的查询时间将减少。如果不进行整合,查询会在分布在名称节点上的大量小文件上运行,从而导致响应时间增加。
,