【分布式】冰山(Iceberg)与哈迪(Hudi)对比的基准测试

发布于:2025-03-23 ⋅ 阅读:(18) ⋅ 点赞:(0)

推荐超级课程:

引言

分布式系统行业正迅速向表格式(TableFormats)和湖仓架构(Lakehouse Architecture)靠拢,以实现数据湖(Data Lakes)的现代化。

本文探讨了本实验室的基准测试练习,比较了最突出的两种表格式:Hudi和Iceberg的性能。它分析了各种基准测试配置文件观察到的高层结果,提供了数据特征如何影响表格式最佳选择的见解。

尽管本实验室选择了Iceberg作为其首选的表格式,但本文试图比较不同配置文件下Hudi和Iceberg的性能数据,以评估这两种格式在性能方面的相互比较。

我们的基准测试练习包括了行业标准基准测试如TPC-DS和自定义测试如Upserts。这种全面的方法使读者能够针对特定的工作负载和需求,做出关于最合适的表格式的明智决策。

什么是湖仓(Lakehouse)?

在大数据技术中,“湖仓”一词指的是一种数据架构,旨在结合数据湖和数据仓库的优点。

  1. 数据湖:存储大量原始数据的存储库。
  2. 数据仓库:为快速高效查询而设计的存储库,用于存储结构化数据。
  3. 湖仓:通过以下方式结合数据湖和数据仓库的优点的一种数据架构:
    • 提供ACID保证
    • 支持索引以加快查询性能
    • 拥有高效的元数据层
    • 提供变更数据流,适用于小批量更新和增量更新。

什么是表格式(TableFormat)?

表格式:是常规数据文件上的元数据。

在这里插入图片描述

理解湖仓与表格式之间的关系:

在这里插入图片描述

文件范围

本次基准测试调查了表格式对数据湖仓性能的影响。通过精心选择表格式,我们可以因为额外的元数据而提高同一数据集的读取性能。虽然写入性能可能会有轻微的下降,但包括读取和计算成本在内的整体影响是显著积极的。我们利用TPC-DS基准测试来评估不同的表格式,并识别数据湖仓环境中各种场景下的最合适选项。

什么是TPC-DS?

TPC-DS是由事务处理性能委员会(TPC)定义的数据仓库基准。TPC是一个非营利组织,由数据库社区于1980年代末成立,旨在开发可以客观地用于测试数据库系统性能的基准,通过模拟现实世界的场景。TPC对数据库行业产生了重大影响。

TPC-DS中的“DS”代表“决策支持”。TPCDS基准套件共有99个查询,范围从简单的聚合到高级模式分析。

基准测试是如何进行的?

以下是进行基准测试练习的步骤:

  1. 选择各种候选配置文件。
  2. 确定现成的开源工具以模拟广泛知名的基准测试,如TPC-DS。注意:为了保持基准测试过程的一致性和高效性,我们选择使用GitHub上托管的Delta开源基准测试。GitHub
  3. 设置我们的基础集群。
  4. 运行基准测试。

基础设施堆栈信息:

在这里插入图片描述

此外,堆栈还包括以下自定义项:

  • Iceberg自定义:
    Iceberg的自定义是通过以下pull request实现的。用于Iceberg的配置可以在这里找到#b899
  • Hudi自定义:
    同样,Hudi的自定义是通过以下pull request实现的。这一实现的灵感来自于Onehouse发布的pull request,以实现Hudi的最佳性能。我们基准测试过程中使用的Hudi配置可以在这里找到#d6d6
  • 使用Spark作为执行引擎:
    我们在自管理的Hadoop集群中使用一些特定的配置来编排和执行所有基准测试。这帮助我们有效地管理和执行测试。
  • 数据存储在GCS桶中:
    基准测试数据存储在Google Cloud Storage (GCS)桶中,以便在测试过程中高效访问和扩展。

基准测试与分析:

我们对以下配置文件进行了基准测试:1 GB、1 TB 和 100 TB。我们注意到在加载配置文件中,Iceberg始终比Hudi表现更好。对于查询配置文件,直到1 TB时Iceberg表现更好,而在100 TB配置文件中,其性能略有下降。

我们分类结果的快照:
在这里插入图片描述

加载:在我们的基准测试中,Iceberg表现更好

