Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ values. Typical usage does not require you to configure them.

* ``NONE``
* ``SNAPPY``
* ``LZ4``
* ``ZSTD``
* ``GZIP``
- ``SNAPPY``
Expand Down Expand Up @@ -220,11 +219,6 @@ connector.
* - ``parquet.max-read-block-row-count``
- Sets the maximum number of rows read in a batch.
- ``8192``
* - ``parquet.optimized-writer.enabled``
- Whether the optimized writer should be used when writing Parquet files.
The equivalent catalog session property is
``parquet_optimized_writer_enabled``.
- ``true``
* - ``parquet.optimized-reader.enabled``
- Whether batched column readers should be used when reading Parquet files
for improved performance. Set this property to ``false`` to disable the
Expand All @@ -243,9 +237,6 @@ configure processing of Parquet files.
* - Property name
- Description
- Default
* - ``parquet_optimized_writer_enabled``
- Whether the optimized writer should be used when writing Parquet files.
- ``true``
* - ``parquet_optimized_reader_enabled``
- Whether batched column readers should be used when reading Parquet files
for improved performance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public void setup(Binder binder)
binder.bind(Key.get(boolean.class, TranslateHiveViews.class)).toInstance(false);
configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);
configBinder(binder).bindConfigDefaults(ParquetWriterConfig.class, config -> config.setParquetOptimizedWriterEnabled(true));

