Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ public class DeltaLakeMetadata
private static final int WRITER_VERSION = 2;
// The highest writer version Trino supports writing to
private static final int MAX_WRITER_VERSION = 3;
// This constant should be used only for a new table
private static final ProtocolEntry DEFAULT_PROTOCOL = new ProtocolEntry(READER_VERSION, WRITER_VERSION);
// Matches the dummy column Databricks stores in the metastore
private static final List<Column> DUMMY_DATA_COLUMNS = ImmutableList.of(
new Column("col", HiveType.toHiveType(new ArrayType(VarcharType.createUnboundedVarcharType())), Optional.empty()));
Expand Down Expand Up @@ -718,7 +720,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
session,
nodeVersion,
nodeId,
tableMetadata.getComment());
tableMetadata.getComment(),
DEFAULT_PROTOCOL);

setRollback(() -> deleteRecursivelyIfExists(new HdfsContext(session), hdfsEnvironment, deltaLogDirectory));
transactionLogWriter.flush();
Expand Down Expand Up @@ -984,7 +987,8 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
session,
nodeVersion,
nodeId,
handle.getComment());
handle.getComment(),
DEFAULT_PROTOCOL);
appendAddFileEntries(transactionLogWriter, dataFileInfos, handle.getPartitionedBy(), true);
transactionLogWriter.flush();
PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow());
Expand Down Expand Up @@ -1063,7 +1067,8 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table
session,
nodeVersion,
nodeId,
comment);
comment,
getProtocolEntry(session, handle.getSchemaTableName()));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand Down Expand Up @@ -1110,7 +1115,8 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
session,
nodeVersion,
nodeId,
Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription()));
Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription()),
getProtocolEntry(session, deltaLakeTableHandle.getSchemaTableName()));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand Down Expand Up @@ -1168,7 +1174,8 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
session,
nodeVersion,
nodeId,
Optional.ofNullable(handle.getMetadataEntry().getDescription()));
Optional.ofNullable(handle.getMetadataEntry().getDescription()),
getProtocolEntry(session, handle.getSchemaTableName()));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand All @@ -1190,7 +1197,8 @@ private static void appendTableEntries(
ConnectorSession session,
String nodeVersion,
String nodeId,
Optional<String> comment)
Optional<String> comment,
ProtocolEntry protocolEntry)
{
long createdTime = System.currentTimeMillis();
transactionLogWriter.appendCommitInfoEntry(
Expand All @@ -1208,7 +1216,7 @@ private static void appendTableEntries(
ISOLATION_LEVEL,
true));

transactionLogWriter.appendProtocolEntry(new ProtocolEntry(READER_VERSION, WRITER_VERSION));
transactionLogWriter.appendProtocolEntry(protocolEntry);

transactionLogWriter.appendMetadataEntry(
new MetadataEntry(
Expand Down Expand Up @@ -1848,14 +1856,19 @@ private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry)

private void checkSupportedWriterVersion(ConnectorSession session, SchemaTableName schemaTableName)
{
int requiredWriterVersion = metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session)).getMinWriterVersion();
int requiredWriterVersion = getProtocolEntry(session, schemaTableName).getMinWriterVersion();
if (requiredWriterVersion > MAX_WRITER_VERSION) {
throw new TrinoException(
NOT_SUPPORTED,
format("Table %s requires Delta Lake writer version %d which is not supported", schemaTableName, requiredWriterVersion));
}
}

private ProtocolEntry getProtocolEntry(ConnectorSession session, SchemaTableName schemaTableName)
{
return metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session));
}

private List<DeltaLakeColumnHandle> getUnmodifiedColumns(DeltaLakeTableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
Set<DeltaLakeColumnHandle> updatedColumnHandles = updatedColumns.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,36 @@ public void testTrinoAlterTablePreservesTableMetadata()
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
public void testTrinoPreservesReaderAndWriterVersions()
{
String tableName = "test_trino_preserves_versions_" + randomTableSuffix();
String tableDirectory = "databricks-compatibility-test-" + tableName;

onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minReaderVersion'='1', 'delta.minWriterVersion'='1', 'delta.checkpointInterval' = 1)",
tableName,
bucketName,
tableDirectory));
try {
onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".col IS 'test column comment'");
onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test table comment'");
onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_col INT");
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 1)");
onTrino().executeQuery("UPDATE delta.default." + tableName + " SET col = 2");
onTrino().executeQuery("DELETE FROM delta.default." + tableName);
onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + "ON (t.col = s.col) WHEN MATCHED THEN UPDATE SET new_col = 3");

List<?> minReaderVersion = getOnlyElement(onDelta().executeQuery("SHOW TBLPROPERTIES " + tableName + "(delta.minReaderVersion)").rows());
assertEquals((String) minReaderVersion.get(1), "1");
List<?> minWriterVersion = getOnlyElement(onDelta().executeQuery("SHOW TBLPROPERTIES " + tableName + "(delta.minWriterVersion)").rows());
assertEquals((String) minWriterVersion.get(1), "1");
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}
}