在建设准实时数据仓库时,由于列式存储具有较高的查询性能,因此,通常都采用ORC、Parquet数据格式,但是这种格式不能追加数据。而hdfs的数据块大小一般都是128MB或者256MB,如果等文件凑够一个HDFS Block大小再写入时,就会导致数据延迟增大。因此,难免产生一个非常常见但是很麻烦的问题,即HDFS小文件问题。过多的小文件会增加NameNode的压力,并且影响查询性能,所以我们在使用流式数据入库的之后,一般会对小文件进行合并处理。
1.1 产生小文件的现象即使是使用Iceberg这种数据湖解决方案,也难免产生小文件。因此,Iceberg本身也想到了解决小文件的方案。
如何每次就写入几条数据,Iceberg的每个分区在写文件的时候都会产生新的文件,这就导致底层文件系统里面产生了很多大小才几KB的文件。下面是我们在前面的演示环境中写了3次数据:
-- 创建Iceberg表
CREATE TABLE sensordata(
sensor_id STRING,
ts BIGINT,
temperature DOUBLE,
dt STRING
) USING iceberg
PARTITIONED BY(dt);
-- Append写入1条数据
INSERT INTO sensordata VALUES('sensor_01',1635743301,-12.1,'2021-12-01');
-- OverWrite写入一条数据
INSERT OVERWRITE sensordata VALUES('sensor_02',1635743301,23.6,'2021-12-01');
-- Append写入1条数据
INSERT INTO sensordata VALUES('sensor_02',1638421701,-22.2,'2021-12-02');
虽然每次仅仅写入1条数据,但是却产生了很多小文件。
[bigdata@bigdata185 iceberg]$ tree
.
└── sensordata
├── data
│ ├── dt=2021-12-01
│ │ ├── 00000-0-275a936f-4d21-4a82-9346-bceac4381e6c-00001.parquet
│ │ └── 00000-2-1189ac19-b488-4956-8de8-8dd96cd5920a-00001.parquet
│ └── dt=2021-12-02
│ └── 00000-1-cc4f552a-28eb-4ff3-a4fa-6c28ce6e5f79-00001.parquet
└── metadata
├── 0dafa6f3-2dbd-4728-ba9b-af31a3416700-m0.avro
├── 2b1fbd5a-6241-4f7d-a4a6-3121019b9afb-m0.avro
├── 2b1fbd5a-6241-4f7d-a4a6-3121019b9afb-m1.avro
├── ad4cd65e-7351-4ad3-baaf-5e5bd99dc257-m0.avro
├── snap-232980156660427676-1-0dafa6f3-2dbd-4728-ba9b-af31a3416700.avro
├── snap-4599216928086762873-1-ad4cd65e-7351-4ad3-baaf-5e5bd99dc257.avro
├── snap-5874199297651146296-1-2b1fbd5a-6241-4f7d-a4a6-3121019b9afb.avro
├── v1.metadata.json
├── v2.metadata.json
├── v3.metadata.json
├── v4.metadata.json
└── version-hint.text
5 directories, 16 files
[bigdata@bigdata185 iceberg]$
当我们执行这个INSERT语句时,会发生以下过程:
- 首先创建一个Parquet格式数据文件 - sensordata/data/dt=2021-12-01/00000-5-cbf2920c-3823-41a1-a612-04679b50a999-00001.parquet
- 创建一个指向这个数据文件的清单文件 - sensordata/metadata/cd1171e3-d178-42d0-8f0d-634804f97a01-m0.avro
- 创建指向该清单文件的清单列表文件 - sensordata/metadata/snap-2251043931717096659-1-cd1171e3-d178-42d0-8f0d-634804f97a01.avro
- 基于当前的元数据文件(v1.metadata.json)创建新的元数据文件,并通过新快照s1跟踪先前的快照s0,指向此清单列表文件 - sensordata/metadata/v2.metadata.json
- 最后,当前元数据指针的值version-hint.text在目录中自动更新,现在指向这个新的元数据文件(v2.metadata.json)。
当一个事务commit完成之后,会生成metadata.json和Manifest文件。
优势就是数据以事务原子性的方式写入Iceberg表,但是不足恰好是,每次提交数据都要产生一次快照,这难免就产生的小文件。
1.3 快照保留周期过长Iceberg使用v[number].metadata.json文件跟踪表元数据。对表的每次更改操作都会生成一个新的元数据文件以提供原子性。
默认情况下,旧的元数据文件会保留历史记录。频繁提交的表,特别是在流作业写数据时,需要定期清理元数据文件。
- 每张表的write.metadata.delete-after-commit.enabled默认值为false,如果不设置为true,历史版本元数据就不会被删除。
- 每张表最大的快照保留数write.metadata.previous-versions-max,默认为100(可以看到100以内的每次Snapshot)。
Iceberg每一次操作都会产生多个数据文件(metadata、data、snapshot),需要自行合并清理。
2.1.1 建Iceberg新增with设置通过org.apache.iceberg.actions.RewriteDataFiles来实现小文件合并时,如果仅仅对Iceberg表的数据进行小文件合并,但是不开启write.metadata.delete-after-commit.enabled为true,历史不会被删除,开启后就会实现合并后清除历史文件。因此,建议设置合理的快照保存策略,write.metadata.previous-versions-max(历史文件保留最大值为2,metadata的里面文件数则始终保持为3个)。
# 启用提交后写入元数据删除
write.metadata.delete-after-commit.enabled=true
# 配置保留历史数量(比如配置为2,则元数据和数据都保留2份历史数据和1份最新数据)
write.metadata.previous-versions-max=2
CREATE TABLE sensordata_01(
sensor_id STRING,
ts BIGINT,
temperature DOUBLE,
dt STRING
) USING iceberg
PARTITIONED BY(dt)
TBLPROPERTIES ('write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='2');
-- 必须在spark sql下执行,FlinkSQL不支持
ALTER TABLE sensordata_01 SET TBLPROPERTIES ('write.metadata.delete-after-commit.enabled'='true');
ALTER TABLE sensordata_01 SET TBLPROPERTIES ('write.metadata.previous-versions-max'='2');
Iceberg跟踪表中的每个数据文件。更多的数据文件会导致更多的元数据存储在清单文件中,而小数据文件会导致不必要的元数据量和文件打开成本,从而降低查询数据的效率。
2.2.1 SparkIceberg可以使用带有操作的Spark并行压缩数据文件rewriteDataFiles。这会将小文件组合成更大的文件,以减少元数据开销和运行时文件打开成本。
Actions.forTable(table).rewriteDataFiles()
.targetSizeInBytes(128 * 1024 * 1024) // 128 MB
.execute()
Iceberg可以使用带有操作的Flink并行压缩数据文件rewriteDataFiles。这会将小文件组合成更大的文件,以减少元数据开销和运行时文件打开成本。
Actions.forTable(table)
.rewriteDataFiles()
.maxParallelism(1)
.targetSizeInBytes(128 * 1024 * 1024) // 128MB
.execute();
Iceberg在其清单列表和清单文件中使用元数据来加快查询计划并修剪不必要的数据文件。元数据树用作表数据的索引。
元数据树中的清单会按照它们添加的顺序自动压缩,当写入模式与读取过滤器对齐时,查询会更快。例如,在数据到达时写入每小时分区的数据与时间范围查询过滤器保持一致。
2.3.1 Spark当表的写入模式与查询模式不一致时,可以重写元数据以使用rewriteManifests操作将数据文件重新分组到清单中, Spark支持并行重写操作。
table.rewriteManifests()
.rewriteIf((file) -> file.length() < 32 * 1024 * 1024) // 32MB
.clusterBy((file) -> file.partition().get(0, String.class))
.commit();
table.rewriteManifests
.rewriteIf((File) => file.length < 32 * 1024 * 1024) // 32 MB
.clusterBy((file) => file.partition.get(0, classOf[String]))
.commit
在每次向Iceberg表写数据时,都会创建一个新的快照,快照可以用于基于时间旅行查询,或者将表回滚到历史上的某一有效快照上。建议定期删除过期快照,以删除不再需要的数据文件,使表元数据的最小且可用。
2.3.1 spark目前我们的应用场景只需要查询当前数据就可以了,不需要查询历史数据,所以我只保留了最新的快照。在每次压缩程序之后,做了处理,使当前快照时间以前的快照过期。程序会自动删除以前的过期数据文件。过期旧快照会将它们从元数据中删除,因此它们不再可用于时间旅行查询。而数据文件只有在不能被基于时间旅行查询之后,才会被删除。
val snapshot = table.currentSnapshot
if (snapshot != null) {
table.expireSnapshots.expireOlderThan(snapshot.timestampMillis).commit
}
过期的旧快照会将它们从元数据中删除,因此它们不再可用于时间旅行查询。
// 6 清除5分钟前的历史快照
Snapshot snapshot = table.currentSnapshot();
long oldSnapshot = snapshot.timestampMillis() - TimeUnit.MINUTES.toMillis(5);
if (snapshot != null) {
table.expireSnapshots().expireOlderThan(oldSnapshot).commit();
}
数据文件在不再被可用于时间旅行或回滚的快照引用之前不会被删除。定期过期的快照会删除未使用的数据文件。
三 源代码3.1 Spark(1)使用Spark编写的合并Iceberg表小文件,是快照过期,删除不用的数据文件。
package com.yunclass.iceberg.streaming
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.Table
import org.apache.iceberg.actions.Actions
import org.apache.iceberg.catalog.{Namespace, TableIdentifier}
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.spark.SparkCatalog
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object CombineTableFiles {
def main(args: Array[String]): Unit = {
// 1 设置执行账号
System.setProperty("HADOOP_USER_NAME", "bigdata")
// 2 配置SparkSession
val sparkConf = new SparkConf()
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set(s"spark.sql.catalog.hadoop_catalog",classOf[SparkCatalog].getName)
.set(s"spark.sql.catalog.hadoop_catalog.type","hadoop")
.set(s"spark.sql.catalog.hadoop_catalog.warehouse","hdfs://bigdata185:9000/dw/iceberg")
.setMaster("local[*]").setAppName("CombineTableFiles")
sparkConf.set("spark.executor.processTreeMetrics.enabled","true")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 3 获取TableLoader
val conf = new util.HashMap[String, String]
conf.put("type", "iceberg")
conf.put("catalog-type", "hadoop")
conf.put("warehouse", "hdfs://bigdata185:9000/dw/iceberg")
val hadoopCatalog = new HadoopCatalog(new Configuration())
hadoopCatalog.initialize("hadoop_catalog", conf)
val identifier = TableIdentifier.of(Namespace.of("db"), "sensordata_01")
val table = hadoopCatalog.loadTable(identifier)
// 调用合并小文件方法
// combineFiles(sparkSession, table)
deleteSnapshot(table)
}
// 合并小文件
def combineFiles(sparkSession: SparkSession, table: Table): Unit = {
Actions.forTable(sparkSession, table).rewriteDataFiles()
.filter(Expressions.equal("dt", "2022-01-20"))
.targetSizeInBytes(128 * 1024 * 1024)
.execute()
// 重新manifest文件
table.rewriteManifests()
.rewriteIf((file) => file.length() < 28 * 1024 * 1024)
.clusterBy((file) => file.partition().get(0, classOf[String]))
.commit()
}
// 删除快照信息
def deleteSnapshot(table: Table): Unit = {
val snapshot = table.currentSnapshot()
val oldSnapshot = snapshot.timestampMillis()
if (snapshot != null) {
table.expireSnapshots().expireOlderThan(oldSnapshot).commit()
}
}
}
(2)合并小文件之前的数据状态
(3)合并之后的状态,将小于128MB的文件,合并成大数据文件。
3.2 Flink
(1)使用Flink编写的合并Iceberg表小文件,是快照过期,删除不用的数据文件。
package com.yunclass.iceberg.streaming;
import com.sun.javafx.fxml.expression.Expression;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Expressions;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.actions.Actions;
import java.util.HashMap;
import java.util.Map;
public class CombineTableFileDemo {
public static void main(String[] args) {
// 1 设置执行用户
System.setProperty("HADOOP_USER_NAME", "bigdata");
// 2 获取Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 3 使用Hadoop Catalog模式加载Iceberg表
Map<String, String> icebergMap = new HashMap<>();
icebergMap.put("type", "iceberg");
icebergMap.put("catalog-type", "hadoop");
icebergMap.put("property-version", "1");
icebergMap.put("warehouse", "hdfs://bigdata185:9000/dw/iceberg");
// 获取catalogLoader
CatalogLoader hadoopCatalog = CatalogLoader.hadoop("hadoop_catalog", new Configuration(), icebergMap);
Catalog catalog = hadoopCatalog.loadCatalog();
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db"), "sensordata_01");
Table table = catalog.loadTable(tableIdentifier);
// 调用方法
// combineFiles(env, table);
deleteOldSnapshot(table);
}
// 合并小文件
private static void combineFiles(StreamExecutionEnvironment env, Table table) {
Actions.forTable(env, table)
.rewriteDataFiles()
.maxParallelism(1)
.targetSizeInBytes(128 * 1024 * 1024)
.execute();
// 重写Manifest文件
table.rewriteManifests()
.rewriteIf((file) -> file.length() < 32 * 1024 * 1024)
.clusterBy((file) -> file.partition().get(0, String.class))
.commit();
}
// 删除过期快照
private static void deleteOldSnapshot(Table table) {
Snapshot snapshot = table.currentSnapshot();
long oldSnapshot = snapshot.timestampMillis();
if (snapshot != null) {
table.expireSnapshots().expireOlderThan(oldSnapshot).commit();
}
}
}
(2)合并小文件之前的数据状态
(3)合并之后的状态,将小于32MB的文件,合并成大Manifest文件。
四 总结,