Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class IcebergConnector
private final List<PropertyMetadata<?>> tableProperties;
private final Optional<ConnectorAccessControl> accessControl;
private final Set<Procedure> procedures;
private final Set<TableProcedureMetadata> tableProcedures;

public IcebergConnector(
LifeCycleManager lifeCycleManager,
Expand All @@ -76,7 +78,8 @@ public IcebergConnector(
List<PropertyMetadata<?>> schemaProperties,
List<PropertyMetadata<?>> tableProperties,
Optional<ConnectorAccessControl> accessControl,
Set<Procedure> procedures)
Set<Procedure> procedures,
Set<TableProcedureMetadata> tableProcedures)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
Expand All @@ -92,7 +95,8 @@ public IcebergConnector(
this.schemaProperties = ImmutableList.copyOf(requireNonNull(schemaProperties, "schemaProperties is null"));
this.tableProperties = ImmutableList.copyOf(requireNonNull(tableProperties, "tableProperties is null"));
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.procedures = requireNonNull(procedures, "procedures is null");
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
this.tableProcedures = ImmutableSet.copyOf(requireNonNull(tableProcedures, "tableProcedures is null"));
}

@Override
Expand Down Expand Up @@ -150,6 +154,12 @@ public Set<Procedure> getProcedures()
return procedures;
}

@Override
public Set<TableProcedureMetadata> getTableProcedures()
{
return tableProcedures;
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package io.trino.plugin.iceberg;

import io.trino.plugin.hive.HiveTransactionHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorHandleResolver;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;

Expand Down Expand Up @@ -56,6 +58,12 @@ public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
return IcebergWritableTableHandle.class;
}

@Override
public Class<? extends ConnectorTableExecuteHandle> getTableExecuteHandleClass()
{
return IcebergTableExecuteHandle.class;
}

@Override
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import com.google.common.collect.Iterables;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.hive.HiveApplyProjectionUtil;
import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation;
import io.trino.plugin.hive.HiveWrittenPartitions;
import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.BeginTableExecuteResult;
import io.trino.spi.connector.CatalogSchemaName;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ColumnHandle;
Expand All @@ -39,6 +44,7 @@
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
Expand All @@ -63,11 +69,13 @@
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
Expand All @@ -83,6 +91,7 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -121,6 +130,7 @@
import static io.trino.plugin.iceberg.TrinoHiveCatalog.DEPENDS_ON_TABLES;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -545,6 +555,158 @@ public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, Connect
return new IcebergColumnHandle(primitiveColumnIdentity(0, "$row_id"), BIGINT, ImmutableList.of(), BIGINT, Optional.empty());
}

@Override
public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
ConnectorSession session,
ConnectorTableHandle connectorTableHandle,
String procedureName,
Map<String, Object> executeProperties)
{
IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTableHandle;

IcebergTableProcedureId procedureId;
try {
procedureId = IcebergTableProcedureId.valueOf(procedureName);
}
catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unknown procedure '" + procedureName + "'");
}

switch (procedureId) {
case OPTIMIZE:
return getTableHandleForOptimize(session, tableHandle, executeProperties);
}

throw new IllegalArgumentException("Unknown procedure: " + procedureId);
}

private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
{
DataSize maxScannedFileSize = (DataSize) executeProperties.get("file_size_threshold");
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());

return Optional.of(new IcebergTableExecuteHandle(
tableHandle.getSchemaTableName(),
OPTIMIZE,
new IcebergOptimizeHandle(
SchemaParser.toJson(icebergTable.schema()),
PartitionSpecParser.toJson(icebergTable.spec()),
getColumns(icebergTable.schema(), typeManager),
getFileFormat(icebergTable),
icebergTable.properties(),
maxScannedFileSize),
icebergTable.location()));
}

@Override
public Optional<ConnectorNewTableLayout> getLayoutForTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle;
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
return getLayoutForOptimize(session, executeHandle);
}
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'");
}

