Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package io.trino.plugin.iceberg;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecFactory;
Expand All @@ -39,22 +37,21 @@
/*
* Serializable version of ConnectorMaterializedViewDefinition stored by iceberg connector
*/
public class IcebergMaterializedViewDefinition
public record IcebergMaterializedViewDefinition(
String originalSql,
Optional<String> catalog,
Optional<String> schema,
List<Column> columns,
Optional<Duration> gracePeriod,
Optional<String> comment,
List<CatalogSchemaName> path)
{
private static final String MATERIALIZED_VIEW_PREFIX = "/* Presto Materialized View: ";
private static final String MATERIALIZED_VIEW_SUFFIX = " */";

private static final JsonCodec<IcebergMaterializedViewDefinition> materializedViewCodec =
new JsonCodecFactory(new ObjectMapperProvider()).jsonCodec(IcebergMaterializedViewDefinition.class);

private final String originalSql;
private final Optional<String> catalog;
private final Optional<String> schema;
private final List<Column> columns;
private final Optional<Duration> gracePeriod;
private final Optional<String> comment;
private final List<CatalogSchemaName> path;

public static String encodeMaterializedViewData(IcebergMaterializedViewDefinition definition)
{
byte[] bytes = materializedViewCodec.toJsonBytes(definition);
Expand Down Expand Up @@ -86,24 +83,15 @@ public static IcebergMaterializedViewDefinition fromConnectorMaterializedViewDef
definition.getPath());
}

