From 6ff7bf16b431bd36c5c98799fd9975af9516931e Mon Sep 17 00:00:00 2001 From: wangd Date: Sun, 14 Dec 2025 23:54:09 +0800 Subject: [PATCH 1/4] feat: Support `sorted_by` for `data_rewrite_files` procedure --- .../analyzer/TestBuiltInQueryPreparer.java | 2 +- .../IcebergDistributedProcedureHandle.java | 4 +- .../procedure/RewriteDataFilesProcedure.java | 27 +++- .../iceberg/IcebergDistributedTestBase.java | 128 ++++++++++++++++++ .../sql/analyzer/AbstractAnalyzerTest.java | 2 +- .../sql/planner/TestLogicalPlanner.java | 2 +- .../iceberg/presto_protocol_iceberg.cpp | 14 ++ .../iceberg/presto_protocol_iceberg.h | 1 + .../IcebergDistributedProcedureHandle.hpp.inc | 1 + .../TableDataRewriteDistributedProcedure.java | 14 +- .../presto/tests/TestProcedureCreation.java | 10 +- 11 files changed, 191 insertions(+), 14 deletions(-) diff --git a/presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java b/presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java index a0e0fc3f53ef0..c37a54fca368d 100644 --- a/presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java +++ b/presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java @@ -75,7 +75,7 @@ public void setup() procedures.add(new Procedure("system", "fun", arguments)); procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_fun", distributedArguments, - (session, transactionContext, procedureHandle, fragments) -> null, + (session, transactionContext, procedureHandle, fragments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())); procedureRegistry.addProcedures(new ConnectorId("test"), procedures); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java index c6f78f97992cb..28eec12680077 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java @@ -17,7 +17,6 @@ import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Map; @@ -43,6 +42,7 @@ public IcebergDistributedProcedureHandle( @JsonProperty("compressionCodec") HiveCompressionCodec compressionCodec, @JsonProperty("storageProperties") Map storageProperties, @JsonProperty("tableLayoutHandle") IcebergTableLayoutHandle tableLayoutHandle, + @JsonProperty("sortOrder") List sortOrder, @JsonProperty("relevantData") Map relevantData) { super( @@ -55,7 +55,7 @@ public IcebergDistributedProcedureHandle( fileFormat, compressionCodec, storageProperties, - ImmutableList.of()); + sortOrder); this.tableLayoutHandle = requireNonNull(tableLayoutHandle, "tableLayoutHandle is null"); this.relevantData = requireNonNull(relevantData, "relevantData is null"); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java index 50dc0d92cff6e..2c3392a787660 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java @@ -24,8 +24,10 @@ import com.facebook.presto.iceberg.IcebergTableLayoutHandle; import com.facebook.presto.iceberg.PartitionData; import com.facebook.presto.iceberg.RuntimeStatsMetricsReporter; +import com.facebook.presto.iceberg.SortField; import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.classloader.ThreadContextClassLoader; import com.facebook.presto.spi.procedure.DistributedProcedure; import com.facebook.presto.spi.procedure.DistributedProcedure.Argument; @@ -42,6 +44,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; @@ -57,17 +60,21 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.OptionalInt; import java.util.Set; import java.util.function.Consumer; import static com.facebook.presto.common.Utils.checkArgument; import static com.facebook.presto.common.type.StandardTypes.VARCHAR; import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; +import static com.facebook.presto.iceberg.IcebergAbstractMetadata.getSupportedSortFields; import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec; import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema; +import static com.facebook.presto.iceberg.SortFieldUtils.parseSortFields; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA; import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -98,8 +105,9 @@ public DistributedProcedure get() new Argument(SCHEMA, VARCHAR), new Argument(TABLE_NAME, VARCHAR), new Argument("filter", VARCHAR, false, "TRUE"), + new Argument("sorted_by", "array(varchar)", false, null), new Argument("options", "map(varchar, varchar)", false, null)), - (session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments), + (session, procedureContext, tableLayoutHandle, arguments, sortOrderIndex) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments, sortOrderIndex), ((session, procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, tableHandle, fragments)), arguments -> { checkArgument(arguments.length == 2, "invalid arguments count: " + arguments.length); @@ -108,12 +116,26 @@ public DistributedProcedure get() }); } - private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments) + private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments, OptionalInt sortOrderIndex) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { Table icebergTable = procedureContext.getTable(); IcebergTableHandle tableHandle = layoutHandle.getTable(); + SortOrder sortOrder = icebergTable.sortOrder(); + List sortFieldStrings = sortOrderIndex.isEmpty() ? ImmutableList.of() : (List) arguments[sortOrderIndex.getAsInt()]; + if (sortFieldStrings != null && !sortFieldStrings.isEmpty()) { + SortOrder specifiedSortOrder = parseSortFields(icebergTable.schema(), sortFieldStrings); + if (specifiedSortOrder.satisfies(sortOrder)) { + // If the specified sort order satisfies the target table's internal sort order, use the specified sort order + sortOrder = specifiedSortOrder; + } + else { + throw new PrestoException(NOT_SUPPORTED, "Specified sort order is incompatible with the target table's internal sort order"); + } + } + + List sortFields = getSupportedSortFields(icebergTable.schema(), sortOrder); return new IcebergDistributedProcedureHandle( tableHandle.getSchemaName(), tableHandle.getIcebergTableName(), @@ -125,6 +147,7 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec getCompressionCodec(session), icebergTable.properties(), layoutHandle, + sortFields, ImmutableMap.of()); } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 53eb1a93cc467..b78086ff2d84e 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -1684,6 +1684,134 @@ public void testWithoutSortOrder() } } + @Test + public void testRewriteDataFilesWithSortOrder() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 1); + String filePath = String.valueOf(result.getOnlyValue()); + assertTrue(isFileSorted(filePath, "id", "ASC")); + + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + + @Test + public void testRewriteDataFilesWithSortOrderOnPartitionedTables() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (partitioning = ARRAY['emp_name'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'AAAA'), (3, 'CCCC'), (1, 'BBBB')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'AAAA')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'BBBB')", 2); + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 3); + for (Object filePath : result.getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "ASC")); + } + + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + + @Test + public void testRewriteDataFilesWithDescSortOrder() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 1); + String filePath = String.valueOf(result.getOnlyValue()); + assertTrue(isFileSorted(filePath, "id", "DESC")); + + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + + @Test + public void testRewriteDataFilesWithDescSortOrderOnPartitionedTables() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (partitioning = ARRAY['emp_name'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'AAAA'), (3, 'CCCC'), (1, 'BBBB')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'AAAA')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'BBBB')", 2); + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 3); + for (Object filePath : result.getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "DESC")); + } + + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + + @Test + public void testRewriteDataFilesWithCompatibleSortOrderForSortedTable() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (sorted_by = ARRAY['id DESC'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); + for (Object filePath : computeActual("SELECT file_path from \"" + tableName + "$files\"").getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "DESC")); + } + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC', 'emp_name ASC'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 1); + String filePath = String.valueOf(result.getOnlyValue()); + assertTrue(isFileSorted(filePath, "id", "DESC")); + + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + + @Test + public void testNotAllRewriteDataFilesWithIncompatibleSortOrderForSortedTable() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (sorted_by = ARRAY['id'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); + for (Object filePath : computeActual("SELECT file_path from \"" + tableName + "$files\"").getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "ASC")); + } + + assertQueryFails(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), + "Specified sort order is incompatible with the target table's internal sort order"); + + assertQueryFails(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['emp_name ASC', 'id ASC'])", schema, tableName), + "Specified sort order is incompatible with the target table's internal sort order"); + + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + public boolean isFileSorted(String path, String sortColumnName, String sortOrder) throws IOException { diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/AbstractAnalyzerTest.java b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/AbstractAnalyzerTest.java index 458c86f986203..af25f7ca02b49 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/AbstractAnalyzerTest.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/AbstractAnalyzerTest.java @@ -196,7 +196,7 @@ public void setup() procedures.add(new Procedure("system", "procedure", arguments)); procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_procedure", distributedArguments, - (session, transactionContext, procedureHandle, fragments) -> null, + (session, transactionContext, procedureHandle, fragments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())); metadata.getProcedureRegistry().addProcedures(SECOND_CONNECTOR_ID, procedures); diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java index 46f2f031e858b..1ff85caca45a9 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java @@ -209,7 +209,7 @@ public Connector create(String catalogName, Map config, Connecto Set> procedures = new HashSet<>(); procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_fun", arguments, - (session, transactionContext, procedureHandle, fragments) -> null, + (session, transactionContext, procedureHandle, fragments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())); diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp index 6674b05f847b4..79fbae02dded4 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp @@ -1188,6 +1188,13 @@ void to_json(json& j, const IcebergDistributedProcedureHandle& p) { "IcebergDistributedProcedureHandle", "IcebergTableLayoutHandle", "tableLayoutHandle"); + to_json_key( + j, + "sortOrder", + p.sortOrder, + "IcebergDistributedProcedureHandle", + "List", + "sortOrder"); to_json_key( j, "relevantData", @@ -1269,6 +1276,13 @@ void from_json(const json& j, IcebergDistributedProcedureHandle& p) { "IcebergDistributedProcedureHandle", "IcebergTableLayoutHandle", "tableLayoutHandle"); + from_json_key( + j, + "sortOrder", + p.sortOrder, + "IcebergDistributedProcedureHandle", + "List", + "sortOrder"); from_json_key( j, "relevantData", diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h index 388d9247a828c..95ac608c985e3 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h @@ -243,6 +243,7 @@ struct IcebergDistributedProcedureHandle hive::HiveCompressionCodec compressionCodec = {}; Map storageProperties = {}; IcebergTableLayoutHandle tableLayoutHandle = {}; + List sortOrder = {}; Map relevantData = {}; IcebergDistributedProcedureHandle() noexcept; diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc index 3377b25a93492..c0a0882c49180 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc @@ -28,6 +28,7 @@ struct IcebergDistributedProcedureHandle hive::HiveCompressionCodec compressionCodec = {}; Map storageProperties = {}; IcebergTableLayoutHandle tableLayoutHandle = {}; + List sortOrder = {}; Map relevantData = {}; IcebergDistributedProcedureHandle() noexcept; diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/procedure/TableDataRewriteDistributedProcedure.java b/presto-spi/src/main/java/com/facebook/presto/spi/procedure/TableDataRewriteDistributedProcedure.java index 1831db06eece0..f409a9ea5e763 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/procedure/TableDataRewriteDistributedProcedure.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/procedure/TableDataRewriteDistributedProcedure.java @@ -35,6 +35,7 @@ public class TableDataRewriteDistributedProcedure public static final String SCHEMA = "schema"; public static final String TABLE_NAME = "table_name"; public static final String FILTER = "filter"; + public static final String SORT_ORDER = "sorted_by"; private final BeginCallDistributedProcedure beginCallDistributedProcedure; private final FinishCallDistributedProcedure finishCallDistributedProcedure; @@ -42,6 +43,7 @@ public class TableDataRewriteDistributedProcedure private int schemaIndex = -1; private int tableNameIndex = -1; private OptionalInt filterIndex = OptionalInt.empty(); + private OptionalInt sortOrderIndex = OptionalInt.empty(); public TableDataRewriteDistributedProcedure(String schema, String name, List arguments, @@ -67,6 +69,9 @@ else if (getArguments().get(i).getName().equals(TABLE_NAME)) { else if (getArguments().get(i).getName().equals(FILTER)) { filterIndex = OptionalInt.of(i); } + else if (getArguments().get(i).getName().equals(SORT_ORDER)) { + sortOrderIndex = OptionalInt.of(i); + } } checkArgument(schemaIndex >= 0 && tableNameIndex >= 0, format("A distributed procedure need at least 2 arguments: `%s` and `%s` for the target table", SCHEMA, TABLE_NAME)); @@ -75,7 +80,7 @@ else if (getArguments().get(i).getName().equals(FILTER)) { @Override public ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments) { - return this.beginCallDistributedProcedure.begin(session, procedureContext, tableLayoutHandle, arguments); + return this.beginCallDistributedProcedure.begin(session, procedureContext, tableLayoutHandle, arguments, getSortOrderIndex()); } @Override @@ -109,10 +114,15 @@ public String getFilter(Object[] parameters) } } + public OptionalInt getSortOrderIndex() + { + return sortOrderIndex; + } + @FunctionalInterface public interface BeginCallDistributedProcedure { - ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments); + ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments, OptionalInt sortOrderIndex); } @FunctionalInterface diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestProcedureCreation.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestProcedureCreation.java index 70563239a1fe1..d0d0c5548188a 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestProcedureCreation.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestProcedureCreation.java @@ -126,7 +126,7 @@ public void showCreateDistributedProcedure() new DistributedProcedure.Argument("name", VARCHAR), new DistributedProcedure.Argument("table_name", VARCHAR), new DistributedProcedure.Argument("schema", VARCHAR, false, null)), - (session, transactionContext, tableLayoutHandle, arguments) -> null, + (session, transactionContext, tableLayoutHandle, arguments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())).isNotNull(); } @@ -141,7 +141,7 @@ public void shouldThrowExceptionForDistributedProcedureWithWrongArgument() new DistributedProcedure.Argument("name", VARCHAR), new DistributedProcedure.Argument("table_name", VARCHAR), new DistributedProcedure.Argument("name3", VARCHAR, false, null)), - (session, transactionContext, tableLayoutHandle, arguments) -> null, + (session, transactionContext, tableLayoutHandle, arguments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())) .isInstanceOf(PrestoException.class) @@ -154,7 +154,7 @@ public void shouldThrowExceptionForDistributedProcedureWithWrongArgument() new DistributedProcedure.Argument("name", VARCHAR), new DistributedProcedure.Argument("name2", VARCHAR), new DistributedProcedure.Argument("schema", VARCHAR, false, null)), - (session, transactionContext, tableLayoutHandle, arguments) -> null, + (session, transactionContext, tableLayoutHandle, arguments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())) .isInstanceOf(PrestoException.class) @@ -167,7 +167,7 @@ public void shouldThrowExceptionForDistributedProcedureWithWrongArgument() new DistributedProcedure.Argument("name", VARCHAR), new DistributedProcedure.Argument("table_name", VARCHAR), new DistributedProcedure.Argument("schema", INTEGER, false, 123)), - (session, transactionContext, tableLayoutHandle, arguments) -> null, + (session, transactionContext, tableLayoutHandle, arguments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())) .isInstanceOf(PrestoException.class) @@ -180,7 +180,7 @@ public void shouldThrowExceptionForDistributedProcedureWithWrongArgument() new DistributedProcedure.Argument("name", VARCHAR), new DistributedProcedure.Argument("table_name", TIMESTAMP), new DistributedProcedure.Argument("schema", VARCHAR, false, null)), - (session, transactionContext, tableLayoutHandle, arguments) -> null, + (session, transactionContext, tableLayoutHandle, arguments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())) .isInstanceOf(PrestoException.class) From d7ef5be1a0e616c2e6651dd5fc53e9067629760d Mon Sep 17 00:00:00 2001 From: wangd Date: Thu, 18 Dec 2025 16:01:32 +0800 Subject: [PATCH 2/4] Add documentation for `sorted_by` argument in `rewrite_data_files` --- presto-docs/src/main/sphinx/connector/iceberg.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 0e474fe258dbf..3132f2c6e5f24 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1279,6 +1279,10 @@ Argument Name required type Description only rewrite of whole partitions is supported. Filter on partition columns. The default value is `true`. +``sorted_by`` array of string Specify an array of one or more columns to use for sorting. When + performing a rewrite, the specified sorting definition must be + compatible with the table's own sorting property, if one exists. + ``options`` map Options to be used for data files rewrite. (to be expanded) ===================== ========== =============== ======================================================================= @@ -1294,6 +1298,11 @@ Examples: CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1'); CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1'); +* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec and a sorting definition:: + + CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1', ARRAY['join_date DESC NULLS FIRST', 'emp_id ASC NULLS LAST']); + CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1', sorted_by => ARRAY['join_date']); + Rewrite Manifests ^^^^^^^^^^^^^^^^^ From 0118353cef876d4db0e35a3e2c84b9d23d6f20b0 Mon Sep 17 00:00:00 2001 From: wangd Date: Fri, 9 Jan 2026 16:35:42 +0800 Subject: [PATCH 3/4] Address AI review comments --- .../procedure/RewriteDataFilesProcedure.java | 17 +- .../iceberg/IcebergDistributedTestBase.java | 226 ++++++++++++------ 2 files changed, 170 insertions(+), 73 deletions(-) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java index 2c3392a787660..b91eaaa25b06c 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java @@ -74,6 +74,7 @@ import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec; import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema; import static com.facebook.presto.iceberg.SortFieldUtils.parseSortFields; +import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA; import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME; @@ -123,7 +124,21 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec IcebergTableHandle tableHandle = layoutHandle.getTable(); SortOrder sortOrder = icebergTable.sortOrder(); - List sortFieldStrings = sortOrderIndex.isEmpty() ? ImmutableList.of() : (List) arguments[sortOrderIndex.getAsInt()]; + List sortFieldStrings = ImmutableList.of(); + if (sortOrderIndex.isPresent()) { + Object value = arguments[sortOrderIndex.getAsInt()]; + if (value == null) { + sortFieldStrings = ImmutableList.of(); + } + else if (value instanceof List) { + sortFieldStrings = ((List) value).stream() + .map(String.class::cast) + .collect(toImmutableList()); + } + else { + throw new PrestoException(INVALID_FUNCTION_ARGUMENT, "sorted_by must be an array(varchar)"); + } + } if (sortFieldStrings != null && !sortFieldStrings.isEmpty()) { SortOrder specifiedSortOrder = parseSortFields(icebergTable.schema(), sortFieldStrings); if (specifiedSortOrder.satisfies(sortOrder)) { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index b78086ff2d84e..919e55c6fa336 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -1690,18 +1690,21 @@ public void testRewriteDataFilesWithSortOrder() { String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); String schema = getSession().getSchema().get(); - assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar)"); - assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); - assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); - assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); - - assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id'])", schema, tableName), 7); - MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); - assertEquals(result.getOnlyColumnAsSet().size(), 1); - String filePath = String.valueOf(result.getOnlyValue()); - assertTrue(isFileSorted(filePath, "id", "ASC")); + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); - assertUpdate("DROP TABLE IF EXISTS " + tableName); + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 1); + String filePath = String.valueOf(result.getOnlyValue()); + assertTrue(isFileSorted(filePath, "id", "ASC")); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } } @Test @@ -1710,19 +1713,22 @@ public void testRewriteDataFilesWithSortOrderOnPartitionedTables() { String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); String schema = getSession().getSchema().get(); - assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (partitioning = ARRAY['emp_name'])"); - assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'AAAA'), (3, 'CCCC'), (1, 'BBBB')", 3); - assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'AAAA')", 2); - assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'BBBB')", 2); - - assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id'])", schema, tableName), 7); - MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); - assertEquals(result.getOnlyColumnAsSet().size(), 3); - for (Object filePath : result.getOnlyColumnAsSet()) { - assertTrue(isFileSorted(String.valueOf(filePath), "id", "ASC")); + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (partitioning = ARRAY['emp_name'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'AAAA'), (3, 'CCCC'), (1, 'BBBB')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'AAAA')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'BBBB')", 2); + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 3); + for (Object filePath : result.getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "ASC")); + } + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); } - - assertUpdate("DROP TABLE IF EXISTS " + tableName); } @Test @@ -1731,18 +1737,21 @@ public void testRewriteDataFilesWithDescSortOrder() { String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); String schema = getSession().getSchema().get(); - assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar)"); - assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); - assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); - assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); - - assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), 7); - MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); - assertEquals(result.getOnlyColumnAsSet().size(), 1); - String filePath = String.valueOf(result.getOnlyValue()); - assertTrue(isFileSorted(filePath, "id", "DESC")); + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); - assertUpdate("DROP TABLE IF EXISTS " + tableName); + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 1); + String filePath = String.valueOf(result.getOnlyValue()); + assertTrue(isFileSorted(filePath, "id", "DESC")); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } } @Test @@ -1751,19 +1760,22 @@ public void testRewriteDataFilesWithDescSortOrderOnPartitionedTables() { String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); String schema = getSession().getSchema().get(); - assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (partitioning = ARRAY['emp_name'])"); - assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'AAAA'), (3, 'CCCC'), (1, 'BBBB')", 3); - assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'AAAA')", 2); - assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'BBBB')", 2); - - assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), 7); - MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); - assertEquals(result.getOnlyColumnAsSet().size(), 3); - for (Object filePath : result.getOnlyColumnAsSet()) { - assertTrue(isFileSorted(String.valueOf(filePath), "id", "DESC")); + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (partitioning = ARRAY['emp_name'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'AAAA'), (3, 'CCCC'), (1, 'BBBB')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'AAAA')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'BBBB')", 2); + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 3); + for (Object filePath : result.getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "DESC")); + } + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); } - - assertUpdate("DROP TABLE IF EXISTS " + tableName); } @Test @@ -1772,21 +1784,24 @@ public void testRewriteDataFilesWithCompatibleSortOrderForSortedTable() { String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); String schema = getSession().getSchema().get(); - assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (sorted_by = ARRAY['id DESC'])"); - assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); - assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); - assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); - for (Object filePath : computeActual("SELECT file_path from \"" + tableName + "$files\"").getOnlyColumnAsSet()) { - assertTrue(isFileSorted(String.valueOf(filePath), "id", "DESC")); - } - - assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC', 'emp_name ASC'])", schema, tableName), 7); - MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); - assertEquals(result.getOnlyColumnAsSet().size(), 1); - String filePath = String.valueOf(result.getOnlyValue()); - assertTrue(isFileSorted(filePath, "id", "DESC")); + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (sorted_by = ARRAY['id DESC'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); + for (Object filePath : computeActual("SELECT file_path from \"" + tableName + "$files\"").getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "DESC")); + } - assertUpdate("DROP TABLE IF EXISTS " + tableName); + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC', 'emp_name ASC'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 1); + String filePath = String.valueOf(result.getOnlyValue()); + assertTrue(isFileSorted(filePath, "id", "DESC")); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } } @Test @@ -1795,21 +1810,88 @@ public void testNotAllRewriteDataFilesWithIncompatibleSortOrderForSortedTable() { String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); String schema = getSession().getSchema().get(); - assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (sorted_by = ARRAY['id'])"); - assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); - assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); - assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); - for (Object filePath : computeActual("SELECT file_path from \"" + tableName + "$files\"").getOnlyColumnAsSet()) { - assertTrue(isFileSorted(String.valueOf(filePath), "id", "ASC")); - } + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (sorted_by = ARRAY['id'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); + for (Object filePath : computeActual("SELECT file_path from \"" + tableName + "$files\"").getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "ASC")); + } - assertQueryFails(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), - "Specified sort order is incompatible with the target table's internal sort order"); + assertQueryFails(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), + "Specified sort order is incompatible with the target table's internal sort order"); - assertQueryFails(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['emp_name ASC', 'id ASC'])", schema, tableName), - "Specified sort order is incompatible with the target table's internal sort order"); + assertQueryFails(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['emp_name ASC', 'id ASC'])", schema, tableName), + "Specified sort order is incompatible with the target table's internal sort order"); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } - assertUpdate("DROP TABLE IF EXISTS " + tableName); + @Test + public void testRewriteDataFilesWithFilterAndSortOrder() + throws IOException + { + String tableName = "test_rewrite_data_with_filter_and_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + try { + assertUpdate("CREATE TABLE " + tableName + " (id int, emp_name varchar) with (partitioning = ARRAY['emp_name'])"); + + // Create multiple data files with mixed id values so that only a subset is rewritten + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'AAAAA'), (2, 'BBBBB'), (4, 'AAAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (4, 'BBBBB'), (0, 'BBBBB')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, 'AAAAA'), (3, 'BBBBB')", 2); + + // Rewrite only rows with `emp_name = 'AAAAA'` and sort the rewritten data files by `id desc` + assertUpdate(format( + "CALL system.rewrite_data_files(" + + "schema => '%s', " + + "table_name => '%s', " + + "filter => 'emp_name = ''AAAAA''', " + + "sorted_by => ARRAY['id desc'])", + schema, tableName), 3); + + // Rewrite only rows with `emp_name = 'BBBBB'` and sort the rewritten data files by `id asc` + assertUpdate(format( + "CALL system.rewrite_data_files(" + + "schema => '%s', " + + "table_name => '%s', " + + "filter => 'emp_name = ''BBBBB''', " + + "sorted_by => ARRAY['id asc'])", + schema, tableName), 4); + + // All data is still present + assertQuery( + "SELECT id, emp_name FROM " + tableName, + "VALUES " + + "(1, 'AAAAA'), " + + "(2, 'BBBBB'), " + + "(4, 'AAAAA'), " + + "(4, 'BBBBB'), " + + "(0, 'BBBBB'), " + + "(3, 'AAAAA'), " + + "(3, 'BBBBB')"); + + // There are 2 data files after the rewriting + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + List paths = result.getOnlyColumn().map(String::valueOf).distinct().toList(); + assertEquals(paths.size(), 2); + + // The data file under partition `emp_name = 'AAAAA'` is sorted by `id DESC` + List dataFileA = paths.stream().filter(str -> str.contains("AAAAA")).toList(); + assertEquals(dataFileA.size(), 1); + assertTrue(isFileSorted(String.valueOf(dataFileA.get(0)), "id", "DESC")); + + // The data file under partition `emp_name = 'BBBBB'` is sorted by `id ASC` + List dataFileB = paths.stream().filter(str -> str.contains("BBBBB")).toList(); + assertEquals(dataFileB.size(), 1); + assertTrue(isFileSorted(String.valueOf(dataFileB.get(0)), "id", "ASC")); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } } public boolean isFileSorted(String path, String sortColumnName, String sortOrder) From 78dd55b159ef8c53432934fc02413c54308cde3b Mon Sep 17 00:00:00 2001 From: wangd Date: Fri, 9 Jan 2026 16:40:23 +0800 Subject: [PATCH 4/4] Address review comments: update the documentation --- presto-docs/src/main/sphinx/connector/iceberg.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 3132f2c6e5f24..5506b09dd30eb 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1277,10 +1277,10 @@ Argument Name required type Description ``filter`` string Predicate as a string used for filtering the files. Currently only rewrite of whole partitions is supported. Filter on partition - columns. The default value is `true`. + columns. The default value is ``true``. -``sorted_by`` array of string Specify an array of one or more columns to use for sorting. When - performing a rewrite, the specified sorting definition must be +``sorted_by`` array of Specify an array of one or more columns to use for sorting. When + strings performing a rewrite, the specified sorting definition must be compatible with the table's own sorting property, if one exists. ``options`` map Options to be used for data files rewrite. (to be expanded) @@ -1288,17 +1288,17 @@ Argument Name required type Description Examples: -* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones:: +* Rewrite all the data files in table ``db.sample`` to the newest partition spec and combine small files to larger ones:: CALL iceberg.system.rewrite_data_files('db', 'sample'); CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample'); -* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec:: +* Rewrite the data files in partitions specified by a filter in table ``db.sample`` to the newest partition spec:: CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1'); CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1'); -* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec and a sorting definition:: +* Rewrite the data files in partitions specified by a filter in table ``db.sample`` to the newest partition spec and a sorting definition:: CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1', ARRAY['join_date DESC NULLS FIRST', 'emp_id ASC NULLS LAST']); CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1', sorted_by => ARRAY['join_date']);