对于加载测试,我们运行了TPCDS加载查询并汇总了结果。对于加载配置文件,我们取了总加载创建时间和总加载删除时间。尽管在100TB时,图表可能显示Iceberg较慢,但删除查询通常非常快,所以图表中显示的3秒差异看起来很大,但在总体加载聚合结果(创建+删除的结果之和)中,Iceberg比Hudi要好得多。

我们还检查了在Iceberg中比Hudi表现更好的加载查询数量。在90%的情况下,Iceberg的加载查询表现比Hudi好。
在这里插入图片描述

查询:在我们的基准测试中,Hudi的表现略微优于Iceberg。

对于查询配置文件,我们为TPCDS套件中的所有查询各运行了3次迭代,并采取了以下指标:每个查询的平均查询时间。我们将所有查询的平均数相加以得到总平均查询时间。

在这里插入图片描述

总体结果:查询 + 加载:在我们的基准测试中,Hudi的表现略微优于Iceberg。

对于查询配置文件,我们为所有查询各运行了3次迭代,并采取了以下指标:每个查询的平均查询时间。我们将所有查询的平均数相加以得到总平均查询时间,并加上加载时间以计算总体时间。在1 TB时,Iceberg比Hudi快5.3%,而在100 TB时,总体结果显示Iceberg比Hudi慢6.7%。
在这里插入图片描述

查询带结果:在我们的基准测试中,Iceberg和Hudi表现相似

对于查询配置文件,我们为所有查询各运行了3次迭代,并采取了以下指标:

  1. 每个查询的平均查询时间

  2. 每个查询的第一次迭代查询时间

这种分析专门用于了解在每种表格式中,有多少查询比另一种表现更好,以及好多少百分比。这帮助我们理解,即使Iceberg总体上较慢,但在查询方面,Iceberg表现更好的查询与Hudi表现更好的查询之间的划分是怎样的?

我们定义了一个指标:

性能比(PR):Iceberg运行查询的挂钟时间 / Hudi运行相同查询的挂钟时间。

之后,我们将结果分为以下几个区间:

  • PR < 0.5 ⇒ Iceberg比Hudi好很多
  • 0.5 <= PR < 0.66 ⇒ Iceberg比Hudi好得多
  • 0.66 <= PR < 0.9 ⇒ Iceberg比Hudi好
  • 0.9 <= PR <= 1 ⇒ Iceberg比Hudi略微好
  • 1 < PR <= 1.1 ⇒ Hudi比Iceberg略微好
  • 1.1 < PR <= 1.5 ⇒ Hudi比Iceberg好
  • 1.5 < PR <= 2 ⇒ Hudi比Iceberg好得多
  • PR > 2 ⇒ Hudi比Iceberg好很多

因此,这表明即使Iceberg比Hudi慢13.5%,如果我们比较Iceberg表现更好的查询与Hudi表现更好的查询的划分,它们几乎相似。但是正如我们之前的基准测试练习所讨论的,如果我们只考虑查询性能,那么Hudi比Iceberg好13.5%。我们进一步深入研究了在Iceberg中表现最差的查询。

100TB查询结果(不包括q9、q16和q64):在我们的基准测试中,Hudi的性能略优于Iceberg。

我们深入研究了q9、q16和q64这三个查询,因为它们在Iceberg和Hudi之间的性能差异最大(Iceberg落后较多),但最终由于以下观察结果,我们决定从结果中排除这些查询:

Q9(Iceberg慢60%):

  • 问题:Iceberg不会重用子查询,而Hudi会。
  • 分析 → 有一个替代方案,用户可以使用WITH子句并提供他们希望重用的子查询作为WITH子句的一部分,Iceberg和Hudi的性能在这种情况下相似。

Q16(Iceberg慢95%):

  • 问题:Iceberg中的exists子句不是最优的。
  • 分析 → 我们没有深入分析这里的exists模式,因为我们认为这个问题与Iceberg的Spark扩展jar有关,应该可以通过在Iceberg方面投入更多时间来修复。

Q64(Iceberg慢77%):

  • 问题:Iceberg生成的子查询数量较少,这阻止了Iceberg在洗牌之前过滤行,而Hudi则不会。
  • 分析 → 我们没有深入分析这个模式,因为我们认为这个问题与Iceberg的Spark扩展jar有关,应该可以通过在Iceberg方面投入更多时间来修复。我们还观察到,在1TB的数据配置文件上,相同的查询在Iceberg和Hudi上的表现相似,但是在100TB的数据配置文件上性能下降,所以影响范围也是部分的,并且只影响更高的数据配置文件。

