Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,25 +216,25 @@ The procedure affects all snapshots that are older than the time period configur

ALTER TABLE test_table EXECUTE expire_snapshots(retention_threshold => '7d')

The value for ``retention_threshold`` must be higher than ``iceberg.expire_snapshots.min-retention`` in the catalog
The value for ``retention_threshold`` must be higher than or equal to ``iceberg.expire_snapshots.min-retention`` in the catalog
otherwise the procedure will fail with similar message:
``Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d)``.
The default value for this property is ``7d``.

delete_orphan_files
remove_orphan_files
~~~~~~~~~~~~~~~~~~~

The ``delete_orphan_files`` command removes all files from table's data directory which are
The ``remove_orphan_files`` command removes all files from table's data directory which are
not linked from metadata files and that are older than the value of ``retention_threshold`` parameter.
Deleting orphan files from time to time is recommended to keep size of table's data directory under control.

``delete_orphan_files`` can be run as follows:
``remove_orphan_files`` can be run as follows:

.. code-block:: sql

ALTER TABLE test_table EXECUTE delete_orphan_files(retention_threshold => '7d')
ALTER TABLE test_table EXECUTE remove_orphan_files(retention_threshold => '7d')

The value for ``retention_threshold`` must be higher than ``iceberg.delete_orphan_files.min-retention`` in the catalog
The value for ``retention_threshold`` must be higher than or equal to ``iceberg.remove_orphan_files.min-retention`` in the catalog
otherwise the procedure will fail with similar message:
``Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d)``.
The default value for this property is ``7d``.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class IcebergConfig
public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
public static final String EXPIRE_SNAPSHOTS_MIN_RETENTION = "iceberg.expire_snapshots.min-retention";
public static final String DELETE_ORPHAN_FILES_MIN_RETENTION = "iceberg.delete_orphan_files.min-retention";
public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "iceberg.remove_orphan_files.min-retention";

private IcebergFileFormat fileFormat = ORC;
private HiveCompressionCodec compressionCodec = ZSTD;
Expand All @@ -49,7 +49,7 @@ public class IcebergConfig
private Optional<String> hiveCatalogName = Optional.empty();
private int formatVersion = FORMAT_VERSION_SUPPORT_MAX;
private Duration expireSnapshotsMinRetention = new Duration(7, DAYS);
private Duration deleteOrphanFilesMinRetention = new Duration(7, DAYS);
private Duration removeOrphanFilesMinRetention = new Duration(7, DAYS);

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -223,16 +223,16 @@ public IcebergConfig setExpireSnapshotsMinRetention(Duration expireSnapshotsMinR
}

@NotNull
public Duration getDeleteOrphanFilesMinRetention()
public Duration getRemoveOrphanFilesMinRetention()
{
return deleteOrphanFilesMinRetention;
return removeOrphanFilesMinRetention;
}

@Config(DELETE_ORPHAN_FILES_MIN_RETENTION)
@ConfigDescription("Minimal retention period for delete_orphan_files procedure")
public IcebergConfig setDeleteOrphanFilesMinRetention(Duration deleteOrphanFilesMinRetention)
@Config(REMOVE_ORPHAN_FILES_MIN_RETENTION)
@ConfigDescription("Minimal retention period for remove_orphan_files procedure")
public IcebergConfig setRemoveOrphanFilesMinRetention(Duration removeOrphanFilesMinRetention)
{
this.deleteOrphanFilesMinRetention = deleteOrphanFilesMinRetention;
this.removeOrphanFilesMinRetention = removeOrphanFilesMinRetention;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation;
import io.trino.plugin.hive.HiveWrittenPartitions;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.procedure.IcebergDeleteOrphanFilesHandle;
import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle;
import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle;
import io.trino.plugin.iceberg.procedure.IcebergRemoveOrphanFilesHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -156,8 +156,8 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getDeleteOrphanFilesMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getExpireSnapshotMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getRemoveOrphanFilesMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
Expand All @@ -179,9 +179,9 @@
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog.DEPENDS_ON_TABLES;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DELETE_ORPHAN_FILES;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static java.lang.String.format;
Expand Down Expand Up @@ -760,8 +760,8 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
return getTableHandleForOptimize(session, tableHandle, executeProperties, retryMode);
case EXPIRE_SNAPSHOTS:
return getTableHandleForExpireSnapshots(session, tableHandle, executeProperties);
case DELETE_ORPHAN_FILES:
return getTableHandleForDeleteOrphanFiles(session, tableHandle, executeProperties);
case REMOVE_ORPHAN_FILES:
return getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties);
}

