1. 简介

hudi采用的是mvcc设计,提供了清理工具cleaner来把旧版本的文件分片删除,默认开启了清理功能,可以防止文件系统的存储空间和文件数量的无限增长。

1.1 环境

1.2 清理保留策略

清理旧文件需要考虑数据查询的情况,有些长查询会占用着旧版本的文件,需要设置合适的清理策略来保留一定数量的commit或者文件版本,以提高系统的容错性

1.3 清理触发策略

目前仅支持一种触发清理的策略:CleaningTriggerStrategy#NUM_COMMITS,即根据提交的次数,默认为1,可以通过设置参数hoodie.clean.max.commits​进行修改,在flink job的每次checkpoint时都会进行触发策略的条件判断,所以在两次chekpoint之间发生过1次或n次提交,都会触发清理动作。

2. 清理流程分析

2.1 清理器(cleaner)初始化

清理逻辑是被包装成一个flink sink,在HoodieTableSink#getSinkRuntimeProvider中进行初始化

if (StreamerUtil.needsAsyncCompaction(conf)) { return Pipelines.compact(conf, pipeline); } else { return Pipelines.clean(conf, pipeline); }

2.2 清理启动入口

  1. compact成功后同步清理

需要满足条件:1)mor表,2)启用异步合并compaction.async.enabled,3)禁用异步清理clean.async.enabled。入代码在CompactionCommitSink#doCommit中:

if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { this.writeClient.clean(); }

  1. chenpoint时异步清理

需要满足条件:1)非mor表或启用异步合并compaction.async.enabled,2)启用异步清理clean.async.enabled。入口代码在CleanFunction#snapshotState中:

if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) { this.writeClient.startAsyncCleaning(); this.isCleaning = true; }

AsyncCleanerService#startService会启动一个线程放到线程池中执行

2.3 清理逻辑执行

清理逻辑的流程控制在基类方法BaseHoodieWriteClient#clean中,主要包含有三个步骤:生成清理计划、刷新ActiveTimeline、执行清理计划

//生成清理计划 scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN); //刷新ActiveTimeline缓存 table.getMetaClient().reloadActiveTimeline(); //执行清理计划 metadata = table.clean(context, cleanInstantTime, skipLocking);

2.3.1 生成清理计划

scheduleTableServiceInternal是通用的方法,根据不同的tableServiceType(clean,compact,archive,cluster)调用对应的ActionExecutor去生成plan,CleanPlanActionExecutor#execute:

if (!needsCleaning(config.getCleaningTriggerStrategy())) { return Option.empty(); } return requestClean(instantTime);

先判断是否满足1.3清理触发策略,不满足表示无需清理,否则在CleanPlanActionExecutor#requestClean中生成清理计划:

final HoodieCleanerPlan cleanerPlan = requestClean(context); table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));

  1. 扫描.hoodie目录结合1.2清理保留策略计算需要保留的最小的HoodieInstant(earliestInstant),进而找到所有需要清理的分区和分区下的文件(HoodieCleanFileInfo)
  2. 将清理计划cleanerPlan序列化成arvo文件,并在.hoodie目录保存成xxx.clean.requested文件

2.3.2 刷新ActiveTimeline缓存

因为如果清理计划生成成功,表元数据目录.hoodie下会增加instant[action=clean,state=requested]文件,由于ActiveTimeline频繁被读取,为了避免每次从文件系统加载,需要实时保持内存与文件系统的元数据同步。

2.3.3 执行清理计划

也是调用对应的ActiveExecutor去执行清理,实现在CleanActionExecutor#execute:

List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline().filterInflightsAndRequested().getInstants().collect(Collectors.toList()); pendingCleanInstants.forEach(hoodieInstant -> { cleanMetadataList.add(runPendingClean(table, hoodieInstant)); table.getMetaClient().reloadActiveTimeline(); });

从ActiveTimeline中过滤状态为requested和inflight的instant,这两个状态都是需要执行清理的

HoodieCleanMetadata runPendingClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant) { HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant); return runClean(table, cleanInstant, cleanerPlan); }

将instant文件(requested,inflight)反序列化为清理计划,然后进入runClean

private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) { final HoodieInstant inflightInstant; if (cleanInstant.isRequested()) { inflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); } else { inflightInstant = cleanInstant; } List<HoodieCleanStat> cleanStats = clean(context, cleanerPlan); table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,TimelineMetadataUtils.serializeCleanMetadata(metadata)); }

  1. 如果处理的instant状态为requested需要先转换为inflight状态(生成xxx.clean.inflight文件),表示开始清理。
  2. 执行清理clean(context, cleanerPlan),根据清理计划的数据进行文件删除即可,首先删除每个分区下需要清理的文件,然后删除需清理的分区目录,最后收集统计数据返回。
  3. 清理成功后将infight状态转换为completed状态,表示清理完成。

3. 整体流程图

如何用clean清理磁盘(hudi系列-旧文件清理)(1)

,