Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,14 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
if (fragments.isEmpty()) {
// Commit the transaction if the table is being created without data
transaction.newFastAppend().commit();
transaction.commitTransaction();
Comment on lines 570 to 571
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why these two lines?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see now, please add a comment

transaction = null;
return Optional.empty();
}

return finishInsert(session, (IcebergWritableTableHandle) tableHandle, fragments, computedStatistics);
}

Expand Down Expand Up @@ -626,13 +634,17 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
IcebergWritableTableHandle table = (IcebergWritableTableHandle) insertHandle;
Table icebergTable = transaction.table();

List<CommitTaskData> commitTasks = fragments.stream()
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());

if (commitTasks.isEmpty()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could change this to

// avoid empty snapshot for existing tables
if (commitTasks.isEmpty() && (table.currentSnapshot() != null)) {

Then we don't need the special case in finishCreateTable().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this could be done indeed as you mentioned.
This has been actually the initial version of the code in the PR.

However, the code would be a bit more difficult to understand with the handling for this special case in finishInsert so we've decided to go further with adding the handling for empty CTAS into finishCreateTable.

Related discussion:

#12412 (comment)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this check happen before instantiating Type[] partitionColumnTypes ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the check at the beginning of the method. Thanks for bringing this up.

transaction = null;
return Optional.empty();
}

IcebergWritableTableHandle table = (IcebergWritableTableHandle) insertHandle;
Table icebergTable = transaction.table();
Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
.map(field -> field.transform().getResultType(
icebergTable.schema().findType(field.sourceId())))
Expand Down Expand Up @@ -1345,6 +1357,12 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());

if (commitTasks.isEmpty()) {
// Avoid recording "empty" write operation
transaction = null;
return;
}

Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());

RowDelta rowDelta = transaction.newRowDelta();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3482,6 +3482,78 @@ public void testUpdatingInvalidTableProperty()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testEmptyCreateTableAsSelect()
{
String tableName = "test_empty_ctas_" + randomTableSuffix();

assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation WHERE false", 0);
List<Long> initialTableSnapshots = getSnapshotIds(tableName);
assertThat(initialTableSnapshots.size())
.withFailMessage("CTAS operations must create Iceberg snapshot independently whether the selection is empty or not")
.isEqualTo(1);
assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testEmptyInsert()
{
String tableName = "test_empty_insert_" + randomTableSuffix();

assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation", "SELECT count(*) FROM nation");
List<Long> initialTableSnapshots = getSnapshotIds(tableName);

assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE false", 0);
List<Long> updatedTableSnapshots = getSnapshotIds(tableName);

assertThat(initialTableSnapshots)
.withFailMessage("INSERT operations that are not changing the state of the table must not cause the creation of a new Iceberg snapshot")
.hasSize(1)
.isEqualTo(updatedTableSnapshots);

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testEmptyUpdate()
{
String tableName = "test_empty_update_" + randomTableSuffix();

assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation", "SELECT count(*) FROM nation");
List<Long> initialTableSnapshots = getSnapshotIds(tableName);

assertUpdate("UPDATE " + tableName + " SET comment = 'new comment' WHERE nationkey IS NULL", 0);
List<Long> updatedTableSnapshots = getSnapshotIds(tableName);

assertThat(initialTableSnapshots)
.withFailMessage("UPDATE operations that are not changing the state of the table must not cause the creation of a new Iceberg snapshot")
.hasSize(1)
.isEqualTo(updatedTableSnapshots);

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testEmptyDelete()
{
String tableName = "test_empty_delete_" + randomTableSuffix();

assertUpdate("CREATE TABLE " + tableName + " WITH (format = '" + format.name() + "') AS SELECT * FROM nation", "SELECT count(*) FROM nation");
List<Long> initialTableSnapshots = getSnapshotIds(tableName);

assertUpdate("DELETE FROM " + tableName + " WHERE nationkey IS NULL", 0);
List<Long> updatedTableSnapshots = getSnapshotIds(tableName);

assertThat(initialTableSnapshots)
.withFailMessage("DELETE operations that are not changing the state of the table must not cause the creation of a new Iceberg snapshot")
.hasSize(1)
.isEqualTo(updatedTableSnapshots);

assertUpdate("DROP TABLE " + tableName);
}

private Session prepareCleanUpSession()
{
return Session.builder(getSession())
Expand Down