Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ public class DeltaLakeMetadata
public static final String MERGE_OPERATION = "MERGE";
public static final String UPDATE_OPERATION = "UPDATE"; // used by old Trino versions and Spark
public static final String DELETE_OPERATION = "DELETE"; // used Trino for whole table/partition deletes as well as Spark
public static final String TRUNCATE_OPERATION = "TRUNCATE";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static final String OPTIMIZE_OPERATION = "OPTIMIZE";
public static final String SET_TBLPROPERTIES_OPERATION = "SET TBLPROPERTIES";
public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN";
Expand Down Expand Up @@ -3259,6 +3260,12 @@ public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession sessi
return WriterScalingOptions.ENABLED;
}

@Override
public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
executeDelete(session, checkValidTableHandle(tableHandle), TRUNCATE_OPERATION);
}

@Override
public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, ConnectorTableHandle handle)
{
Expand All @@ -3273,6 +3280,11 @@ public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, Conn

@Override
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle)
{
return executeDelete(session, handle, DELETE_OPERATION);
}

private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle, String operation)
{
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) handle;
if (isAppendOnly(tableHandle.getMetadataEntry())) {
Expand All @@ -3294,7 +3306,7 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle
throw new TransactionConflictException(format("Conflicting concurrent writes found. Expected transaction log version: %s, actual version: %s", tableHandle.getReadVersion(), currentVersion));
}
long commitVersion = currentVersion + 1;
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, writeTimestamp, DELETE_OPERATION, tableHandle.getReadVersion()));
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, writeTimestamp, operation, tableHandle.getReadVersion()));

long deletedRecords = 0L;
boolean allDeletedFilesStatsPresent = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_CREATE_MATERIALIZED_VIEW,
SUPPORTS_RENAME_SCHEMA,
SUPPORTS_TRUNCATE -> false;
SUPPORTS_RENAME_SCHEMA -> false;
default -> super.hasBehavior(connectorBehavior);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ private void testCorruptedTableLocation(String tableName, Path tableLocation, bo
assertQueryFails("UPDATE " + tableName + " SET foo = 'bar'", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("DELETE FROM " + tableName, "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("MERGE INTO " + tableName + " USING (SELECT 1 a) input ON true WHEN MATCHED THEN DELETE", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("TRUNCATE TABLE " + tableName, "This connector does not support truncating tables");
assertQueryFails("TRUNCATE TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("COMMENT ON TABLE " + tableName + " IS NULL", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("COMMENT ON COLUMN " + tableName + ".foo IS NULL", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("CALL system.vacuum(CURRENT_SCHEMA, '" + tableName + "', '7d')", "Metadata not found in transaction log for tpch." + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
SUPPORTS_RENAME_FIELD,
SUPPORTS_RENAME_SCHEMA,
SUPPORTS_SET_COLUMN_TYPE,
SUPPORTS_TOPN_PUSHDOWN,
SUPPORTS_TRUNCATE -> false;
SUPPORTS_TOPN_PUSHDOWN -> false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't use enhanced switch (so that single item modification is a single line change, not two)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, i see this was recently changed and discussed here #18783 (review)

default -> super.hasBehavior(connectorBehavior);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,38 @@ public void testDeleteOnAppendOnlyTableFails()
// Whole table deletes should be disallowed as well
assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM default." + tableName))
.hasMessageContaining("Cannot modify rows from a table with 'delta.appendOnly' set to true");
assertQueryFailure(() -> onTrino().executeQuery("TRUNCATE TABLE delta.default." + tableName))
.hasMessageContaining("Cannot modify rows from a table with 'delta.appendOnly' set to true");

assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 11), row(2, 12));
onTrino().executeQuery("DROP TABLE " + tableName);
}

// OSS Delta doesn't support TRUNCATE TABLE statement
@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTruncateTable()
{
String tableName = "test_truncate_table_" + randomNameSuffix();
onTrino().executeQuery("" +
"CREATE TABLE delta.default." + tableName +
"(a INT)" +
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "')");
try {
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES 1, 2, 3");
onTrino().executeQuery("TRUNCATE TABLE delta.default." + tableName);
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).hasNoRows();

onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES 4, 5, 6");
onDelta().executeQuery("TRUNCATE TABLE default." + tableName);
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).hasNoRows();
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}

// Databricks 12.1 and OSS Delta 2.4.0 added support for deletion vectors
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
Expand Down