Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ default void deleteRows(Block rowIds)
throw new UnsupportedOperationException("This connector does not support row-level delete");
}

/**
* Write updated rows to the PageSource.
* @param page Contains values for all updated columns, as well as the $row_id column. The order of these Blocks can be derived from columnValueAndRowIdChannels.
* @param columnValueAndRowIdChannels The index of this list matches the index columns have in updatedColumns parameter of {@link ConnectorMetadata#beginUpdate}
* The value at each index is the channel number in the given page. The last element of this list is always the channel number for the $row_id column.
*/
default void updateRows(Page page, List<Integer> columnValueAndRowIdChannels)
{
throw new UnsupportedOperationException("This connector does not support row update");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,10 @@ public synchronized void replaceTable(String databaseName, String tableName, Tab
throw new TrinoException(HIVE_METASTORE_ERROR, "Replacement table must have same name");
}

if (isIcebergTable(table) && !Objects.equals(table.getParameters().get("metadata_location"), newTable.getParameters().get("previous_metadata_location"))) {
throw new TrinoException(HIVE_METASTORE_ERROR, "Cannot update Iceberg table: supplied previous location does not match current location");
}

Path tableMetadataDirectory = getTableMetadataDirectory(table);
writeSchemaFile(TABLE, tableMetadataDirectory, tableCodec, new TableMetadata(currentVersion, newTable), true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,20 @@ public void testUpdateRowConcurrently()
.hasMessage("Hive update is only supported for ACID transactional tables");
}

@Override
public void testUpdateWithPredicates()
{
assertThatThrownBy(super::testUpdateWithPredicates)
.hasMessage("Hive update is only supported for ACID transactional tables");
}

@Override
public void testUpdateAllValues()
{
assertThatThrownBy(super::testUpdateAllValues)
.hasMessage("Hive update is only supported for ACID transactional tables");
}

@Override
public void testExplainAnalyzeWithDeleteWithSubquery()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
public class IcebergColumnHandle
implements ColumnHandle
{
// Iceberg reserved row ids begin at INTEGER.MAX_VALUE and count down. Starting with MIN_VALUE here to avoid conflicts.
public static final int TRINO_UPDATE_ROW_ID_COLUMN_ID = Integer.MIN_VALUE;
public static final String TRINO_UPDATE_ROW_ID_COLUMN_NAME = "$row_id";

private final ColumnIdentity baseColumnIdentity;
private final Type baseType;
// The list of field ids to indicate the projected part of the top-level column represented by baseColumnIdentity
Expand Down Expand Up @@ -149,6 +153,12 @@ public boolean isRowPositionColumn()
return id == ROW_POSITION.fieldId();
}

@JsonIgnore
public boolean isUpdateRowIdColumn()
{
return id == TRINO_UPDATE_ROW_ID_COLUMN_ID;
}

/**
* Marker column used by the Iceberg DeleteFilter to indicate rows which are deleted by equality deletes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IsolationLevel;
Expand All @@ -105,13 +104,16 @@
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
Expand Down Expand Up @@ -145,6 +147,8 @@
import static io.trino.plugin.hive.HiveApplyProjectionUtil.replaceWithNewVariables;
import static io.trino.plugin.hive.util.HiveUtil.isStructuralType;
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_ID;
import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_NAME;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
Expand All @@ -162,6 +166,7 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
Expand All @@ -177,7 +182,6 @@
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DELETE_ORPHAN_FILES;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static java.lang.String.format;
Expand Down Expand Up @@ -269,14 +273,16 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
name.getTableType(),
snapshotId,
SchemaParser.toJson(table.schema()),
PartitionSpecParser.toJson(table.spec()),
table.operations().current().formatVersion(),
TupleDomain.all(),
TupleDomain.all(),
ImmutableSet.of(),
Optional.ofNullable(nameMappingJson),
table.location(),
table.properties(),
NO_RETRIES);
NO_RETRIES,
ImmutableList.of());
}

@Override
Expand Down Expand Up @@ -1279,7 +1285,60 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
@Override
public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
{
finishWrite(session, (IcebergTableHandle) tableHandle, fragments, false);
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return IcebergUtil.getColumnHandle(ROW_POSITION, typeManager);
}

@Override
public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> updatedColumns, RetryMode retryMode)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
if (table.getFormatVersion() < 2) {
throw new TrinoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2");
}
verify(transaction == null, "transaction already set");
transaction = catalog.loadTable(session, table.getSchemaTableName()).newTransaction();
return table.withRetryMode(retryMode)
.withUpdatedColumns(updatedColumns.stream()
.map(IcebergColumnHandle.class::cast)
.collect(toImmutableList()));
}

@Override
public void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
{
finishWrite(session, (IcebergTableHandle) tableHandle, fragments, true);
}

@Override
public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
List<Types.NestedField> unmodifiedColumns = new ArrayList<>();
unmodifiedColumns.add(ROW_POSITION);

// Include all the non-updated columns. These are needed when writing the new data file with updated column values.
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Set<Integer> updatedFields = updatedColumns.stream()
.map(IcebergColumnHandle.class::cast)
.map(IcebergColumnHandle::getId)
.collect(toImmutableSet());
for (Types.NestedField column : SchemaParser.fromJson(table.getTableSchemaJson()).columns()) {
if (!updatedFields.contains(column.fieldId())) {
unmodifiedColumns.add(column);
}
}

Types.NestedField icebergRowIdField = Types.NestedField.required(TRINO_UPDATE_ROW_ID_COLUMN_ID, TRINO_UPDATE_ROW_ID_COLUMN_NAME, Types.StructType.of(unmodifiedColumns));
return getColumnHandle(icebergRowIdField, typeManager);
}

private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection<Slice> fragments, boolean runUpdateValidations)
{
Table icebergTable = transaction.table();

List<CommitTaskData> commitTasks = fragments.stream()
Expand All @@ -1299,32 +1358,56 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
rowDelta.validateNoConflictingDataFiles();
}

if (runUpdateValidations) {
// Ensure a row that is updated by this commit was not deleted by a separate commit
rowDelta.validateDeletedFiles();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only for update? maybe add a comment.
or can this be unconditional?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment. Deleting a row from two commits concurrently shouldn't cause a validation to fail, but deleting a row and updating it concurrently should fail since the update might un-do the delete

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly, the method names validateDeletedFiles and validateNoConflictingDeleteFiles do not suggest to me that they should be called for updates and shouldn't be called for deletes.
I see that this is what Spark Iceberg does https://github.com/apache/iceberg/blob/f6e11148d31b408a7aea57a0efcb4428134f6a99/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java#L191-L194

keep as is

rowDelta.validateNoConflictingDeleteFiles();
}

ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
ImmutableSet.Builder<String> referencedDataFiles = ImmutableSet.builder();
for (CommitTaskData task : commitTasks) {
if (task.getContent() != FileContent.POSITION_DELETES) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Iceberg finishDelete called with commit task that was not a position delete file");
}
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, task.getPartitionSpecJson());
Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);
FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.ofPositionDeletes()
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());
switch (task.getContent()) {
case POSITION_DELETES:
FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.ofPositionDeletes()
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!partitionSpec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}

if (!partitionSpec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
rowDelta.addDeletes(deleteBuilder.build());
writtenFiles.add(task.getPath());
task.getReferencedDataFile().ifPresent(referencedDataFiles::add);
break;
case DATA:
DataFiles.Builder builder = DataFiles.builder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
rowDelta.addRows(builder.build());
writtenFiles.add(task.getPath());
break;
default:
throw new UnsupportedOperationException("Unsupported task content: " + task.getContent());
}

rowDelta.addDeletes(deleteBuilder.build());
writtenFiles.add(task.getPath());
task.getReferencedDataFile().ifPresent(referencedDataFiles::add);
}

// try to leave as little garbage as possible behind
Expand All @@ -1333,17 +1416,16 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
}

rowDelta.validateDataFilesExist(referencedDataFiles.build());
rowDelta.commit();
transaction.commitTransaction();
try {
rowDelta.commit();
transaction.commitTransaction();
}
catch (ValidationException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + table.getSchemaTableName(), e);
}
transaction = null;
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return IcebergUtil.getColumnHandle(ROW_POSITION, typeManager);
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace)
{
Expand Down Expand Up @@ -1452,14 +1534,16 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
table.getTableType(),
table.getSnapshotId(),
table.getTableSchemaJson(),
table.getPartitionSpecJson(),
table.getFormatVersion(),
newUnenforcedConstraint,
newEnforcedConstraint,
table.getProjectedColumns(),
table.getNameMappingJson(),
table.getTableLocation(),
table.getStorageProperties(),
table.getRetryMode()),
table.getRetryMode(),
table.getUpdatedColumns()),
remainingConstraint.transformKeys(ColumnHandle.class::cast),
false));
}
Expand Down
Loading