在排除了这三个查询之后,Iceberg的查询结果慢了9.4%,而包括它们时,Iceberg慢了13.5%,如上所述。

100TB计算/成本结果(不包括q9、q16和q64):在我们的基准测试中,Hudi的性能略优于Iceberg。

TPCDS套件默认只提供基于挂钟时间的分析,我们进一步分析了配置文件使用的计算资源,而不仅仅是测量完成查询的时间。由于上述原因,我们从结果中排除了q9、q16和q64查询。
在这里插入图片描述

更新基准测试

TPCDS基准测试包括帮助我们评估加载性能的数据加载场景,类似于分布式系统中的仅插入用例。本博客的前几部分主要关注于查询和插入性能的基准测试。在分布式系统中,更新操作也被大量使用。为了评估更新性能,我们使用模拟数据对小型数据大小进行了额外的基准测试。

模拟数据详情:

  • 数据大小:24 GB
  • 数据集中的总列数:110(包含嵌套列)
  • 数据集中的总行数:46268189

这确保了我们能够衡量如果将来用于小型批量或近流式用例,表格式将如何表现。

这个基准测试默认不包括在TPC-DS基准测试套件中。

我们是如何进行这个测试的?

  1. 使用Spark SQL在所需表格式中创建一个带有嵌套架构的表。
  2. 在表中插入模拟数据行。
  3. 使用Spark SQL中的MERGE语法对表格式进行更新,插入与步骤2中插入的大小相同的模拟新数据批次,包含部分更新和部分插入。

更新基准测试结果:在我们的基准测试中,Iceberg的性能优于Hudi

在这里插入图片描述

结论

Iceberg识别的差距

基准测试结论:Iceberg比Hudi略微慢

  1. 如果考虑挂钟时间(对于100TB配置文件,不包括q9、q16和q64),Iceberg的查询性能比Hudi9.4%
  2. 如果考虑计算GBHrs(对于100TB配置文件,不包括q9、q16和q64),Iceberg的查询性能比Hudi13%
  3. TPCDS基准测试套件中的3个查询(q9、q16和q64)被跳过,因为:
  • 可以通过它们的替代方案避免这些模式,在Iceberg和Hudi表现相似的情况下,或者
  • 影响或爆炸半径有限,或者
  • 通过对Iceberg Spark层进行一些更改可以修复它们。

附录:

  • Iceberg基准测试配置:

== Iceberg配置 ==

Iceberg数据加载的表配置:

🔗’write.spark.fanout.enabled’=’true’

🔗’format-version’=2

🔗’write.parquet.compression-codec’=’snappy’

== Iceberg配置覆盖 ==

🔗”spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions”

🔗”spark.sql.catalog.hiveXXX_cat=org.apache.iceberg.spark.SparkCatalog”

🔗”spark.sql.catalog.hiveXXX_cat.type=hive”

🔗”spark.driver.memory=5120m”

🔗”spark.executor.memory=10240m”

  • Hudi基准测试配置:

== Hudi配置 ==

Hudi数据加载的表配置:

🔗option(“hoodie.datasource.write.precombine.field”, “”)
🔗配置(“hoodie.datasource.write.recordkey.field”, primaryKeys.mkString(“,”)) // 将主键字段转换为逗号分隔的字符串

🔗配置(“hoodie.datasource.write.partitionpath.field”, partitionFields) // 指定分区字段

🔗配置(“hoodie.datasource.write.keygenerator.class”, keygenClass) // 指定键生成器类

🔗配置(“hoodie.table.name”, tableName) // 指定表名

🔗配置(“hoodie.datasource.write.table.name”, tableName) // 指定数据源写入的表名

🔗配置(“hoodie.datasource.write.hive_style_partitioning”, “true”) // 启用Hive风格的分区

🔗配置(“hoodie.datasource.write.operation”, “bulk_insert”) // 指定写入操作为批量插入

🔗配置(“hoodie.combine.before.insert”, “false”) // 在插入前不进行合并

🔗配置(“hoodie.bulkinsert.sort.mode”, “NONE”) // 批量插入不进行排序

🔗配置(“hoodie.parquet.compression.codec”, “snappy”) // 指定Parquet文件的压缩编解码器为snappy

🔗配置(“hoodie.parquet.writelegacyformat.enabled”, “false”) // 不启用Parquet的旧格式写入

🔗配置(“hoodie.metadata.enable”, “false”) // 不启用元数据

