Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void setup()
procedures.add(new Procedure("system", "fun", arguments));
procedures.add(new TableDataRewriteDistributedProcedure("system", "distributed_fun",
distributedArguments,
(session, transactionContext, procedureHandle, fragments) -> null,
(session, transactionContext, procedureHandle, fragments, sortOrderIndex) -> null,
(session, transactionContext, procedureHandle, fragments) -> {},
ignored -> new TestProcedureRegistry.TestProcedureContext()));
procedureRegistry.addProcedures(new ConnectorId("test"), procedures);
Expand Down
15 changes: 12 additions & 3 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1277,23 +1277,32 @@ Argument Name required type Description

``filter`` string Predicate as a string used for filtering the files. Currently
only rewrite of whole partitions is supported. Filter on partition
columns. The default value is `true`.
columns. The default value is ``true``.

``sorted_by`` array of Specify an array of one or more columns to use for sorting. When
strings performing a rewrite, the specified sorting definition must be
compatible with the table's own sorting property, if one exists.

``options`` map Options to be used for data files rewrite. (to be expanded)
===================== ========== =============== =======================================================================

Examples:

* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones::
* Rewrite all the data files in table ``db.sample`` to the newest partition spec and combine small files to larger ones::

CALL iceberg.system.rewrite_data_files('db', 'sample');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample');

* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec::
* Rewrite the data files in partitions specified by a filter in table ``db.sample`` to the newest partition spec::

CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1');

* Rewrite the data files in partitions specified by a filter in table ``db.sample`` to the newest partition spec and a sorting definition::

CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1', ARRAY['join_date DESC NULLS FIRST', 'emp_id ASC NULLS LAST']);
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1', sorted_by => ARRAY['join_date']);

Rewrite Manifests
^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Map;
Expand All @@ -43,6 +42,7 @@ public IcebergDistributedProcedureHandle(
@JsonProperty("compressionCodec") HiveCompressionCodec compressionCodec,
@JsonProperty("storageProperties") Map<String, String> storageProperties,
@JsonProperty("tableLayoutHandle") IcebergTableLayoutHandle tableLayoutHandle,
@JsonProperty("sortOrder") List<SortField> sortOrder,
@JsonProperty("relevantData") Map<String, String> relevantData)
{
super(
Expand All @@ -55,7 +55,7 @@ public IcebergDistributedProcedureHandle(
fileFormat,
compressionCodec,
storageProperties,
ImmutableList.of());
sortOrder);
this.tableLayoutHandle = requireNonNull(tableLayoutHandle, "tableLayoutHandle is null");
this.relevantData = requireNonNull(relevantData, "relevantData is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import com.facebook.presto.iceberg.IcebergTableLayoutHandle;
import com.facebook.presto.iceberg.PartitionData;
import com.facebook.presto.iceberg.RuntimeStatsMetricsReporter;
import com.facebook.presto.iceberg.SortField;
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.procedure.DistributedProcedure;
import com.facebook.presto.spi.procedure.DistributedProcedure.Argument;
Expand All @@ -42,6 +44,7 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
Expand All @@ -57,17 +60,22 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Consumer;

import static com.facebook.presto.common.Utils.checkArgument;
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergAbstractMetadata.getSupportedSortFields;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
import static com.facebook.presto.iceberg.SortFieldUtils.parseSortFields;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA;
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -98,8 +106,9 @@ public DistributedProcedure get()
new Argument(SCHEMA, VARCHAR),
new Argument(TABLE_NAME, VARCHAR),
new Argument("filter", VARCHAR, false, "TRUE"),
new Argument("sorted_by", "array(varchar)", false, null),
new Argument("options", "map(varchar, varchar)", false, null)),
(session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
(session, procedureContext, tableLayoutHandle, arguments, sortOrderIndex) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments, sortOrderIndex),
((session, procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, tableHandle, fragments)),
arguments -> {
checkArgument(arguments.length == 2, "invalid arguments count: " + arguments.length);
Expand All @@ -108,12 +117,40 @@ public DistributedProcedure get()
});
}

private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments, OptionalInt sortOrderIndex)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
Table icebergTable = procedureContext.getTable();
IcebergTableHandle tableHandle = layoutHandle.getTable();

SortOrder sortOrder = icebergTable.sortOrder();
List<String> sortFieldStrings = ImmutableList.of();
if (sortOrderIndex.isPresent()) {
Object value = arguments[sortOrderIndex.getAsInt()];
if (value == null) {
sortFieldStrings = ImmutableList.of();
}
else if (value instanceof List<?>) {
sortFieldStrings = ((List<?>) value).stream()
.map(String.class::cast)
.collect(toImmutableList());
}
else {
throw new PrestoException(INVALID_FUNCTION_ARGUMENT, "sorted_by must be an array(varchar)");
}
}
if (sortFieldStrings != null && !sortFieldStrings.isEmpty()) {
SortOrder specifiedSortOrder = parseSortFields(icebergTable.schema(), sortFieldStrings);
if (specifiedSortOrder.satisfies(sortOrder)) {
// If the specified sort order satisfies the target table's internal sort order, use the specified sort order
sortOrder = specifiedSortOrder;
}
else {
throw new PrestoException(NOT_SUPPORTED, "Specified sort order is incompatible with the target table's internal sort order");
}
}

List<SortField> sortFields = getSupportedSortFields(icebergTable.schema(), sortOrder);
return new IcebergDistributedProcedureHandle(
tableHandle.getSchemaName(),
tableHandle.getIcebergTableName(),
Expand All @@ -125,6 +162,7 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec
getCompressionCodec(session),
icebergTable.properties(),
layoutHandle,
sortFields,
ImmutableMap.of());
}
}
Expand Down
Loading
Loading