Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,20 @@ public Optional<ConnectorOutputMetadata> finishInsert(
transactionLogWriter.flush();
writeCommitted = true;
writeCheckpointIfNeeded(session, new SchemaTableName(handle.getSchemaName(), handle.getTableName()), checkpointInterval, commitVersion);

if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty() && !dataFileInfos.isEmpty()) {
// TODO (https://github.com/trinodb/trino/issues/16088) Add synchronization when version conflict for INSERT is resolved.
Optional<Instant> maxFileModificationTime = dataFileInfos.stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxFileModificationTime probably doesn't need to be optional, since for empty insert there should be no stats collected.
@findinpath does empty insert still create a transaction log entry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and insert creates transaction log entry for that case.
I will add check for that case, but I would leave Optional here to meet updateTableStatistics arguments, as in general maxFileModificationTime can be empty (for example during ANALYZE when it can't be retrieved from computed statistics).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findinpath does empty insert still create a transaction log entry?

I remember doing something like this for Iceberg:

if (commitTasks.isEmpty()) {
transaction = null;
return Optional.empty();
}

I don't think we did this for Delta Lake.
We should probably add an issue to handle this aspect as well on Delta.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map(DataFileInfo::getCreationTime)
Copy link
Contributor

@findinpath findinpath Feb 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess for finishInsert is not relevant, but eventually for finishMerge (follow-up PR) do take into account that you'll need to filter out only data files dataFile.getDataFileType() == DATA

.max(Long::compare)
.map(Instant::ofEpochMilli);
updateTableStatistics(
session,
Optional.empty(),
handle.getLocation(),
maxFileModificationTime,
computedStatistics);
}
}
catch (Exception e) {
if (!writeCommitted) {
Expand Down Expand Up @@ -2207,10 +2221,18 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector
.map(ColumnMetadata::getName)
.collect(toImmutableSet());

Optional<ExtendedStatistics> existingStatistics = Optional.empty();
Optional<Set<String>> analyzeColumnNames = Optional.empty();
String tableLocation = getLocation(tableMetadata.getProperties());
if (tableLocation != null) {
existingStatistics = statisticsAccess.readExtendedStatistics(session, tableLocation);
analyzeColumnNames = existingStatistics.flatMap(ExtendedStatistics::getAnalyzedColumns);
}

return getStatisticsCollectionMetadata(
Optional.empty(),
existingStatistics,
tableMetadata.getColumns(),
allColumnNames,
analyzeColumnNames.orElse(allColumnNames),
// File modified time does not need to be collected as a statistics because it gets derived directly from files being written
false);
}
Expand Down Expand Up @@ -2327,7 +2349,13 @@ private void updateTableStatistics(
finalAlreadyAnalyzedModifiedTimeMax = Comparators.max(oldStatistics.get().getAlreadyAnalyzedModifiedTimeMax(), finalAlreadyAnalyzedModifiedTimeMax);
}

analyzeHandle.flatMap(AnalyzeHandle::getColumns).ifPresent(analyzeColumns -> {
Optional<Set<String>> analyzedColumns = analyzeHandle.flatMap(AnalyzeHandle::getColumns);
// If update is invoked by other command than ANALYZE, statistics should preserve previous columns set.
if (analyzeHandle.isEmpty()) {
analyzedColumns = oldStatistics.flatMap(ExtendedStatistics::getAnalyzedColumns);
}

analyzedColumns.ifPresent(analyzeColumns -> {
if (!mergedColumnStatistics.keySet().equals(analyzeColumns)) {
// sanity validation
throw new IllegalStateException(format("Unexpected columns in in mergedColumnStatistics %s; expected %s", mergedColumnStatistics.keySet(), analyzeColumns));
Expand All @@ -2337,7 +2365,7 @@ private void updateTableStatistics(
ExtendedStatistics mergedExtendedStatistics = new ExtendedStatistics(
finalAlreadyAnalyzedModifiedTimeMax,
mergedColumnStatistics,
analyzeHandle.flatMap(AnalyzeHandle::getColumns));
analyzedColumns);

statisticsAccess.updateExtendedStatistics(session, location, mergedExtendedStatistics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,9 +945,9 @@ public void testInsertIntoNonLowercaseColumnTable()
"SHOW STATS FOR insert_nonlowercase_columns",
"VALUES " +
// column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
"('lower_case_string', null, null, 0.5, null, null, null)," +
"('upper_case_string', null, null, 0.5, null, null, null)," +
"('mixed_case_string', null, null, 0.5, null, null, null)," +
"('lower_case_string', 10.0, 1.0, 0.5, null, null, null)," +
"('upper_case_string', 10.0, 1.0, 0.5, null, null, null)," +
"('mixed_case_string', 10.0, 1.0, 0.5, null, null, null)," +
"(null, null, null, null, 8.0, null, null)");
}

Expand Down Expand Up @@ -985,7 +985,7 @@ public void testInsertNestedNonLowercaseColumns()
"SHOW STATS FOR insert_nested_nonlowercase_columns",
"VALUES " +
// column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
"('an_int', null, null, 0.0, null, 1, 40)," +
"('an_int', null, 4.0, 0.0, null, 1, 40)," +
"('nested', null, null, null, null, null, null)," +
"(null, null, null, null, 8.0, null, null)");
}
Expand Down Expand Up @@ -1047,8 +1047,8 @@ public void testInsertIntoPartitionedNonLowercaseColumnTable()
"SHOW STATS FOR insert_nonlowercase_columns_partitioned",
"VALUES " +
// column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
"('lower_case_string', null, null, 0.5, null, null, null)," +
"('upper_case_string', null, null, 0.5, null, null, null)," +
"('lower_case_string', 10.0, 1.0, 0.5, null, null, null)," +
"('upper_case_string', 10.0, 1.0, 0.5, null, null, null)," +
"('mixed_case_string', null, 2.0, 0.5, null, null, null)," +
"(null, null, null, null, 8.0, null, null)");
}
Expand Down Expand Up @@ -1172,7 +1172,9 @@ public void testCheckpointWriteStatsAsStruct(String type, String sampleValue, St
tableName,
type,
getLocationForTable(bucketName, tableName)));
assertUpdate("INSERT INTO " + tableName + " SELECT " + sampleValue + " UNION ALL SELECT " + highValue, 2);
assertUpdate(
disableStatisticsCollectionOnWrite(getSession()),
"INSERT INTO " + tableName + " SELECT " + sampleValue + " UNION ALL SELECT " + highValue, 2);

