diff --git a/presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java b/presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java index a0e0fc3f53ef0..c37a54fca368d 100644 --- a/presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java +++ b/presto-analyzer/src/test/java/com/facebook/presto/sql/analyzer/TestBuiltInQueryPreparer.java @@ -75,7 +75,7 @@ public void setup() procedures.add(new Procedure("system", "fun", arguments)); procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_fun", distributedArguments, - (session, transactionContext, procedureHandle, fragments) -> null, + (session, transactionContext, procedureHandle, fragments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())); procedureRegistry.addProcedures(new ConnectorId("test"), procedures); diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 0e474fe258dbf..5506b09dd30eb 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1277,23 +1277,32 @@ Argument Name required type Description ``filter`` string Predicate as a string used for filtering the files. Currently only rewrite of whole partitions is supported. Filter on partition - columns. The default value is `true`. + columns. The default value is ``true``. + +``sorted_by`` array of Specify an array of one or more columns to use for sorting. When + strings performing a rewrite, the specified sorting definition must be + compatible with the table's own sorting property, if one exists. ``options`` map Options to be used for data files rewrite. (to be expanded) ===================== ========== =============== ======================================================================= Examples: -* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones:: +* Rewrite all the data files in table ``db.sample`` to the newest partition spec and combine small files to larger ones:: CALL iceberg.system.rewrite_data_files('db', 'sample'); CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample'); -* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec:: +* Rewrite the data files in partitions specified by a filter in table ``db.sample`` to the newest partition spec:: CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1'); CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1'); +* Rewrite the data files in partitions specified by a filter in table ``db.sample`` to the newest partition spec and a sorting definition:: + + CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1', ARRAY['join_date DESC NULLS FIRST', 'emp_id ASC NULLS LAST']); + CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1', sorted_by => ARRAY['join_date']); + Rewrite Manifests ^^^^^^^^^^^^^^^^^ diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java index c6f78f97992cb..28eec12680077 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java @@ -17,7 +17,6 @@ import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Map; @@ -43,6 +42,7 @@ public IcebergDistributedProcedureHandle( @JsonProperty("compressionCodec") HiveCompressionCodec compressionCodec, @JsonProperty("storageProperties") Map storageProperties, @JsonProperty("tableLayoutHandle") IcebergTableLayoutHandle tableLayoutHandle, + @JsonProperty("sortOrder") List sortOrder, @JsonProperty("relevantData") Map relevantData) { super( @@ -55,7 +55,7 @@ public IcebergDistributedProcedureHandle( fileFormat, compressionCodec, storageProperties, - ImmutableList.of()); + sortOrder); this.tableLayoutHandle = requireNonNull(tableLayoutHandle, "tableLayoutHandle is null"); this.relevantData = requireNonNull(relevantData, "relevantData is null"); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java index 50dc0d92cff6e..b91eaaa25b06c 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java @@ -24,8 +24,10 @@ import com.facebook.presto.iceberg.IcebergTableLayoutHandle; import com.facebook.presto.iceberg.PartitionData; import com.facebook.presto.iceberg.RuntimeStatsMetricsReporter; +import com.facebook.presto.iceberg.SortField; import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.classloader.ThreadContextClassLoader; import com.facebook.presto.spi.procedure.DistributedProcedure; import com.facebook.presto.spi.procedure.DistributedProcedure.Argument; @@ -42,6 +44,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; @@ -57,17 +60,22 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.OptionalInt; import java.util.Set; import java.util.function.Consumer; import static com.facebook.presto.common.Utils.checkArgument; import static com.facebook.presto.common.type.StandardTypes.VARCHAR; import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; +import static com.facebook.presto.iceberg.IcebergAbstractMetadata.getSupportedSortFields; import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec; import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema; +import static com.facebook.presto.iceberg.SortFieldUtils.parseSortFields; +import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA; import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -98,8 +106,9 @@ public DistributedProcedure get() new Argument(SCHEMA, VARCHAR), new Argument(TABLE_NAME, VARCHAR), new Argument("filter", VARCHAR, false, "TRUE"), + new Argument("sorted_by", "array(varchar)", false, null), new Argument("options", "map(varchar, varchar)", false, null)), - (session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments), + (session, procedureContext, tableLayoutHandle, arguments, sortOrderIndex) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments, sortOrderIndex), ((session, procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, tableHandle, fragments)), arguments -> { checkArgument(arguments.length == 2, "invalid arguments count: " + arguments.length); @@ -108,12 +117,40 @@ public DistributedProcedure get() }); } - private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments) + private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments, OptionalInt sortOrderIndex) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { Table icebergTable = procedureContext.getTable(); IcebergTableHandle tableHandle = layoutHandle.getTable(); + SortOrder sortOrder = icebergTable.sortOrder(); + List sortFieldStrings = ImmutableList.of(); + if (sortOrderIndex.isPresent()) { + Object value = arguments[sortOrderIndex.getAsInt()]; + if (value == null) { + sortFieldStrings = ImmutableList.of(); + } + else if (value instanceof List) { + sortFieldStrings = ((List) value).stream() + .map(String.class::cast) + .collect(toImmutableList()); + } + else { + throw new PrestoException(INVALID_FUNCTION_ARGUMENT, "sorted_by must be an array(varchar)"); + } + } + if (sortFieldStrings != null && !sortFieldStrings.isEmpty()) { + SortOrder specifiedSortOrder = parseSortFields(icebergTable.schema(), sortFieldStrings); + if (specifiedSortOrder.satisfies(sortOrder)) { + // If the specified sort order satisfies the target table's internal sort order, use the specified sort order + sortOrder = specifiedSortOrder; + } + else { + throw new PrestoException(NOT_SUPPORTED, "Specified sort order is incompatible with the target table's internal sort order"); + } + } + + List sortFields = getSupportedSortFields(icebergTable.schema(), sortOrder); return new IcebergDistributedProcedureHandle( tableHandle.getSchemaName(), tableHandle.getIcebergTableName(), @@ -125,6 +162,7 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec getCompressionCodec(session), icebergTable.properties(), layoutHandle, + sortFields, ImmutableMap.of()); } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 53eb1a93cc467..919e55c6fa336 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -1684,6 +1684,216 @@ public void testWithoutSortOrder() } } + @Test + public void testRewriteDataFilesWithSortOrder() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 1); + String filePath = String.valueOf(result.getOnlyValue()); + assertTrue(isFileSorted(filePath, "id", "ASC")); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testRewriteDataFilesWithSortOrderOnPartitionedTables() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (partitioning = ARRAY['emp_name'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'AAAA'), (3, 'CCCC'), (1, 'BBBB')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'AAAA')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'BBBB')", 2); + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 3); + for (Object filePath : result.getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "ASC")); + } + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testRewriteDataFilesWithDescSortOrder() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 1); + String filePath = String.valueOf(result.getOnlyValue()); + assertTrue(isFileSorted(filePath, "id", "DESC")); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testRewriteDataFilesWithDescSortOrderOnPartitionedTables() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (partitioning = ARRAY['emp_name'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'AAAA'), (3, 'CCCC'), (1, 'BBBB')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'AAAA')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'BBBB')", 2); + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 3); + for (Object filePath : result.getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "DESC")); + } + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testRewriteDataFilesWithCompatibleSortOrderForSortedTable() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (sorted_by = ARRAY['id DESC'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); + for (Object filePath : computeActual("SELECT file_path from \"" + tableName + "$files\"").getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "DESC")); + } + + assertUpdate(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC', 'emp_name ASC'])", schema, tableName), 7); + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + assertEquals(result.getOnlyColumnAsSet().size(), 1); + String filePath = String.valueOf(result.getOnlyValue()); + assertTrue(isFileSorted(filePath, "id", "DESC")); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testNotAllRewriteDataFilesWithIncompatibleSortOrderForSortedTable() + throws IOException + { + String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + try { + assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) with (sorted_by = ARRAY['id'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2); + for (Object filePath : computeActual("SELECT file_path from \"" + tableName + "$files\"").getOnlyColumnAsSet()) { + assertTrue(isFileSorted(String.valueOf(filePath), "id", "ASC")); + } + + assertQueryFails(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['id DESC'])", schema, tableName), + "Specified sort order is incompatible with the target table's internal sort order"); + + assertQueryFails(format("CALL system.rewrite_data_files(schema => '%s', table_name => '%s', sorted_by => ARRAY['emp_name ASC', 'id ASC'])", schema, tableName), + "Specified sort order is incompatible with the target table's internal sort order"); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testRewriteDataFilesWithFilterAndSortOrder() + throws IOException + { + String tableName = "test_rewrite_data_with_filter_and_sort_order_" + randomTableSuffix(); + String schema = getSession().getSchema().get(); + try { + assertUpdate("CREATE TABLE " + tableName + " (id int, emp_name varchar) with (partitioning = ARRAY['emp_name'])"); + + // Create multiple data files with mixed id values so that only a subset is rewritten + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'AAAAA'), (2, 'BBBBB'), (4, 'AAAAA')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (4, 'BBBBB'), (0, 'BBBBB')", 2); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, 'AAAAA'), (3, 'BBBBB')", 2); + + // Rewrite only rows with `emp_name = 'AAAAA'` and sort the rewritten data files by `id desc` + assertUpdate(format( + "CALL system.rewrite_data_files(" + + "schema => '%s', " + + "table_name => '%s', " + + "filter => 'emp_name = ''AAAAA''', " + + "sorted_by => ARRAY['id desc'])", + schema, tableName), 3); + + // Rewrite only rows with `emp_name = 'BBBBB'` and sort the rewritten data files by `id asc` + assertUpdate(format( + "CALL system.rewrite_data_files(" + + "schema => '%s', " + + "table_name => '%s', " + + "filter => 'emp_name = ''BBBBB''', " + + "sorted_by => ARRAY['id asc'])", + schema, tableName), 4); + + // All data is still present + assertQuery( + "SELECT id, emp_name FROM " + tableName, + "VALUES " + + "(1, 'AAAAA'), " + + "(2, 'BBBBB'), " + + "(4, 'AAAAA'), " + + "(4, 'BBBBB'), " + + "(0, 'BBBBB'), " + + "(3, 'AAAAA'), " + + "(3, 'BBBBB')"); + + // There are 2 data files after the rewriting + MaterializedResult result = computeActual("SELECT file_path from \"" + tableName + "$files\""); + List paths = result.getOnlyColumn().map(String::valueOf).distinct().toList(); + assertEquals(paths.size(), 2); + + // The data file under partition `emp_name = 'AAAAA'` is sorted by `id DESC` + List dataFileA = paths.stream().filter(str -> str.contains("AAAAA")).toList(); + assertEquals(dataFileA.size(), 1); + assertTrue(isFileSorted(String.valueOf(dataFileA.get(0)), "id", "DESC")); + + // The data file under partition `emp_name = 'BBBBB'` is sorted by `id ASC` + List dataFileB = paths.stream().filter(str -> str.contains("BBBBB")).toList(); + assertEquals(dataFileB.size(), 1); + assertTrue(isFileSorted(String.valueOf(dataFileB.get(0)), "id", "ASC")); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + public boolean isFileSorted(String path, String sortColumnName, String sortOrder) throws IOException { diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/AbstractAnalyzerTest.java b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/AbstractAnalyzerTest.java index 458c86f986203..af25f7ca02b49 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/AbstractAnalyzerTest.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/AbstractAnalyzerTest.java @@ -196,7 +196,7 @@ public void setup() procedures.add(new Procedure("system", "procedure", arguments)); procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_procedure", distributedArguments, - (session, transactionContext, procedureHandle, fragments) -> null, + (session, transactionContext, procedureHandle, fragments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())); metadata.getProcedureRegistry().addProcedures(SECOND_CONNECTOR_ID, procedures); diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java index 46f2f031e858b..1ff85caca45a9 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java @@ -209,7 +209,7 @@ public Connector create(String catalogName, Map config, Connecto Set> procedures = new HashSet<>(); procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_fun", arguments, - (session, transactionContext, procedureHandle, fragments) -> null, + (session, transactionContext, procedureHandle, fragments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())); diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp index 6674b05f847b4..79fbae02dded4 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp @@ -1188,6 +1188,13 @@ void to_json(json& j, const IcebergDistributedProcedureHandle& p) { "IcebergDistributedProcedureHandle", "IcebergTableLayoutHandle", "tableLayoutHandle"); + to_json_key( + j, + "sortOrder", + p.sortOrder, + "IcebergDistributedProcedureHandle", + "List", + "sortOrder"); to_json_key( j, "relevantData", @@ -1269,6 +1276,13 @@ void from_json(const json& j, IcebergDistributedProcedureHandle& p) { "IcebergDistributedProcedureHandle", "IcebergTableLayoutHandle", "tableLayoutHandle"); + from_json_key( + j, + "sortOrder", + p.sortOrder, + "IcebergDistributedProcedureHandle", + "List", + "sortOrder"); from_json_key( j, "relevantData", diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h index 388d9247a828c..95ac608c985e3 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h @@ -243,6 +243,7 @@ struct IcebergDistributedProcedureHandle hive::HiveCompressionCodec compressionCodec = {}; Map storageProperties = {}; IcebergTableLayoutHandle tableLayoutHandle = {}; + List sortOrder = {}; Map relevantData = {}; IcebergDistributedProcedureHandle() noexcept; diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc index 3377b25a93492..c0a0882c49180 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc @@ -28,6 +28,7 @@ struct IcebergDistributedProcedureHandle hive::HiveCompressionCodec compressionCodec = {}; Map storageProperties = {}; IcebergTableLayoutHandle tableLayoutHandle = {}; + List sortOrder = {}; Map relevantData = {}; IcebergDistributedProcedureHandle() noexcept; diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/procedure/TableDataRewriteDistributedProcedure.java b/presto-spi/src/main/java/com/facebook/presto/spi/procedure/TableDataRewriteDistributedProcedure.java index 1831db06eece0..f409a9ea5e763 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/procedure/TableDataRewriteDistributedProcedure.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/procedure/TableDataRewriteDistributedProcedure.java @@ -35,6 +35,7 @@ public class TableDataRewriteDistributedProcedure public static final String SCHEMA = "schema"; public static final String TABLE_NAME = "table_name"; public static final String FILTER = "filter"; + public static final String SORT_ORDER = "sorted_by"; private final BeginCallDistributedProcedure beginCallDistributedProcedure; private final FinishCallDistributedProcedure finishCallDistributedProcedure; @@ -42,6 +43,7 @@ public class TableDataRewriteDistributedProcedure private int schemaIndex = -1; private int tableNameIndex = -1; private OptionalInt filterIndex = OptionalInt.empty(); + private OptionalInt sortOrderIndex = OptionalInt.empty(); public TableDataRewriteDistributedProcedure(String schema, String name, List arguments, @@ -67,6 +69,9 @@ else if (getArguments().get(i).getName().equals(TABLE_NAME)) { else if (getArguments().get(i).getName().equals(FILTER)) { filterIndex = OptionalInt.of(i); } + else if (getArguments().get(i).getName().equals(SORT_ORDER)) { + sortOrderIndex = OptionalInt.of(i); + } } checkArgument(schemaIndex >= 0 && tableNameIndex >= 0, format("A distributed procedure need at least 2 arguments: `%s` and `%s` for the target table", SCHEMA, TABLE_NAME)); @@ -75,7 +80,7 @@ else if (getArguments().get(i).getName().equals(FILTER)) { @Override public ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments) { - return this.beginCallDistributedProcedure.begin(session, procedureContext, tableLayoutHandle, arguments); + return this.beginCallDistributedProcedure.begin(session, procedureContext, tableLayoutHandle, arguments, getSortOrderIndex()); } @Override @@ -109,10 +114,15 @@ public String getFilter(Object[] parameters) } } + public OptionalInt getSortOrderIndex() + { + return sortOrderIndex; + } + @FunctionalInterface public interface BeginCallDistributedProcedure { - ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments); + ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments, OptionalInt sortOrderIndex); } @FunctionalInterface diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestProcedureCreation.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestProcedureCreation.java index 70563239a1fe1..d0d0c5548188a 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestProcedureCreation.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestProcedureCreation.java @@ -126,7 +126,7 @@ public void showCreateDistributedProcedure() new DistributedProcedure.Argument("name", VARCHAR), new DistributedProcedure.Argument("table_name", VARCHAR), new DistributedProcedure.Argument("schema", VARCHAR, false, null)), - (session, transactionContext, tableLayoutHandle, arguments) -> null, + (session, transactionContext, tableLayoutHandle, arguments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())).isNotNull(); } @@ -141,7 +141,7 @@ public void shouldThrowExceptionForDistributedProcedureWithWrongArgument() new DistributedProcedure.Argument("name", VARCHAR), new DistributedProcedure.Argument("table_name", VARCHAR), new DistributedProcedure.Argument("name3", VARCHAR, false, null)), - (session, transactionContext, tableLayoutHandle, arguments) -> null, + (session, transactionContext, tableLayoutHandle, arguments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())) .isInstanceOf(PrestoException.class) @@ -154,7 +154,7 @@ public void shouldThrowExceptionForDistributedProcedureWithWrongArgument() new DistributedProcedure.Argument("name", VARCHAR), new DistributedProcedure.Argument("name2", VARCHAR), new DistributedProcedure.Argument("schema", VARCHAR, false, null)), - (session, transactionContext, tableLayoutHandle, arguments) -> null, + (session, transactionContext, tableLayoutHandle, arguments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())) .isInstanceOf(PrestoException.class) @@ -167,7 +167,7 @@ public void shouldThrowExceptionForDistributedProcedureWithWrongArgument() new DistributedProcedure.Argument("name", VARCHAR), new DistributedProcedure.Argument("table_name", VARCHAR), new DistributedProcedure.Argument("schema", INTEGER, false, 123)), - (session, transactionContext, tableLayoutHandle, arguments) -> null, + (session, transactionContext, tableLayoutHandle, arguments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())) .isInstanceOf(PrestoException.class) @@ -180,7 +180,7 @@ public void shouldThrowExceptionForDistributedProcedureWithWrongArgument() new DistributedProcedure.Argument("name", VARCHAR), new DistributedProcedure.Argument("table_name", TIMESTAMP), new DistributedProcedure.Argument("schema", VARCHAR, false, null)), - (session, transactionContext, tableLayoutHandle, arguments) -> null, + (session, transactionContext, tableLayoutHandle, arguments, sortOrderIndex) -> null, (session, transactionContext, procedureHandle, fragments) -> {}, ignored -> new TestProcedureRegistry.TestProcedureContext())) .isInstanceOf(PrestoException.class)