Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private DeltaLakeColumnHandle buildColumnHandle(
{
Type type = switch (entryType) {
case TRANSACTION -> schemaManager.getTxnEntryType();
case ADD -> schemaManager.getAddEntryType(metadataEntry, protocolEntry, addStatsMinMaxColumnFilter.orElseThrow(), true, true, true);
case ADD -> schemaManager.getAddEntryType(metadataEntry, protocolEntry, addStatsMinMaxColumnFilter.orElseThrow(), true, true);
case REMOVE -> schemaManager.getRemoveEntryType();
case METADATA -> schemaManager.getMetadataEntryType();
case PROTOCOL -> schemaManager.getProtocolEntryType(true, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.plugin.deltalake.DeltaHiveTypeTranslator;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
Expand Down Expand Up @@ -120,8 +121,7 @@ public RowType getAddEntryType(
ProtocolEntry protocolEntry,
Predicate<String> addStatsMinMaxColumnFilter,
boolean requireWriteStatsAsJson,
boolean requireWriteStatsAsStruct,
boolean usePartitionValuesParsed)
boolean requireWriteStatsAsStruct)
{
List<DeltaLakeColumnMetadata> allColumns = extractSchema(metadataEntry, protocolEntry, typeManager);
List<DeltaLakeColumnMetadata> minMaxColumns = columnsWithStats(metadataEntry, protocolEntry, typeManager);
Expand Down Expand Up @@ -168,16 +168,14 @@ public RowType getAddEntryType(
if (requireWriteStatsAsJson) {
addFields.add(RowType.field("stats", VARCHAR));
}
if (usePartitionValuesParsed) {
if (requireWriteStatsAsStruct) {
List<DeltaLakeColumnHandle> partitionColumns = extractPartitionColumns(metadataEntry, protocolEntry, typeManager);
if (!partitionColumns.isEmpty()) {
List<RowType.Field> partitionValuesParsed = partitionColumns.stream()
.map(column -> RowType.field(column.getColumnName(), column.getType()))
.map(column -> RowType.field(column.getColumnName(), typeManager.getType(DeltaHiveTypeTranslator.toHiveType(column.getType()).getTypeSignature())))
.collect(toImmutableList());
addFields.add(RowType.field("partitionValues_parsed", RowType.from(partitionValuesParsed)));
}
}
if (requireWriteStatsAsStruct) {
addFields.add(RowType.field("stats_parsed", RowType.from(statsColumns.build())));
}
addFields.add(RowType.field("tags", stringMap));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.parquet.writer.ParquetSchemaConverter;
import io.trino.parquet.writer.ParquetWriter;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
Expand Down Expand Up @@ -59,10 +60,12 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonValueToTrinoValue;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.toJsonValues;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.toNullCounts;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_JSON_PROPERTY;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_STRUCT_PROPERTY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static java.lang.Math.multiplyExact;
Expand Down Expand Up @@ -112,14 +115,12 @@ public void write(CheckpointEntries entries, TrinoOutputFile outputFile)
RowType metadataEntryType = checkpointSchemaManager.getMetadataEntryType();
RowType protocolEntryType = checkpointSchemaManager.getProtocolEntryType(protocolEntry.getReaderFeatures().isPresent(), protocolEntry.getWriterFeatures().isPresent());
RowType txnEntryType = checkpointSchemaManager.getTxnEntryType();
// TODO https://github.com/trinodb/trino/issues/19586 Add support for writing 'partitionValues_parsed' field
RowType addEntryType = checkpointSchemaManager.getAddEntryType(
entries.getMetadataEntry(),
entries.getProtocolEntry(),
alwaysTrue(),
writeStatsAsJson,
writeStatsAsStruct,
false);
writeStatsAsStruct);
RowType removeEntryType = checkpointSchemaManager.getRemoveEntryType();

List<String> columnNames = ImmutableList.of(
Expand Down Expand Up @@ -154,8 +155,12 @@ public void write(CheckpointEntries entries, TrinoOutputFile outputFile)
for (TransactionEntry transactionEntry : entries.getTransactionEntries()) {
writeTransactionEntry(pageBuilder, txnEntryType, transactionEntry);
}
List<DeltaLakeColumnHandle> partitionColumns = extractPartitionColumns(entries.getMetadataEntry(), entries.getProtocolEntry(), typeManager);
List<RowType.Field> partitionValuesParsedFieldTypes = partitionColumns.stream()
.map(column -> RowType.field(column.getColumnName(), column.getType()))
.collect(toImmutableList());
for (AddFileEntry addFileEntry : entries.getAddFileEntries()) {
writeAddFileEntry(pageBuilder, addEntryType, addFileEntry, entries.getMetadataEntry(), entries.getProtocolEntry(), writeStatsAsJson, writeStatsAsStruct);
writeAddFileEntry(pageBuilder, addEntryType, addFileEntry, entries.getMetadataEntry(), entries.getProtocolEntry(), partitionColumns, partitionValuesParsedFieldTypes, writeStatsAsJson, writeStatsAsStruct);
}
for (RemoveFileEntry removeFileEntry : entries.getRemoveFileEntries()) {
writeRemoveFileEntry(pageBuilder, removeEntryType, removeFileEntry);
Expand Down Expand Up @@ -228,7 +233,16 @@ private void writeTransactionEntry(PageBuilder pageBuilder, RowType entryType, T
appendNullOtherBlocks(pageBuilder, TXN_BLOCK_CHANNEL);
}

private void writeAddFileEntry(PageBuilder pageBuilder, RowType entryType, AddFileEntry addFileEntry, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, boolean writeStatsAsJson, boolean writeStatsAsStruct)
private void writeAddFileEntry(
PageBuilder pageBuilder,
RowType entryType,
AddFileEntry addFileEntry,
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
List<DeltaLakeColumnHandle> partitionColumns,
List<RowType.Field> partitionValuesParsedFieldTypes,
boolean writeStatsAsJson,
boolean writeStatsAsStruct)
{
pageBuilder.declarePosition();
RowBlockBuilder blockBuilder = (RowBlockBuilder) pageBuilder.getBlockBuilder(ADD_BLOCK_CHANNEL);
Expand All @@ -255,6 +269,11 @@ private void writeAddFileEntry(PageBuilder pageBuilder, RowType entryType, AddFi
}

if (writeStatsAsStruct) {
if (!addFileEntry.getPartitionValues().isEmpty()) {
writeParsedPartitionValues(fieldBuilders.get(fieldId), entryType, addFileEntry, partitionColumns, partitionValuesParsedFieldTypes, fieldId);
fieldId++;
}

writeParsedStats(fieldBuilders.get(fieldId), entryType, addFileEntry, fieldId);
fieldId++;
}
Expand Down Expand Up @@ -303,6 +322,32 @@ private Optional<String> getStatsString(DeltaLakeJsonFileStatistics parsedStats)
}
}

private void writeParsedPartitionValues(
BlockBuilder entryBlockBuilder,
RowType entryType,
AddFileEntry addFileEntry,
List<DeltaLakeColumnHandle> partitionColumns,
List<RowType.Field> partitionValuesParsedFieldTypes,
int fieldId)
{
RowType partitionValuesParsedType = getInternalRowType(entryType, fieldId, "partitionValues_parsed");
((RowBlockBuilder) entryBlockBuilder).buildEntry(fieldBuilders -> {
for (int i = 0; i < partitionValuesParsedFieldTypes.size(); i++) {
RowType.Field partitionValueField = partitionValuesParsedFieldTypes.get(i);
String partitionColumnName = partitionValueField.getName().orElseThrow();
String partitionValue = addFileEntry.getPartitionValues().get(partitionColumnName);
Comment thread
findinpath marked this conversation as resolved.
Outdated
validateAndGetField(partitionValuesParsedType, i, partitionColumnName);
if (partitionValue == null) {
fieldBuilders.get(i).appendNull();
continue;
}
DeltaLakeColumnHandle partitionColumn = partitionColumns.get(i);
Object deserializedPartitionValue = deserializePartitionValue(partitionColumn, Optional.of(partitionValue));
writeNativeValue(partitionValueField.getType(), fieldBuilders.get(i), deserializedPartitionValue);
}
});
}

private void writeParsedStats(BlockBuilder entryBlockBuilder, RowType entryType, AddFileEntry addFileEntry, int fieldId)
{
RowType statsType = getInternalRowType(entryType, fieldId, "stats_parsed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,102 @@ public void testDeltaLakeWithPartitionValuesParsedAllTypes()
assertPartitionValuesParsedCondition(tableName, 3, "part_timestamp_ntz IS NULL");
}

/**
* @see deltalake.partition_values_parsed_all_types
*/
@Test
public void testDeltaLakeWritePartitionValuesParsedAllTypesInCheckpoint()
throws Exception
{
String tableName = "test_write_partition_values_parsed_checkpoint_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource("deltalake/partition_values_parsed_all_types").toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));

assertThat(query("SELECT * FROM " + tableName))
.skippingTypesCheck()
.matches("""
VALUES
(1, true, TINYINT '1', SMALLINT '10', 100, BIGINT '1000', CAST('123.12' AS DECIMAL(5,2)), CAST('123456789012345678.123' AS DECIMAL(21,3)), DOUBLE '1.2', REAL '3.4', 'a', DATE '2020-08-21', TIMESTAMP '2020-10-21 01:00:00.123 UTC', TIMESTAMP '2023-01-02 01:02:03.456'),
(2, false, TINYINT '2', SMALLINT '20', 200, BIGINT '2000', CAST('223.12' AS DECIMAL (5,2)), CAST('223456789012345678.123' AS DECIMAL(21,3)), DOUBLE '10.2', REAL '30.4', 'b', DATE '2020-08-22', TIMESTAMP '2020-10-22 01:00:00.123 UTC', TIMESTAMP '2023-01-03 01:02:03.456'),
(3, null, null, null, null, null, null, null, null, null, null, null, null, null)""");

// Create a new checkpoint
assertUpdate("INSERT INTO " + tableName + " VALUES (4, false, TINYINT '4', SMALLINT '40', 400, BIGINT '4000', CAST('444.44' AS DECIMAL(5,2)), CAST('4444444.444' AS DECIMAL(21,3)), DOUBLE '4.4', REAL '4.4', 'd', DATE '2020-08-24', TIMESTAMP '2020-10-24 01:00:00.123 UTC', TIMESTAMP '2023-01-04 01:02:03.456')", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES (5, false, TINYINT '5', SMALLINT '50', 500, BIGINT '5000', CAST('555.5' AS DECIMAL(5,2)), CAST('55555.55' AS DECIMAL(21,3)), DOUBLE '5.55', REAL '5.5555', 'd', DATE '2020-08-25', TIMESTAMP '2020-10-25 01:00:00.123 UTC', TIMESTAMP '2023-01-05 01:02:03.456')", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES (6, null, null, null, null, null, null, null, null, null, null, null, null, null)", 1);

assertThat(query("SELECT * FROM " + tableName))
.skippingTypesCheck()
.matches("""
VALUES
(1, true, TINYINT '1', SMALLINT '10', 100, BIGINT '1000', CAST('123.12' AS DECIMAL(5,2)), CAST('123456789012345678.123' AS DECIMAL(21,3)), DOUBLE '1.2', REAL '3.4', 'a', DATE '2020-08-21', TIMESTAMP '2020-10-21 01:00:00.123 UTC', TIMESTAMP '2023-01-02 01:02:03.456'),
(2, false, TINYINT '2', SMALLINT '20', 200, BIGINT '2000', CAST('223.12' AS DECIMAL (5,2)), CAST('223456789012345678.123' AS DECIMAL(21,3)), DOUBLE '10.2', REAL '30.4', 'b', DATE '2020-08-22', TIMESTAMP '2020-10-22 01:00:00.123 UTC', TIMESTAMP '2023-01-03 01:02:03.456'),
(3, null, null, null, null, null, null, null, null, null, null, null, null, null),
(4, false, TINYINT '4', SMALLINT '40', 400, BIGINT '4000', CAST('444.44' AS DECIMAL(5,2)), CAST('4444444.444' AS DECIMAL(21,3)), DOUBLE '4.4', REAL '4.4', 'd', DATE '2020-08-24', TIMESTAMP '2020-10-24 01:00:00.123 UTC', TIMESTAMP '2023-01-04 01:02:03.456'),
(5, false, TINYINT '5', SMALLINT '50', 500, BIGINT '5000', CAST('555.5' AS DECIMAL(5,2)), CAST('55555.55' AS DECIMAL(21,3)), DOUBLE '5.55', REAL '5.5555', 'd', DATE '2020-08-25', TIMESTAMP '2020-10-25 01:00:00.123 UTC', TIMESTAMP '2023-01-05 01:02:03.456'),
(6, null, null, null, null, null, null, null, null, null, null, null, null, null)""");

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty("delta", "checkpoint_filtering_enabled", "true")
.build();
assertThat(query(session, """
SELECT id
FROM %s
WHERE
part_boolean = true AND
part_tinyint = TINYINT '1' AND
part_smallint= SMALLINT '10' AND
part_int = 100 AND
part_bigint = BIGINT '1000' AND
part_short_decimal = CAST('123.12' AS DECIMAL(5,2)) AND
part_long_decimal = CAST('123456789012345678.123' AS DECIMAL(21,3)) AND
part_double = DOUBLE '1.2' AND
part_float = REAL '3.4' AND
part_varchar = 'a' AND
part_date = DATE '2020-08-21' AND
part_timestamp = TIMESTAMP '2020-10-21 01:00:00.123 UTC' AND
part_timestamp_ntz =TIMESTAMP '2023-01-02 01:02:03.456'""".formatted(tableName)))
.matches("VALUES 1");
}

/**
* @see databricks133.partition_values_parsed_case_sensitive
*/
@Test
public void testDeltaLakeWritePartitionValuesParsedCaseSensitiveInCheckpoint()
throws Exception
{
String tableName = "test_write_partition_values_parsed_checkpoint_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource("databricks133/partition_values_parsed_case_sensitive").toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));

assertThat(query("SELECT * FROM " + tableName))
.skippingTypesCheck()
.matches("""
VALUES
(100, 1, 'ala'),
(200, 2,'kota'),
(300, 3, 'osla')""");

// Create a new checkpoint
assertUpdate("INSERT INTO " + tableName + " VALUES (400, 4, 'kon')", 1);
assertThat(query("SELECT * FROM " + tableName))
.skippingTypesCheck()
.matches("""
VALUES
(100, 1, 'ala'),
(200, 2,'kota'),
(300, 3, 'osla'),
(400, 4, 'kon')""");
Session session = Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty("delta", "checkpoint_filtering_enabled", "true")
.build();
assertThat(query(session, "SELECT id FROM " + tableName + " WHERE part_NuMbEr = 1 AND part_StRiNg = 'ala'"))
.matches("VALUES 100");
}

private void assertPartitionValuesParsedCondition(String tableName, int id, @Language("SQL") String condition)
{
Session session = Session.builder(getQueryRunner().getDefaultSession())
Expand Down
Loading