Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public final class IcebergUtil
public static final String METADATA_FOLDER_NAME = "metadata";
public static final String METADATA_FILE_EXTENSION = ".metadata.json";
public static final String TRINO_QUERY_ID_NAME = "trino_query_id";
public static final String TRINO_USER_NAME = "trino_user";
// For backward compatibility only. DO NOT USE.
private static final String BROKEN_ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp";
// For backward compatibility only. DO NOT USE.
Expand Down Expand Up @@ -1056,6 +1057,7 @@ public static String fileName(String path)
public static void commit(SnapshotUpdate<?> update, ConnectorSession session)
{
update.set(TRINO_QUERY_ID_NAME, session.getQueryId());
update.set(TRINO_USER_NAME, session.getUser());
update.commit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
import static io.trino.plugin.iceberg.IcebergTestUtils.withSmallRowGroups;
import static io.trino.plugin.iceberg.IcebergUtil.TRINO_QUERY_ID_NAME;
import static io.trino.plugin.iceberg.IcebergUtil.TRINO_USER_NAME;
import static io.trino.plugin.iceberg.IcebergUtil.getLatestMetadataLocation;
import static io.trino.spi.predicate.Domain.multipleValues;
import static io.trino.spi.predicate.Domain.singleValue;
Expand Down Expand Up @@ -7378,17 +7379,17 @@ public void testSnapshotSummariesHaveTrinoQueryIdFormatV1()
String tableName = "test_snapshot_query_ids_v1" + randomNameSuffix();

// Create empty table
assertQueryIdStored(tableName, executeWithQueryId(format("CREATE TABLE %s (a bigint, b bigint) WITH (format_version = 1, partitioning = ARRAY['a'])", tableName)));
assertQueryIdAndUserStored(tableName, executeWithQueryId(format("CREATE TABLE %s (a bigint, b bigint) WITH (format_version = 1, partitioning = ARRAY['a'])", tableName)));

// Insert some records, creating 3 partitions
assertQueryIdStored(tableName, executeWithQueryId(format("INSERT INTO %s VALUES (1, 100), (2, 300), (2, 350), (3, 250)", tableName)));
assertQueryIdAndUserStored(tableName, executeWithQueryId(format("INSERT INTO %s VALUES (1, 100), (2, 300), (2, 350), (3, 250)", tableName)));

// Delete whole partition
assertQueryIdStored(tableName, executeWithQueryId(format("DELETE FROM %s WHERE a = 2", tableName)));
assertQueryIdAndUserStored(tableName, executeWithQueryId(format("DELETE FROM %s WHERE a = 2", tableName)));

// Insert some more and then optimize
assertQueryIdStored(tableName, executeWithQueryId(format("INSERT INTO %s VALUES (1, 400)", tableName)));
assertQueryIdStored(tableName, executeWithQueryId(format("ALTER TABLE %s EXECUTE OPTIMIZE", tableName)));
assertQueryIdAndUserStored(tableName, executeWithQueryId(format("INSERT INTO %s VALUES (1, 400)", tableName)));
assertQueryIdAndUserStored(tableName, executeWithQueryId(format("ALTER TABLE %s EXECUTE OPTIMIZE", tableName)));
}

@Test
Expand All @@ -7400,23 +7401,23 @@ public void testSnapshotSummariesHaveTrinoQueryIdFormatV2()
assertUpdate(format("INSERT INTO %s VALUES (1, 1), (1, 4), (1, 20), (2, 2)", sourceTableName), 4);

// Create table with CTAS
assertQueryIdStored(tableName, executeWithQueryId(format("CREATE TABLE %s WITH (format_version = 2, partitioning = ARRAY['a']) " +
assertQueryIdAndUserStored(tableName, executeWithQueryId(format("CREATE TABLE %s WITH (format_version = 2, partitioning = ARRAY['a']) " +
"AS SELECT * FROM %s", tableName, sourceTableName)));

// Insert records
assertQueryIdStored(tableName, executeWithQueryId(format("INSERT INTO %s VALUES (1, 100), (2, 300), (3, 250)", tableName)));
assertQueryIdAndUserStored(tableName, executeWithQueryId(format("INSERT INTO %s VALUES (1, 100), (2, 300), (3, 250)", tableName)));

// Delete a whole partition
assertQueryIdStored(tableName, executeWithQueryId(format("DELETE FROM %s WHERE a = 2", tableName)));
assertQueryIdAndUserStored(tableName, executeWithQueryId(format("DELETE FROM %s WHERE a = 2", tableName)));

// Delete an individual row
assertQueryIdStored(tableName, executeWithQueryId(format("DELETE FROM %s WHERE a = 1 AND b = 4", tableName)));
assertQueryIdAndUserStored(tableName, executeWithQueryId(format("DELETE FROM %s WHERE a = 1 AND b = 4", tableName)));

// Update an individual row
assertQueryIdStored(tableName, executeWithQueryId(format("UPDATE %s SET b = 900 WHERE a = 1 AND b = 1", tableName)));
assertQueryIdAndUserStored(tableName, executeWithQueryId(format("UPDATE %s SET b = 900 WHERE a = 1 AND b = 1", tableName)));

// Merge
assertQueryIdStored(tableName, executeWithQueryId(format("MERGE INTO %s t USING %s s ON t.a = s.a AND t.b = s.b " +
assertQueryIdAndUserStored(tableName, executeWithQueryId(format("MERGE INTO %s t USING %s s ON t.a = s.a AND t.b = s.b " +
"WHEN MATCHED THEN UPDATE SET b = t.b * 50", tableName, sourceTableName)));
}

Expand Down Expand Up @@ -8780,9 +8781,11 @@ private QueryId executeWithQueryId(String sql)
.queryId();
}

private void assertQueryIdStored(String tableName, QueryId queryId)
private void assertQueryIdAndUserStored(String tableName, QueryId queryId)
{
assertThat(getFieldFromLatestSnapshotSummary(tableName, TRINO_QUERY_ID_NAME))
.isEqualTo(queryId.toString());
assertThat(getFieldFromLatestSnapshotSummary(tableName, TRINO_USER_NAME))
.isEqualTo("user");
}
}