private Optional<ConnectorNewTableLayout> getLayoutForOptimize(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
{
Table icebergTable = catalog.loadTable(session, executeHandle.getSchemaTableName());
return getWriteLayout(icebergTable.schema(), icebergTable.spec());
}

@Override
public BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle> beginTableExecute(
ConnectorSession session,
ConnectorTableExecuteHandle tableExecuteHandle,
ConnectorTableHandle updatedSourceTableHandle)
{
IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle;
IcebergTableHandle table = (IcebergTableHandle) updatedSourceTableHandle;
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
return beginOptimize(session, executeHandle, table);
}
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'");
}

private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle> beginOptimize(
ConnectorSession session,
IcebergTableExecuteHandle executeHandle,
IcebergTableHandle table)
{
IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle();
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

verify(transaction == null, "transaction already set");
transaction = icebergTable.newTransaction();

return new BeginTableExecuteResult<>(
executeHandle,
table.forOptimize(true, optimizeHandle.getMaxScannedFileSize()));
Comment thread
findepi marked this conversation as resolved.
Outdated
}

@Override
public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
{
IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle;
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
finishOptimize(executeHandle, fragments, splitSourceInfo);
return;
}
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'");
}

private void finishOptimize(IcebergTableExecuteHandle executeHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
{
IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle();
Table icebergTable = transaction.table();

// paths to be deleted
Set<DataFile> scannedFiles = splitSourceInfo.stream()
.map(DataFile.class::cast)
.collect(toImmutableSet());

List<CommitTaskData> commitTasks = fragments.stream()
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());

Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
.map(field -> field.transform().getResultType(
icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);

Set<DataFile> newFiles = new HashSet<>();
for (CommitTaskData task : commitTasks) {
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(optimizeHandle.getFileFormat())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}

newFiles.add(builder.build());
}

if (scannedFiles.isEmpty() && newFiles.isEmpty()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert we should not ever get one empty and other not? Feels like a bug situation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scanned file list may be non empty, but resulting data may be empty, if input files were empty.

// Table scan turned out to be empty, nothing to commit
transaction = null;
return;
}

RewriteFiles rewriteFiles = transaction.newRewrite();
rewriteFiles.rewriteFiles(scannedFiles, newFiles);
rewriteFiles.commit();
transaction.commitTransaction();
transaction = null;
}

@Override
public Optional<Object> getInfo(ConnectorTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.procedure.Procedure;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
Expand Down Expand Up @@ -76,5 +78,8 @@ public void configure(Binder binder)

Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON);

Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
import io.airlift.json.JsonCodec;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.SchemaTableName;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -93,4 +96,34 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
tableHandle.getFileFormat(),
maxOpenPartitions);
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle;
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
HdfsContext hdfsContext = new HdfsContext(session);
IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle();
Schema schema = SchemaParser.fromJson(optimizeHandle.getSchemaAsJson());
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, optimizeHandle.getPartitionSpecAsJson());
LocationProvider locationProvider = getLocationProvider(executeHandle.getSchemaTableName(),
executeHandle.getTableLocation(), optimizeHandle.getTableStorageProperties());
return new IcebergPageSink(
schema,
partitionSpec,
locationProvider,
fileWriterFactory,
pageIndexerFactory,
hdfsEnvironment,
hdfsContext,
optimizeHandle.getTableColumns(),
jsonCodec,
session,
optimizeHandle.getFileFormat(),
maxOpenPartitions);
}

throw new IllegalArgumentException("Unknown procedure: " + executeHandle.getProcedureId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public ConnectorSplitSource getSplits(
IcebergTableHandle table = (IcebergTableHandle) handle;

if (table.getSnapshotId().isEmpty()) {
if (table.isRecordScannedFiles()) {
return new FixedSplitSource(ImmutableList.of(), ImmutableList.of());
}
return new FixedSplitSource(ImmutableList.of());
}

Expand All @@ -85,9 +88,11 @@ public ConnectorSplitSource getSplits(
table,
identityPartitionColumns,
tableScan,
table.getMaxScannedFileSize(),
dynamicFilter,
dynamicFilteringWaitTimeout,
constraint);
constraint,
table.isRecordScannedFiles());

return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
}
Expand Down
Loading