throw new IllegalArgumentException("Unknown procedure: " + procedureId);
Expand Down Expand Up @@ -798,15 +798,15 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForExpireSnapshots(C
icebergTable.location()));
}

private Optional<ConnectorTableExecuteHandle> getTableHandleForDeleteOrphanFiles(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
private Optional<ConnectorTableExecuteHandle> getTableHandleForRemoveOrphanFiles(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
{
Duration retentionThreshold = (Duration) executeProperties.get(RETENTION_THRESHOLD);
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());

return Optional.of(new IcebergTableExecuteHandle(
tableHandle.getSchemaTableName(),
DELETE_ORPHAN_FILES,
new IcebergDeleteOrphanFilesHandle(retentionThreshold),
REMOVE_ORPHAN_FILES,
new IcebergRemoveOrphanFilesHandle(retentionThreshold),
icebergTable.location()));
}

Expand All @@ -818,7 +818,7 @@ public Optional<ConnectorTableLayout> getLayoutForTableExecute(ConnectorSession
case OPTIMIZE:
return getLayoutForOptimize(session, executeHandle);
case EXPIRE_SNAPSHOTS:
case DELETE_ORPHAN_FILES:
case REMOVE_ORPHAN_FILES:
// handled via executeTableExecute
}
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'");
Expand All @@ -844,7 +844,7 @@ public BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle
case OPTIMIZE:
return beginOptimize(session, executeHandle, table);
case EXPIRE_SNAPSHOTS:
case DELETE_ORPHAN_FILES:
case REMOVE_ORPHAN_FILES:
// handled via executeTableExecute
}
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'");
Expand Down Expand Up @@ -886,7 +886,7 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa
finishOptimize(session, executeHandle, fragments, splitSourceInfo);
return;
case EXPIRE_SNAPSHOTS:
case DELETE_ORPHAN_FILES:
case REMOVE_ORPHAN_FILES:
// handled via executeTableExecute
}
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'");
Expand Down Expand Up @@ -957,8 +957,8 @@ public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteH
case EXPIRE_SNAPSHOTS:
executeExpireSnapshots(session, executeHandle);
return;
case DELETE_ORPHAN_FILES:
executeDeleteOrphanFiles(session, executeHandle);
case REMOVE_ORPHAN_FILES:
executeRemoveOrphanFiles(session, executeHandle);
return;
default:
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'");
Expand Down Expand Up @@ -1059,27 +1059,27 @@ private Set<String> buildSetOfValidFiles(Table table)
.collect(toImmutableSet());
}

public void executeDeleteOrphanFiles(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
{
IcebergDeleteOrphanFilesHandle deleteOrphanFilesHandle = (IcebergDeleteOrphanFilesHandle) executeHandle.getProcedureHandle();
IcebergRemoveOrphanFilesHandle removeOrphanFilesHandle = (IcebergRemoveOrphanFilesHandle) executeHandle.getProcedureHandle();

Table table = catalog.loadTable(session, executeHandle.getSchemaTableName());
Duration retention = requireNonNull(deleteOrphanFilesHandle.getRetentionThreshold(), "retention is null");
Duration retention = requireNonNull(removeOrphanFilesHandle.getRetentionThreshold(), "retention is null");
validateTableExecuteParameters(
table,
executeHandle.getSchemaTableName(),
DELETE_ORPHAN_FILES.name(),
REMOVE_ORPHAN_FILES.name(),
retention,
getDeleteOrphanFilesMinRetention(session),
IcebergConfig.DELETE_ORPHAN_FILES_MIN_RETENTION,
IcebergSessionProperties.DELETE_ORPHAN_FILES_MIN_RETENTION);
getRemoveOrphanFilesMinRetention(session),
IcebergConfig.REMOVE_ORPHAN_FILES_MIN_RETENTION,
IcebergSessionProperties.REMOVE_ORPHAN_FILES_MIN_RETENTION);

long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis();
deleteOrphanFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis);
deleteOrphanMetadataFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis);
removeOrphanFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis);
removeOrphanMetadataFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis);
}

