diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index 178b89ad213a..714c0b3bfe67 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; @@ -127,8 +128,8 @@ public static TableMetadata replacePaths( metadata.snapshotLog(), metadataLogEntries, metadata.refs(), - // TODO: update statistic file paths - metadata.statisticsFiles(), + updatePathInStatisticsFiles(metadata.statisticsFiles(), sourcePrefix, targetPrefix), + // TODO: update partition statistics file paths metadata.partitionStatisticsFiles(), metadata.changes(), metadata.rowLineageEnabled(), @@ -160,6 +161,20 @@ private static void updatePathInProperty( } } + private static List updatePathInStatisticsFiles( + List statisticsFiles, String sourcePrefix, String targetPrefix) { + return statisticsFiles.stream() + .map( + existing -> + new GenericStatisticsFile( + existing.snapshotId(), + newPath(existing.path(), sourcePrefix, targetPrefix), + existing.fileSizeInBytes(), + existing.fileFooterSizeInBytes(), + existing.blobMetadata())) + .collect(Collectors.toList()); + } + private static List updatePathInMetadataLogs( TableMetadata metadata, String sourcePrefix, String targetPrefix) { List metadataLogEntries = diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index 33873de72f9d..c9382fafd8ae 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -37,6 +37,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -272,8 +273,9 @@ private String rebuildMetadata() { ((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current(); Preconditions.checkArgument( - endMetadata.statisticsFiles() == null || endMetadata.statisticsFiles().isEmpty(), - "Statistic files are not supported yet."); + endMetadata.partitionStatisticsFiles() == null + || endMetadata.partitionStatisticsFiles().isEmpty(), + "Partition statistics files are not supported yet."); // rebuild version files RewriteResult rewriteVersionResult = rewriteVersionFiles(endMetadata); @@ -341,7 +343,7 @@ private Set deltaSnapshots(TableMetadata startMetadata, Set private RewriteResult rewriteVersionFiles(TableMetadata endMetadata) { RewriteResult result = new RewriteResult<>(); result.toRewrite().addAll(endMetadata.snapshots()); - result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName)); + result.copyPlan().addAll(rewriteVersionFile(endMetadata, endVersionName)); List versions = endMetadata.previousFiles(); for (int i = versions.size() - 1; i >= 0; i--) { @@ -357,19 +359,50 @@ private RewriteResult rewriteVersionFiles(TableMetadata endMetadata) { new StaticTableOperations(versionFilePath, table.io()).current(); result.toRewrite().addAll(tableMetadata.snapshots()); - result.copyPlan().add(rewriteVersionFile(tableMetadata, versionFilePath)); + result.copyPlan().addAll(rewriteVersionFile(tableMetadata, versionFilePath)); } return result; } - private Pair rewriteVersionFile(TableMetadata metadata, String versionFilePath) { + private Set> rewriteVersionFile( + TableMetadata metadata, String versionFilePath) { + Set> result = Sets.newHashSet(); String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, stagingDir); TableMetadata newTableMetadata = RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix); TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath)); - return Pair.of( - stagingPath, RewriteTablePathUtil.newPath(versionFilePath, sourcePrefix, targetPrefix)); + result.add( + Pair.of( + stagingPath, + RewriteTablePathUtil.newPath(versionFilePath, sourcePrefix, targetPrefix))); + + // include statistics files in copy plan + result.addAll( + statsFileCopyPlan(metadata.statisticsFiles(), newTableMetadata.statisticsFiles())); + return result; + } + + private Set> statsFileCopyPlan( + List beforeStats, List afterStats) { + Set> result = Sets.newHashSet(); + if (beforeStats.isEmpty()) { + return result; + } + + Preconditions.checkArgument( + beforeStats.size() == afterStats.size(), + "Before and after path rewrite, statistic files count should be same"); + for (int i = 0; i < beforeStats.size(); i++) { + StatisticsFile before = beforeStats.get(i); + StatisticsFile after = afterStats.get(i); + Preconditions.checkArgument( + before.fileSizeInBytes() == after.fileSizeInBytes(), + "Before and after path rewrite, statistic file size should be same"); + result.add( + Pair.of(RewriteTablePathUtil.stagingPath(before.path(), stagingDir), after.path())); + } + return result; } /** diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index c69a270d8f17..f028d18c56c6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -37,8 +37,8 @@ import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -59,7 +59,6 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -868,25 +867,27 @@ public void testInvalidArgs() { } @Test - public void testStatisticFile() throws IOException { + public void testPartitionStatisticFile() throws IOException { String sourceTableLocation = newTableLocation(); Map properties = Maps.newHashMap(); properties.put("format-version", "2"); - String tableName = "v2tblwithstats"; + String tableName = "v2tblwithPartStats"; Table sourceTable = createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0); TableMetadata metadata = currentMetadata(sourceTable); - TableMetadata withStatistics = + TableMetadata withPartStatistics = TableMetadata.buildFrom(metadata) - .setStatistics( - 43, - new GenericStatisticsFile( - 43, "/some/path/to/stats/file", 128, 27, ImmutableList.of())) + .setPartitionStatistics( + ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(11L) + .path("/some/partition/stats/file.parquet") + .fileSizeInBytes(42L) + .build()) .build(); OutputFile file = sourceTable.io().newOutputFile(metadata.metadataFileLocation()); - TableMetadataParser.overwrite(withStatistics, file); + TableMetadataParser.overwrite(withPartStatistics, file); assertThatThrownBy( () -> @@ -895,7 +896,36 @@ public void testStatisticFile() throws IOException { .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) .execute()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Statistic files are not supported yet"); + .hasMessageContaining("Partition statistics files are not supported yet"); + } + + @Test + public void testTableWithManyStatisticFiles() throws IOException { + String sourceTableLocation = newTableLocation(); + Map properties = Maps.newHashMap(); + properties.put("format-version", "2"); + String tableName = "v2tblwithmanystats"; + Table sourceTable = + createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0); + + int iterations = 10; + for (int i = 0; i < iterations; i++) { + sql("insert into hive.default.%s values (%s, 'AAAAAAAAAA', 'AAAA')", tableName, i); + sourceTable.refresh(); + actions().computeTableStats(sourceTable).execute(); + } + + sourceTable.refresh(); + assertThat(sourceTable.statisticsFiles().size()).isEqualTo(iterations); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .execute(); + + checkFileNum( + iterations * 2 + 1, iterations, iterations, iterations, iterations * 6 + 1, result); } @Test @@ -1063,6 +1093,16 @@ protected void checkFileNum( int manifestFileCount, int totalCount, RewriteTablePath.Result result) { + checkFileNum(versionFileCount, manifestListCount, manifestFileCount, 0, totalCount, result); + } + + protected void checkFileNum( + int versionFileCount, + int manifestListCount, + int manifestFileCount, + int statisticsFileCount, + int totalCount, + RewriteTablePath.Result result) { List filesToMove = spark .read() @@ -1083,6 +1123,9 @@ protected void checkFileNum( assertThat(filesToMove.stream().filter(isManifest).count()) .as("Wrong rebuilt Manifest file file count") .isEqualTo(manifestFileCount); + assertThat(filesToMove.stream().filter(f -> f.endsWith(".stats")).count()) + .withFailMessage("Wrong rebuilt Statistic file count") + .isEqualTo(statisticsFileCount); assertThat(filesToMove.size()).as("Wrong total file count").isEqualTo(totalCount); }