install(new ConnectorAccessControlModule());
newOptionalBinder(binder, DeltaLakeAccessControlMetadataFactory.class).setDefault().toInstance(SYSTEM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
import io.airlift.slice.Slice;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.parquet.writer.ParquetSchemaConverter;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.plugin.hive.FileWriter;
import io.trino.plugin.hive.HivePartitionKey;
import io.trino.plugin.hive.RecordFileWriter;
import io.trino.plugin.hive.parquet.ParquetFileWriter;
import io.trino.plugin.hive.util.HiveUtil;
import io.trino.plugin.hive.util.HiveWriteUtils;
Expand All @@ -42,12 +39,8 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTimeZone;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -58,23 +51,16 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.trino.hdfs.ConfigurationUtils.toJobConf;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE;
import static io.trino.plugin.deltalake.DeltaLakeSchemaProperties.buildHiveSchema;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getCompressionCodec;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterBlockSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterPageSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetOptimizedWriterEnabled;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.canonicalizeColumnName;
import static io.trino.plugin.hive.HiveStorageFormat.PARQUET;
import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression;
import static io.trino.plugin.hive.util.HiveUtil.escapePathName;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static java.lang.String.format;
Expand Down Expand Up @@ -110,8 +96,6 @@ public class DeltaLakePageSink
private final String outputPath;
private final ConnectorSession session;
private final DeltaLakeWriterStats stats;
private final JobConf conf;
private final TypeManager typeManager;
private final String trinoVersion;
private final long targetMaxFileSize;

Expand All @@ -125,14 +109,12 @@ public DeltaLakePageSink(
List<DeltaLakeColumnHandle> inputColumns,
List<String> originalPartitionColumns,
PageIndexerFactory pageIndexerFactory,
HdfsEnvironment hdfsEnvironment,
TrinoFileSystemFactory fileSystemFactory,
int maxOpenWriters,
JsonCodec<DataFileInfo> dataFileInfoCodec,
String outputPath,
ConnectorSession session,
DeltaLakeWriterStats stats,
TypeManager typeManager,
String trinoVersion)
{
requireNonNull(inputColumns, "inputColumns is null");
Expand Down Expand Up @@ -195,11 +177,6 @@ public DeltaLakePageSink(
this.outputPath = outputPath;
this.session = requireNonNull(session, "session is null");
this.stats = stats;

Configuration conf = hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(outputPath));
configureCompression(conf, getCompressionCodec(session));
this.conf = toJobConf(conf);
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.targetMaxFileSize = DeltaLakeSessionProperties.getTargetMaxFileSize(session);
}
Expand Down Expand Up @@ -343,21 +320,15 @@ private int[] getWriterIndexes(Page page)
while (writers.size() <= pageIndexer.getMaxIndex()) {
writers.add(null);
}
boolean isOptimizedParquetWriter = isParquetOptimizedWriterEnabled(session);
// create missing writers
for (int position = 0; position < page.getPositionCount(); position++) {
int writerIndex = writerIndexes[position];
DeltaLakeWriter deltaLakeWriter = writers.get(writerIndex);
if (deltaLakeWriter != null) {
if (isOptimizedParquetWriter) {
if (deltaLakeWriter.getWrittenBytes() <= targetMaxFileSize) {
continue;
}
closeWriter(writerIndex);
}
else {
if (deltaLakeWriter.getWrittenBytes() <= targetMaxFileSize) {
continue;
}
closeWriter(writerIndex);
}

Path filePath = new Path(outputPath);
Expand All @@ -373,13 +344,7 @@ private int[] getWriterIndexes(Page page)
String fileName = session.getQueryId() + "-" + randomUUID();
filePath = new Path(filePath, fileName);

FileWriter fileWriter;
if (isOptimizedParquetWriter) {
fileWriter = createParquetFileWriter(filePath.toString());
}
else {
fileWriter = createRecordFileWriter(filePath);
}
FileWriter fileWriter = createParquetFileWriter(filePath.toString());

Path rootTableLocation = new Path(outputPath);
DeltaLakeWriter writer = new DeltaLakeWriter(
Expand Down Expand Up @@ -500,21 +465,6 @@ private FileWriter createParquetFileWriter(String path)
}
}

private FileWriter createRecordFileWriter(Path path)
{
Properties schema = buildHiveSchema(dataColumnNames, dataColumnTypes);
return new RecordFileWriter(
path,
dataColumnNames,
fromHiveStorageFormat(PARQUET),
schema,
PARQUET.getEstimatedWriterMemoryUsage(),
conf,
typeManager,
DateTimeZone.UTC,
session);
}

private Page getDataPage(Page page)
{
Block[] blocks = new Block[dataColumnInputIndex.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import io.airlift.json.JsonCodec;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.deltalake.procedure.DeltaLakeTableExecuteHandle;
import io.trino.plugin.deltalake.procedure.DeltaTableOptimizeHandle;
import io.trino.plugin.hive.NodeVersion;
Expand All @@ -30,7 +29,6 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.type.TypeManager;
import org.joda.time.DateTimeZone;

import javax.inject.Inject;
Expand All @@ -42,38 +40,32 @@ public class DeltaLakePageSinkProvider
{
private final PageIndexerFactory pageIndexerFactory;
private final TrinoFileSystemFactory fileSystemFactory;
private final HdfsEnvironment hdfsEnvironment;
private final JsonCodec<DataFileInfo> dataFileInfoCodec;
private final JsonCodec<DeltaLakeMergeResult> mergeResultJsonCodec;
private final DeltaLakeWriterStats stats;
private final int maxPartitionsPerWriter;
private final DateTimeZone parquetDateTimeZone;
private final TypeManager typeManager;
private final String trinoVersion;
private final int domainCompactionThreshold;

@Inject
public DeltaLakePageSinkProvider(
PageIndexerFactory pageIndexerFactory,
TrinoFileSystemFactory fileSystemFactory,
HdfsEnvironment hdfsEnvironment,
JsonCodec<DataFileInfo> dataFileInfoCodec,
JsonCodec<DeltaLakeMergeResult> mergeResultJsonCodec,
DeltaLakeWriterStats stats,
DeltaLakeConfig deltaLakeConfig,
TypeManager typeManager,
NodeVersion nodeVersion)
{
this.pageIndexerFactory = pageIndexerFactory;
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.hdfsEnvironment = hdfsEnvironment;
this.dataFileInfoCodec = dataFileInfoCodec;
this.mergeResultJsonCodec = requireNonNull(mergeResultJsonCodec, "mergeResultJsonCodec is null");
this.stats = stats;
this.maxPartitionsPerWriter = deltaLakeConfig.getMaxPartitionsPerWriter();
this.parquetDateTimeZone = deltaLakeConfig.getParquetDateTimeZone();
this.domainCompactionThreshold = deltaLakeConfig.getDomainCompactionThreshold();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.trinoVersion = nodeVersion.toString();
}

Expand All @@ -85,14 +77,12 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
tableHandle.getInputColumns(),
tableHandle.getPartitionedBy(),
pageIndexerFactory,
hdfsEnvironment,
fileSystemFactory,
maxPartitionsPerWriter,
dataFileInfoCodec,
tableHandle.getLocation(),
session,
stats,
typeManager,
trinoVersion);
}

Expand All @@ -104,14 +94,12 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
tableHandle.getInputColumns(),
tableHandle.getMetadataEntry().getOriginalPartitionColumns(),
pageIndexerFactory,
hdfsEnvironment,
fileSystemFactory,
maxPartitionsPerWriter,
dataFileInfoCodec,
tableHandle.getLocation(),
session,
stats,
typeManager,
trinoVersion);
}

Expand All @@ -126,14 +114,12 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
optimizeHandle.getTableColumns(),
optimizeHandle.getOriginalPartitionColumns(),
pageIndexerFactory,
hdfsEnvironment,
fileSystemFactory,
maxPartitionsPerWriter,
dataFileInfoCodec,
executeHandle.getTableLocation(),
session,
stats,
typeManager,
trinoVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public final class DeltaLakeSessionProperties
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "parquet_optimized_writer_enabled"; // = HiveSessionProperties#PARQUET_OPTIMIZED_WRITER_ENABLED
private static final String COMPRESSION_CODEC = "compression_codec";
// This property is not supported by Delta Lake and exists solely for technical reasons.
@Deprecated
Expand Down Expand Up @@ -137,11 +136,6 @@ public DeltaLakeSessionProperties(
"Target maximum size of written files; the actual size may be larger",
deltaLakeConfig.getTargetMaxFileSize(),
false),
booleanProperty(
PARQUET_OPTIMIZED_WRITER_ENABLED,
"Enable optimized writer",
parquetWriterConfig.isParquetOptimizedWriterEnabled(),
false),
enumProperty(
TIMESTAMP_PRECISION,
"Internal Delta Lake connector property",
Expand Down Expand Up @@ -174,6 +168,11 @@ public DeltaLakeSessionProperties(
"Compression codec to use when writing new data files",
HiveCompressionCodec.class,
deltaLakeConfig.getCompressionCodec(),
value -> {
if (value == HiveCompressionCodec.LZ4) {
throw new TrinoException(INVALID_SESSION_PROPERTY, "Unsupported codec: LZ4");
}
},
false));
}

Expand Down Expand Up @@ -223,11 +222,6 @@ public static boolean isParquetOptimizedReaderEnabled(ConnectorSession session)
return session.getProperty(PARQUET_OPTIMIZED_READER_ENABLED, Boolean.class);
}

public static boolean isParquetOptimizedWriterEnabled(ConnectorSession session)
{
return session.getProperty(PARQUET_OPTIMIZED_WRITER_ENABLED, Boolean.class);
}

public static DataSize getParquetWriterBlockSize(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_BLOCK_SIZE, DataSize.class);
Expand Down
Loading