Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions lib/trino-hive-formats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@
<artifactId>trino-filesystem</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hadoop-toolkit</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-plugin-toolkit</artifactId>
Expand Down Expand Up @@ -91,6 +86,13 @@
<artifactId>modernizer-maven-annotations</artifactId>
</dependency>

<!-- used by tests but also needed transitively -->
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hadoop-toolkit</artifactId>
<scope>runtime</scope>
</dependency>

<!-- for hadoop compression -->
<dependency>
<groupId>io.trino.hadoop</groupId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,48 @@
*/
package io.trino.hive.formats.compression;

import io.airlift.compress.hadoop.HadoopStreams;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public interface Codec
import static java.util.Objects.requireNonNull;

public final class Codec
{
OutputStream createStreamCompressor(OutputStream outputStream)
throws IOException;
private final HadoopStreams hadoopStreams;

Codec(HadoopStreams hadoopStreams)
{
this.hadoopStreams = requireNonNull(hadoopStreams, "hadoopStreams is null");
}

public OutputStream createStreamCompressor(OutputStream outputStream)
throws IOException
{
return hadoopStreams.createOutputStream(outputStream);
}

ValueCompressor createValueCompressor();
public ValueCompressor createValueCompressor()
{
return new ValueCompressor(hadoopStreams);
}

MemoryCompressedSliceOutput createMemoryCompressedSliceOutput(int minChunkSize, int maxChunkSize)
throws IOException;
public MemoryCompressedSliceOutput createMemoryCompressedSliceOutput(int minChunkSize, int maxChunkSize)
throws IOException
{
return new MemoryCompressedSliceOutput(hadoopStreams, minChunkSize, maxChunkSize);
}

InputStream createStreamDecompressor(InputStream inputStream)
throws IOException;
public InputStream createStreamDecompressor(InputStream inputStream)
throws IOException
{
return hadoopStreams.createInputStream(inputStream);
}

ValueDecompressor createValueDecompressor();
public ValueDecompressor createValueDecompressor()
{
return new ValueDecompressor(hadoopStreams);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.compress.gzip.JdkGzipCodec;
import io.airlift.compress.lz4.Lz4Codec;
import io.airlift.compress.lzo.LzoCodec;
import io.airlift.compress.lzo.LzopCodec;
import io.airlift.compress.snappy.SnappyCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import io.airlift.compress.bzip2.BZip2HadoopStreams;
import io.airlift.compress.deflate.JdkDeflateHadoopStreams;
import io.airlift.compress.gzip.JdkGzipHadoopStreams;
import io.airlift.compress.hadoop.HadoopStreams;
import io.airlift.compress.lz4.Lz4HadoopStreams;
import io.airlift.compress.lzo.LzoHadoopStreams;
import io.airlift.compress.lzo.LzopHadoopStreams;
import io.airlift.compress.snappy.SnappyHadoopStreams;
import io.airlift.compress.zstd.ZstdHadoopStreams;

import java.util.Arrays;
import java.util.List;
Expand All @@ -29,81 +32,30 @@
import java.util.function.Function;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.hadoop.ConfigurationInstantiator.newEmptyConfiguration;
import static java.util.Objects.requireNonNull;

public enum CompressionKind
{
SNAPPY(".snappy", "org.apache.hadoop.io.compress.SnappyCodec") {
@Override
public Codec createCodec()
{
return new AircompressorCodec(new SnappyCodec());
}
},
LZO(".lzo_deflate", "org.apache.hadoop.io.compress.LzoCodec", "com.hadoop.compression.lzo.LzoCodec") {
@Override
public Codec createCodec()
{
return new AircompressorCodec(new LzoCodec());
}
},
LZOP(".lzo", "com.hadoop.compression.lzo.LzopCodec") {
@Override
public Codec createCodec()
{
return new AircompressorCodec(new LzopCodec());
}
},
LZ4(".lz4", "org.apache.hadoop.io.compress.Lz4Codec") {
@Override
public Codec createCodec()
{
return new AircompressorCodec(new Lz4Codec());
}
},
GZIP(".gz", "org.apache.hadoop.io.compress.GzipCodec") {
@Override
public Codec createCodec()
{
return new AircompressorCodec(new JdkGzipCodec());
}
},
DEFLATE(".deflate", "org.apache.hadoop.io.compress.DefaultCodec") {
@Override
public Codec createCodec()
{
DefaultCodec codec = new DefaultCodec();
codec.setConf(newEmptyConfiguration());
return new HadoopCodec(codec);
}
},
ZSTD(".zst", "org.apache.hadoop.io.compress.ZStandardCodec") {
@Override
public Codec createCodec()
{
org.apache.hadoop.io.compress.ZStandardCodec codec = new org.apache.hadoop.io.compress.ZStandardCodec();
codec.setConf(newEmptyConfiguration());
return new HadoopCodec(codec);
}
},
BZIP2(".bz2", "org.apache.hadoop.io.compress.BZip2Codec") {
@Override
public Codec createCodec()
{
org.apache.hadoop.io.compress.BZip2Codec codec = new org.apache.hadoop.io.compress.BZip2Codec();
codec.setConf(newEmptyConfiguration());
return new HadoopCodec(codec);
}
};
// These are in preference order
ZSTD(new ZstdHadoopStreams()),
LZ4(new Lz4HadoopStreams()),
SNAPPY(new SnappyHadoopStreams()),
GZIP(new JdkGzipHadoopStreams()),
DEFLATE(new JdkDeflateHadoopStreams()),
// These algorithms are only supported for backwards compatibility, and should be avoided at all costs
BZIP2(new BZip2HadoopStreams()),
LZO(new LzoHadoopStreams()),
LZOP(new LzopHadoopStreams());

private final HadoopStreams hadoopStreams;
private final List<String> hadoopClassNames;
private final String fileExtension;

CompressionKind(String fileExtension, String... hadoopClassNames)
CompressionKind(HadoopStreams hadoopStreams)
{
this.hadoopClassNames = ImmutableList.copyOf(hadoopClassNames);
this.fileExtension = requireNonNull(fileExtension, "fileExtension is null");
this.hadoopStreams = requireNonNull(hadoopStreams, "hadoopStreams is null");
this.hadoopClassNames = ImmutableList.copyOf(hadoopStreams.getHadoopCodecName());
this.fileExtension = hadoopStreams.getDefaultFileExtension();
}

public String getHadoopClassName()
Expand All @@ -116,7 +68,10 @@ public String getFileExtension()
return fileExtension;
}

public abstract Codec createCodec();
public Codec createCodec()
{
return new Codec(hadoopStreams);
}

private static final Map<String, CompressionKind> CODECS_BY_HADOOP_CLASS_NAME;

Expand All @@ -136,13 +91,6 @@ public static CompressionKind fromHadoopClassName(String hadoopClassName)
.orElseThrow(() -> new IllegalArgumentException("Unknown codec: " + hadoopClassName));
}

public static Codec createCodecFromHadoopClassName(String hadoopClassName)
{
return Optional.ofNullable(CODECS_BY_HADOOP_CLASS_NAME.get(hadoopClassName))
.orElseThrow(() -> new IllegalArgumentException("Unknown codec: " + hadoopClassName))
.createCodec();
}

private static final Map<String, CompressionKind> CODECS_BY_FILE_EXTENSION = Arrays.stream(values())
.filter(codec -> codec.fileExtension != null)
.collect(toImmutableMap(codec -> codec.fileExtension, Function.identity()));
Expand Down
Loading