Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
import io.trino.spi.connector.ConnectorSession;
import io.trino.testing.TestingConnectorSession;

public final class DeltaTestingConnectorSession
{
public static final ConnectorSession SESSION = TestingConnectorSession.builder()
.setPropertyMetadata(new DeltaLakeSessionProperties(new DeltaLakeConfig(), new ParquetReaderConfig(), new ParquetWriterConfig()).getSessionProperties())
.build();

private DeltaTestingConnectorSession() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.SESSION;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.DoubleType.DOUBLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.airlift.json.JsonCodecFactory;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess;
import io.trino.plugin.deltalake.statistics.ExtendedStatistics;
Expand Down Expand Up @@ -49,9 +48,7 @@
import io.trino.plugin.hive.metastore.file.FileHiveMetastore;
import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.PrincipalType;
Expand All @@ -62,7 +59,6 @@
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.TypeManager;
import io.trino.testing.TestingConnectorContext;
import io.trino.testing.TestingConnectorSession;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

Expand All @@ -74,6 +70,7 @@

import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.PATH_PROPERTY;
import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION;
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_PROPERTY;
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE;
import static io.trino.spi.type.BigintType.BIGINT;
Expand All @@ -91,9 +88,6 @@
public class TestDeltaLakeMetastoreStatistics
{
private static final ColumnHandle COLUMN_HANDLE = new DeltaLakeColumnHandle("val", DoubleType.DOUBLE, REGULAR);
private static final ConnectorSession SESSION = TestingConnectorSession.builder()
.setPropertyMetadata(new DeltaLakeSessionProperties(new DeltaLakeConfig(), new ParquetReaderConfig(), new ParquetWriterConfig()).getSessionProperties())
.build();

private DeltaLakeMetastore deltaLakeMetastore;
private HiveMetastore hiveMetastore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@
import java.util.Set;

import static com.google.common.io.Resources.getResource;
import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.COMMIT;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.TRANSACTION;
import static io.trino.plugin.hive.HiveTestUtils.SESSION;
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,15 @@
import io.trino.plugin.hive.HdfsConfiguration;
import io.trino.plugin.hive.HdfsConfigurationInitializer;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.HiveHdfsConfiguration;
import io.trino.plugin.hive.HiveSessionProperties;
import io.trino.plugin.hive.authentication.NoHdfsAuthentication;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.spi.block.Block;
import io.trino.spi.block.RowBlock;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Int128;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.TypeManager;
import io.trino.testing.TestingConnectorSession;
import io.trino.util.DateTimeUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -61,12 +57,12 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.TRANSACTION;
import static io.trino.plugin.hive.HiveTestUtils.getHiveSessionProperties;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
Expand All @@ -79,7 +75,6 @@
public class TestCheckpointWriter
{
private final TypeManager typeManager = TESTING_TYPE_MANAGER;
private ConnectorSession session;
private CheckpointSchemaManager checkpointSchemaManager;
private HdfsEnvironment hdfsEnvironment;

Expand All @@ -90,11 +85,6 @@ public void setUp()
HdfsConfig hdfsConfig = new HdfsConfig();
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), Set.of());
hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication());

HiveSessionProperties hiveSessionProperties = getHiveSessionProperties(new HiveConfig());
session = TestingConnectorSession.builder()
.setPropertyMetadata(hiveSessionProperties.getSessionProperties())
.build();
}

@Test
Expand Down Expand Up @@ -277,7 +267,7 @@ public void testCheckpointWriteReadRoundtrip()

Path targetPath = new Path("file://" + targetFile.getAbsolutePath());
targetFile.delete(); // file must not exist when writer is called
writer.write(session, entries, targetPath);
writer.write(SESSION, entries, targetPath);

CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, true);
assertEquals(readEntries.getTransactionEntries(), entries.getTransactionEntries());
Expand Down Expand Up @@ -351,7 +341,7 @@ public void testDisablingRowStatistics()

