diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 18f74440b87c..dc97f035a1cd 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -508,6 +508,7 @@ public LocatedTableHandle getTableHandle(ConnectorSession session, SchemaTableNa managed, tableLocation, metadataEntry, + protocolEntry, TupleDomain.all(), TupleDomain.all(), Optional.empty(), @@ -1258,7 +1259,7 @@ private static boolean isCreatedBy(Table table, String queryId) public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional comment) { DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); - checkSupportedWriterVersion(session, handle); + checkSupportedWriterVersion(handle); ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry()); if (columnMappingMode != ID && columnMappingMode != NAME && columnMappingMode != NONE) { throw new TrinoException(NOT_SUPPORTED, "Setting a table comment with column mapping %s is not supported".formatted(columnMappingMode)); @@ -1280,7 +1281,7 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table SET_TBLPROPERTIES_OPERATION, session, comment, - getProtocolEntry(session, handle)); + handle.getProtocolEntry()); transactionLogWriter.flush(); } catch (Exception e) { @@ -1294,7 +1295,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl DeltaLakeTableHandle deltaLakeTableHandle = (DeltaLakeTableHandle) tableHandle; DeltaLakeColumnHandle deltaLakeColumnHandle = (DeltaLakeColumnHandle) column; verify(deltaLakeColumnHandle.isBaseColumn(), "Unexpected dereference: %s", column); - checkSupportedWriterVersion(session, deltaLakeTableHandle); + checkSupportedWriterVersion(deltaLakeTableHandle); ColumnMappingMode columnMappingMode = getColumnMappingMode(deltaLakeTableHandle.getMetadataEntry()); if (columnMappingMode != ID && columnMappingMode != NAME && columnMappingMode != NONE) { throw new TrinoException(NOT_SUPPORTED, "Setting a column comment with column mapping %s is not supported".formatted(columnMappingMode)); @@ -1324,7 +1325,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl CHANGE_COLUMN_OPERATION, session, Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription()), - getProtocolEntry(session, deltaLakeTableHandle)); + deltaLakeTableHandle.getProtocolEntry()); transactionLogWriter.flush(); } catch (Exception e) { @@ -1348,7 +1349,7 @@ public void setViewColumnComment(ConnectorSession session, SchemaTableName viewN public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata newColumnMetadata) { DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); - checkSupportedWriterVersion(session, handle); + checkSupportedWriterVersion(handle); ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry()); if (changeDataFeedEnabled(handle.getMetadataEntry()) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) { throw new TrinoException(NOT_SUPPORTED, "Column name %s is forbidden when change data feed is enabled".formatted(newColumnMetadata.getName())); @@ -1410,7 +1411,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle ADD_COLUMN_OPERATION, session, Optional.ofNullable(handle.getMetadataEntry().getDescription()), - getProtocolEntry(session, handle)); + handle.getProtocolEntry()); transactionLogWriter.flush(); } catch (Exception e) { @@ -1427,7 +1428,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl String dropColumnName = deltaLakeColumn.getBaseColumnName(); MetadataEntry metadataEntry = table.getMetadataEntry(); - checkSupportedWriterVersion(session, table); + checkSupportedWriterVersion(table); ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry); if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) { throw new TrinoException(NOT_SUPPORTED, "Cannot drop column from table using column mapping mode " + columnMappingMode); @@ -1476,7 +1477,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl DROP_COLUMN_OPERATION, session, Optional.ofNullable(metadataEntry.getDescription()), - getProtocolEntry(session, table)); + table.getProtocolEntry()); transactionLogWriter.flush(); } catch (Exception e) { @@ -1508,7 +1509,7 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan verify(deltaLakeColumn.isBaseColumn(), "Unexpected dereference: %s", deltaLakeColumn); String sourceColumnName = deltaLakeColumn.getBaseColumnName(); - checkSupportedWriterVersion(session, table); + checkSupportedWriterVersion(table); if (changeDataFeedEnabled(table.getMetadataEntry())) { throw new TrinoException(NOT_SUPPORTED, "Cannot rename column when change data feed is enabled"); } @@ -1558,7 +1559,7 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan RENAME_COLUMN_OPERATION, session, Optional.ofNullable(metadataEntry.getDescription()), - getProtocolEntry(session, table)); + table.getProtocolEntry()); transactionLogWriter.flush(); // Don't update extended statistics because it uses physical column names internally } @@ -1671,7 +1672,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto { DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle; checkWriteAllowed(session, table); - checkWriteSupported(session, table); + checkWriteSupported(table); List inputColumns = columns.stream() .map(handle -> (DeltaLakeColumnHandle) handle) @@ -1845,7 +1846,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT throw new TrinoException(NOT_SUPPORTED, "Cannot modify rows from a table with '" + APPEND_ONLY_CONFIGURATION_KEY + "' set to true"); } checkWriteAllowed(session, handle); - checkWriteSupported(session, handle); + checkWriteSupported(handle); List inputColumns = getColumns(handle.getMetadataEntry()).stream() .filter(column -> column.getColumnType() != SYNTHESIZED) @@ -2054,7 +2055,7 @@ private BeginTableExecuteResult( executeHandle.withProcedureHandle(optimizeHandle.withCurrentVersion(table.getReadVersion())), @@ -2158,9 +2159,9 @@ private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableH } } - private void checkWriteSupported(ConnectorSession session, DeltaLakeTableHandle handle) + private void checkWriteSupported(DeltaLakeTableHandle handle) { - checkSupportedWriterVersion(session, handle); + checkSupportedWriterVersion(handle); checkUnsupportedGeneratedColumns(handle.getMetadataEntry()); ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry()); if (!(columnMappingMode == NONE || columnMappingMode == ColumnMappingMode.NAME || columnMappingMode == ColumnMappingMode.ID)) { @@ -2180,9 +2181,9 @@ private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry) } } - private void checkSupportedWriterVersion(ConnectorSession session, DeltaLakeTableHandle handle) + private void checkSupportedWriterVersion(DeltaLakeTableHandle handle) { - int requiredWriterVersion = getProtocolEntry(session, handle).getMinWriterVersion(); + int requiredWriterVersion = handle.getProtocolEntry().getMinWriterVersion(); if (requiredWriterVersion > MAX_WRITER_VERSION) { throw new TrinoException( NOT_SUPPORTED, @@ -2190,11 +2191,6 @@ private void checkSupportedWriterVersion(ConnectorSession session, DeltaLakeTabl } } - private ProtocolEntry getProtocolEntry(ConnectorSession session, DeltaLakeTableHandle handle) - { - return transactionLogAccess.getProtocolEntry(session, getSnapshot(handle.getSchemaTableName(), handle.getLocation(), session)); - } - private TableSnapshot getSnapshot(SchemaTableName schemaTableName, String tableLocation, ConnectorSession session) { try { @@ -2338,7 +2334,7 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta throw new TrinoException(NOT_SUPPORTED, "The following properties cannot be updated: " + String.join(", ", unsupportedProperties)); } - ProtocolEntry currentProtocolEntry = getProtocolEntry(session, handle); + ProtocolEntry currentProtocolEntry = handle.getProtocolEntry(); long createdTime = Instant.now().toEpochMilli(); @@ -2582,6 +2578,7 @@ public Optional> applyFilter(C tableHandle.isManaged(), tableHandle.getLocation(), tableHandle.getMetadataEntry(), + tableHandle.getProtocolEntry(), // Do not simplify the enforced constraint, the connector is guaranteeing the constraint will be applied as is. // The unenforced constraint will still be checked by the engine. tableHandle.getEnforcedPartitionConstraint() @@ -2884,6 +2881,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession handle.isManaged(), handle.getLocation(), metadata, + handle.getProtocolEntry(), TupleDomain.all(), TupleDomain.all(), Optional.empty(), @@ -3326,7 +3324,7 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl throw new TrinoException(NOT_SUPPORTED, "Cannot modify rows from a table with '" + APPEND_ONLY_CONFIGURATION_KEY + "' set to true"); } checkWriteAllowed(session, tableHandle); - checkWriteSupported(session, tableHandle); + checkWriteSupported(tableHandle); String tableLocation = tableHandle.location(); List activeFiles = getAddFileEntriesMatchingEnforcedPartitionConstraint(session, tableHandle); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java index 5a69dd105107..d4a02aaae718 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableSet; import io.airlift.units.DataSize; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; @@ -46,6 +47,7 @@ public enum WriteType private final boolean managed; private final String location; private final MetadataEntry metadataEntry; + private final ProtocolEntry protocolEntry; private final TupleDomain enforcedPartitionConstraint; private final TupleDomain nonPartitionConstraint; private final Optional writeType; @@ -73,6 +75,7 @@ public DeltaLakeTableHandle( @JsonProperty("managed") boolean managed, @JsonProperty("location") String location, @JsonProperty("metadataEntry") MetadataEntry metadataEntry, + @JsonProperty("protocolEntry") ProtocolEntry protocolEntry, @JsonProperty("enforcedPartitionConstraint") TupleDomain enforcedPartitionConstraint, @JsonProperty("nonPartitionConstraint") TupleDomain nonPartitionConstraint, @JsonProperty("writeType") Optional writeType, @@ -88,6 +91,7 @@ public DeltaLakeTableHandle( managed, location, metadataEntry, + protocolEntry, enforcedPartitionConstraint, nonPartitionConstraint, ImmutableSet.of(), @@ -107,6 +111,7 @@ public DeltaLakeTableHandle( boolean managed, String location, MetadataEntry metadataEntry, + ProtocolEntry protocolEntry, TupleDomain enforcedPartitionConstraint, TupleDomain nonPartitionConstraint, Set constraintColumns, @@ -124,6 +129,7 @@ public DeltaLakeTableHandle( this.managed = managed; this.location = requireNonNull(location, "location is null"); this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null"); + this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null"); this.enforcedPartitionConstraint = requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null"); this.nonPartitionConstraint = requireNonNull(nonPartitionConstraint, "nonPartitionConstraint is null"); this.writeType = requireNonNull(writeType, "writeType is null"); @@ -147,6 +153,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set proj managed, location, metadataEntry, + protocolEntry, enforcedPartitionConstraint, nonPartitionConstraint, constraintColumns, @@ -168,6 +175,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max managed, location, metadataEntry, + protocolEntry, enforcedPartitionConstraint, nonPartitionConstraint, constraintColumns, @@ -234,6 +242,12 @@ public MetadataEntry getMetadataEntry() return metadataEntry; } + @JsonProperty + public ProtocolEntry getProtocolEntry() + { + return protocolEntry; + } + @JsonProperty public TupleDomain getEnforcedPartitionConstraint() { @@ -324,6 +338,7 @@ public boolean equals(Object o) managed == that.managed && Objects.equals(location, that.location) && Objects.equals(metadataEntry, that.metadataEntry) && + Objects.equals(protocolEntry, that.protocolEntry) && Objects.equals(enforcedPartitionConstraint, that.enforcedPartitionConstraint) && Objects.equals(nonPartitionConstraint, that.nonPartitionConstraint) && Objects.equals(writeType, that.writeType) && @@ -344,6 +359,7 @@ public int hashCode() managed, location, metadataEntry, + protocolEntry, enforcedPartitionConstraint, nonPartitionConstraint, writeType, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 33f249fbc8cd..0c4030768195 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -252,14 +252,14 @@ public void testDeleteWholePartition() assertFileSystemAccesses( "DELETE FROM test_delete_part_key WHERE key = 'p1'", ImmutableMultiset.builder() - .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 6) // TODO (https://github.com/trinodb/trino/issues/16782) should be checked once per query + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 5) // TODO (https://github.com/trinodb/trino/issues/16782) should be checked once per query .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_EXISTS), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_EXISTS), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_EXISTS), 1) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 5) // TODO (https://github.com/trinodb/trino/issues/16780) why is last transaction log accessed more times than others? + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 4) // TODO (https://github.com/trinodb/trino/issues/16780) why is last transaction log accessed more times than others? .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 1) .build()); @@ -280,14 +280,14 @@ public void testDeleteWholeTable() assertFileSystemAccesses( "DELETE FROM test_delete_whole_table WHERE true", ImmutableMultiset.builder() - .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 6) // TODO (https://github.com/trinodb/trino/issues/16782) should be checked once per query + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 5) // TODO (https://github.com/trinodb/trino/issues/16782) should be checked once per query .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_EXISTS), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_EXISTS), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_EXISTS), 1) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 5) // TODO (https://github.com/trinodb/trino/issues/16780) why is last transaction log accessed more times than others? + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 4) // TODO (https://github.com/trinodb/trino/issues/16780) why is last transaction log accessed more times than others? .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 1) .build()); @@ -306,7 +306,7 @@ public void testDeleteWithNonPartitionFilter() assertFileSystemAccesses( "DELETE FROM test_delete_with_non_partition_filter WHERE page_url ='url1'", ImmutableMultiset.builder() - .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 7) // TODO (https://github.com/trinodb/trino/issues/16782) should be checked once per query + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 6) // TODO (https://github.com/trinodb/trino/issues/16782) should be checked once per query .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_EXISTS), 2) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 1) @@ -315,7 +315,7 @@ public void testDeleteWithNonPartitionFilter() .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_EXISTS), 2) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_EXISTS), 2) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 5) // TODO (https://github.com/trinodb/trino/issues/16780) why is last transaction log accessed more times than others? + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 4) // TODO (https://github.com/trinodb/trino/issues/16780) why is last transaction log accessed more times than others? .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 1) .addCopies(new FileOperation(DATA, "key=domain1/", INPUT_FILE_NEW_STREAM), 2) .addCopies(new FileOperation(DATA, "key=domain1/", INPUT_FILE_GET_LENGTH), 2) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java index 2c00208004c5..4a1512fe9c00 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java @@ -31,6 +31,7 @@ import io.trino.plugin.deltalake.metastore.DeltaLakeMetastoreModule; import io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HiveMetastoreFactory; @@ -529,6 +530,7 @@ private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set