// TODO: Open checkpoint parquet file and verify 'stats_parsed' field directly
assertThat(getTableFiles(tableName))
Expand Down Expand Up @@ -1226,8 +1228,8 @@ public void testCheckpointWriteStatsAsStructWithPartiallyUnsupportedColumnStats(
assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('col', null, null, 0.0, null, 1, 1)," +
"('unsupported', null, null, 0.0, null, null, null)," +
"('col', null, 1.0, 0.0, null, 1, 1)," +
"('unsupported', null, 1.0, 0.0, null, null, null)," +
"(null, null, null, null, 1.0, null, null)");

assertUpdate("DROP TABLE " + tableName);
Expand Down Expand Up @@ -1362,14 +1364,13 @@ private void testDeltaLakeTableLocationChanged(boolean fewerEntries, boolean fir
public void testAnalyze()
{
String tableName = "test_analyze_" + randomNameSuffix();
Session sessionWithDisabledStatisticsOnWrite = Session.builder(getSession())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate commit -unrelated to the current commit

.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), EXTENDED_STATISTICS_COLLECT_ON_WRITE, "false")
.build();
assertUpdate(sessionWithDisabledStatisticsOnWrite, "CREATE TABLE " + tableName
+ " WITH ("
+ "location = '" + getLocationForTable(bucketName, tableName) + "'"
+ ")"
+ " AS SELECT * FROM tpch.sf1.nation", 25);
assertUpdate(
disableStatisticsCollectionOnWrite(getSession()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate commit pls

"CREATE TABLE " + tableName
+ " WITH ("
+ "location = '" + getLocationForTable(bucketName, tableName) + "'"
+ ")"
+ " AS SELECT * FROM tpch.sf1.nation", 25);

assertQuery(
"SHOW STATS FOR " + tableName,
Expand Down Expand Up @@ -1950,4 +1951,11 @@ private String getTableLocation(String tableName)
}
throw new IllegalStateException("Location not found in SHOW CREATE TABLE result");
}

private static Session disableStatisticsCollectionOnWrite(Session session)
{
return Session.builder(session)
.setCatalogSessionProperty(session.getCatalog().orElseThrow(), EXTENDED_STATISTICS_COLLECT_ON_WRITE, "false")
.build();
}
}
Loading