diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java index b797c098a59d..ec4fabe530a4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java @@ -31,6 +31,7 @@ import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.SystemTable; +import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.procedure.Procedure; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.transaction.IsolationLevel; @@ -62,6 +63,7 @@ public class IcebergConnector private final List> tableProperties; private final Optional accessControl; private final Set procedures; + private final Set tableProcedures; public IcebergConnector( LifeCycleManager lifeCycleManager, @@ -76,7 +78,8 @@ public IcebergConnector( List> schemaProperties, List> tableProperties, Optional accessControl, - Set procedures) + Set procedures, + Set tableProcedures) { this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); @@ -92,7 +95,8 @@ public IcebergConnector( this.schemaProperties = ImmutableList.copyOf(requireNonNull(schemaProperties, "schemaProperties is null")); this.tableProperties = ImmutableList.copyOf(requireNonNull(tableProperties, "tableProperties is null")); this.accessControl = requireNonNull(accessControl, "accessControl is null"); - this.procedures = requireNonNull(procedures, "procedures is null"); + this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null")); + this.tableProcedures = ImmutableSet.copyOf(requireNonNull(tableProcedures, "tableProcedures is null")); } @Override @@ -150,6 +154,12 @@ public Set getProcedures() return procedures; } + @Override + public Set getTableProcedures() + { + return tableProcedures; + } + @Override public List> getSessionProperties() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergHandleResolver.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergHandleResolver.java index 939b8a8540c9..8519c9dd49b6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergHandleResolver.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergHandleResolver.java @@ -14,12 +14,14 @@ package io.trino.plugin.iceberg; import io.trino.plugin.hive.HiveTransactionHandle; +import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorHandleResolver; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorOutputTableHandle; import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; @@ -56,6 +58,12 @@ public Class getInsertTableHandleClass() return IcebergWritableTableHandle.class; } + @Override + public Class getTableExecuteHandleClass() + { + return IcebergTableExecuteHandle.class; + } + @Override public Class getTransactionHandleClass() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index fb4cae303f23..ffc1766e838e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -22,12 +22,17 @@ import com.google.common.collect.Iterables; import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; +import io.airlift.units.DataSize; import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable; import io.trino.plugin.hive.HiveApplyProjectionUtil; import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation; import io.trino.plugin.hive.HiveWrittenPartitions; +import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle; +import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle; +import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId; import io.trino.spi.TrinoException; import io.trino.spi.connector.Assignment; +import io.trino.spi.connector.BeginTableExecuteResult; import io.trino.spi.connector.CatalogSchemaName; import io.trino.spi.connector.CatalogSchemaTableName; import io.trino.spi.connector.ColumnHandle; @@ -39,6 +44,7 @@ import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorOutputTableHandle; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorTableProperties; @@ -63,11 +69,13 @@ import io.trino.spi.statistics.TableStatistics; import io.trino.spi.type.TypeManager; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; @@ -83,6 +91,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -121,6 +130,7 @@ import static io.trino.plugin.iceberg.TrinoHiveCatalog.DEPENDS_ON_TABLES; import static io.trino.plugin.iceberg.TypeConverter.toIcebergType; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.BigintType.BIGINT; import static java.util.Collections.singletonList; @@ -545,6 +555,158 @@ public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, Connect return new IcebergColumnHandle(primitiveColumnIdentity(0, "$row_id"), BIGINT, ImmutableList.of(), BIGINT, Optional.empty()); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorTableHandle connectorTableHandle, + String procedureName, + Map executeProperties) + { + IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTableHandle; + + IcebergTableProcedureId procedureId; + try { + procedureId = IcebergTableProcedureId.valueOf(procedureName); + } + catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unknown procedure '" + procedureName + "'"); + } + + switch (procedureId) { + case OPTIMIZE: + return getTableHandleForOptimize(session, tableHandle, executeProperties); + } + + throw new IllegalArgumentException("Unknown procedure: " + procedureId); + } + + private Optional getTableHandleForOptimize(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) + { + DataSize maxScannedFileSize = (DataSize) executeProperties.get("file_size_threshold"); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + OPTIMIZE, + new IcebergOptimizeHandle( + SchemaParser.toJson(icebergTable.schema()), + PartitionSpecParser.toJson(icebergTable.spec()), + getColumns(icebergTable.schema(), typeManager), + getFileFormat(icebergTable), + icebergTable.properties(), + maxScannedFileSize), + icebergTable.location())); + } + + @Override + public Optional getLayoutForTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + { + IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle; + switch (executeHandle.getProcedureId()) { + case OPTIMIZE: + return getLayoutForOptimize(session, executeHandle); + } + throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'"); + } + + private Optional getLayoutForOptimize(ConnectorSession session, IcebergTableExecuteHandle executeHandle) + { + Table icebergTable = catalog.loadTable(session, executeHandle.getSchemaTableName()); + return getWriteLayout(icebergTable.schema(), icebergTable.spec()); + } + + @Override + public BeginTableExecuteResult beginTableExecute( + ConnectorSession session, + ConnectorTableExecuteHandle tableExecuteHandle, + ConnectorTableHandle updatedSourceTableHandle) + { + IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle; + IcebergTableHandle table = (IcebergTableHandle) updatedSourceTableHandle; + switch (executeHandle.getProcedureId()) { + case OPTIMIZE: + return beginOptimize(session, executeHandle, table); + } + throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'"); + } + + private BeginTableExecuteResult beginOptimize( + ConnectorSession session, + IcebergTableExecuteHandle executeHandle, + IcebergTableHandle table) + { + IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle(); + Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + + verify(transaction == null, "transaction already set"); + transaction = icebergTable.newTransaction(); + + return new BeginTableExecuteResult<>( + executeHandle, + table.forOptimize(true, optimizeHandle.getMaxScannedFileSize())); + } + + @Override + public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List splitSourceInfo) + { + IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle; + switch (executeHandle.getProcedureId()) { + case OPTIMIZE: + finishOptimize(executeHandle, fragments, splitSourceInfo); + return; + } + throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'"); + } + + private void finishOptimize(IcebergTableExecuteHandle executeHandle, Collection fragments, List splitSourceInfo) + { + IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle(); + Table icebergTable = transaction.table(); + + // paths to be deleted + Set scannedFiles = splitSourceInfo.stream() + .map(DataFile.class::cast) + .collect(toImmutableSet()); + + List commitTasks = fragments.stream() + .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) + .collect(toImmutableList()); + + Type[] partitionColumnTypes = icebergTable.spec().fields().stream() + .map(field -> field.transform().getResultType( + icebergTable.schema().findType(field.sourceId()))) + .toArray(Type[]::new); + + Set newFiles = new HashSet<>(); + for (CommitTaskData task : commitTasks) { + DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) + .withPath(task.getPath()) + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withFormat(optimizeHandle.getFileFormat()) + .withMetrics(task.getMetrics().metrics()); + + if (!icebergTable.spec().fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } + + newFiles.add(builder.build()); + } + + if (scannedFiles.isEmpty() && newFiles.isEmpty()) { + // Table scan turned out to be empty, nothing to commit + transaction = null; + return; + } + + RewriteFiles rewriteFiles = transaction.newRewrite(); + rewriteFiles.rewriteFiles(scannedFiles, newFiles); + rewriteFiles.commit(); + transaction.commitTransaction(); + transaction = null; + } + @Override public Optional getInfo(ConnectorTableHandle tableHandle) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index c526c962912e..fd55affe7c4c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -25,10 +25,12 @@ import io.trino.plugin.hive.orc.OrcWriterConfig; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; +import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorPageSourceProvider; import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.procedure.Procedure; import static com.google.inject.multibindings.Multibinder.newSetBinder; @@ -76,5 +78,8 @@ public void configure(Binder binder) Multibinder procedures = newSetBinder(binder, Procedure.class); procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON); + + Multibinder tableProcedures = newSetBinder(binder, TableProcedureMetadata.class); + tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index 21ea0017f2cf..35c5c6839cbb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -16,12 +16,15 @@ import io.airlift.json.JsonCodec; import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; +import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle; +import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle; import io.trino.spi.PageIndexerFactory; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorOutputTableHandle; import io.trino.spi.connector.ConnectorPageSink; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.SchemaTableName; import org.apache.iceberg.PartitionSpec; @@ -93,4 +96,34 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab tableHandle.getFileFormat(), maxOpenPartitions); } + + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + { + IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle; + switch (executeHandle.getProcedureId()) { + case OPTIMIZE: + HdfsContext hdfsContext = new HdfsContext(session); + IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle(); + Schema schema = SchemaParser.fromJson(optimizeHandle.getSchemaAsJson()); + PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, optimizeHandle.getPartitionSpecAsJson()); + LocationProvider locationProvider = getLocationProvider(executeHandle.getSchemaTableName(), + executeHandle.getTableLocation(), optimizeHandle.getTableStorageProperties()); + return new IcebergPageSink( + schema, + partitionSpec, + locationProvider, + fileWriterFactory, + pageIndexerFactory, + hdfsEnvironment, + hdfsContext, + optimizeHandle.getTableColumns(), + jsonCodec, + session, + optimizeHandle.getFileFormat(), + maxOpenPartitions); + } + + throw new IllegalArgumentException("Unknown procedure: " + executeHandle.getProcedureId()); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index 4fdd9c6b0c74..dbb549b63065 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -66,6 +66,9 @@ public ConnectorSplitSource getSplits( IcebergTableHandle table = (IcebergTableHandle) handle; if (table.getSnapshotId().isEmpty()) { + if (table.isRecordScannedFiles()) { + return new FixedSplitSource(ImmutableList.of(), ImmutableList.of()); + } return new FixedSplitSource(ImmutableList.of()); } @@ -85,9 +88,11 @@ public ConnectorSplitSource getSplits( table, identityPartitionColumns, tableScan, + table.getMaxScannedFileSize(), dynamicFilter, dynamicFilteringWaitTimeout, - constraint); + constraint, + table.isRecordScannedFiles()); return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index c39e6f18a118..c2b0b56a2296 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -16,8 +16,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; import com.google.common.collect.Streams; +import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; @@ -32,6 +34,7 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.predicate.ValueSet; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; @@ -47,11 +50,14 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Suppliers.memoize; import static com.google.common.base.Verify.verify; import static com.google.common.collect.Sets.intersection; @@ -77,6 +83,7 @@ public class IcebergSplitSource private final IcebergTableHandle tableHandle; private final Set identityPartitionColumns; private final TableScan tableScan; + private final Optional maxScannedFileSizeInBytes; private final Map fieldIdToType; private final DynamicFilter dynamicFilter; private final long dynamicFilteringWaitTimeoutMillis; @@ -87,22 +94,29 @@ public class IcebergSplitSource private Iterator fileScanIterator; private TupleDomain pushedDownDynamicFilterPredicate; + private final boolean recordScannedFiles; + private final ImmutableSet.Builder scannedFiles = ImmutableSet.builder(); + public IcebergSplitSource( IcebergTableHandle tableHandle, Set identityPartitionColumns, TableScan tableScan, + Optional maxScannedFileSize, DynamicFilter dynamicFilter, Duration dynamicFilteringWaitTimeout, - Constraint constraint) + Constraint constraint, + boolean recordScannedFiles) { this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); this.identityPartitionColumns = requireNonNull(identityPartitionColumns, "identityPartitionColumns is null"); this.tableScan = requireNonNull(tableScan, "tableScan is null"); + this.maxScannedFileSizeInBytes = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null").map(DataSize::toBytes); this.fieldIdToType = primitiveFieldTypes(tableScan.schema()); this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null"); this.dynamicFilteringWaitTimeoutMillis = requireNonNull(dynamicFilteringWaitTimeout, "dynamicFilteringWaitTimeout is null").toMillis(); this.dynamicFilterWaitStopwatch = Stopwatch.createStarted(); this.constraint = requireNonNull(constraint, "constraint is null"); + this.recordScannedFiles = recordScannedFiles; } @Override @@ -161,6 +175,10 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan throw new TrinoException(NOT_SUPPORTED, "Iceberg tables with delete files are not supported: " + tableHandle.getSchemaTableName()); } + if (maxScannedFileSizeInBytes.isPresent() && scanTask.file().fileSizeInBytes() > maxScannedFileSizeInBytes.get()) { + continue; + } + IcebergSplit icebergSplit = toIcebergSplit(scanTask); Supplier> partitionValues = memoize(() -> { @@ -195,6 +213,9 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan if (!partitionMatchesConstraint(identityPartitionColumns, partitionValues, constraint)) { continue; } + if (recordScannedFiles) { + scannedFiles.add(scanTask.file()); + } splits.add(icebergSplit); } return completedFuture(new ConnectorSplitBatch(splits.build(), isFinished())); @@ -213,6 +234,16 @@ public boolean isFinished() return fileScanIterator != null && !fileScanIterator.hasNext(); } + @Override + public Optional> getTableExecuteSplitsInfo() + { + checkState(isFinished(), "Split source must be finished before TableExecuteSplitsInfo is read"); + if (!recordScannedFiles) { + return Optional.empty(); + } + return Optional.of(ImmutableList.copyOf(scannedFiles.build())); + } + @Override public void close() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index 11aae4c14d79..38db3521237f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -14,8 +14,10 @@ package io.trino.plugin.iceberg; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableSet; +import io.airlift.units.DataSize; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; @@ -44,6 +46,10 @@ public class IcebergTableHandle private final Set projectedColumns; private final Optional nameMappingJson; + // OPTIMIZE only. Coordinator-only + private final boolean recordScannedFiles; + private final Optional maxScannedFileSize; + @JsonCreator public IcebergTableHandle( @JsonProperty("schemaName") String schemaName, @@ -54,6 +60,31 @@ public IcebergTableHandle( @JsonProperty("enforcedPredicate") TupleDomain enforcedPredicate, @JsonProperty("projectedColumns") Set projectedColumns, @JsonProperty("nameMappingJson") Optional nameMappingJson) + { + this( + schemaName, + tableName, + tableType, + snapshotId, + unenforcedPredicate, + enforcedPredicate, + projectedColumns, + nameMappingJson, + false, + Optional.empty()); + } + + public IcebergTableHandle( + String schemaName, + String tableName, + TableType tableType, + Optional snapshotId, + TupleDomain unenforcedPredicate, + TupleDomain enforcedPredicate, + Set projectedColumns, + Optional nameMappingJson, + boolean recordScannedFiles, + Optional maxScannedFileSize) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -63,6 +94,8 @@ public IcebergTableHandle( this.enforcedPredicate = requireNonNull(enforcedPredicate, "enforcedPredicate is null"); this.projectedColumns = ImmutableSet.copyOf(requireNonNull(projectedColumns, "projectedColumns is null")); this.nameMappingJson = requireNonNull(nameMappingJson, "nameMappingJson is null"); + this.recordScannedFiles = recordScannedFiles; + this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); } @JsonProperty @@ -113,6 +146,18 @@ public Optional getNameMappingJson() return nameMappingJson; } + @JsonIgnore + public boolean isRecordScannedFiles() + { + return recordScannedFiles; + } + + @JsonIgnore + public Optional getMaxScannedFileSize() + { + return maxScannedFileSize; + } + public SchemaTableName getSchemaTableName() { return new SchemaTableName(schemaName, tableName); @@ -133,7 +178,24 @@ public IcebergTableHandle withProjectedColumns(Set projecte unenforcedPredicate, enforcedPredicate, projectedColumns, - nameMappingJson); + nameMappingJson, + recordScannedFiles, + maxScannedFileSize); + } + + public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize) + { + return new IcebergTableHandle( + schemaName, + tableName, + tableType, + snapshotId, + unenforcedPredicate, + enforcedPredicate, + projectedColumns, + nameMappingJson, + recordScannedFiles, + Optional.of(maxScannedFileSize)); } @Override @@ -147,20 +209,22 @@ public boolean equals(Object o) } IcebergTableHandle that = (IcebergTableHandle) o; - return Objects.equals(schemaName, that.schemaName) && + return recordScannedFiles == that.recordScannedFiles && + Objects.equals(schemaName, that.schemaName) && Objects.equals(tableName, that.tableName) && tableType == that.tableType && Objects.equals(snapshotId, that.snapshotId) && Objects.equals(unenforcedPredicate, that.unenforcedPredicate) && Objects.equals(enforcedPredicate, that.enforcedPredicate) && Objects.equals(projectedColumns, that.projectedColumns) && - Objects.equals(nameMappingJson, that.nameMappingJson); + Objects.equals(nameMappingJson, that.nameMappingJson) && + Objects.equals(maxScannedFileSize, that.maxScannedFileSize); } @Override public int hashCode() { - return Objects.hash(schemaName, tableName, tableType, snapshotId, unenforcedPredicate, enforcedPredicate, projectedColumns, nameMappingJson); + return Objects.hash(schemaName, tableName, tableType, snapshotId, unenforcedPredicate, enforcedPredicate, projectedColumns, nameMappingJson, recordScannedFiles, maxScannedFileSize); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java index d79bafeb537d..586069972d49 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java @@ -47,6 +47,7 @@ import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorPageSourceProvider; import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.procedure.Procedure; import io.trino.spi.type.TypeManager; import org.weakref.jmx.guice.MBeanModule; @@ -112,6 +113,7 @@ public static Connector createConnector( Set sessionPropertiesProviders = injector.getInstance(Key.get(new TypeLiteral>() {})); IcebergTableProperties icebergTableProperties = injector.getInstance(IcebergTableProperties.class); Set procedures = injector.getInstance(Key.get(new TypeLiteral>() {})); + Set tableProcedures = injector.getInstance(Key.get(new TypeLiteral>() {})); Optional accessControl = injector.getInstance(Key.get(new TypeLiteral>() {})); return new IcebergConnector( @@ -127,7 +129,8 @@ public static Connector createConnector( IcebergSchemaProperties.SCHEMA_PROPERTIES, icebergTableProperties.getTableProperties(), accessControl, - procedures); + procedures, + tableProcedures); } } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java new file mode 100644 index 000000000000..c141342671bd --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; +import io.trino.plugin.iceberg.IcebergColumnHandle; +import org.apache.iceberg.FileFormat; + +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class IcebergOptimizeHandle + extends IcebergProcedureHandle +{ + private final String schemaAsJson; + private final String partitionSpecAsJson; + private final List tableColumns; + private final FileFormat fileFormat; + private final Map tableStorageProperties; + private final DataSize maxScannedFileSize; + + @JsonCreator + public IcebergOptimizeHandle( + String schemaAsJson, + String partitionSpecAsJson, + List tableColumns, + FileFormat fileFormat, + Map tableStorageProperties, + DataSize maxScannedFileSize) + { + this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null"); + this.partitionSpecAsJson = requireNonNull(partitionSpecAsJson, "partitionSpecAsJson is null"); + this.tableColumns = ImmutableList.copyOf(requireNonNull(tableColumns, "tableColumns is null")); + this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); + this.tableStorageProperties = ImmutableMap.copyOf(requireNonNull(tableStorageProperties, "tableStorageProperties is null")); + this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); + } + + @JsonProperty + public String getSchemaAsJson() + { + return schemaAsJson; + } + + @JsonProperty + public String getPartitionSpecAsJson() + { + return partitionSpecAsJson; + } + + @JsonProperty + public List getTableColumns() + { + return tableColumns; + } + + @JsonProperty + public FileFormat getFileFormat() + { + return fileFormat; + } + + @JsonProperty + public Map getTableStorageProperties() + { + return tableStorageProperties; + } + + @JsonProperty + public DataSize getMaxScannedFileSize() + { + return maxScannedFileSize; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java new file mode 100644 index 000000000000..7562c37e7f1c --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + property = "@type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = IcebergOptimizeHandle.class, name = "optimize"), +}) +public abstract class IcebergProcedureHandle {} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableExecuteHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableExecuteHandle.java new file mode 100644 index 000000000000..d48cdf5ddcf6 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableExecuteHandle.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.trino.spi.connector.ConnectorTableExecuteHandle; +import io.trino.spi.connector.SchemaTableName; + +import static java.util.Objects.requireNonNull; + +public class IcebergTableExecuteHandle + implements ConnectorTableExecuteHandle +{ + private final SchemaTableName schemaTableName; + private final IcebergTableProcedureId procedureId; + private final IcebergProcedureHandle procedureHandle; + private final String tableLocation; + + @JsonCreator + public IcebergTableExecuteHandle( + SchemaTableName schemaTableName, + IcebergTableProcedureId procedureId, + IcebergProcedureHandle procedureHandle, + String tableLocation) + { + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.procedureId = requireNonNull(procedureId, "procedureId is null"); + this.procedureHandle = requireNonNull(procedureHandle, "procedureHandle is null"); + this.tableLocation = requireNonNull(tableLocation, "tableLocation is null"); + } + + @JsonProperty + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @JsonProperty + public IcebergTableProcedureId getProcedureId() + { + return procedureId; + } + + @JsonProperty + public IcebergProcedureHandle getProcedureHandle() + { + return procedureHandle; + } + + @JsonProperty + public String getTableLocation() + { + return tableLocation; + } + + public IcebergTableExecuteHandle withProcedureHandle(IcebergProcedureHandle procedureHandle) + { + return new IcebergTableExecuteHandle( + schemaTableName, + procedureId, + procedureHandle, + tableLocation); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java new file mode 100644 index 000000000000..f9b195b4d57e --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +public enum IcebergTableProcedureId +{ + OPTIMIZE, +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/OptimizeTableProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/OptimizeTableProcedure.java new file mode 100644 index 000000000000..4550f01e3302 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/OptimizeTableProcedure.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; +import io.trino.spi.connector.TableProcedureMetadata; + +import javax.inject.Provider; + +import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE; +import static io.trino.spi.connector.TableProcedureExecutionMode.distributedWithFilteringAndRepartitioning; + +public class OptimizeTableProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + OPTIMIZE.name(), + distributedWithFilteringAndRepartitioning(), + ImmutableList.of( + dataSizeProperty( + "file_size_threshold", + "Only compact files smaller than given threshold in bytes", + DataSize.of(100, DataSize.Unit.MEGABYTE), + false))); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 6456c8e06aba..77434cb85303 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -33,7 +33,6 @@ import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.TupleDomain; -import io.trino.spi.statistics.ColumnStatistics; import io.trino.spi.statistics.TableStatistics; import io.trino.testing.BaseConnectorTest; import io.trino.testing.DataProviders; @@ -51,7 +50,6 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.iceberg.FileFormat; import org.intellij.lang.annotations.Language; import org.testng.SkipException; @@ -59,8 +57,10 @@ import org.testng.annotations.Test; import java.io.File; +import java.io.IOException; import java.io.OutputStream; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; import java.util.Locale; @@ -80,6 +80,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.MoreCollectors.onlyElement; import static com.google.common.collect.MoreCollectors.toOptional; @@ -114,7 +115,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; public abstract class BaseIcebergConnectorTest @@ -2239,22 +2239,6 @@ private static IcebergColumnHandle getColumnHandleFromStatistics(TableStatistics throw new IllegalArgumentException("TableStatistics did not contain column named " + columnName); } - private ColumnStatistics checkColumnStatistics(ColumnStatistics statistics) - { - assertNotNull(statistics, "statistics is null"); - // Sadly, statistics.getDataSize().isUnknown() for columns in ORC files. See the TODO - // in IcebergOrcFileWriter. - if (format == ORC) { - assertTrue(statistics.getDataSize().isUnknown()); - } - else { - assertFalse(statistics.getDataSize().isUnknown()); - } - assertFalse(statistics.getNullsFraction().isUnknown(), "statistics nulls fraction is unknown"); - assertFalse(statistics.getRange().isEmpty(), "statistics range is not present"); - return statistics; - } - private TableStatistics getTableStatistics(String tableName, Constraint constraint) { Metadata metadata = getDistributedQueryRunner().getCoordinator().getMetadata(); @@ -2492,7 +2476,7 @@ public void testIncorrectIcebergFileSizes() // Replace the file through HDFS client. This is required for correct checksums. HdfsEnvironment.HdfsContext context = new HdfsContext(getSession().toConnectorSession()); - Path manifestFilePath = new Path(manifestFile); + org.apache.hadoop.fs.Path manifestFilePath = new org.apache.hadoop.fs.Path(manifestFile); FileSystem fs = HDFS_ENVIRONMENT.getFileSystem(context, manifestFilePath); // Write altered metadata @@ -2970,8 +2954,8 @@ public void testSplitPruningFromDataFileStatistics(DataMappingTestSetup testSetu String tableName = table.getName(); String values = Stream.concat( - nCopies(100, testSetup.getSampleValueLiteral()).stream(), - nCopies(100, testSetup.getHighValueLiteral()).stream()) + nCopies(100, testSetup.getSampleValueLiteral()).stream(), + nCopies(100, testSetup.getHighValueLiteral()).stream()) .map(value -> "(" + value + ", rand())") .collect(Collectors.joining(", ")); assertUpdate(withSmallRowGroups(getSession()), "INSERT INTO " + tableName + " VALUES " + values, 200); @@ -3257,15 +3241,85 @@ public void testProjectionPushdownOnPartitionedTables() } } - private OperatorStats getScanOperatorStats(QueryId queryId) + @Test + public void testOptimize() + throws Exception + { + String tableName = "test_optimize_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar)"); + + // DistributedQueryRunner sets node-scheduler.include-coordinator by default, so include coordinator + int workerCount = getQueryRunner().getNodeCount(); + + // optimize an empty table + assertQuerySucceeds("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertThat(getActiveFiles(tableName)).isEmpty(); + + assertUpdate("INSERT INTO " + tableName + " VALUES (11, 'eleven')", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES (12, 'zwölf')", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES (13, 'trzynaście')", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES (14, 'quatorze')", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES (15, 'пʼятнадцять')", 1); + + List initialFiles = getActiveFiles(tableName); + assertThat(initialFiles) + .hasSize(5) + // Verify we have sufficiently many test rows with respect to worker count. + .hasSizeGreaterThan(workerCount); + + computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName)) + .matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')"); + List updatedFiles = getActiveFiles(tableName); + assertThat(updatedFiles) + .hasSizeBetween(1, workerCount) + .doesNotContainAnyElementsOf(initialFiles); + // No files should be removed (this is VACUUM's job, when it exists) + assertThat(getAllDataFilesFromTableDirectory(tableName)) + .containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles)); + + // optimize with low retention threshold, nothing should change + computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE (file_size_threshold => '33B')"); + assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName)) + .matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')"); + assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles); + assertThat(getAllDataFilesFromTableDirectory(tableName)) + .containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles)); + + assertUpdate("DROP TABLE " + tableName); + } + + private List getActiveFiles(String tableName) { - return getDistributedQueryRunner().getCoordinator() - .getQueryManager() - .getFullQueryInfo(queryId) - .getQueryStats() - .getOperatorSummaries() - .stream() - .filter(summary -> summary.getOperatorType().contains("Scan")) - .collect(onlyElement()); + return computeActual(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumn() + .map(String.class::cast) + .collect(toImmutableList()); + } + + private List getAllDataFilesFromTableDirectory(String tableName) + throws IOException + { + String schema = getSession().getSchema().orElseThrow(); + Path tableDataDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().resolve("iceberg_data").resolve(schema).resolve(tableName).resolve("data"); + try (Stream list = Files.list(tableDataDir)) { + return list + .filter(path -> !path.getFileName().toString().matches("\\..*\\.crc")) + .map(Path::toString) + .collect(toImmutableList()); + } + } + + @Test + public void testOptimizeParameterValidation() + { + assertQueryFails( + "ALTER TABLE no_such_table_exists EXECUTE OPTIMIZE", + "\\Qline 1:1: Table 'iceberg.tpch.no_such_table_exists' does not exist"); + assertQueryFails( + "ALTER TABLE nation EXECUTE OPTIMIZE (file_size_threshold => '33')", + "\\QUnable to set procedure property 'file_size_threshold' to ['33']: size is not a valid data size string: 33"); + assertQueryFails( + "ALTER TABLE nation EXECUTE OPTIMIZE (file_size_threshold => '33s')", + "\\QUnable to set procedure property 'file_size_threshold' to ['33s']: Unknown unit: s"); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index b724e784ee71..98b6ccd64580 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -118,6 +118,7 @@ public void testIncompleteDynamicFilterTimeout() tableHandle, ImmutableSet.of(), nationTable.newScan(), + Optional.empty(), new DynamicFilter() { @Override @@ -158,7 +159,8 @@ public TupleDomain getCurrentPredicate() } }, new Duration(2, SECONDS), - alwaysTrue()); + alwaysTrue(), + false); ImmutableList.Builder splits = ImmutableList.builder(); while (!splitSource.isFinished()) {