Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand All @@ -63,7 +62,6 @@
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterPageSize;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.canonicalizeColumnName;
import static io.trino.plugin.hive.util.HiveUtil.escapePathName;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
Expand Down Expand Up @@ -159,8 +157,8 @@ public AbstractDeltaLakePageSink(
case REGULAR:
dataColumnHandles.add(column);
dataColumnsInputIndex.add(inputIndex);
dataColumnNames.add(column.getName());
dataColumnTypes.add(column.getType());
dataColumnNames.add(column.getPhysicalName());
dataColumnTypes.add(column.getSupportedType());
break;
case SYNTHESIZED:
processSynthesizedColumn(column);
Expand Down Expand Up @@ -457,28 +455,18 @@ private FileWriter createParquetFileWriter(String path)
try {
Closeable rollbackAction = () -> fileSystem.deleteFile(path);

List<Type> parquetTypes = dataColumnTypes.stream()
.map(type -> {
if (type instanceof TimestampWithTimeZoneType) {
verify(((TimestampWithTimeZoneType) type).getPrecision() == 3, "Unsupported type: %s", type);
return TIMESTAMP_MILLIS;
}
return type;
})
.collect(toImmutableList());

// we use identity column mapping; input page already contains only data columns per
// DataLagePageSink.getDataPage()
int[] identityMapping = new int[dataColumnTypes.size()];
for (int i = 0; i < identityMapping.length; ++i) {
identityMapping[i] = i;
}

ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(parquetTypes, dataColumnNames, false, false);
ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(dataColumnTypes, dataColumnNames, false, false);
return new ParquetFileWriter(
fileSystem.newOutputFile(path),
rollbackAction,
parquetTypes,
dataColumnTypes,
dataColumnNames,
schemaConverter.getMessageType(),
schemaConverter.getPrimitiveTypes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import org.openjdk.jol.info.ClassLayout;

import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;

import static com.google.common.base.Verify.verify;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.trino.plugin.deltalake.DeltaHiveTypeTranslator.toHiveType;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.RowType.field;
import static io.trino.spi.type.RowType.rowType;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Math.toIntExact;
Expand Down Expand Up @@ -184,4 +187,14 @@ public static DeltaLakeColumnHandle fileModifiedTimeColumnHandle()
{
return new DeltaLakeColumnHandle(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, OptionalInt.empty(), FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, SYNTHESIZED);
}

public Type getSupportedType()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs an @JsonIgnore annotation

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also call this getPhysicalType

{
Type supportedType = getPhysicalType();
if (supportedType instanceof TimestampWithTimeZoneType timestamp) {
verify(timestamp.getPrecision() == 3, "Unsupported type: %s", supportedType);
supportedType = TIMESTAMP_MILLIS;
}
return supportedType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand All @@ -61,7 +60,6 @@
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.json.JsonCodec.listJsonCodec;
import static io.airlift.slice.Slices.utf8Slice;
Expand All @@ -76,7 +74,6 @@
import static io.trino.spi.block.ColumnarRow.toColumnarRow;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Math.toIntExact;
Expand Down Expand Up @@ -355,20 +352,9 @@ private FileWriter createParquetFileWriter(String path, List<DeltaLakeColumnHand
try {
Closeable rollbackAction = () -> fileSystem.deleteFile(path);

List<Type> parquetTypes = dataColumns.stream()
.map(column -> {
Type type = column.getType();
if (type instanceof TimestampWithTimeZoneType timestamp) {
verify(timestamp.getPrecision() == 3, "Unsupported type: %s", type);
return TIMESTAMP_MILLIS;
}
return type;
})
.collect(toImmutableList());

List<String> dataColumnNames = dataColumns.stream()
.map(DeltaLakeColumnHandle::getName)
.collect(toImmutableList());
List<String> dataColumnNames = dataColumns.stream().map(DeltaLakeColumnHandle::getPhysicalName).collect(toImmutableList());
List<Type> parquetTypes = dataColumns.stream().map(DeltaLakeColumnHandle::getSupportedType).collect(toImmutableList());
Comment on lines 355 to 356
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to avoid iterating over the column list twice

Suggested change
List<String> dataColumnNames = dataColumns.stream().map(DeltaLakeColumnHandle::getPhysicalName).collect(toImmutableList());
List<Type> parquetTypes = dataColumns.stream().map(DeltaLakeColumnHandle::getSupportedType).collect(toImmutableList());
ImmutableList.Builder<String> dataColumnNames = ImmutableList.builder();
ImmutableList.Builder<Type> parquetTypes = ImmutableList.builder();
for (DeltaLakeColumnHandle column : dataColumns) {
dataColumnNames.add(..);
parquetTypes.add(...);
}


ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(
parquetTypes,
dataColumnNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CdfFileEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry.Format;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
Expand Down Expand Up @@ -155,6 +156,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.collect.MoreCollectors.toOptional;
import static io.trino.plugin.deltalake.DataFileInfo.DataFileType.DATA;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME;
Expand Down Expand Up @@ -277,6 +279,7 @@ public class DeltaLakeMetadata
private static final int WRITER_VERSION = 2;
// The highest writer version Trino supports writing to
private static final int MAX_WRITER_VERSION = 4;
private static final int MAX_DML_WRITER_VERSION = 5;
// This constant should be used only for a new table
private static final ProtocolEntry DEFAULT_PROTOCOL = new ProtocolEntry(READER_VERSION, WRITER_VERSION);
// Matches the dummy column Databricks stores in the metastore
Expand Down Expand Up @@ -1297,7 +1300,8 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with delta invariants");
}
checkUnsupportedGeneratedColumns(table.getMetadataEntry());
checkSupportedWriterVersion(session, table.getSchemaTableName());
checkUnsupportedColumnMapping(table.getMetadataEntry());
checkSupportedDmlWriterVersion(session, table);

List<DeltaLakeColumnHandle> inputColumns = columns.stream()
.map(handle -> (DeltaLakeColumnHandle) handle)
Expand Down Expand Up @@ -1391,8 +1395,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(
ISOLATION_LEVEL,
true));

// Note: during writes we want to preserve original case of partition columns
List<String> partitionColumns = handle.getMetadataEntry().getOriginalPartitionColumns();
List<String> partitionColumns = getWritePartitionColumnNames(handle.getMetadataEntry().getOriginalPartitionColumns(), handle.getInputColumns());
appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumns, true);

transactionLogWriter.flush();
Expand All @@ -1410,6 +1413,22 @@ public Optional<ConnectorOutputMetadata> finishInsert(
return Optional.empty();
}

private static List<String> getWritePartitionColumnNames(List<String> originalPartitionColumns, List<DeltaLakeColumnHandle> dataColumns)
{
return originalPartitionColumns.stream()
.map(columnName -> {
DeltaLakeColumnHandle dataColumn = dataColumns.stream()
.filter(column -> columnName.equalsIgnoreCase(column.getName()))
.collect(onlyElement());
Comment on lines 1420 to 1422
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is O(n^2) on the column list size, not terrible but not awesome. Can we do better by pre-generating a Map lookup for the list traversal you're doing here?

// Note: during writes we want to preserve original case of partition columns, if the column's name is not differ of column's physical name
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there is no need for the "Note:" prefix in the statement. The comment implies that this is a developer note.

If we add the column mapping as a parameter for the method, we may not need this comment anymore.
If the column mapping is NONE perform the mapping of the column names as in the original code, otherwise for NAME use physical column name and for ID throw illegal argument exception.

if (dataColumn.getPhysicalName().equalsIgnoreCase(columnName)) {
Copy link
Copy Markdown
Contributor

@findinpath findinpath Feb 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this apply only for column mapping NONE ?
If yes, please specify the column mapping as a parameter for the method and use it in the if statement.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, if the behavior is very different for name mapping I'd rewrite this method to only do the name mapping and call it getPartitionColumnsForNameMapping

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of a none mapping you're just returning the input handle.getMetadataEntry().getOriginalPartitionColumns() right?

return columnName;
}
return dataColumn.getPhysicalName();
})
.collect(toImmutableList());
}

@Override
public RowChangeParadigm getRowChangeParadigm(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down Expand Up @@ -1449,7 +1468,8 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkUnsupportedGeneratedColumns(handle.getMetadataEntry());
checkSupportedWriterVersion(session, handle.getSchemaTableName());
checkUnsupportedColumnMapping(handle.getMetadataEntry());
checkSupportedDmlWriterVersion(session, handle);

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);

Expand Down Expand Up @@ -1536,7 +1556,9 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tabl
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(file, writeTimestamp, true));
}

List<String> partitionColumns = handle.getMetadataEntry().getOriginalPartitionColumns();
List<String> partitionColumns = getWritePartitionColumnNames(
handle.getMetadataEntry().getOriginalPartitionColumns(),
((DeltaLakeMergeTableHandle) tableHandle).getInsertTableHandle().getInputColumns());
appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, true);

transactionLogWriter.flush();
Expand Down Expand Up @@ -1787,6 +1809,25 @@ private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry)
}
}

