From 0e405609b6a2931204bbfc3e98358fa85686b0f8 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 14 Jul 2021 22:57:42 -0700 Subject: [PATCH] Iceberg: support row-level delete and update --- .../java/io/trino/spi/block/RowBlock.java | 2 +- .../trino/plugin/iceberg/CommitTaskData.java | 13 +- .../plugin/iceberg/IcebergColumnHandle.java | 11 ++ .../trino/plugin/iceberg/IcebergMetadata.java | 123 +++++++++++- .../trino/plugin/iceberg/IcebergPageSink.java | 7 +- .../iceberg/IcebergPageSinkProvider.java | 2 + .../iceberg/IcebergPageSourceProvider.java | 77 +++++++- .../iceberg/IcebergPageSourceUpdate.java | 178 ++++++++++++++++++ .../plugin/iceberg/IcebergTableHandle.java | 70 +++++++ 9 files changed, 466 insertions(+), 17 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceUpdate.java diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/RowBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/RowBlock.java index de45b5c5f375..9dca9498aeab 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/RowBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/RowBlock.java @@ -112,7 +112,7 @@ private RowBlock(int startOffset, int positionCount, @Nullable boolean[] rowIsNu } @Override - protected Block[] getRawFieldBlocks() + public Block[] getRawFieldBlocks() { return fieldBlocks; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java index 94eac52f82a1..f455d5f91e9d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java @@ -18,6 +18,7 @@ import java.util.Optional; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; public class CommitTaskData @@ -25,16 +26,20 @@ public class CommitTaskData private final String path; private final MetricsWrapper metrics; private final Optional partitionDataJson; + private final int content; @JsonCreator public CommitTaskData( @JsonProperty("path") String path, @JsonProperty("metrics") MetricsWrapper metrics, - @JsonProperty("partitionDataJson") Optional partitionDataJson) + @JsonProperty("partitionDataJson") Optional partitionDataJson, + @JsonProperty("content") int content) { this.path = requireNonNull(path, "path is null"); this.metrics = requireNonNull(metrics, "metrics is null"); this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null"); + this.content = content; + checkArgument(content >= 0, "content id must be positive"); } @JsonProperty @@ -54,4 +59,10 @@ public Optional getPartitionDataJson() { return partitionDataJson; } + + @JsonProperty + public int getContent() + { + return content; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java index 4b0fd392ad77..34949aabaaa1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java @@ -19,6 +19,8 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.types.Types; import java.util.Objects; @@ -28,10 +30,14 @@ import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.types.Types.NestedField.required; public class IcebergColumnHandle implements ColumnHandle { + public static final int ROW_ID_COLUMN_INDEX = Integer.MIN_VALUE; + public static final String ROW_ID_COLUMN_NAME = "$row_id"; + private final ColumnIdentity columnIdentity; private final Type type; private final Optional comment; @@ -116,4 +122,9 @@ public static IcebergColumnHandle create(Types.NestedField column, TypeManager t toTrinoType(column.type(), typeManager), Optional.ofNullable(column.doc())); } + + public static IcebergColumnHandle createUpdateRowIdColumnHandle(Schema tableSchema, TypeManager typeManager) + { + return create(required(ROW_ID_COLUMN_INDEX, ROW_ID_COLUMN_NAME, DeleteSchemaUtil.posDeleteSchema(tableSchema).asStruct()), typeManager); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 903ddeaf17fe..592ad7193f45 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -73,10 +73,12 @@ import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; @@ -85,6 +87,7 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -125,7 +128,7 @@ import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT; import static io.trino.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; -import static io.trino.plugin.iceberg.IcebergColumnHandle.primitiveIcebergColumnHandle; +import static io.trino.plugin.iceberg.IcebergColumnHandle.createUpdateRowIdColumnHandle; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData; @@ -154,7 +157,6 @@ import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; -import static io.trino.spi.type.BigintType.BIGINT; import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; @@ -166,6 +168,7 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.Transactions.createTableTransaction; +import static org.apache.iceberg.util.SerializationUtil.serializeToBytes; public class IcebergMetadata implements ConnectorMetadata @@ -255,6 +258,11 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa name.getTableName(), name.getTableType(), snapshotId, + getFileFormat(table), + getDataPath(table.location()), + serializeToBytes(table.schema()), + serializeToBytes(table.spec()), + new ArrayList<>(), TupleDomain.all(), TupleDomain.all()); } @@ -640,12 +648,6 @@ public Optional finishInsert(ConnectorSession session, .collect(toImmutableList()))); } - @Override - public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) - { - return primitiveIcebergColumnHandle(0, "$row_id", BIGINT, Optional.empty()); - } - @Override public Optional getInfo(ConnectorTableHandle tableHandle) { @@ -754,7 +756,90 @@ public Optional applyDelete(ConnectorSession session, Conn @Override public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) { - throw new TrinoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely"); + return beginUpdate(session, tableHandle, new ArrayList<>()); + } + + @Override + public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, List updatedColumns) + { + IcebergTableHandle handle = (IcebergTableHandle) tableHandle; + return new IcebergTableHandle( + handle.getSchemaName(), + handle.getTableName(), + handle.getTableType(), + handle.getSnapshotId(), + handle.getFileFormat(), + handle.getOutputPath(), + handle.getSerializedSchema(), + handle.getSerializedPartitionSpec(), + updatedColumns.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList()), + handle.getUnenforcedPredicate(), + handle.getEnforcedPredicate()); + } + + @Override + public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection fragments) + { + finishUpdate(session, tableHandle, fragments); + } + + @Override + public void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, Collection fragments) + { + // TODO: refactor logic with finishInsert + org.apache.iceberg.Table icebergTable = transaction.table(); + + List commitTasks = fragments.stream() + .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) + .collect(toImmutableList()); + + Type[] partitionColumnTypes = icebergTable.spec().fields().stream() + .map(field -> field.transform().getResultType( + icebergTable.schema().findType(field.sourceId()))) + .toArray(Type[]::new); + + FileIO io = new HdfsFileIo(hdfsEnvironment, new HdfsContext(session)); + + RowDelta rowDelta = transaction.newRowDelta(); + for (CommitTaskData task : commitTasks) { + switch (task.getContent()) { + case 0: + DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) + .withInputFile(io.newInputFile(task.getPath())) + .withFormat(getFileFormat(icebergTable)) + .withMetrics(task.getMetrics().metrics()); + + if (!icebergTable.spec().fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } + + rowDelta.addRows(builder.build()); + continue; + case 1: + FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(icebergTable.spec()) + .withInputFile(io.newInputFile(task.getPath())) + .withFormat(getFileFormat(icebergTable)) + .ofPositionDeletes() + .withMetrics(task.getMetrics().metrics()); + + if (!icebergTable.spec().fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } + + rowDelta.addDeletes(deleteBuilder.build()); + continue; + default: + throw new IllegalStateException("unknown content type " + task.getContent()); + } + } + + rowDelta.validateDeletedFiles(); + rowDelta.commit(); + transaction.commitTransaction(); } @Override @@ -772,6 +857,21 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle return OptionalLong.empty(); } + @Override + public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return getUpdateRowIdColumnHandle(session, tableHandle, null); + } + + @Override + public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle, List updatedColumns) + { + // we need all columns of the table in position delete schema, so updateColumns value is not used + IcebergTableHandle table = (IcebergTableHandle) tableHandle; + org.apache.iceberg.Table icebergTable = getIcebergTable(session, table.getSchemaTableName()); + return createUpdateRowIdColumnHandle(icebergTable.schema(), typeManager); + } + @Override public boolean usesLegacyTableLayouts() { @@ -814,6 +914,11 @@ public Optional> applyFilter(C table.getTableName(), table.getTableType(), table.getSnapshotId(), + table.getFileFormat(), + table.getOutputPath(), + table.getSerializedSchema(), + table.getSerializedPartitionSpec(), + new ArrayList<>(), newUnenforcedConstraint, newEnforcedConstraint), newUnenforcedConstraint.transformKeys(ColumnHandle.class::cast), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index baf4a270f171..56d7b2e729f1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -40,6 +40,7 @@ import io.trino.spi.type.VarcharType; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; +import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; @@ -92,6 +93,7 @@ public class IcebergPageSink private final JsonCodec jsonCodec; private final ConnectorSession session; private final FileFormat fileFormat; + private final FileContent fileContent; private final PagePartitioner pagePartitioner; private final List writers = new ArrayList<>(); @@ -112,6 +114,7 @@ public IcebergPageSink( JsonCodec jsonCodec, ConnectorSession session, FileFormat fileFormat, + FileContent fileContent, int maxOpenWriters) { requireNonNull(inputColumns, "inputColumns is null"); @@ -124,6 +127,7 @@ public IcebergPageSink( this.jobConf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, new Path(outputPath))); this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.session = requireNonNull(session, "session is null"); + this.fileContent = requireNonNull(fileContent, "fileContent is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); this.maxOpenWriters = maxOpenWriters; this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec)); @@ -166,7 +170,8 @@ public CompletableFuture> finish() CommitTaskData task = new CommitTaskData( context.getPath().toString(), new MetricsWrapper(context.writer.getMetrics()), - context.getPartitionData().map(PartitionData::toJson)); + context.getPartitionData().map(PartitionData::toJson), + fileContent.id()); commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index 94051d1f2083..19301766db7e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -23,6 +23,7 @@ import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTransactionHandle; +import org.apache.iceberg.FileContent; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; @@ -86,6 +87,7 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab jsonCodec, session, tableHandle.getFileFormat(), + FileContent.DATA, maxOpenPartitions); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 928e3cf9f353..b539157d6661 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import io.airlift.json.JsonCodec; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.orc.NameBasedFieldMapper; import io.trino.orc.OrcColumn; @@ -46,8 +47,10 @@ import io.trino.plugin.hive.parquet.HdfsParquetDataSource; import io.trino.plugin.hive.parquet.ParquetPageSource; import io.trino.plugin.hive.parquet.ParquetReaderConfig; +import io.trino.spi.PageIndexerFactory; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorPageSink; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorPageSourceProvider; import io.trino.spi.connector.ConnectorSession; @@ -59,13 +62,18 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.StandardTypes; import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockMissingException; +import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; @@ -96,6 +104,7 @@ import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; import static io.trino.plugin.hive.parquet.ParquetColumnIOConverter.constructField; +import static io.trino.plugin.iceberg.IcebergColumnHandle.ROW_ID_COLUMN_NAME; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR; @@ -111,6 +120,7 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcBloomFiltersEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcNestedLazy; import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata; +import static io.trino.plugin.iceberg.IcebergUtil.getColumns; import static io.trino.plugin.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; @@ -127,18 +137,35 @@ public class IcebergPageSourceProvider private final FileFormatDataSourceStats fileFormatDataSourceStats; private final OrcReaderOptions orcReaderOptions; private final ParquetReaderOptions parquetReaderOptions; + private final JsonCodec jsonCodec; + private final IcebergFileWriterFactory fileWriterFactory; + private final PageIndexerFactory pageIndexerFactory; + private final TypeManager typeManager; + private final int maxOpenPartitions; @Inject public IcebergPageSourceProvider( HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats fileFormatDataSourceStats, OrcReaderConfig orcReaderConfig, - ParquetReaderConfig parquetReaderConfig) + ParquetReaderConfig parquetReaderConfig, + JsonCodec jsonCodec, + IcebergFileWriterFactory fileWriterFactory, + PageIndexerFactory pageIndexerFactory, + TypeManager typeManager, + IcebergConfig config) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null"); this.orcReaderOptions = requireNonNull(orcReaderConfig, "orcReaderConfig is null").toOrcReaderOptions(); this.parquetReaderOptions = requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions(); + // TODO: refactor with IcebergPageSinkProvider + this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); + this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null"); + this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + requireNonNull(config, "config is null"); + this.maxOpenPartitions = config.getMaxPartitionsPerWriter(); } @Override @@ -153,14 +180,16 @@ public ConnectorPageSource createPageSource( IcebergSplit split = (IcebergSplit) connectorSplit; IcebergTableHandle table = (IcebergTableHandle) connectorTable; - List icebergColumns = columns.stream() + List queriedColumns = columns.stream() .map(IcebergColumnHandle.class::cast) .collect(toImmutableList()); + boolean hasDeleteRowId = queriedColumns.stream().anyMatch(c -> ROW_ID_COLUMN_NAME.equals(c.getName())); + List icebergColumns = hasDeleteRowId ? getColumns(table.getSchema(), typeManager) : queriedColumns; + Map partitionKeys = split.getPartitionKeys(); - List regularColumns = columns.stream() - .map(IcebergColumnHandle.class::cast) + List regularColumns = icebergColumns.stream() .filter(column -> !partitionKeys.containsKey(column.getId())) .collect(toImmutableList()); @@ -176,7 +205,45 @@ public ConnectorPageSource createPageSource( regularColumns, table.getUnenforcedPredicate()); - return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey()); + ConnectorPageSource icebergPageSource = new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey()); + + if (!hasDeleteRowId) { + return icebergPageSource; + } + else { + Schema posDeleteSchema = DeleteSchemaUtil.posDeleteSchema(table.getSchema()); + ConnectorPageSink posDeleteSink = new IcebergPageSink( + posDeleteSchema, + PartitionSpec.unpartitioned(), + table.getOutputPath() + "/delete/position", + fileWriterFactory, + pageIndexerFactory, + hdfsEnvironment, + hdfsContext, + icebergColumns, + jsonCodec, + session, + table.getFileFormat(), + FileContent.POSITION_DELETES, + maxOpenPartitions); + + ConnectorPageSink updateRowSink = new IcebergPageSink( + table.getSchema(), + table.getPartitionSpec(), + table.getOutputPath() + "/update", + fileWriterFactory, + pageIndexerFactory, + hdfsEnvironment, + hdfsContext, + icebergColumns, + jsonCodec, + session, + table.getFileFormat(), + FileContent.DATA, + maxOpenPartitions); + + return new IcebergPageSourceUpdate(split.getPath(), queriedColumns, icebergColumns, table.getUpdateColumns(), icebergPageSource, posDeleteSink, updateRowSink); + } } private ConnectorPageSource createDataPageSource( diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceUpdate.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceUpdate.java new file mode 100644 index 000000000000..5ede8d3d0052 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceUpdate.java @@ -0,0 +1,178 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.block.RowBlock; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.connector.ConnectorPageSink; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.UpdatablePageSource; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static io.trino.plugin.iceberg.IcebergColumnHandle.ROW_ID_COLUMN_NAME; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class IcebergPageSourceUpdate + implements UpdatablePageSource +{ + private final Slice filePathSlice; + private final List queriedColumns; + private final List allTableColumns; + private final List updateColumns; + private final ConnectorPageSource source; + private final ConnectorPageSink posDeleteSink; + private final ConnectorPageSink updateRowSink; + + public IcebergPageSourceUpdate( + String filePath, + List queriedColumns, + List allTableColumns, + List updateColumns, + ConnectorPageSource source, + ConnectorPageSink posDeleteSink, + ConnectorPageSink updateRowSink) + { + this.filePathSlice = Slices.utf8Slice(requireNonNull(filePath, "filePath is null")); + this.queriedColumns = requireNonNull(queriedColumns, "queriedColumns is null"); + this.allTableColumns = requireNonNull(allTableColumns, "allTableColumns is null"); + this.updateColumns = requireNonNull(updateColumns, "updateColumns is null"); + this.source = requireNonNull(source, "source is null"); + this.posDeleteSink = requireNonNull(posDeleteSink, "posDeleteSink is null"); + this.updateRowSink = requireNonNull(updateRowSink, "updateRowSink is null"); + } + + @Override + public void deleteRows(Block rowIds) + { + RowBlock rows = (RowBlock) rowIds; + // TODO: need to use proper way to get blocks inside the row block instead of using getRawFieldBlocks(), this is just for POC purpose + posDeleteSink.appendPage(new Page(rows.getPositionCount(), rows.getRawFieldBlocks())); + } + + @Override + public void updateRows(Page page, List columnValueAndRowIdChannels) + { + RowBlock rowIdBlock = (RowBlock) page.getBlock(columnValueAndRowIdChannels.get(columnValueAndRowIdChannels.size() - 1)); + deleteRows(rowIdBlock); + Map updatedColumnBlocks = new HashMap<>(); + for (int i = 0; i < columnValueAndRowIdChannels.size() - 1; i++) { + IcebergColumnHandle updatedColumn = updateColumns.get(i); + Block updatedValues = page.getBlock(columnValueAndRowIdChannels.get(i)); + updatedColumnBlocks.put(updatedColumn.getId(), updatedValues); + } + + Block[] updatedRows = new Block[allTableColumns.size()]; + Block[] oldRows = ((RowBlock) rowIdBlock.getRawFieldBlocks()[2]).getRawFieldBlocks(); + for (int i = 0; i < allTableColumns.size(); i++) { + int columnId = allTableColumns.get(i).getId(); + if (updatedColumnBlocks.containsKey(columnId)) { + updatedRows[i] = updatedColumnBlocks.get(columnId); + } + else { + updatedRows[i] = oldRows[i]; + } + } + + updateRowSink.appendPage(new Page(page.getPositionCount(), updatedRows)); + } + + @Override + public CompletableFuture> finish() + { + try { + Collection slices = posDeleteSink.finish().get(); + slices.addAll(updateRowSink.finish().get()); + return CompletableFuture.completedFuture(slices); + } + catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public Page getNextPage() + { + Page sourcePage = source.getNextPage(); + if (sourcePage == null) { + return null; + } + Block[] rowIdComponentBlocks = new Block[3]; + int channelCount = sourcePage.getChannelCount(); + int pageSize = sourcePage.getPositionCount(); + // file_path + rowIdComponentBlocks[0] = RunLengthEncodedBlock.create(VARCHAR, filePathSlice, pageSize); + rowIdComponentBlocks[1] = sourcePage.getBlock(channelCount - 2); + Block[] fieldBlocks = new Block[allTableColumns.size()]; + for (int i = 0; i < allTableColumns.size(); i++) { + fieldBlocks[i] = sourcePage.getBlock(i); + } + rowIdComponentBlocks[2] = RowBlock.fromFieldBlocks(pageSize, Optional.empty(), fieldBlocks); + + Block[] resultBlocks = new Block[queriedColumns.size()]; + for (int i = 0; i < queriedColumns.size(); i++) { + IcebergColumnHandle columnHandle = queriedColumns.get(i); + if (ROW_ID_COLUMN_NAME.equals(columnHandle.getName())) { + resultBlocks[i] = RowBlock.fromFieldBlocks(pageSize, Optional.empty(), rowIdComponentBlocks); + } + else { + resultBlocks[i] = sourcePage.getBlock(allTableColumns.indexOf(columnHandle)); + } + } + return new Page(sourcePage.getPositionCount(), resultBlocks); + } + + @Override + public long getSystemMemoryUsage() + { + return source.getSystemMemoryUsage(); + } + + @Override + public void close() + throws IOException + { + source.close(); + } + + @Override + public long getCompletedBytes() + { + return source.getCompletedBytes(); + } + + @Override + public long getReadTimeNanos() + { + return source.getReadTimeNanos(); + } + + @Override + public boolean isFinished() + { + return source.isFinished(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index b0a7d0af830d..19e1baf63a5e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -18,12 +18,17 @@ import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.Optional; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.util.SerializationUtil.deserializeFromBytes; public class IcebergTableHandle implements ConnectorTableHandle @@ -32,6 +37,11 @@ public class IcebergTableHandle private final String tableName; private final TableType tableType; private final Optional snapshotId; + private final FileFormat fileFormat; + private final String outputPath; + private final byte[] serializedSchema; + private final byte[] serializedPartitionSpec; + private final List updateColumns; // Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector private final TupleDomain unenforcedPredicate; @@ -39,12 +49,21 @@ public class IcebergTableHandle // Filter guaranteed to be enforced by Iceberg connector private final TupleDomain enforcedPredicate; + private volatile Schema schema; + private volatile PartitionSpec partitionSpec; + @JsonCreator public IcebergTableHandle( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("tableType") TableType tableType, @JsonProperty("snapshotId") Optional snapshotId, + // TODO: refactor with IcebergWritableTableHandle + @JsonProperty("fileFormat") FileFormat fileFormat, + @JsonProperty("outputPath") String outputPath, + @JsonProperty("serializedSchema") byte[] serializedSchema, + @JsonProperty("serializedPartitionSpec") byte[] serializedPartitionSpec, + @JsonProperty("updateColumns") List updateColumns, @JsonProperty("unenforcedPredicate") TupleDomain unenforcedPredicate, @JsonProperty("enforcedPredicate") TupleDomain enforcedPredicate) { @@ -52,6 +71,11 @@ public IcebergTableHandle( this.tableName = requireNonNull(tableName, "tableName is null"); this.tableType = requireNonNull(tableType, "tableType is null"); this.snapshotId = requireNonNull(snapshotId, "snapshotId is null"); + this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); + this.outputPath = requireNonNull(outputPath, "outputPath is null"); + this.serializedSchema = requireNonNull(serializedSchema, "serializedSchema is null"); + this.serializedPartitionSpec = requireNonNull(serializedPartitionSpec, "serializedPartitionSpec is null"); + this.updateColumns = requireNonNull(updateColumns, "updateColumns is null"); this.unenforcedPredicate = requireNonNull(unenforcedPredicate, "unenforcedPredicate is null"); this.enforcedPredicate = requireNonNull(enforcedPredicate, "enforcedPredicate is null"); } @@ -80,6 +104,52 @@ public Optional getSnapshotId() return snapshotId; } + @JsonProperty + public FileFormat getFileFormat() + { + return fileFormat; + } + + @JsonProperty + public String getOutputPath() + { + return outputPath; + } + + @JsonProperty + public byte[] getSerializedSchema() + { + return serializedSchema; + } + + public Schema getSchema() + { + if (schema == null) { + schema = deserializeFromBytes(serializedSchema); + } + return schema; + } + + @JsonProperty + public byte[] getSerializedPartitionSpec() + { + return serializedPartitionSpec; + } + + public PartitionSpec getPartitionSpec() + { + if (partitionSpec == null) { + partitionSpec = deserializeFromBytes(serializedPartitionSpec); + } + return partitionSpec; + } + + @JsonProperty + public List getUpdateColumns() + { + return updateColumns; + } + @JsonProperty public TupleDomain getUnenforcedPredicate() {