Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,8 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (IcebergColumnHandle columnHandle : getColumns(icebergTable.schema(), typeManager)) {
for (IcebergColumnHandle columnHandle : getColumns(SchemaParser.fromJson(table.getTableSchemaJson()), typeManager)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good change.

However, it looks like we call SchemaParser.fromJson(tableHandle.getTableSchemaJson()) multiple times on one table handle. Am i right?

SchemaParser.fromJson does cache internally (on a static field).
This isn't ideal, and we could better, caching within table handle object. Not sure it matters though -- depends how frequently this is called.

Copy link
Copy Markdown
Contributor Author

@findinpath findinpath Sep 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we switch to SchemaParser.fromJson(JsonNode)

fromJson(JsonUtil.mapper().readValue(jsonKey, JsonNode.class))

?

columnHandles.put(columnHandle.getName(), columnHandle);
}
columnHandles.put(FILE_PATH.getColumnName(), pathColumnHandle());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) {
partitionSpec.specId(),
split.getPartitionDataJson(),
split.getFileFormat(),
split.getSchemaAsJson().map(SchemaParser::fromJson),
SchemaParser.fromJson(table.getTableSchemaJson()),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to internal caching within the method
org.apache.iceberg.ManifestGroup.planFiles
the returned file scan tasks may contain an invalid split
schema string.

is it testable?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it it testable through io.trino.plugin.iceberg.TestIcebergAvroConnectorTest.

I was reluctant on squashing the two commits of this PR because they address different issues.

The test io.trino.plugin.iceberg.TestIcebergAvroConnectorTest covers both of the issues.

requiredColumns,
effectivePredicate,
table.getNameMappingJson().map(NameMappingParser::fromJson),
Expand Down Expand Up @@ -495,7 +495,7 @@ private ConnectorPageSource openDeletes(
0,
"",
IcebergFileFormat.fromIceberg(delete.format()),
Optional.of(schemaFromHandles(columns)),
schemaFromHandles(columns),
columns,
tupleDomain,
Optional.empty(),
Expand All @@ -513,7 +513,7 @@ public ReaderPageSourceWithRowPositions createDataPageSource(
int partitionSpecId,
String partitionData,
IcebergFileFormat fileFormat,
Optional<Schema> fileSchema,
Schema fileSchema,
List<IcebergColumnHandle> dataColumns,
TupleDomain<IcebergColumnHandle> predicate,
Optional<NameMapping> nameMapping,
Expand Down Expand Up @@ -564,7 +564,7 @@ public ReaderPageSourceWithRowPositions createDataPageSource(
length,
partitionSpecId,
partitionData,
fileSchema.orElseThrow(),
fileSchema,
nameMapping,
dataColumns);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.openjdk.jol.info.ClassLayout;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
Expand All @@ -44,7 +43,6 @@ public class IcebergSplit
private final List<HostAddress> addresses;
private final String partitionSpecJson;
private final String partitionDataJson;
private final Optional<String> schemaAsJson;
private final List<DeleteFile> deletes;
private final SplitWeight splitWeight;

Expand All @@ -58,7 +56,6 @@ public IcebergSplit(
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("partitionSpecJson") String partitionSpecJson,
@JsonProperty("partitionDataJson") String partitionDataJson,
@JsonProperty("schemaAsJson") Optional<String> schemaAsJson,
@JsonProperty("deletes") List<DeleteFile> deletes,
@JsonProperty("splitWeight") SplitWeight splitWeight)
{
Expand All @@ -70,7 +67,6 @@ public IcebergSplit(
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.partitionSpecJson = requireNonNull(partitionSpecJson, "partitionSpecJson is null");
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null");
this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null"));
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
}
Expand Down Expand Up @@ -124,12 +120,6 @@ public String getPartitionSpecJson()
return partitionSpecJson;
}

@JsonProperty
public Optional<String> getSchemaAsJson()
{
return schemaAsJson;
}

@JsonProperty
public String getPartitionDataJson()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -76,7 +75,6 @@
import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergFileFormat.AVRO;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
Expand Down Expand Up @@ -424,7 +422,6 @@ static boolean partitionMatchesPredicate(

private IcebergSplit toIcebergSplit(FileScanTask task)
{
IcebergFileFormat fileFormat = IcebergFileFormat.fromIceberg(task.file().format());
return new IcebergSplit(
task.file().path().toString(),
task.start(),
Expand All @@ -434,7 +431,6 @@ private IcebergSplit toIcebergSplit(FileScanTask task)
ImmutableList.of(),
PartitionSpecParser.toJson(task.spec()),
PartitionData.toJson(task.file().partition()),
fileFormat != AVRO ? Optional.empty() : Optional.of(SchemaParser.toJson(task.spec().schema())),
task.deletes().stream()
.map(DeleteFile::fromIceberg)
.collect(toImmutableList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import static io.trino.spi.predicate.Domain.multipleValues;
import static io.trino.spi.predicate.Domain.singleValue;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
Expand Down Expand Up @@ -5427,6 +5428,55 @@ public void testReadFromVersionedTableWithSchemaEvolution()
.matches("VALUES (VARCHAR 'a', 11, NULL), (VARCHAR 'b', 22, BIGINT '32')");
}

@Test
public void testReadFromVersionedTableWithSchemaEvolutionDropColumn()
{
String tableName = "test_versioned_table_schema_evolution_drop_column_" + randomTableSuffix();

assertQuerySucceeds("CREATE TABLE " + tableName + "(col1 varchar, col2 integer, col3 boolean)");
long v1SnapshotId = getCurrentSnapshotId(tableName);
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN))
.returnsEmptyResult();

assertUpdate("INSERT INTO " + tableName + " VALUES ('a', 1 , true)", 1);
long v2SnapshotId = getCurrentSnapshotId(tableName);
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN))
.matches("VALUES (VARCHAR 'a', 1, true)");

assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN col3");
assertUpdate("INSERT INTO " + tableName + " VALUES ('b', 2)", 1);
long v3SnapshotId = getCurrentSnapshotId(tableName);
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER))
.matches("VALUES (VARCHAR 'a', 1), (VARCHAR 'b', 2)");
assertThat(query("SELECT * FROM " + tableName))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER))
.matches("VALUES (VARCHAR 'a', 1), (VARCHAR 'b', 2)");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN))
.matches("VALUES (VARCHAR 'a', 1, true)");

assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN col2");
assertUpdate("INSERT INTO " + tableName + " VALUES ('c')", 1);
long v4SnapshotId = getCurrentSnapshotId(tableName);
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v4SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR))
.matches("VALUES (VARCHAR 'a'), (VARCHAR 'b'), (VARCHAR 'c')");
assertThat(query("SELECT * FROM " + tableName))
.hasOutputTypes(ImmutableList.of(VARCHAR))
.matches("VALUES (VARCHAR 'a'), (VARCHAR 'b'), (VARCHAR 'c')");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER))
.matches("VALUES (VARCHAR 'a', 1), (VARCHAR 'b', 2)");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId))
.hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN))
.matches("VALUES (VARCHAR 'a', 1, true)");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testReadFromVersionedTableWithPartitionSpecEvolution()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle
ImmutableList.of(),
PartitionSpecParser.toJson(PartitionSpec.unpartitioned()),
PartitionData.toJson(new PartitionData(new Object[] {})),
Optional.empty(),
ImmutableList.of(),
SplitWeight.standard());

Expand Down