@JsonCreator
public IcebergMaterializedViewDefinition(
@JsonProperty("originalSql") String originalSql,
@JsonProperty("catalog") Optional<String> catalog,
@JsonProperty("schema") Optional<String> schema,
@JsonProperty("columns") List<Column> columns,
@JsonProperty("gracePeriod") Optional<Duration> gracePeriod,
@JsonProperty("comment") Optional<String> comment,
@JsonProperty("path") List<CatalogSchemaName> path)
public IcebergMaterializedViewDefinition
{
this.originalSql = requireNonNull(originalSql, "originalSql is null");
this.catalog = requireNonNull(catalog, "catalog is null");
this.schema = requireNonNull(schema, "schema is null");
this.columns = List.copyOf(requireNonNull(columns, "columns is null"));
requireNonNull(originalSql, "originalSql is null");
requireNonNull(catalog, "catalog is null");
requireNonNull(schema, "schema is null");
columns = List.copyOf(requireNonNull(columns, "columns is null"));
checkArgument(gracePeriod.isEmpty() || !gracePeriod.get().isNegative(), "gracePeriod cannot be negative: %s", gracePeriod);
this.gracePeriod = gracePeriod;
this.comment = requireNonNull(comment, "comment is null");
this.path = path == null ? ImmutableList.of() : ImmutableList.copyOf(path);
requireNonNull(comment, "comment is null");
path = path == null ? ImmutableList.of() : ImmutableList.copyOf(path);

if (catalog.isEmpty() && schema.isPresent()) {
throw new IllegalArgumentException("catalog must be present if schema is present");
Expand All @@ -113,48 +101,6 @@ public IcebergMaterializedViewDefinition(
}
}

@JsonProperty
public String getOriginalSql()
{
return originalSql;
}

@JsonProperty
public Optional<String> getCatalog()
{
return catalog;
}

@JsonProperty
public Optional<String> getSchema()
{
return schema;
}

@JsonProperty
public List<Column> getColumns()
{
return columns;
}

@JsonProperty
public Optional<Duration> getGracePeriod()
{
return gracePeriod;
}

@JsonProperty
public Optional<String> getComment()
{
return comment;
}

@JsonProperty
public List<CatalogSchemaName> getPath()
{
return path;
}

@Override
public String toString()
{
Expand All @@ -169,39 +115,13 @@ public String toString()
return getClass().getSimpleName() + joiner;
}

public static final class Column
public record Column(String name, TypeId type, Optional<String> comment)
{
private final String name;
private final TypeId type;
private final Optional<String> comment;

@JsonCreator
public Column(
@JsonProperty("name") String name,
@JsonProperty("type") TypeId type,
@JsonProperty("comment") Optional<String> comment)
{
this.name = requireNonNull(name, "name is null");
this.type = requireNonNull(type, "type is null");
this.comment = requireNonNull(comment, "comment is null");
}

@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public TypeId getType()
{
return type;
}

@JsonProperty
public Optional<String> getComment()
public Column
{
return comment;
requireNonNull(name, "name is null");
requireNonNull(type, "type is null");
requireNonNull(comment, "comment is null");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.path())
.withFileSizeInBytes(task.fileSizeInBytes())
.withFormat(table.getFileFormat().toIceberg())
.withFormat(table.fileFormat().toIceberg())
.withMetrics(task.metrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
Expand All @@ -1149,7 +1149,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
}

// try to leave as little garbage as possible behind
if (table.getRetryMode() != NO_RETRIES) {
if (table.retryMode() != NO_RETRIES) {
cleanExtraOutputFiles(session, writtenFiles.build());
}

Expand All @@ -1165,7 +1165,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,

if (!computedStatistics.isEmpty()) {
try {
beginTransaction(catalog.loadTable(session, table.getName()));
beginTransaction(catalog.loadTable(session, table.name()));
Table reloadedTable = transaction.table();
CollectedStatistics collectedStatistics = processComputedTableStatistics(reloadedTable, computedStatistics);
StatisticsFile statisticsFile = tableStatisticsWriter.writeStatisticsFile(
Expand Down Expand Up @@ -2353,7 +2353,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg
{
IcebergMergeTableHandle mergeHandle = (IcebergMergeTableHandle) mergeTableHandle;
IcebergTableHandle handle = mergeHandle.getTableHandle();
RetryMode retryMode = mergeHandle.getInsertTableHandle().getRetryMode();
RetryMode retryMode = mergeHandle.getInsertTableHandle().retryMode();
finishWrite(session, handle, fragments, retryMode);
}

Expand Down Expand Up @@ -2934,7 +2934,7 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.path())
.withFileSizeInBytes(task.fileSizeInBytes())
.withFormat(table.getFileFormat().toIceberg())
.withFormat(table.fileFormat().toIceberg())
.withMetrics(task.metrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
Expand Down Expand Up @@ -2963,7 +2963,7 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
.collect(joining(","));

// try to leave as little garbage as possible behind
if (table.getRetryMode() != NO_RETRIES) {
if (table.retryMode() != NO_RETRIES) {
cleanExtraOutputFiles(session, writtenFiles.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ public BucketFunction getBucketFunction(
}

IcebergPartitioningHandle handle = (IcebergPartitioningHandle) partitioningHandle;
Schema schema = schemaFromHandles(handle.getPartitioningColumns());
Schema schema = schemaFromHandles(handle.partitioningColumns());
return new IcebergBucketFunction(
typeOperators,
parsePartitionFields(schema, handle.getPartitioning()),
handle.getPartitioningColumns(),
parsePartitionFields(schema, handle.partitioning()),
handle.partitioningColumns(),
bucketCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ public IcebergPageSink(
ImmutableList.Builder<Integer> sortColumnIndexes = ImmutableList.builder();
ImmutableList.Builder<SortOrder> sortOrders = ImmutableList.builder();
for (TrinoSortField sortField : sortOrder) {
Types.NestedField column = outputSchema.findField(sortField.getSourceColumnId());
Types.NestedField column = outputSchema.findField(sortField.sourceColumnId());
if (column == null) {
throw new TrinoException(ICEBERG_INVALID_METADATA, "Unable to find sort field source column in the table schema: " + sortField);
}
sortColumnIndexes.add(outputSchema.columns().indexOf(column));
sortOrders.add(sortField.getSortOrder());
sortOrders.add(sortField.sortOrder());
}
this.sortColumnIndexes = sortColumnIndexes.build();
this.sortOrders = sortOrders.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,24 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa

private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritableTableHandle tableHandle)
{
Schema schema = SchemaParser.fromJson(tableHandle.getSchemaAsJson());
String partitionSpecJson = tableHandle.getPartitionsSpecsAsJson().get(tableHandle.getPartitionSpecId());
Schema schema = SchemaParser.fromJson(tableHandle.schemaAsJson());
String partitionSpecJson = tableHandle.partitionsSpecsAsJson().get(tableHandle.partitionSpecId());
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, partitionSpecJson);
LocationProvider locationProvider = getLocationProvider(tableHandle.getName(), tableHandle.getOutputPath(), tableHandle.getStorageProperties());
LocationProvider locationProvider = getLocationProvider(tableHandle.name(), tableHandle.outputPath(), tableHandle.storageProperties());
return new IcebergPageSink(
schema,
partitionSpec,
locationProvider,
fileWriterFactory,
pageIndexerFactory,
fileSystemFactory.create(session.getIdentity(), tableHandle.getFileIoProperties()),
tableHandle.getInputColumns(),
fileSystemFactory.create(session.getIdentity(), tableHandle.fileIoProperties()),
tableHandle.inputColumns(),
jsonCodec,
session,
tableHandle.getFileFormat(),
tableHandle.getStorageProperties(),
tableHandle.fileFormat(),
tableHandle.storageProperties(),
maxOpenPartitions,
tableHandle.getSortOrder(),
tableHandle.sortOrder(),
sortingFileWriterBufferSize,
sortingFileWriterMaxOpenFiles,
typeManager,
Expand Down Expand Up @@ -159,22 +159,22 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
{
IcebergMergeTableHandle merge = (IcebergMergeTableHandle) mergeHandle;
IcebergWritableTableHandle tableHandle = merge.getInsertTableHandle();
LocationProvider locationProvider = getLocationProvider(tableHandle.getName(), tableHandle.getOutputPath(), tableHandle.getStorageProperties());
Schema schema = SchemaParser.fromJson(tableHandle.getSchemaAsJson());
Map<Integer, PartitionSpec> partitionsSpecs = transformValues(tableHandle.getPartitionsSpecsAsJson(), json -> PartitionSpecParser.fromJson(schema, json));
LocationProvider locationProvider = getLocationProvider(tableHandle.name(), tableHandle.outputPath(), tableHandle.storageProperties());
Schema schema = SchemaParser.fromJson(tableHandle.schemaAsJson());
Map<Integer, PartitionSpec> partitionsSpecs = transformValues(tableHandle.partitionsSpecsAsJson(), json -> PartitionSpecParser.fromJson(schema, json));
ConnectorPageSink pageSink = createPageSink(session, tableHandle);

return new IcebergMergeSink(
locationProvider,
fileWriterFactory,
fileSystemFactory.create(session.getIdentity(), tableHandle.getFileIoProperties()),
fileSystemFactory.create(session.getIdentity(), tableHandle.fileIoProperties()),
jsonCodec,
session,
tableHandle.getFileFormat(),
tableHandle.getStorageProperties(),
tableHandle.fileFormat(),
tableHandle.storageProperties(),
schema,
partitionsSpecs,
pageSink,
tableHandle.getInputColumns().size());
tableHandle.inputColumns().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package io.trino.plugin.iceberg;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ConnectorPartitioningHandle;

Expand All @@ -23,31 +21,13 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class IcebergPartitioningHandle
public record IcebergPartitioningHandle(List<String> partitioning, List<IcebergColumnHandle> partitioningColumns)
implements ConnectorPartitioningHandle
{
private final List<String> partitioning;
private final List<IcebergColumnHandle> partitioningColumns;

@JsonCreator
public IcebergPartitioningHandle(
@JsonProperty("partitioning") List<String> partitioning,
@JsonProperty("partitioningColumns") List<IcebergColumnHandle> partitioningColumns)
{
this.partitioning = ImmutableList.copyOf(requireNonNull(partitioning, "partitioning is null"));
this.partitioningColumns = ImmutableList.copyOf(requireNonNull(partitioningColumns, "partitioningColumns is null"));
}

@JsonProperty
public List<String> getPartitioning()
{
return partitioning;
}

@JsonProperty
public List<IcebergColumnHandle> getPartitioningColumns()
public IcebergPartitioningHandle
{
return partitioningColumns;
partitioning = ImmutableList.copyOf(requireNonNull(partitioning, "partitioning is null"));
partitioningColumns = ImmutableList.copyOf(requireNonNull(partitioningColumns, "partitioningColumns is null"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public long getRetainedSizeInBytes()
+ estimatedSizeOf(path)
+ estimatedSizeOf(partitionSpecJson)
+ estimatedSizeOf(partitionDataJson)
+ estimatedSizeOf(deletes, DeleteFile::getRetainedSizeInBytes)
+ estimatedSizeOf(deletes, DeleteFile::retainedSizeInBytes)
+ splitWeight.getRetainedSizeInBytes()
+ fileStatisticsDomain.getRetainedSizeInBytes(IcebergColumnHandle::getRetainedSizeInBytes)
+ estimatedSizeOf(fileIoProperties, SizeOf::estimatedSizeOf, SizeOf::estimatedSizeOf)
Expand Down
Loading