🔗配置(“hoodie.populate.meta.fields”, “false”) // 不填充元数据字段

🔗配置(“hoodie.parquet.max.file.size”, “141557760”) // 135Mb // 指定Parquet文件的最大大小

🔗配置(“hoodie.parquet.block.size”, “141557760”) // 135Mb // 指定Parquet文件的块大小

== Hudi配置覆盖 ==

🔗"spark.serializer=org.apache.spark.serializer.KryoSerializer", // 指定Spark序列化器为KryoSerializer

🔗"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension", // 添加Hudi的Spark会话扩展

🔗"spark.driver.memory=5120m", // 设置Spark驱动程序的内存为5120m

🔗"spark.executor.memory=10240m" // 设置Spark执行器的内存为10240m

  • Spark配置:

== Spark配置 ==

🔗spark.cleaner.ttl: 86400 // 设置Spark清理器的生存时间为86400秒

🔗spark.delta.logStore.gs.impl: io.delta.storage.GCSLogStore // 设置Delta日志存储的GCS实现

🔗spark.driver.cores: 1 // 设置Spark驱动程序的核心数为1

🔗spark.driver.extraJavaOptions: -Denv=prod -Dcom.sun.management.jmxremote.port=0 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=0 -XX:MaxDirectMemorySize=800M -XX:MaxMetaspaceSize=256M -XX:CompressedClassSpaceSize=100M -XX:+UnlockDiagnosticVMOptions -Djob.numOfRePartitions=30 // 设置Spark驱动程序的额外Java选项

🔗spark.driver.memory: 5120m // 设置Spark驱动程序的内存为5120m

🔗spark.driver.memoryOverhead: 4096 // 设置Spark驱动程序的内存开销为4096

🔗spark.dynamicAllocation.enabled: true // 启用Spark的动态分配

🔗spark.dynamicAllocation.executorIdleTimeout: 60s // 设置Spark执行器空闲超时时间为60秒

🔗spark.dynamicAllocation.maxExecutors: 200 // 设置Spark最大执行器数量为200

🔗spark.eventLog.enabled: true // 启用Spark事件日志

🔗spark.executor.cores: 1 // 设置Spark执行器的核心数为1

🔗spark.executor.extraJavaOptions: -Denv=prod -Dcom.sun.management.jmxremote.port=0 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=0 -XX:MaxDirectMemorySize=800M -XX:MaxMetaspaceSize=256M -XX:CompressedClassSpaceSize=100M -XX:+UnlockDiagnosticVMOptions -Djob.numOfRePartitions=30 // 设置Spark执行器的额外Java选项

🔗spark.executor.id: driver // 设置Spark执行器的ID为driver

🔗spark.executor.memory: 10240m // 设置Spark执行器的内存为10240m

🔗spark.executor.memoryOverhead: 4096 // 设置Spark执行器的内存开销为4096

🔗spark.hadoop.fs.s3.useRequesterPaysHeader: true // 在S3上使用请求者付费头部

🔗spark.hadoop.yarn.timeline-service.enabled: false // 禁用Yarn时间线服务

🔗spark.history.fs.cleaner.interval: 1d // 设置Spark历史文件系统清理器间隔为1天

🔗spark.history.fs.cleaner.maxAge: 60d // 设置Spark历史文件系统清理器最大年龄为60天

🔗spark.history.provider: org.apache.spark.deploy.history.FsHistoryProvider // 设置Spark历史提供者为FsHistoryProvider

🔗spark.master: yarn // 设置Spark主节点为Yarn

🔗spark.shuffle.service.enabled: true // 启用Spark洗牌服务

🔗spark.shuffle.useOldFetchProtocol: true // 使用旧的Spark洗牌获取协议

🔗spark.sql.catalog.hive_cat: org.apache.iceberg.spark.SparkCatalog // 设置Hive的Spark目录为Iceberg的Spark目录

🔗spark.sql.catalog.hive_cat.type: hive // 设置Hive目录类型为Hive

🔗spark.sql.catalogImplementation: hive // 设置Spark SQL目录实现为Hive

🔗spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions // 添加Iceberg的Spark会话扩展

🔗spark.streaming.concurrentJobs: 4 // 设置Spark流处理的并发作业数为4

🔗spark.submit.deployMode: client // 设置Spark提交部署模式为客户端

🔗spark.yarn.report.interval: 60s // 设置Spark在Yarn上的报告间隔为60秒