Path targetPath = new Path("file://" + targetFile.getAbsolutePath());
targetFile.delete(); // file must not exist when writer is called
writer.write(session, entries, targetPath);
writer.write(SESSION, entries, targetPath);

CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, false);
AddFileEntry addFileEntry = getOnlyElement(readEntries.getAddFileEntries());
Expand Down Expand Up @@ -416,12 +406,12 @@ private Optional<Map<String, Object>> makeComparableStatistics(Optional<Map<Stri
private CheckpointEntries readCheckpoint(Path checkpointPath, MetadataEntry metadataEntry, boolean rowStatisticsEnabled)
throws IOException
{
FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsEnvironment.HdfsContext(session), checkpointPath);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsEnvironment.HdfsContext(SESSION), checkpointPath);
FileStatus fileStatus = fileSystem.getFileStatus(checkpointPath);

Iterator<DeltaLakeTransactionLogEntry> checkpointEntryIterator = new CheckpointEntryIterator(
checkpointPath,
session,
SESSION,
fileStatus.getLen(),
checkpointSchemaManager,
typeManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hive;

import io.trino.orc.metadata.CompressionKind;
import org.apache.avro.file.DataFileConstants;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
Expand All @@ -27,21 +28,30 @@

public enum HiveCompressionCodec
{
NONE(null, CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED),
SNAPPY(SnappyCodec.class, CompressionKind.SNAPPY, CompressionCodecName.SNAPPY),
LZ4(Lz4Codec.class, CompressionKind.LZ4, CompressionCodecName.LZ4),
ZSTD(ZStandardCodec.class, CompressionKind.ZSTD, CompressionCodecName.ZSTD),
GZIP(GzipCodec.class, CompressionKind.ZLIB, CompressionCodecName.GZIP);
NONE(null, CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED, DataFileConstants.NULL_CODEC),
SNAPPY(SnappyCodec.class, CompressionKind.SNAPPY, CompressionCodecName.SNAPPY, DataFileConstants.SNAPPY_CODEC),
LZ4(Lz4Codec.class, CompressionKind.LZ4, CompressionCodecName.LZ4, null),
ZSTD(ZStandardCodec.class, CompressionKind.ZSTD, CompressionCodecName.ZSTD, DataFileConstants.ZSTANDARD_CODEC),
// Using DEFLATE for GZIP for Avro for now so Avro files can be written in default configuration
// TODO(https://github.com/trinodb/trino/issues/12580) change GZIP to be unsupported for Avro when we change Trino default compression to be storage format aware
GZIP(GzipCodec.class, CompressionKind.ZLIB, CompressionCodecName.GZIP, DataFileConstants.DEFLATE_CODEC);

private final Optional<Class<? extends CompressionCodec>> codec;
private final CompressionKind orcCompressionKind;
private final CompressionCodecName parquetCompressionCodec;

HiveCompressionCodec(Class<? extends CompressionCodec> codec, CompressionKind orcCompressionKind, CompressionCodecName parquetCompressionCodec)
private final Optional<String> avroCompressionCodec;

HiveCompressionCodec(
Class<? extends CompressionCodec> codec,
CompressionKind orcCompressionKind,
CompressionCodecName parquetCompressionCodec,
String avroCompressionCodec)
{
this.codec = Optional.ofNullable(codec);
this.orcCompressionKind = requireNonNull(orcCompressionKind, "orcCompressionKind is null");
this.parquetCompressionCodec = requireNonNull(parquetCompressionCodec, "parquetCompressionCodec is null");
this.avroCompressionCodec = Optional.ofNullable(avroCompressionCodec);
}

public Optional<Class<? extends CompressionCodec>> getCodec()
Expand All @@ -58,4 +68,9 @@ public CompressionCodecName getParquetCompressionCodec()
{
return parquetCompressionCodec;
}

public Optional<String> getAvroCompressionCodec()
{
return avroCompressionCodec;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.trino.plugin.hive.metastore.StorageFormat;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;

public final class HiveCompressionCodecs
{
private HiveCompressionCodecs() {}

public static HiveCompressionCodec selectCompressionCodec(ConnectorSession session, StorageFormat storageFormat)
{
HiveCompressionOption compressionOption = HiveSessionProperties.getCompressionCodec(session);
return HiveStorageFormat.getHiveStorageFormat(storageFormat)
.map(format -> selectCompressionCodec(compressionOption, format))
.orElse(selectCompressionCodecForUnknownStorageFormat(compressionOption));
}

public static HiveCompressionCodec selectCompressionCodec(ConnectorSession session, HiveStorageFormat storageFormat)
{
return selectCompressionCodec(HiveSessionProperties.getCompressionCodec(session), storageFormat);
}

public static HiveCompressionCodec selectCompressionCodec(HiveCompressionOption compressionOption, HiveStorageFormat storageFormat)
{
HiveCompressionCodec selectedCodec = selectCompressionCodec(compressionOption);

// perform codec vs format validation
if (storageFormat == HiveStorageFormat.AVRO && selectedCodec.getAvroCompressionCodec().isEmpty()) {
throw new TrinoException(HiveErrorCode.HIVE_UNSUPPORTED_FORMAT, "Compression codec " + selectedCodec + " not supported for " + storageFormat);
}

return selectedCodec;
}

private static HiveCompressionCodec selectCompressionCodec(HiveCompressionOption compressionOption)
{
switch (compressionOption) {
case NONE:
return HiveCompressionCodec.NONE;
case SNAPPY:
return HiveCompressionCodec.SNAPPY;
case LZ4:
return HiveCompressionCodec.LZ4;
case ZSTD:
return HiveCompressionCodec.ZSTD;
case GZIP:
return HiveCompressionCodec.GZIP;
}
throw new IllegalArgumentException("Unknown compressionOption " + compressionOption);
}

private static HiveCompressionCodec selectCompressionCodecForUnknownStorageFormat(HiveCompressionOption compressionOption)
{
switch (compressionOption) {
case NONE:
return HiveCompressionCodec.NONE;
case SNAPPY:
return HiveCompressionCodec.SNAPPY;
case LZ4:
return HiveCompressionCodec.LZ4;
case ZSTD:
return HiveCompressionCodec.ZSTD;
case GZIP:
return HiveCompressionCodec.GZIP;
}
throw new IllegalArgumentException("Unknown compressionOption " + compressionOption);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

public enum HiveCompressionOption
{
NONE,
SNAPPY,
LZ4,
ZSTD,
GZIP,
/**/;
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class HiveConfig
private long perTransactionMetastoreCacheMaximumSize = 1000;

private HiveStorageFormat hiveStorageFormat = HiveStorageFormat.ORC;
private HiveCompressionCodec hiveCompressionCodec = HiveCompressionCodec.GZIP;
private HiveCompressionOption hiveCompressionCodec = HiveCompressionOption.GZIP;
private boolean respectTableFormat = true;
private boolean immutablePartitions;
private Optional<InsertExistingPartitionsBehavior> insertExistingPartitionsBehavior = Optional.empty();
Expand Down Expand Up @@ -475,13 +475,13 @@ public HiveConfig setHiveStorageFormat(HiveStorageFormat hiveStorageFormat)
return this;
}

public HiveCompressionCodec getHiveCompressionCodec()
public HiveCompressionOption getHiveCompressionCodec()
{
return hiveCompressionCodec;
}

@Config("hive.compression-codec")
public HiveConfig setHiveCompressionCodec(HiveCompressionCodec hiveCompressionCodec)
public HiveConfig setHiveCompressionCodec(HiveCompressionOption hiveCompressionCodec)
{
this.hiveCompressionCodec = hiveCompressionCodec;
return this;
Expand Down
Loading