推荐超级课程:
引言
分布式系统行业正迅速向表格式(TableFormats)和湖仓架构(Lakehouse Architecture)靠拢,以实现数据湖(Data Lakes)的现代化。
本文探讨了本实验室的基准测试练习,比较了最突出的两种表格式:Hudi和Iceberg的性能。它分析了各种基准测试配置文件观察到的高层结果,提供了数据特征如何影响表格式最佳选择的见解。
尽管本实验室选择了Iceberg作为其首选的表格式,但本文试图比较不同配置文件下Hudi和Iceberg的性能数据,以评估这两种格式在性能方面的相互比较。
我们的基准测试练习包括了行业标准基准测试如TPC-DS和自定义测试如Upserts。这种全面的方法使读者能够针对特定的工作负载和需求,做出关于最合适的表格式的明智决策。
什么是湖仓(Lakehouse)?
在大数据技术中,“湖仓”一词指的是一种数据架构,旨在结合数据湖和数据仓库的优点。
- 数据湖:存储大量原始数据的存储库。
- 数据仓库:为快速高效查询而设计的存储库,用于存储结构化数据。
- 湖仓:通过以下方式结合数据湖和数据仓库的优点的一种数据架构:
- 提供ACID保证
- 支持索引以加快查询性能
- 拥有高效的元数据层
- 提供变更数据流,适用于小批量更新和增量更新。
什么是表格式(TableFormat)?
表格式:是常规数据文件上的元数据。
理解湖仓与表格式之间的关系:
文件范围
本次基准测试调查了表格式对数据湖仓性能的影响。通过精心选择表格式,我们可以因为额外的元数据而提高同一数据集的读取性能。虽然写入性能可能会有轻微的下降,但包括读取和计算成本在内的整体影响是显著积极的。我们利用TPC-DS基准测试来评估不同的表格式,并识别数据湖仓环境中各种场景下的最合适选项。
什么是TPC-DS?
TPC-DS是由事务处理性能委员会(TPC)定义的数据仓库基准。TPC是一个非营利组织,由数据库社区于1980年代末成立,旨在开发可以客观地用于测试数据库系统性能的基准,通过模拟现实世界的场景。TPC对数据库行业产生了重大影响。
TPC-DS中的“DS”代表“决策支持”。TPCDS基准套件共有99个查询,范围从简单的聚合到高级模式分析。
基准测试是如何进行的?
以下是进行基准测试练习的步骤:
- 选择各种候选配置文件。
- 确定现成的开源工具以模拟广泛知名的基准测试,如TPC-DS。注意:为了保持基准测试过程的一致性和高效性,我们选择使用GitHub上托管的Delta开源基准测试。GitHub。
- 设置我们的基础集群。
- 运行基准测试。
基础设施堆栈信息:
此外,堆栈还包括以下自定义项:
- Iceberg自定义:
Iceberg的自定义是通过以下pull request实现的。用于Iceberg的配置可以在这里找到#b899。 - Hudi自定义:
同样,Hudi的自定义是通过以下pull request实现的。这一实现的灵感来自于Onehouse发布的pull request,以实现Hudi的最佳性能。我们基准测试过程中使用的Hudi配置可以在这里找到#d6d6。 - 使用Spark作为执行引擎:
我们在自管理的Hadoop集群中使用一些特定的配置来编排和执行所有基准测试。这帮助我们有效地管理和执行测试。 - 数据存储在GCS桶中:
基准测试数据存储在Google Cloud Storage (GCS)桶中,以便在测试过程中高效访问和扩展。
基准测试与分析:
我们对以下配置文件进行了基准测试:1 GB、1 TB 和 100 TB。我们注意到在加载配置文件中,Iceberg始终比Hudi表现更好。对于查询配置文件,直到1 TB时Iceberg表现更好,而在100 TB配置文件中,其性能略有下降。
我们分类结果的快照:
加载:在我们的基准测试中,Iceberg表现更好
对于加载测试,我们运行了TPCDS加载查询并汇总了结果。对于加载配置文件,我们取了总加载创建时间和总加载删除时间。尽管在100TB时,图表可能显示Iceberg较慢,但删除查询通常非常快,所以图表中显示的3秒差异看起来很大,但在总体加载聚合结果(创建+删除的结果之和)中,Iceberg比Hudi要好得多。
我们还检查了在Iceberg中比Hudi表现更好的加载查询数量。在90%的情况下,Iceberg的加载查询表现比Hudi好。
查询:在我们的基准测试中,Hudi的表现略微优于Iceberg。
对于查询配置文件,我们为TPCDS套件中的所有查询各运行了3次迭代,并采取了以下指标:每个查询的平均查询时间。我们将所有查询的平均数相加以得到总平均查询时间。
总体结果:查询 + 加载:在我们的基准测试中,Hudi的表现略微优于Iceberg。
对于查询配置文件,我们为所有查询各运行了3次迭代,并采取了以下指标:每个查询的平均查询时间。我们将所有查询的平均数相加以得到总平均查询时间,并加上加载时间以计算总体时间。在1 TB时,Iceberg比Hudi快5.3%,而在100 TB时,总体结果显示Iceberg比Hudi慢6.7%。
查询带结果:在我们的基准测试中,Iceberg和Hudi表现相似
对于查询配置文件,我们为所有查询各运行了3次迭代,并采取了以下指标:
每个查询的平均查询时间
每个查询的第一次迭代查询时间
这种分析专门用于了解在每种表格式中,有多少查询比另一种表现更好,以及好多少百分比。这帮助我们理解,即使Iceberg总体上较慢,但在查询方面,Iceberg表现更好的查询与Hudi表现更好的查询之间的划分是怎样的?
我们定义了一个指标:
性能比(PR):Iceberg运行查询的挂钟时间 / Hudi运行相同查询的挂钟时间。
之后,我们将结果分为以下几个区间:
- PR < 0.5 ⇒ Iceberg比Hudi好很多
- 0.5 <= PR < 0.66 ⇒ Iceberg比Hudi好得多
- 0.66 <= PR < 0.9 ⇒ Iceberg比Hudi好
- 0.9 <= PR <= 1 ⇒ Iceberg比Hudi略微好
- 1 < PR <= 1.1 ⇒ Hudi比Iceberg略微好
- 1.1 < PR <= 1.5 ⇒ Hudi比Iceberg好
- 1.5 < PR <= 2 ⇒ Hudi比Iceberg好得多
- PR > 2 ⇒ Hudi比Iceberg好很多
因此,这表明即使Iceberg比Hudi慢13.5%,如果我们比较Iceberg表现更好的查询与Hudi表现更好的查询的划分,它们几乎相似。但是正如我们之前的基准测试练习所讨论的,如果我们只考虑查询性能,那么Hudi比Iceberg好13.5%。我们进一步深入研究了在Iceberg中表现最差的查询。
100TB查询结果(不包括q9、q16和q64):在我们的基准测试中,Hudi的性能略优于Iceberg。
我们深入研究了q9、q16和q64这三个查询,因为它们在Iceberg和Hudi之间的性能差异最大(Iceberg落后较多),但最终由于以下观察结果,我们决定从结果中排除这些查询:
Q9(Iceberg慢60%):
- 问题:Iceberg不会重用子查询,而Hudi会。
- 分析 → 有一个替代方案,用户可以使用WITH子句并提供他们希望重用的子查询作为WITH子句的一部分,Iceberg和Hudi的性能在这种情况下相似。
Q16(Iceberg慢95%):
- 问题:Iceberg中的exists子句不是最优的。
- 分析 → 我们没有深入分析这里的exists模式,因为我们认为这个问题与Iceberg的Spark扩展jar有关,应该可以通过在Iceberg方面投入更多时间来修复。
Q64(Iceberg慢77%):
- 问题:Iceberg生成的子查询数量较少,这阻止了Iceberg在洗牌之前过滤行,而Hudi则不会。
- 分析 → 我们没有深入分析这个模式,因为我们认为这个问题与Iceberg的Spark扩展jar有关,应该可以通过在Iceberg方面投入更多时间来修复。我们还观察到,在1TB的数据配置文件上,相同的查询在Iceberg和Hudi上的表现相似,但是在100TB的数据配置文件上性能下降,所以影响范围也是部分的,并且只影响更高的数据配置文件。
在排除了这三个查询之后,Iceberg的查询结果慢了9.4%,而包括它们时,Iceberg慢了13.5%,如上所述。
100TB计算/成本结果(不包括q9、q16和q64):在我们的基准测试中,Hudi的性能略优于Iceberg。
TPCDS套件默认只提供基于挂钟时间的分析,我们进一步分析了配置文件使用的计算资源,而不仅仅是测量完成查询的时间。由于上述原因,我们从结果中排除了q9、q16和q64查询。
更新基准测试
TPCDS基准测试包括帮助我们评估加载性能的数据加载场景,类似于分布式系统中的仅插入用例。本博客的前几部分主要关注于查询和插入性能的基准测试。在分布式系统中,更新操作也被大量使用。为了评估更新性能,我们使用模拟数据对小型数据大小进行了额外的基准测试。
模拟数据详情:
- 数据大小:24 GB
- 数据集中的总列数:110(包含嵌套列)
- 数据集中的总行数:46268189
这确保了我们能够衡量如果将来用于小型批量或近流式用例,表格式将如何表现。
这个基准测试默认不包括在TPC-DS基准测试套件中。
我们是如何进行这个测试的?
- 使用Spark SQL在所需表格式中创建一个带有嵌套架构的表。
- 在表中插入模拟数据行。
- 使用Spark SQL中的MERGE语法对表格式进行更新,插入与步骤2中插入的大小相同的模拟新数据批次,包含部分更新和部分插入。
更新基准测试结果:在我们的基准测试中,Iceberg的性能优于Hudi
结论
Iceberg识别的差距
- Iceberg确实调用了getBlockLocation来在数据所在的节点上定位计算任务,以实现更好的数据局部性。这导致了更高的初始启动时间。这个问题在对象存储如GCS上没有观察到。
- Iceberg没有将过滤器推送到Parquet层,在许多查询中表现不佳:https://github.com/apache/iceberg/pull/9479* 在某些情况下,与Hudi相比,Iceberg的子查询数量较少,从而导致数据过滤效果差和更大的数据洗牌。观察到,使用1TB数据时,Iceberg中创建了更多的子查询,但当数据超过1TB时,子查询数量减少,Iceberg的表现变差:TPCDS套件中的q64查询也遇到了同样的问题。
- 使用exists子句的查询在Hudi中表现更好:TPCDS套件中的q16查询也遇到了同样的问题。
- Iceberg没有有效地重用子查询:TPCDS套件中的q9查询也遇到了同样的问题。
基准测试结论:Iceberg比Hudi略微慢
- 如果考虑挂钟时间(对于100TB配置文件,不包括q9、q16和q64),Iceberg的查询性能比Hudi慢9.4%。
- 如果考虑计算GBHrs(对于100TB配置文件,不包括q9、q16和q64),Iceberg的查询性能比Hudi差13%。
- TPCDS基准测试套件中的3个查询(q9、q16和q64)被跳过,因为:
- 可以通过它们的替代方案避免这些模式,在Iceberg和Hudi表现相似的情况下,或者
- 影响或爆炸半径有限,或者
- 通过对Iceberg Spark层进行一些更改可以修复它们。
附录:
- Iceberg基准测试配置:
== Iceberg配置 ==
Iceberg数据加载的表配置:
🔗’write.spark.fanout.enabled’=’true’
🔗’format-version’=2
🔗’write.parquet.compression-codec’=’snappy’
== Iceberg配置覆盖 ==
🔗”spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions”
🔗”spark.sql.catalog.hiveXXX_cat=org.apache.iceberg.spark.SparkCatalog”
🔗”spark.sql.catalog.hiveXXX_cat.type=hive”
🔗”spark.driver.memory=5120m”
🔗”spark.executor.memory=10240m”
- Hudi基准测试配置:
== Hudi配置 ==
Hudi数据加载的表配置:
🔗option(“hoodie.datasource.write.precombine.field”, “”)
🔗配置(“hoodie.datasource.write.recordkey.field”, primaryKeys.mkString(“,”)) // 将主键字段转换为逗号分隔的字符串🔗配置(“hoodie.datasource.write.partitionpath.field”, partitionFields) // 指定分区字段
🔗配置(“hoodie.datasource.write.keygenerator.class”, keygenClass) // 指定键生成器类
🔗配置(“hoodie.table.name”, tableName) // 指定表名
🔗配置(“hoodie.datasource.write.table.name”, tableName) // 指定数据源写入的表名
🔗配置(“hoodie.datasource.write.hive_style_partitioning”, “true”) // 启用Hive风格的分区
🔗配置(“hoodie.datasource.write.operation”, “bulk_insert”) // 指定写入操作为批量插入
🔗配置(“hoodie.combine.before.insert”, “false”) // 在插入前不进行合并
🔗配置(“hoodie.bulkinsert.sort.mode”, “NONE”) // 批量插入不进行排序
🔗配置(“hoodie.parquet.compression.codec”, “snappy”) // 指定Parquet文件的压缩编解码器为snappy
🔗配置(“hoodie.parquet.writelegacyformat.enabled”, “false”) // 不启用Parquet的旧格式写入
🔗配置(“hoodie.metadata.enable”, “false”) // 不启用元数据
🔗配置(“hoodie.populate.meta.fields”, “false”) // 不填充元数据字段
🔗配置(“hoodie.parquet.max.file.size”, “141557760”) // 135Mb // 指定Parquet文件的最大大小
🔗配置(“hoodie.parquet.block.size”, “141557760”) // 135Mb // 指定Parquet文件的块大小
== Hudi配置覆盖 ==
🔗"spark.serializer=org.apache.spark.serializer.KryoSerializer", // 指定Spark序列化器为KryoSerializer
🔗"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension", // 添加Hudi的Spark会话扩展
🔗"spark.driver.memory=5120m", // 设置Spark驱动程序的内存为5120m
🔗"spark.executor.memory=10240m" // 设置Spark执行器的内存为10240m
- Spark配置:
== Spark配置 ==
🔗spark.cleaner.ttl: 86400 // 设置Spark清理器的生存时间为86400秒
🔗spark.delta.logStore.gs.impl: io.delta.storage.GCSLogStore // 设置Delta日志存储的GCS实现
🔗spark.driver.cores: 1 // 设置Spark驱动程序的核心数为1
🔗spark.driver.extraJavaOptions: -Denv=prod -Dcom.sun.management.jmxremote.port=0 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=0 -XX:MaxDirectMemorySize=800M -XX:MaxMetaspaceSize=256M -XX:CompressedClassSpaceSize=100M -XX:+UnlockDiagnosticVMOptions -Djob.numOfRePartitions=30 // 设置Spark驱动程序的额外Java选项
🔗spark.driver.memory: 5120m // 设置Spark驱动程序的内存为5120m
🔗spark.driver.memoryOverhead: 4096 // 设置Spark驱动程序的内存开销为4096
🔗spark.dynamicAllocation.enabled: true // 启用Spark的动态分配
🔗spark.dynamicAllocation.executorIdleTimeout: 60s // 设置Spark执行器空闲超时时间为60秒
🔗spark.dynamicAllocation.maxExecutors: 200 // 设置Spark最大执行器数量为200
🔗spark.eventLog.enabled: true // 启用Spark事件日志
🔗spark.executor.cores: 1 // 设置Spark执行器的核心数为1
🔗spark.executor.extraJavaOptions: -Denv=prod -Dcom.sun.management.jmxremote.port=0 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=0 -XX:MaxDirectMemorySize=800M -XX:MaxMetaspaceSize=256M -XX:CompressedClassSpaceSize=100M -XX:+UnlockDiagnosticVMOptions -Djob.numOfRePartitions=30 // 设置Spark执行器的额外Java选项
🔗spark.executor.id: driver // 设置Spark执行器的ID为driver
🔗spark.executor.memory: 10240m // 设置Spark执行器的内存为10240m
🔗spark.executor.memoryOverhead: 4096 // 设置Spark执行器的内存开销为4096
🔗spark.hadoop.fs.s3.useRequesterPaysHeader: true // 在S3上使用请求者付费头部
🔗spark.hadoop.yarn.timeline-service.enabled: false // 禁用Yarn时间线服务
🔗spark.history.fs.cleaner.interval: 1d // 设置Spark历史文件系统清理器间隔为1天
🔗spark.history.fs.cleaner.maxAge: 60d // 设置Spark历史文件系统清理器最大年龄为60天
🔗spark.history.provider: org.apache.spark.deploy.history.FsHistoryProvider // 设置Spark历史提供者为FsHistoryProvider
🔗spark.master: yarn // 设置Spark主节点为Yarn
🔗spark.shuffle.service.enabled: true // 启用Spark洗牌服务
🔗spark.shuffle.useOldFetchProtocol: true // 使用旧的Spark洗牌获取协议
🔗spark.sql.catalog.hive_cat: org.apache.iceberg.spark.SparkCatalog // 设置Hive的Spark目录为Iceberg的Spark目录
🔗spark.sql.catalog.hive_cat.type: hive // 设置Hive目录类型为Hive
🔗spark.sql.catalogImplementation: hive // 设置Spark SQL目录实现为Hive
🔗spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions // 添加Iceberg的Spark会话扩展
🔗spark.streaming.concurrentJobs: 4 // 设置Spark流处理的并发作业数为4
🔗spark.submit.deployMode: client // 设置Spark提交部署模式为客户端
🔗spark.yarn.report.interval: 60s // 设置Spark在Yarn上的报告间隔为60秒