Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private RowBlock(int startOffset, int positionCount, @Nullable boolean[] rowIsNu
}

@Override
protected Block[] getRawFieldBlocks()
public Block[] getRawFieldBlocks()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder why this is needed, and whether this is actually used correctly.

{
return fieldBlocks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,28 @@

import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class CommitTaskData
{
private final String path;
private final MetricsWrapper metrics;
private final Optional<String> partitionDataJson;
private final int content;

@JsonCreator
public CommitTaskData(
@JsonProperty("path") String path,
@JsonProperty("metrics") MetricsWrapper metrics,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson)
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("content") int content)
{
this.path = requireNonNull(path, "path is null");
this.metrics = requireNonNull(metrics, "metrics is null");
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.content = content;
checkArgument(content >= 0, "content id must be positive");
}

@JsonProperty
Expand All @@ -54,4 +59,10 @@ public Optional<String> getPartitionDataJson()
{
return partitionDataJson;
}

@JsonProperty
public int getContent()
{
return content;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.types.Types;

import java.util.Objects;
Expand All @@ -28,10 +30,14 @@
import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.types.Types.NestedField.required;

public class IcebergColumnHandle
implements ColumnHandle
{
public static final int ROW_ID_COLUMN_INDEX = Integer.MIN_VALUE;
public static final String ROW_ID_COLUMN_NAME = "$row_id";

private final ColumnIdentity columnIdentity;
private final Type type;
private final Optional<String> comment;
Expand Down Expand Up @@ -116,4 +122,9 @@ public static IcebergColumnHandle create(Types.NestedField column, TypeManager t
toTrinoType(column.type(), typeManager),
Optional.ofNullable(column.doc()));
}

public static IcebergColumnHandle createUpdateRowIdColumnHandle(Schema tableSchema, TypeManager typeManager)
{
return create(required(ROW_ID_COLUMN_INDEX, ROW_ID_COLUMN_NAME, DeleteSchemaUtil.posDeleteSchema(tableSchema).asStruct()), typeManager);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it used for deletes only, or for updates as well?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
Expand All @@ -85,6 +87,7 @@
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -125,7 +128,7 @@
import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT;
import static io.trino.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation;
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergColumnHandle.primitiveIcebergColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.createUpdateRowIdColumnHandle;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData;
Expand Down Expand Up @@ -154,7 +157,6 @@
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
Expand All @@ -166,6 +168,7 @@
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.Transactions.createTableTransaction;
import static org.apache.iceberg.util.SerializationUtil.serializeToBytes;

public class IcebergMetadata
implements ConnectorMetadata
Expand Down Expand Up @@ -255,6 +258,11 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
name.getTableName(),
name.getTableType(),
snapshotId,
getFileFormat(table),
getDataPath(table.location()),
serializeToBytes(table.schema()),
serializeToBytes(table.spec()),
Comment on lines +263 to +264
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there already was an idea to add schema to IcebergTableHandle and it was rejected (?) for some reason.

@phd3 do you remember?

new ArrayList<>(),
TupleDomain.all(),
TupleDomain.all());
}
Expand Down Expand Up @@ -640,12 +648,6 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
.collect(toImmutableList())));
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return primitiveIcebergColumnHandle(0, "$row_id", BIGINT, Optional.empty());
}

@Override
public Optional<Object> getInfo(ConnectorTableHandle tableHandle)
{
Expand Down Expand Up @@ -754,7 +756,90 @@ public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, Conn
@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new TrinoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely");
return beginUpdate(session, tableHandle, new ArrayList<>());
}

@Override
public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
return new IcebergTableHandle(
handle.getSchemaName(),
handle.getTableName(),
handle.getTableType(),
handle.getSnapshotId(),
handle.getFileFormat(),
handle.getOutputPath(),
handle.getSerializedSchema(),
handle.getSerializedPartitionSpec(),
updatedColumns.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList()),
handle.getUnenforcedPredicate(),
handle.getEnforcedPredicate());
}

@Override
public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
{
finishUpdate(session, tableHandle, fragments);
}

@Override
public void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
{
// TODO: refactor logic with finishInsert
org.apache.iceberg.Table icebergTable = transaction.table();

List<CommitTaskData> commitTasks = fragments.stream()
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());

Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
.map(field -> field.transform().getResultType(
icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);

FileIO io = new HdfsFileIo(hdfsEnvironment, new HdfsContext(session));

RowDelta rowDelta = transaction.newRowDelta();
for (CommitTaskData task : commitTasks) {
switch (task.getContent()) {
case 0:
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withInputFile(io.newInputFile(task.getPath()))
.withFormat(getFileFormat(icebergTable))
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}

rowDelta.addRows(builder.build());
continue;
case 1:
FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(icebergTable.spec())
.withInputFile(io.newInputFile(task.getPath()))
.withFormat(getFileFormat(icebergTable))
.ofPositionDeletes()
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}

rowDelta.addDeletes(deleteBuilder.build());
continue;
default:
throw new IllegalStateException("unknown content type " + task.getContent());
}
}

rowDelta.validateDeletedFiles();
rowDelta.commit();
transaction.commitTransaction();
}

@Override
Expand All @@ -772,6 +857,21 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle
return OptionalLong.empty();
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return getUpdateRowIdColumnHandle(session, tableHandle, null);
}

@Override
public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
// we need all columns of the table in position delete schema, so updateColumns value is not used
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
org.apache.iceberg.Table icebergTable = getIcebergTable(session, table.getSchemaTableName());
return createUpdateRowIdColumnHandle(icebergTable.schema(), typeManager);
}

@Override
public boolean usesLegacyTableLayouts()
{
Expand Down Expand Up @@ -814,6 +914,11 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
table.getTableName(),
table.getTableType(),
table.getSnapshotId(),
table.getFileFormat(),
table.getOutputPath(),
table.getSerializedSchema(),
table.getSerializedPartitionSpec(),
new ArrayList<>(),
newUnenforcedConstraint,
newEnforcedConstraint),
newUnenforcedConstraint.transformKeys(ColumnHandle.class::cast),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.trino.spi.type.VarcharType;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class IcebergPageSink
private final JsonCodec<CommitTaskData> jsonCodec;
private final ConnectorSession session;
private final FileFormat fileFormat;
private final FileContent fileContent;
private final PagePartitioner pagePartitioner;

private final List<WriteContext> writers = new ArrayList<>();
Expand All @@ -112,6 +114,7 @@ public IcebergPageSink(
JsonCodec<CommitTaskData> jsonCodec,
ConnectorSession session,
FileFormat fileFormat,
FileContent fileContent,
int maxOpenWriters)
{
requireNonNull(inputColumns, "inputColumns is null");
Expand All @@ -124,6 +127,7 @@ public IcebergPageSink(
this.jobConf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, new Path(outputPath)));
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.session = requireNonNull(session, "session is null");
this.fileContent = requireNonNull(fileContent, "fileContent is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.maxOpenWriters = maxOpenWriters;
this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec));
Expand Down Expand Up @@ -166,7 +170,8 @@ public CompletableFuture<Collection<Slice>> finish()
CommitTaskData task = new CommitTaskData(
context.getPath().toString(),
new MetricsWrapper(context.writer.getMetrics()),
context.getPartitionData().map(PartitionData::toJson));
context.getPartitionData().map(PartitionData::toJson),
fileContent.id());

commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTransactionHandle;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -86,6 +87,7 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
jsonCodec,
session,
tableHandle.getFileFormat(),
FileContent.DATA,
maxOpenPartitions);
}
}
Loading