private void checkUnsupportedColumnMapping(MetadataEntry metadataEntry)
{
ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry);
if (!(columnMappingMode == ColumnMappingMode.NONE || columnMappingMode == ColumnMappingMode.NAME)) {
throw new TrinoException(NOT_SUPPORTED, "Writing with column mapping id is not supported");
}
}

private void checkSupportedDmlWriterVersion(ConnectorSession session, DeltaLakeTableHandle table)
{
SchemaTableName schemaTableName = table.getSchemaTableName();
int requiredWriterVersion = getProtocolEntry(session, schemaTableName).getMinWriterVersion();
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
if (requiredWriterVersion == MAX_DML_WRITER_VERSION && (columnMappingMode == ColumnMappingMode.NONE || columnMappingMode == ColumnMappingMode.NAME)) {
return;
}
checkSupportedWriterVersion(session, schemaTableName);
}

private void checkSupportedWriterVersion(ConnectorSession session, SchemaTableName schemaTableName)
{
int requiredWriterVersion = getProtocolEntry(session, schemaTableName).getMinWriterVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ public long getRowCount()
public DataFileInfo getDataFileInfo()
throws IOException
{
List<String> dataColumnNames = columnHandles.stream().map(DeltaLakeColumnHandle::getName).collect(toImmutableList());
List<Type> dataColumnTypes = columnHandles.stream().map(DeltaLakeColumnHandle::getType).collect(toImmutableList());
List<String> dataColumnNames = columnHandles.stream().map(DeltaLakeColumnHandle::getPhysicalName).collect(toImmutableList());
List<Type> dataColumnTypes = columnHandles.stream().map(DeltaLakeColumnHandle::getSupportedType).collect(toImmutableList());
return new DataFileInfo(
relativeFilePath,
getWrittenBytes(),
Expand Down
Loading