feat: Support sorted_by for data_rewrite_files procedure#26804
feat: Support sorted_by for data_rewrite_files procedure#26804hantangwangd merged 4 commits intoprestodb:masterfrom
sorted_by for data_rewrite_files procedure#26804Conversation
Reviewer's GuideAdd support for an optional sorted_by argument to the Iceberg rewrite_data_files distributed procedure, propagating sort order through the SPI and native execution handle so rewrites can physically sort data files while enforcing compatibility with the table’s existing sort order, and add tests for behavior and procedure wiring updates. Sequence diagram for rewrite_data_files with optional sorted_by propagationsequenceDiagram
actor User
participant PrestoCoordinator
participant AnalyzerPlanner
participant TableDataRewriteDistributedProcedure
participant RewriteDataFilesProcedure
participant IcebergDistributedProcedureHandle
participant NativeWorker
User->>PrestoCoordinator: CALL iceberg.system.rewrite_data_files(..., sorted_by => ARRAY['join_date DESC'])
PrestoCoordinator->>AnalyzerPlanner: Parse and prepare procedure call
AnalyzerPlanner->>TableDataRewriteDistributedProcedure: Resolve arguments(schema, table_name, filter, sorted_by, options)
TableDataRewriteDistributedProcedure->>TableDataRewriteDistributedProcedure: Determine sortOrderIndex
TableDataRewriteDistributedProcedure->>RewriteDataFilesProcedure: begin(session, context, layoutHandle, arguments, sortOrderIndex)
RewriteDataFilesProcedure->>RewriteDataFilesProcedure: Read Iceberg table and layout
RewriteDataFilesProcedure->>RewriteDataFilesProcedure: sortFieldStrings = arguments[sortOrderIndex]
alt sorted_by provided
RewriteDataFilesProcedure->>RewriteDataFilesProcedure: specifiedSortOrder = parseSortFields(schema, sortFieldStrings)
alt specifiedSortOrder satisfies table.sortOrder
RewriteDataFilesProcedure->>RewriteDataFilesProcedure: sortOrder = specifiedSortOrder
else incompatible sort order
RewriteDataFilesProcedure-->>PrestoCoordinator: Throw PrestoException(NOT_SUPPORTED)
PrestoCoordinator-->>User: Error: Specified sort order is incompatible
RewriteDataFilesProcedure-->>RewriteDataFilesProcedure: return
end
else no sorted_by
RewriteDataFilesProcedure->>RewriteDataFilesProcedure: sortOrder = table.sortOrder()
end
RewriteDataFilesProcedure->>RewriteDataFilesProcedure: sortFields = getSupportedSortFields(schema, sortOrder)
RewriteDataFilesProcedure->>IcebergDistributedProcedureHandle: new IcebergDistributedProcedureHandle(..., tableLayoutHandle, sortFields, relevantData)
IcebergDistributedProcedureHandle-->>PrestoCoordinator: ConnectorDistributedProcedureHandle
PrestoCoordinator->>NativeWorker: Send handle as JSON (includes sortOrder)
NativeWorker->>NativeWorker: Deserialize IcebergDistributedProcedureHandle.sortOrder
NativeWorker->>NativeWorker: Plan physical rewrite using sortOrder
NativeWorker-->>PrestoCoordinator: Rewrite result
PrestoCoordinator-->>User: Procedure completed successfully
Updated class diagram for Iceberg rewrite_data_files distributed procedureclassDiagram
class TableDataRewriteDistributedProcedure {
<<abstract>>
+static String SCHEMA
+static String TABLE_NAME
+static String FILTER
+static String SORT_ORDER
-BeginCallDistributedProcedure beginCallDistributedProcedure
-FinishCallDistributedProcedure finishCallDistributedProcedure
-int schemaIndex
-int tableNameIndex
-OptionalInt filterIndex
-OptionalInt sortOrderIndex
+TableDataRewriteDistributedProcedure(String schema, String name, List~Argument~ arguments, BeginCallDistributedProcedure begin, FinishCallDistributedProcedure finish)
+ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments)
+String getSchema(Object[] parameters)
+String getTableName(Object[] parameters)
+String getFilter(Object[] parameters)
+OptionalInt getSortOrderIndex()
}
class BeginCallDistributedProcedure {
<<interface>>
+ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments, OptionalInt sortOrderIndex)
}
class FinishCallDistributedProcedure {
<<interface>>
+Void finish(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableHandle tableHandle, List~Object~ fragments)
}
class RewriteDataFilesProcedure {
+DistributedProcedure get()
-ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments, OptionalInt sortOrderIndex)
-SortOrder parseSortFields(Schema schema, List~String~ sortFieldStrings)
-List~SortField~ getSupportedSortFields(Schema schema, SortOrder sortOrder)
}
class IcebergDistributedProcedureHandle {
+SchemaTableName schemaTableName
+IcebergFileFormat fileFormat
+HiveCompressionCodec compressionCodec
+Map~String,String~ storageProperties
+IcebergTableLayoutHandle tableLayoutHandle
+List~SortField~ sortOrder
+Map~String,String~ relevantData
+IcebergDistributedProcedureHandle(SchemaTableName schemaTableName, String icebergTableName, IcebergFileFormat fileFormat, HiveCompressionCodec compressionCodec, Map~String,String~ storageProperties, IcebergTableLayoutHandle tableLayoutHandle, List~SortField~ sortOrder, Map~String,String~ relevantData)
}
class IcebergDistributedProcedureHandle_cpp {
<<struct>>
+SchemaTableName schemaTableName
+String icebergTableName
+IcebergFileFormat fileFormat
+HiveCompressionCodec compressionCodec
+Map~String,String~ storageProperties
+IcebergTableLayoutHandle tableLayoutHandle
+List~SortField~ sortOrder
+Map~String,String~ relevantData
+IcebergDistributedProcedureHandle()
}
TableDataRewriteDistributedProcedure o--> BeginCallDistributedProcedure
TableDataRewriteDistributedProcedure o--> FinishCallDistributedProcedure
RewriteDataFilesProcedure ..> TableDataRewriteDistributedProcedure : uses
RewriteDataFilesProcedure ..> IcebergDistributedProcedureHandle : constructs
IcebergDistributedProcedureHandle_cpp <.. IcebergDistributedProcedureHandle : JSON serialization
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
cf6071b to
abbc90a
Compare
abbc90a to
4ff8a33
Compare
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- The new Iceberg rewrite tests repeat the same table creation/insert/drop patterns several times; consider extracting a small helper to set up and tear down these tables to reduce duplication and make intent clearer.
- In the new tests that create temporary tables, a try/finally pattern around assertions can ensure the DROP TABLE always runs even when an assertion fails, preventing leftover tables from impacting other tests.
- When throwing NOT_SUPPORTED for an incompatible sort order, consider including the table's existing sort order and the requested sort order in the exception message to make debugging misconfigurations easier.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new Iceberg rewrite tests repeat the same table creation/insert/drop patterns several times; consider extracting a small helper to set up and tear down these tables to reduce duplication and make intent clearer.
- In the new tests that create temporary tables, a try/finally pattern around assertions can ensure the DROP TABLE always runs even when an assertion fails, preventing leftover tables from impacting other tests.
- When throwing NOT_SUPPORTED for an incompatible sort order, consider including the table's existing sort order and the requested sort order in the exception message to make debugging misconfigurations easier.
## Individual Comments
### Comment 1
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java:119-126` </location>
<code_context>
+ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments, OptionalInt sortOrderIndex)
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Tighten handling of the `sorted_by` argument to avoid unchecked cast and potential surprises with nulls.
This currently uses an unchecked `(List<String>)` cast and permits `sortFieldStrings` to be `null`. Consider validating the argument type and normalizing nulls up front, e.g.:
```java
List<String> sortFieldStrings = ImmutableList.of();
if (sortOrderIndex.isPresent()) {
Object value = arguments[sortOrderIndex.getAsInt()];
if (value == null) {
sortFieldStrings = ImmutableList.of();
}
else if (value instanceof List<?>) {
sortFieldStrings = ((List<?>) value).stream()
.map(String.class::cast)
.collect(toImmutableList());
}
else {
throw new PrestoException(INVALID_FUNCTION_ARGUMENT, "sorted_by must be an array(varchar)");
}
}
```
This avoids a possible `ClassCastException`, gives users a clearer error, and removes the need for special null handling later.
Suggested implementation:
```java
Table icebergTable = procedureContext.getTable();
IcebergTableHandle tableHandle = layoutHandle.getTable();
SortOrder sortOrder = icebergTable.sortOrder();
List<String> sortFieldStrings = ImmutableList.of();
if (sortOrderIndex.isPresent()) {
Object value = arguments[sortOrderIndex.getAsInt()];
if (value == null) {
sortFieldStrings = ImmutableList.of();
}
else if (value instanceof List<?>) {
sortFieldStrings = ((List<?>) value).stream()
.map(String.class::cast)
.collect(toImmutableList());
}
else {
throw new PrestoException(INVALID_FUNCTION_ARGUMENT, "sorted_by must be an array(varchar)");
}
}
if (!sortFieldStrings.isEmpty()) {
```
To compile successfully, ensure the following imports exist in `RewriteDataFilesProcedure.java`:
1. `import com.facebook.presto.spi.PrestoException;`
2. `import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;`
3. `import static com.google.common.collect.ImmutableList.toImmutableList;`
If any of these are missing, add them alongside the existing imports.
</issue_to_address>
### Comment 2
<location> `presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp:1191-1197` </location>
<code_context>
"IcebergDistributedProcedureHandle",
"IcebergTableLayoutHandle",
"tableLayoutHandle");
+ to_json_key(
+ j,
+ "sortOrder",
+ p.sortOrder,
+ "IcebergDistributedProcedureHandle",
+ "List<SortField>",
+ "sortOrder");
to_json_key(
j,
</code_context>
<issue_to_address>
**issue (bug_risk):** Double-check mixed-version compatibility for the new `sortOrder` protocol field.
Please confirm that:
- `from_json_key` treats a missing `sortOrder` as optional and leaves `p.sortOrder` at its default, so older writers → newer readers still work.
- The Java side can deserialize payloads that omit `sortOrder` (e.g., via a non-required `@JsonProperty` or a suitable default), so newer writers ↔ older components don’t fail.
If missing fields aren’t tolerated on either side, mixed-version deployments may see runtime errors.
</issue_to_address>
### Comment 3
<location> `presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java:1687-1696` </location>
<code_context>
+ @Test
</code_context>
<issue_to_address>
**suggestion (testing):** Add coverage for combining `filter` and `sorted_by` in `rewrite_data_files`
Given the PR description emphasizes using `rewrite_data_files` with both `filter` and `sorted_by`, please add a test invoking `CALL system.rewrite_data_files(schema => ..., table_name => ..., filter => 'id > 3', sorted_by => ARRAY['id'])`. The test should assert that only the filtered rows are rewritten and that the resulting files are ordered by `id`, so we have explicit coverage of their interaction and can detect regressions in filter handling when `sorted_by` is used.
Suggested implementation:
```java
}
}
@Test
public void testRewriteDataFilesWithFilterAndSortOrder()
throws IOException
{
String tableName = "test_rewrite_data_with_filter_and_sort_order_" + randomTableSuffix();
String schema = getSession().getSchema().get();
assertUpdate("CREATE TABLE " + tableName + " (id int, emp_name varchar)");
// Create multiple data files with mixed id values so that only a subset is rewritten
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'AAAA'), (2, 'BBBB'), (3, 'CCCC')", 3);
assertUpdate("INSERT INTO " + tableName + " VALUES (4, 'DDDD'), (5, 'EEEE')", 2);
assertUpdate("INSERT INTO " + tableName + " VALUES (6, 'FFFF'), (7, 'GGGG')", 2);
// Rewrite only rows with id > 3 and sort the rewritten data files by id
assertUpdate(format(
"CALL system.rewrite_data_files(" +
"schema_name => '%s', " +
"table_name => '%s', " +
"filter => 'id > 3', " +
"sorted_by => ARRAY['id'])",
schema,
tableName));
// All data is still present
assertQuery(
"SELECT id, emp_name FROM " + tableName,
"VALUES " +
"(1, 'AAAA'), " +
"(2, 'BBBB'), " +
"(3, 'CCCC'), " +
"(4, 'DDDD'), " +
"(5, 'EEEE'), " +
"(6, 'FFFF'), " +
"(7, 'GGGG')");
// Filtered rows (id > 3) should appear ordered by id after rewrite_data_files with sorted_by
assertQuery(
"SELECT id, emp_name FROM " + tableName + " WHERE id > 3",
"VALUES " +
"(4, 'DDDD'), " +
"(5, 'EEEE'), " +
"(6, 'FFFF'), " +
"(7, 'GGGG')");
}
@Test
public void testRewriteDataFilesWithSortOrder()
throws IOException
{
String tableName = "test_rewrite_data_with_sort_order_" + randomTableSuffix();
String schema = getSession().getSchema().get();
assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar)");
assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA')", 3);
assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'BBBB'), (4,'DDDD')", 2);
assertUpdate("INSERT INTO " + tableName + " VALUES (9, 'CCCC'), (11,'FFFF')", 2);
```
1. If this class uses a helper for building `CALL system.rewrite_data_files(...)` statements or for asserting row ordering, you may want to adjust the new test to reuse those helpers instead of hardcoding the SQL.
2. If the existing tests explicitly inspect Iceberg metadata (e.g., `$files` or `$snapshots`) to verify which data files were rewritten, you can extend the new test similarly to assert that only files containing rows with `id > 3` changed. The basic interaction and coverage for `filter` + `sorted_by` is implemented above, but you might refine the metadata assertions to match the style of the rest of the suite.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
...o-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
...to-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp
Show resolved
Hide resolved
presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java
Outdated
Show resolved
Hide resolved
steveburnett
left a comment
There was a problem hiding this comment.
Thank you for the doc! A couple of formatting and phrasing nits only.
4ff8a33 to
78dd55b
Compare
|
@steveburnett thanks for the review and suggestion. I've addressed the comments. Please take a look when you have a chance. |
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull updated branch, new local doc build. Looks good, thanks!
tdcmeehan
left a comment
There was a problem hiding this comment.
Looks great, thanks @hantangwangd !
Description
This PR support specifying
sorted_byargument when callingrewrite_ data_filesprocedure. It use the same syntax that is used to specify the sorting property when creating a table:It's especially useful when the table itself is not defined as a sorted table, or its sorting property is somewhat insufficient. This significantly optimizes scanning performance by allowing more irrelevant data files to be filtered out.
Motivation and Context
See issue #26824
Impact
Users can specify
sort_byargument when callingrewrite_data_filesprocedure.Test Plan
Newly added test cases to show rewriting data files with specified sort orders.
Contributor checklist
Release Notes