private void deleteOrphanFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp)
private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp)
{
Set<String> validDataFilePaths = stream(table.snapshots())
.map(Snapshot::snapshotId)
Expand All @@ -1095,7 +1095,7 @@ private void deleteOrphanFiles(Table table, ConnectorSession session, SchemaTabl
scanAndDeleteInvalidFiles(table, session, schemaTableName, expireTimestamp, union(validDataFilePaths, validDeleteFilePaths), "/data");
}

private void deleteOrphanMetadataFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp)
private void removeOrphanMetadataFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp)
{
ImmutableSet<String> manifests = stream(table.snapshots())
.flatMap(snapshot -> snapshot.allManifests().stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
import io.trino.plugin.iceberg.procedure.DeleteOrphanFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure;
import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure;
import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
Expand Down Expand Up @@ -83,6 +83,6 @@ public void configure(Binder binder)
Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(ExpireSnapshotsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(DeleteOrphanFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
optimizeHandle.getTableStorageProperties(),
maxOpenPartitions);
case EXPIRE_SNAPSHOTS:
case DELETE_ORPHAN_FILES:
case REMOVE_ORPHAN_FILES:
// handled via ConnectorMetadata.executeTableExecute
}
throw new IllegalArgumentException("Unknown procedure: " + executeHandle.getProcedureId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public final class IcebergSessionProperties
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";
public static final String EXPIRE_SNAPSHOTS_MIN_RETENTION = "expire_snapshots_min_retention";
public static final String DELETE_ORPHAN_FILES_MIN_RETENTION = "delete_orphan_files_min_retention";
public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "remove_orphan_files_min_retention";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -237,9 +237,9 @@ public IcebergSessionProperties(
icebergConfig.getExpireSnapshotsMinRetention(),
false))
.add(durationProperty(
DELETE_ORPHAN_FILES_MIN_RETENTION,
"Minimal retention period for delete_orphan_files procedure",
icebergConfig.getDeleteOrphanFilesMinRetention(),
REMOVE_ORPHAN_FILES_MIN_RETENTION,
"Minimal retention period for remove_orphan_files procedure",
icebergConfig.getRemoveOrphanFilesMinRetention(),
false))
.build();
}
Expand Down Expand Up @@ -392,8 +392,8 @@ public static Duration getExpireSnapshotMinRetention(ConnectorSession session)
return session.getProperty(EXPIRE_SNAPSHOTS_MIN_RETENTION, Duration.class);
}

public static Duration getDeleteOrphanFilesMinRetention(ConnectorSession session)
public static Duration getRemoveOrphanFilesMinRetention(ConnectorSession session)
{
return session.getProperty(DELETE_ORPHAN_FILES_MIN_RETENTION, Duration.class);
return session.getProperty(REMOVE_ORPHAN_FILES_MIN_RETENTION, Duration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
@JsonSubTypes({
@JsonSubTypes.Type(value = IcebergOptimizeHandle.class, name = "optimize"),
@JsonSubTypes.Type(value = IcebergExpireSnapshotsHandle.class, name = "expire_snapshots"),
@JsonSubTypes.Type(value = IcebergDeleteOrphanFilesHandle.class, name = "delete_orphan_files"),
@JsonSubTypes.Type(value = IcebergRemoveOrphanFilesHandle.class, name = "remove_orphan_files"),
})
public abstract class IcebergProcedureHandle {}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class IcebergDeleteOrphanFilesHandle
public class IcebergRemoveOrphanFilesHandle
extends IcebergProcedureHandle
{
private final Duration retentionThreshold;

@JsonCreator
public IcebergDeleteOrphanFilesHandle(Duration retentionThreshold)
public IcebergRemoveOrphanFilesHandle(Duration retentionThreshold)
{
this.retentionThreshold = requireNonNull(retentionThreshold, "retentionThreshold is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ public enum IcebergTableProcedureId
{
OPTIMIZE,
EXPIRE_SNAPSHOTS,
DELETE_ORPHAN_FILES,
REMOVE_ORPHAN_FILES,
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
import javax.inject.Provider;

import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DELETE_ORPHAN_FILES;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;

public class DeleteOrphanFilesTableProcedure
public class RemoveOrphanFilesTableProcedure
implements Provider<TableProcedureMetadata>
{
@Override
public TableProcedureMetadata get()
{
return new TableProcedureMetadata(
DELETE_ORPHAN_FILES.name(),
REMOVE_ORPHAN_FILES.name(),
coordinatorOnly(),
ImmutableList.of(
durationProperty(
Expand Down
Loading