diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index 14ad724c1cfc..9e610c453958 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -34,6 +34,7 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.RewriteTablePathUtil; import org.apache.iceberg.RewriteTablePathUtil.PositionDeleteReaderWriter; import org.apache.iceberg.RewriteTablePathUtil.RewriteResult; @@ -273,11 +274,6 @@ private String rebuildMetadata() { TableMetadata endMetadata = ((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current(); - Preconditions.checkArgument( - endMetadata.partitionStatisticsFiles() == null - || endMetadata.partitionStatisticsFiles().isEmpty(), - "Partition statistics files are not supported yet."); - // rebuild version files RewriteResult rewriteVersionResult = rewriteVersionFiles(endMetadata); Set deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); @@ -385,6 +381,9 @@ private Set> rewriteVersionFile( // include statistics files in copy plan result.addAll( statsFileCopyPlan(metadata.statisticsFiles(), newTableMetadata.statisticsFiles())); + result.addAll( + partitionStatsFileCopyPlan( + metadata.partitionStatisticsFiles(), newTableMetadata.partitionStatisticsFiles())); return result; } @@ -409,6 +408,27 @@ private Set> statsFileCopyPlan( return result; } + private Set> partitionStatsFileCopyPlan( + List beforeStats, List afterStats) { + Set> result = Sets.newHashSet(); + if (beforeStats.isEmpty()) { + return result; + } + + Preconditions.checkArgument( + beforeStats.size() == afterStats.size(), + "Before and after path rewrite, partition statistic files count should be same"); + for (int i = 0; i < beforeStats.size(); i++) { + PartitionStatisticsFile before = beforeStats.get(i); + PartitionStatisticsFile after = afterStats.get(i); + Preconditions.checkArgument( + before.fileSizeInBytes() == after.fileSizeInBytes(), + "Before and after path rewrite, partition statistic file size should be same"); + result.add(Pair.of(before.path(), after.path())); + } + return result; + } + /** * Rewrite a manifest list representing a snapshot. * diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 770f6c5c7aa6..ff68f7e4781d 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -38,14 +38,12 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.HasTableOperations; -import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StaticTableOperations; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.actions.ActionsProvider; @@ -902,36 +900,35 @@ public void testInvalidArgs() { } @Test - public void testPartitionStatisticFile() throws IOException { + public void testTableWithManyPartitionStatisticFile() throws IOException { String sourceTableLocation = newTableLocation(); Map properties = Maps.newHashMap(); properties.put("format-version", "2"); String tableName = "v2tblwithPartStats"; Table sourceTable = - createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0); + createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0, "c1"); - TableMetadata metadata = currentMetadata(sourceTable); - TableMetadata withPartStatistics = - TableMetadata.buildFrom(metadata) - .setPartitionStatistics( - ImmutableGenericPartitionStatisticsFile.builder() - .snapshotId(11L) - .path("/some/partition/stats/file.parquet") - .fileSizeInBytes(42L) - .build()) - .build(); + int iterations = 10; + for (int i = 0; i < iterations; i++) { + sql("insert into hive.default.%s values (%s, 'AAAAAAAAAA', 'AAAA')", tableName, i); + sourceTable.refresh(); + actions().computePartitionStats(sourceTable).execute(); + } - OutputFile file = sourceTable.io().newOutputFile(metadata.metadataFileLocation()); - TableMetadataParser.overwrite(withPartStatistics, file); + sourceTable.refresh(); + assertThat(sourceTable.partitionStatisticsFiles()).hasSize(iterations); - assertThatThrownBy( - () -> - actions() - .rewriteTablePath(sourceTable) - .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) - .execute()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Partition statistics files are not supported yet"); + String targetTableLocation = targetTableLocation(); + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation) + .execute(); + checkFileNum( + iterations * 2 + 1, iterations, iterations, 0, iterations, iterations * 6 + 1, result); + + findAndAssertFileInFileList( + result, "partition-stats", sourceTableLocation, targetTableLocation); } @Test @@ -988,29 +985,7 @@ public void testStatisticsFileSourcePath() throws IOException { checkFileNum(3, 1, 1, 1, 7, result); - // Read the file list to verify statistics file paths - List> filesToMove = readPathPairList(result.fileListLocation()); - - // Find the statistics file entry in the file list using stream - Tuple2 statsFilePathPair = - filesToMove.stream() - .filter(pathPair -> pathPair._1().endsWith(".stats")) - .findFirst() - .orElse(null); - - assertThat(statsFilePathPair).as("Should find statistics file in file list").isNotNull(); - - // Verify the source path points to the actual source location, not staging - assertThat(statsFilePathPair._1()) - .as("Statistics file source should point to source table location and NOT staging") - .startsWith(sourceTableLocation) - .contains("/metadata/") - .doesNotContain("staging"); - - // Verify the target path is correctly rewritten - assertThat(statsFilePathPair._2()) - .as("Statistics file target should point to target table location") - .startsWith(targetTableLocation); + findAndAssertFileInFileList(result, ".stats", sourceTableLocation, targetTableLocation); } @Test @@ -1265,7 +1240,24 @@ protected void checkFileNum( int manifestFileCount, int totalCount, RewriteTablePath.Result result) { - checkFileNum(versionFileCount, manifestListCount, manifestFileCount, 0, totalCount, result); + checkFileNum(versionFileCount, manifestListCount, manifestFileCount, 0, 0, totalCount, result); + } + + protected void checkFileNum( + int versionFileCount, + int manifestListCount, + int manifestFileCount, + int statisticsFileCount, + int totalCount, + RewriteTablePath.Result result) { + checkFileNum( + versionFileCount, + manifestListCount, + manifestFileCount, + statisticsFileCount, + 0, + totalCount, + result); } protected void checkFileNum( @@ -1273,6 +1265,7 @@ protected void checkFileNum( int manifestListCount, int manifestFileCount, int statisticsFileCount, + int partitionFileCount, int totalCount, RewriteTablePath.Result result) { List filesToMove = @@ -1302,6 +1295,9 @@ protected void checkFileNum( assertThat(filesToMove.stream().filter(f -> f.endsWith(".stats"))) .as("Wrong rebuilt Statistic file count") .hasSize(statisticsFileCount); + assertThat(filesToMove.stream().filter(f -> f.contains("partition-stats"))) + .as("Wrong rebuilt Partition Statistic file count") + .hasSize(partitionFileCount); assertThat(filesToMove).as("Wrong total file count").hasSize(totalCount); } @@ -1355,41 +1351,79 @@ private Table createMetastoreTable( String namespace, String tableName, int snapshotNumber) { + return createMetastoreTable(location, properties, namespace, tableName, snapshotNumber, null); + } + + private Table createMetastoreTable( + String location, + Map properties, + String namespace, + String tableName, + int snapshotNumber, + String partitionColumn) { spark.conf().set("spark.sql.catalog.hive", SparkCatalog.class.getName()); spark.conf().set("spark.sql.catalog.hive.type", "hive"); spark.conf().set("spark.sql.catalog.hive.default-namespace", "default"); spark.conf().set("spark.sql.catalog.hive.cache-enabled", "false"); + // Generate and execute CREATE TABLE SQL + String createTableSQL = + generateCreateTableSQL(location, properties, namespace, tableName, partitionColumn); + sql(createTableSQL); + + for (int i = 0; i < snapshotNumber; i++) { + sql("insert into hive.%s.%s values (%s, 'AAAAAAAAAA', 'AAAA')", namespace, tableName, i); + } + return catalog.loadTable(TableIdentifier.of(namespace, tableName)); + } + + /** + * Generates SQL statement for creating an Iceberg table + * + * @param location location the storage location path for the table, can be empty + * @param properties key-value pairs of table properties for setting table metadata + * @param namespace the namespace (database name) + * @param tableName the name of the table to be created + * @param partitionColumn the partition column name, must be one of c1, c2, or c3; can be null or + * empty string for non-partitioned table + * @return CREATE TABLE SQL statement string + */ + private String generateCreateTableSQL( + String location, + Map properties, + String namespace, + String tableName, + String partitionColumn) { StringBuilder propertiesStr = new StringBuilder(); properties.forEach((k, v) -> propertiesStr.append("'" + k + "'='" + v + "',")); String tblProperties = propertiesStr.substring(0, propertiesStr.length() > 0 ? propertiesStr.length() - 1 : 0); sql("DROP TABLE IF EXISTS hive.%s.%s", namespace, tableName); - if (tblProperties.isEmpty()) { - String sqlStr = - String.format( - "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", namespace, tableName); - if (!location.isEmpty()) { - sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, location); - } - sql(sqlStr); + + StringBuilder createTableSql = new StringBuilder(); + createTableSql + .append("CREATE TABLE hive.") + .append(namespace) + .append(".") + .append(tableName) + .append(" (c1 bigint, c2 string, c3 string)"); + + if (partitionColumn != null && !partitionColumn.isEmpty()) { + createTableSql.append(" USING iceberg PARTITIONED BY (").append(partitionColumn).append(")"); } else { - String sqlStr = - String.format( - "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", namespace, tableName); - if (!location.isEmpty()) { - sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, location); - } - - sqlStr = String.format("%s TBLPROPERTIES (%s)", sqlStr, tblProperties); - sql(sqlStr); + createTableSql.append(" USING iceberg"); } - for (int i = 0; i < snapshotNumber; i++) { - sql("insert into hive.%s.%s values (%s, 'AAAAAAAAAA', 'AAAA')", namespace, tableName, i); + if (!location.isEmpty()) { + createTableSql.append(" LOCATION '").append(location).append("'"); } - return catalog.loadTable(TableIdentifier.of(namespace, tableName)); + + if (!tblProperties.isEmpty()) { + createTableSql.append(" TBLPROPERTIES (").append(tblProperties).append(")"); + } + + return createTableSql.toString(); } private static String fileName(String path) { @@ -1434,4 +1468,36 @@ private void removeBroadcastValuesFromLocalBlockManager(long id) { blockInfoManager.removeBlock(blockId); blockManager.memoryStore().remove(blockId); } + + private void findAndAssertFileInFileList( + RewriteTablePath.Result result, + String fileIdentifier, + String sourceTableLocation, + String targetTableLocation) { + + List> filesToMove = readPathPairList(result.fileListLocation()); + + // Find the file path pair that contains the specified file identifier + Tuple2 filePathPair = + filesToMove.stream() + .filter(pair -> pair._1().contains(fileIdentifier)) + .findFirst() + .orElse(null); + + // Assert that the file was found in the list + assertThat(filePathPair).as("Should find " + fileIdentifier + " file in file list").isNotNull(); + + // Validate source path: should point to source table location, contain metadata, and not + // staging + assertThat(filePathPair._1()) + .as(fileIdentifier + " source should point to source table location and NOT staging") + .startsWith(sourceTableLocation) + .contains("/metadata/") + .doesNotContain("staging"); + + // Validate target path: should point to target table location + assertThat(filePathPair._2()) + .as(fileIdentifier + " target should point to target table location") + .startsWith(targetTableLocation); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index 14ad724c1cfc..9e610c453958 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -34,6 +34,7 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.RewriteTablePathUtil; import org.apache.iceberg.RewriteTablePathUtil.PositionDeleteReaderWriter; import org.apache.iceberg.RewriteTablePathUtil.RewriteResult; @@ -273,11 +274,6 @@ private String rebuildMetadata() { TableMetadata endMetadata = ((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current(); - Preconditions.checkArgument( - endMetadata.partitionStatisticsFiles() == null - || endMetadata.partitionStatisticsFiles().isEmpty(), - "Partition statistics files are not supported yet."); - // rebuild version files RewriteResult rewriteVersionResult = rewriteVersionFiles(endMetadata); Set deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); @@ -385,6 +381,9 @@ private Set> rewriteVersionFile( // include statistics files in copy plan result.addAll( statsFileCopyPlan(metadata.statisticsFiles(), newTableMetadata.statisticsFiles())); + result.addAll( + partitionStatsFileCopyPlan( + metadata.partitionStatisticsFiles(), newTableMetadata.partitionStatisticsFiles())); return result; } @@ -409,6 +408,27 @@ private Set> statsFileCopyPlan( return result; } + private Set> partitionStatsFileCopyPlan( + List beforeStats, List afterStats) { + Set> result = Sets.newHashSet(); + if (beforeStats.isEmpty()) { + return result; + } + + Preconditions.checkArgument( + beforeStats.size() == afterStats.size(), + "Before and after path rewrite, partition statistic files count should be same"); + for (int i = 0; i < beforeStats.size(); i++) { + PartitionStatisticsFile before = beforeStats.get(i); + PartitionStatisticsFile after = afterStats.get(i); + Preconditions.checkArgument( + before.fileSizeInBytes() == after.fileSizeInBytes(), + "Before and after path rewrite, partition statistic file size should be same"); + result.add(Pair.of(before.path(), after.path())); + } + return result; + } + /** * Rewrite a manifest list representing a snapshot. * diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 770f6c5c7aa6..ff68f7e4781d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -38,14 +38,12 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.HasTableOperations; -import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StaticTableOperations; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.actions.ActionsProvider; @@ -902,36 +900,35 @@ public void testInvalidArgs() { } @Test - public void testPartitionStatisticFile() throws IOException { + public void testTableWithManyPartitionStatisticFile() throws IOException { String sourceTableLocation = newTableLocation(); Map properties = Maps.newHashMap(); properties.put("format-version", "2"); String tableName = "v2tblwithPartStats"; Table sourceTable = - createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0); + createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0, "c1"); - TableMetadata metadata = currentMetadata(sourceTable); - TableMetadata withPartStatistics = - TableMetadata.buildFrom(metadata) - .setPartitionStatistics( - ImmutableGenericPartitionStatisticsFile.builder() - .snapshotId(11L) - .path("/some/partition/stats/file.parquet") - .fileSizeInBytes(42L) - .build()) - .build(); + int iterations = 10; + for (int i = 0; i < iterations; i++) { + sql("insert into hive.default.%s values (%s, 'AAAAAAAAAA', 'AAAA')", tableName, i); + sourceTable.refresh(); + actions().computePartitionStats(sourceTable).execute(); + } - OutputFile file = sourceTable.io().newOutputFile(metadata.metadataFileLocation()); - TableMetadataParser.overwrite(withPartStatistics, file); + sourceTable.refresh(); + assertThat(sourceTable.partitionStatisticsFiles()).hasSize(iterations); - assertThatThrownBy( - () -> - actions() - .rewriteTablePath(sourceTable) - .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) - .execute()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Partition statistics files are not supported yet"); + String targetTableLocation = targetTableLocation(); + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation) + .execute(); + checkFileNum( + iterations * 2 + 1, iterations, iterations, 0, iterations, iterations * 6 + 1, result); + + findAndAssertFileInFileList( + result, "partition-stats", sourceTableLocation, targetTableLocation); } @Test @@ -988,29 +985,7 @@ public void testStatisticsFileSourcePath() throws IOException { checkFileNum(3, 1, 1, 1, 7, result); - // Read the file list to verify statistics file paths - List> filesToMove = readPathPairList(result.fileListLocation()); - - // Find the statistics file entry in the file list using stream - Tuple2 statsFilePathPair = - filesToMove.stream() - .filter(pathPair -> pathPair._1().endsWith(".stats")) - .findFirst() - .orElse(null); - - assertThat(statsFilePathPair).as("Should find statistics file in file list").isNotNull(); - - // Verify the source path points to the actual source location, not staging - assertThat(statsFilePathPair._1()) - .as("Statistics file source should point to source table location and NOT staging") - .startsWith(sourceTableLocation) - .contains("/metadata/") - .doesNotContain("staging"); - - // Verify the target path is correctly rewritten - assertThat(statsFilePathPair._2()) - .as("Statistics file target should point to target table location") - .startsWith(targetTableLocation); + findAndAssertFileInFileList(result, ".stats", sourceTableLocation, targetTableLocation); } @Test @@ -1265,7 +1240,24 @@ protected void checkFileNum( int manifestFileCount, int totalCount, RewriteTablePath.Result result) { - checkFileNum(versionFileCount, manifestListCount, manifestFileCount, 0, totalCount, result); + checkFileNum(versionFileCount, manifestListCount, manifestFileCount, 0, 0, totalCount, result); + } + + protected void checkFileNum( + int versionFileCount, + int manifestListCount, + int manifestFileCount, + int statisticsFileCount, + int totalCount, + RewriteTablePath.Result result) { + checkFileNum( + versionFileCount, + manifestListCount, + manifestFileCount, + statisticsFileCount, + 0, + totalCount, + result); } protected void checkFileNum( @@ -1273,6 +1265,7 @@ protected void checkFileNum( int manifestListCount, int manifestFileCount, int statisticsFileCount, + int partitionFileCount, int totalCount, RewriteTablePath.Result result) { List filesToMove = @@ -1302,6 +1295,9 @@ protected void checkFileNum( assertThat(filesToMove.stream().filter(f -> f.endsWith(".stats"))) .as("Wrong rebuilt Statistic file count") .hasSize(statisticsFileCount); + assertThat(filesToMove.stream().filter(f -> f.contains("partition-stats"))) + .as("Wrong rebuilt Partition Statistic file count") + .hasSize(partitionFileCount); assertThat(filesToMove).as("Wrong total file count").hasSize(totalCount); } @@ -1355,41 +1351,79 @@ private Table createMetastoreTable( String namespace, String tableName, int snapshotNumber) { + return createMetastoreTable(location, properties, namespace, tableName, snapshotNumber, null); + } + + private Table createMetastoreTable( + String location, + Map properties, + String namespace, + String tableName, + int snapshotNumber, + String partitionColumn) { spark.conf().set("spark.sql.catalog.hive", SparkCatalog.class.getName()); spark.conf().set("spark.sql.catalog.hive.type", "hive"); spark.conf().set("spark.sql.catalog.hive.default-namespace", "default"); spark.conf().set("spark.sql.catalog.hive.cache-enabled", "false"); + // Generate and execute CREATE TABLE SQL + String createTableSQL = + generateCreateTableSQL(location, properties, namespace, tableName, partitionColumn); + sql(createTableSQL); + + for (int i = 0; i < snapshotNumber; i++) { + sql("insert into hive.%s.%s values (%s, 'AAAAAAAAAA', 'AAAA')", namespace, tableName, i); + } + return catalog.loadTable(TableIdentifier.of(namespace, tableName)); + } + + /** + * Generates SQL statement for creating an Iceberg table + * + * @param location location the storage location path for the table, can be empty + * @param properties key-value pairs of table properties for setting table metadata + * @param namespace the namespace (database name) + * @param tableName the name of the table to be created + * @param partitionColumn the partition column name, must be one of c1, c2, or c3; can be null or + * empty string for non-partitioned table + * @return CREATE TABLE SQL statement string + */ + private String generateCreateTableSQL( + String location, + Map properties, + String namespace, + String tableName, + String partitionColumn) { StringBuilder propertiesStr = new StringBuilder(); properties.forEach((k, v) -> propertiesStr.append("'" + k + "'='" + v + "',")); String tblProperties = propertiesStr.substring(0, propertiesStr.length() > 0 ? propertiesStr.length() - 1 : 0); sql("DROP TABLE IF EXISTS hive.%s.%s", namespace, tableName); - if (tblProperties.isEmpty()) { - String sqlStr = - String.format( - "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", namespace, tableName); - if (!location.isEmpty()) { - sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, location); - } - sql(sqlStr); + + StringBuilder createTableSql = new StringBuilder(); + createTableSql + .append("CREATE TABLE hive.") + .append(namespace) + .append(".") + .append(tableName) + .append(" (c1 bigint, c2 string, c3 string)"); + + if (partitionColumn != null && !partitionColumn.isEmpty()) { + createTableSql.append(" USING iceberg PARTITIONED BY (").append(partitionColumn).append(")"); } else { - String sqlStr = - String.format( - "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", namespace, tableName); - if (!location.isEmpty()) { - sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, location); - } - - sqlStr = String.format("%s TBLPROPERTIES (%s)", sqlStr, tblProperties); - sql(sqlStr); + createTableSql.append(" USING iceberg"); } - for (int i = 0; i < snapshotNumber; i++) { - sql("insert into hive.%s.%s values (%s, 'AAAAAAAAAA', 'AAAA')", namespace, tableName, i); + if (!location.isEmpty()) { + createTableSql.append(" LOCATION '").append(location).append("'"); } - return catalog.loadTable(TableIdentifier.of(namespace, tableName)); + + if (!tblProperties.isEmpty()) { + createTableSql.append(" TBLPROPERTIES (").append(tblProperties).append(")"); + } + + return createTableSql.toString(); } private static String fileName(String path) { @@ -1434,4 +1468,36 @@ private void removeBroadcastValuesFromLocalBlockManager(long id) { blockInfoManager.removeBlock(blockId); blockManager.memoryStore().remove(blockId); } + + private void findAndAssertFileInFileList( + RewriteTablePath.Result result, + String fileIdentifier, + String sourceTableLocation, + String targetTableLocation) { + + List> filesToMove = readPathPairList(result.fileListLocation()); + + // Find the file path pair that contains the specified file identifier + Tuple2 filePathPair = + filesToMove.stream() + .filter(pair -> pair._1().contains(fileIdentifier)) + .findFirst() + .orElse(null); + + // Assert that the file was found in the list + assertThat(filePathPair).as("Should find " + fileIdentifier + " file in file list").isNotNull(); + + // Validate source path: should point to source table location, contain metadata, and not + // staging + assertThat(filePathPair._1()) + .as(fileIdentifier + " source should point to source table location and NOT staging") + .startsWith(sourceTableLocation) + .contains("/metadata/") + .doesNotContain("staging"); + + // Validate target path: should point to target table location + assertThat(filePathPair._2()) + .as(fileIdentifier + " target should point to target table location") + .startsWith(targetTableLocation); + } }