diff --git a/lib/trino-hive-formats/pom.xml b/lib/trino-hive-formats/pom.xml
index bcbc4a08cd74..79ed22f89578 100644
--- a/lib/trino-hive-formats/pom.xml
+++ b/lib/trino-hive-formats/pom.xml
@@ -23,11 +23,6 @@
trino-filesystem
-
- io.trino
- trino-hadoop-toolkit
-
-
io.trino
trino-plugin-toolkit
@@ -91,6 +86,13 @@
modernizer-maven-annotations
+
+
+ io.trino
+ trino-hadoop-toolkit
+ runtime
+
+
io.trino.hadoop
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/AircompressorCodec.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/AircompressorCodec.java
deleted file mode 100644
index 6b7d740f2403..000000000000
--- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/AircompressorCodec.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.hive.formats.compression;
-
-import io.airlift.slice.DynamicSliceOutput;
-import io.airlift.slice.Slice;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UncheckedIOException;
-import java.util.function.Supplier;
-
-import static java.util.Objects.requireNonNull;
-
-public class AircompressorCodec
- implements Codec
-{
- // Airlift Codecs are assumed to not retain memory and are assumed to not be pooled
- private final CompressionCodec codec;
-
- public AircompressorCodec(CompressionCodec codec)
- {
- this.codec = requireNonNull(codec, "codec is null");
- }
-
- @Override
- public OutputStream createStreamCompressor(OutputStream outputStream)
- throws IOException
- {
- return codec.createOutputStream(outputStream);
- }
-
- @Override
- public ValueCompressor createValueCompressor()
- {
- return new AircompressorValueCompressor(codec);
- }
-
- private static class AircompressorValueCompressor
- implements ValueCompressor
- {
- private final CompressionCodec codec;
- private final DynamicSliceOutput buffer;
-
- private AircompressorValueCompressor(CompressionCodec codec)
- {
- this.codec = requireNonNull(codec, "codec is null");
- this.buffer = new DynamicSliceOutput(1024);
- }
-
- @Override
- public Slice compress(Slice slice)
- throws IOException
- {
- buffer.reset();
- try (CompressionOutputStream compressionStream = codec.createOutputStream(buffer, codec.createCompressor())) {
- slice.getInput().transferTo(compressionStream);
- }
- return buffer.slice();
- }
- }
-
- @Override
- public MemoryCompressedSliceOutput createMemoryCompressedSliceOutput(int minChunkSize, int maxChunkSize)
- {
- return new AircompressorCompressedSliceOutputSupplier(codec, minChunkSize, maxChunkSize).get();
- }
-
- // this can be dramatically simplified when actual hadoop codecs are dropped
- private static class AircompressorCompressedSliceOutputSupplier
- implements Supplier
- {
- private final CompressionCodec codec;
- private final ChunkedSliceOutput compressedOutput;
-
- public AircompressorCompressedSliceOutputSupplier(CompressionCodec codec, int minChunkSize, int maxChunkSize)
- {
- this.codec = requireNonNull(codec, "codec is null");
- this.compressedOutput = new ChunkedSliceOutput(minChunkSize, maxChunkSize);
- }
-
- @Override
- public MemoryCompressedSliceOutput get()
- {
- try {
- compressedOutput.reset();
- CompressionOutputStream compressionStream = codec.createOutputStream(compressedOutput);
- return new MemoryCompressedSliceOutput(compressionStream, compressedOutput, this, () -> {});
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
- }
-
- @Override
- public InputStream createStreamDecompressor(InputStream inputStream)
- throws IOException
- {
- return codec.createInputStream(inputStream);
- }
-
- @Override
- public ValueDecompressor createValueDecompressor()
- {
- return new AircompressorValueDecompressor(codec);
- }
-
- private static class AircompressorValueDecompressor
- implements ValueDecompressor
- {
- private final CompressionCodec codec;
-
- private AircompressorValueDecompressor(CompressionCodec codec)
- {
- this.codec = requireNonNull(codec, "codec is null");
- }
-
- @Override
- public void decompress(Slice compressed, OutputStream uncompressed)
- throws IOException
- {
- try (CompressionInputStream decompressorStream = codec.createInputStream(compressed.getInput())) {
- decompressorStream.transferTo(uncompressed);
- }
- catch (IndexOutOfBoundsException | IOException e) {
- throw new IOException("Compressed stream is truncated", e);
- }
- }
-
- @Override
- public void decompress(Slice compressed, Slice uncompressed)
- throws IOException
- {
- try (CompressionInputStream decompressorStream = codec.createInputStream(compressed.getInput())) {
- uncompressed.setBytes(0, decompressorStream, uncompressed.length());
- }
- catch (IndexOutOfBoundsException | IOException e) {
- throw new IOException("Compressed stream is truncated", e);
- }
- }
- }
-}
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/Codec.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/Codec.java
index 9b55665110f9..ae57836822a5 100644
--- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/Codec.java
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/Codec.java
@@ -13,22 +13,48 @@
*/
package io.trino.hive.formats.compression;
+import io.airlift.compress.hadoop.HadoopStreams;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-public interface Codec
+import static java.util.Objects.requireNonNull;
+
+public final class Codec
{
- OutputStream createStreamCompressor(OutputStream outputStream)
- throws IOException;
+ private final HadoopStreams hadoopStreams;
+
+ Codec(HadoopStreams hadoopStreams)
+ {
+ this.hadoopStreams = requireNonNull(hadoopStreams, "hadoopStreams is null");
+ }
+
+ public OutputStream createStreamCompressor(OutputStream outputStream)
+ throws IOException
+ {
+ return hadoopStreams.createOutputStream(outputStream);
+ }
- ValueCompressor createValueCompressor();
+ public ValueCompressor createValueCompressor()
+ {
+ return new ValueCompressor(hadoopStreams);
+ }
- MemoryCompressedSliceOutput createMemoryCompressedSliceOutput(int minChunkSize, int maxChunkSize)
- throws IOException;
+ public MemoryCompressedSliceOutput createMemoryCompressedSliceOutput(int minChunkSize, int maxChunkSize)
+ throws IOException
+ {
+ return new MemoryCompressedSliceOutput(hadoopStreams, minChunkSize, maxChunkSize);
+ }
- InputStream createStreamDecompressor(InputStream inputStream)
- throws IOException;
+ public InputStream createStreamDecompressor(InputStream inputStream)
+ throws IOException
+ {
+ return hadoopStreams.createInputStream(inputStream);
+ }
- ValueDecompressor createValueDecompressor();
+ public ValueDecompressor createValueDecompressor()
+ {
+ return new ValueDecompressor(hadoopStreams);
+ }
}
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/CompressionKind.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/CompressionKind.java
index 60a09e4784e8..56123049a571 100644
--- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/CompressionKind.java
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/CompressionKind.java
@@ -15,12 +15,15 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import io.airlift.compress.gzip.JdkGzipCodec;
-import io.airlift.compress.lz4.Lz4Codec;
-import io.airlift.compress.lzo.LzoCodec;
-import io.airlift.compress.lzo.LzopCodec;
-import io.airlift.compress.snappy.SnappyCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
+import io.airlift.compress.bzip2.BZip2HadoopStreams;
+import io.airlift.compress.deflate.JdkDeflateHadoopStreams;
+import io.airlift.compress.gzip.JdkGzipHadoopStreams;
+import io.airlift.compress.hadoop.HadoopStreams;
+import io.airlift.compress.lz4.Lz4HadoopStreams;
+import io.airlift.compress.lzo.LzoHadoopStreams;
+import io.airlift.compress.lzo.LzopHadoopStreams;
+import io.airlift.compress.snappy.SnappyHadoopStreams;
+import io.airlift.compress.zstd.ZstdHadoopStreams;
import java.util.Arrays;
import java.util.List;
@@ -29,81 +32,30 @@
import java.util.function.Function;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
-import static io.trino.hadoop.ConfigurationInstantiator.newEmptyConfiguration;
import static java.util.Objects.requireNonNull;
public enum CompressionKind
{
- SNAPPY(".snappy", "org.apache.hadoop.io.compress.SnappyCodec") {
- @Override
- public Codec createCodec()
- {
- return new AircompressorCodec(new SnappyCodec());
- }
- },
- LZO(".lzo_deflate", "org.apache.hadoop.io.compress.LzoCodec", "com.hadoop.compression.lzo.LzoCodec") {
- @Override
- public Codec createCodec()
- {
- return new AircompressorCodec(new LzoCodec());
- }
- },
- LZOP(".lzo", "com.hadoop.compression.lzo.LzopCodec") {
- @Override
- public Codec createCodec()
- {
- return new AircompressorCodec(new LzopCodec());
- }
- },
- LZ4(".lz4", "org.apache.hadoop.io.compress.Lz4Codec") {
- @Override
- public Codec createCodec()
- {
- return new AircompressorCodec(new Lz4Codec());
- }
- },
- GZIP(".gz", "org.apache.hadoop.io.compress.GzipCodec") {
- @Override
- public Codec createCodec()
- {
- return new AircompressorCodec(new JdkGzipCodec());
- }
- },
- DEFLATE(".deflate", "org.apache.hadoop.io.compress.DefaultCodec") {
- @Override
- public Codec createCodec()
- {
- DefaultCodec codec = new DefaultCodec();
- codec.setConf(newEmptyConfiguration());
- return new HadoopCodec(codec);
- }
- },
- ZSTD(".zst", "org.apache.hadoop.io.compress.ZStandardCodec") {
- @Override
- public Codec createCodec()
- {
- org.apache.hadoop.io.compress.ZStandardCodec codec = new org.apache.hadoop.io.compress.ZStandardCodec();
- codec.setConf(newEmptyConfiguration());
- return new HadoopCodec(codec);
- }
- },
- BZIP2(".bz2", "org.apache.hadoop.io.compress.BZip2Codec") {
- @Override
- public Codec createCodec()
- {
- org.apache.hadoop.io.compress.BZip2Codec codec = new org.apache.hadoop.io.compress.BZip2Codec();
- codec.setConf(newEmptyConfiguration());
- return new HadoopCodec(codec);
- }
- };
+ // These are in preference order
+ ZSTD(new ZstdHadoopStreams()),
+ LZ4(new Lz4HadoopStreams()),
+ SNAPPY(new SnappyHadoopStreams()),
+ GZIP(new JdkGzipHadoopStreams()),
+ DEFLATE(new JdkDeflateHadoopStreams()),
+ // These algorithms are only supported for backwards compatibility, and should be avoided at all costs
+ BZIP2(new BZip2HadoopStreams()),
+ LZO(new LzoHadoopStreams()),
+ LZOP(new LzopHadoopStreams());
+ private final HadoopStreams hadoopStreams;
private final List hadoopClassNames;
private final String fileExtension;
- CompressionKind(String fileExtension, String... hadoopClassNames)
+ CompressionKind(HadoopStreams hadoopStreams)
{
- this.hadoopClassNames = ImmutableList.copyOf(hadoopClassNames);
- this.fileExtension = requireNonNull(fileExtension, "fileExtension is null");
+ this.hadoopStreams = requireNonNull(hadoopStreams, "hadoopStreams is null");
+ this.hadoopClassNames = ImmutableList.copyOf(hadoopStreams.getHadoopCodecName());
+ this.fileExtension = hadoopStreams.getDefaultFileExtension();
}
public String getHadoopClassName()
@@ -116,7 +68,10 @@ public String getFileExtension()
return fileExtension;
}
- public abstract Codec createCodec();
+ public Codec createCodec()
+ {
+ return new Codec(hadoopStreams);
+ }
private static final Map CODECS_BY_HADOOP_CLASS_NAME;
@@ -136,13 +91,6 @@ public static CompressionKind fromHadoopClassName(String hadoopClassName)
.orElseThrow(() -> new IllegalArgumentException("Unknown codec: " + hadoopClassName));
}
- public static Codec createCodecFromHadoopClassName(String hadoopClassName)
- {
- return Optional.ofNullable(CODECS_BY_HADOOP_CLASS_NAME.get(hadoopClassName))
- .orElseThrow(() -> new IllegalArgumentException("Unknown codec: " + hadoopClassName))
- .createCodec();
- }
-
private static final Map CODECS_BY_FILE_EXTENSION = Arrays.stream(values())
.filter(codec -> codec.fileExtension != null)
.collect(toImmutableMap(codec -> codec.fileExtension, Function.identity()));
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/HadoopCodec.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/HadoopCodec.java
deleted file mode 100644
index 62e365f17eb6..000000000000
--- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/HadoopCodec.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.hive.formats.compression;
-
-import io.airlift.slice.DynamicSliceOutput;
-import io.airlift.slice.Slice;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UncheckedIOException;
-import java.util.function.Supplier;
-
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-public class HadoopCodec
- implements Codec
-{
- private final CompressionCodec codec;
-
- public HadoopCodec(CompressionCodec codec)
- {
- this.codec = requireNonNull(codec, "codec is null");
- }
-
- @Override
- public OutputStream createStreamCompressor(OutputStream outputStream)
- throws IOException
- {
- return codec.createOutputStream(outputStream);
- }
-
- @Override
- public ValueCompressor createValueCompressor()
- {
- return new HadoopValueCompressor(codec);
- }
-
- private static class HadoopValueCompressor
- implements ValueCompressor
- {
- private final CompressionCodec codec;
- private final Compressor compressor;
- private final DynamicSliceOutput buffer;
-
- private HadoopValueCompressor(CompressionCodec codec)
- {
- this.codec = requireNonNull(codec, "codec is null");
- this.compressor = CodecPool.getCompressor(requireNonNull(codec, "codec is null"));
- this.buffer = new DynamicSliceOutput(1024);
- }
-
- @Override
- public Slice compress(Slice slice)
- throws IOException
- {
- compressor.reset();
- buffer.reset();
- try (CompressionOutputStream compressionStream = codec.createOutputStream(buffer, compressor)) {
- slice.getInput().transferTo(compressionStream);
- }
- return buffer.slice();
- }
-
- @Override
- public void close()
- {
- CodecPool.returnCompressor(compressor);
- }
- }
-
- @Override
- public MemoryCompressedSliceOutput createMemoryCompressedSliceOutput(int minChunkSize, int maxChunkSize)
- {
- return new HadoopCompressedSliceOutputSupplier(codec, minChunkSize, maxChunkSize).get();
- }
-
- private static class HadoopCompressedSliceOutputSupplier
- implements Supplier
- {
- private final CompressionCodec codec;
- private final Compressor compressor;
- private final ChunkedSliceOutput bufferedOutput;
-
- public HadoopCompressedSliceOutputSupplier(CompressionCodec codec, int minChunkSize, int maxChunkSize)
- {
- this.codec = requireNonNull(codec, "codec is null");
- this.compressor = CodecPool.getCompressor(requireNonNull(codec, "codec is null"));
- this.bufferedOutput = new ChunkedSliceOutput(minChunkSize, maxChunkSize);
- }
-
- @Override
- public MemoryCompressedSliceOutput get()
- {
- try {
- compressor.reset();
- bufferedOutput.reset();
- CompressionOutputStream compressionStream = codec.createOutputStream(bufferedOutput, compressor);
- return new MemoryCompressedSliceOutput(compressionStream, bufferedOutput, this, () -> CodecPool.returnCompressor(compressor));
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
- }
-
- @Override
- public InputStream createStreamDecompressor(InputStream inputStream)
- throws IOException
- {
- return codec.createInputStream(inputStream);
- }
-
- @Override
- public ValueDecompressor createValueDecompressor()
- {
- return new HadoopValueDecompressor(codec);
- }
-
- private static class HadoopValueDecompressor
- implements ValueDecompressor
- {
- private final CompressionCodec codec;
- private final Decompressor decompressor;
- private boolean closed;
-
- private HadoopValueDecompressor(CompressionCodec codec)
- {
- this.codec = requireNonNull(codec, "codec is null");
- decompressor = CodecPool.getDecompressor(codec);
- }
-
- @Override
- public void decompress(Slice compressed, OutputStream uncompressed)
- throws IOException
- {
- checkState(!closed, "Value decompressor has been closed");
- decompressor.reset();
- try (CompressionInputStream decompressorStream = codec.createInputStream(compressed.getInput(), decompressor)) {
- decompressorStream.transferTo(uncompressed);
- }
- catch (IndexOutOfBoundsException | IOException e) {
- throw new IOException("Compressed stream is truncated", e);
- }
- }
-
- @Override
- public void decompress(Slice compressed, Slice uncompressed)
- throws IOException
- {
- checkState(!closed, "Value decompressor has been closed");
- decompressor.reset();
- try (CompressionInputStream decompressorStream = codec.createInputStream(compressed.getInput(), decompressor)) {
- uncompressed.setBytes(0, decompressorStream, uncompressed.length());
- }
- catch (IndexOutOfBoundsException | IOException e) {
- throw new IOException("Compressed stream is truncated", e);
- }
- }
-
- @Override
- public void close()
- {
- if (closed) {
- return;
- }
- closed = true;
- CodecPool.returnDecompressor(decompressor);
- }
- }
-}
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/MemoryCompressedSliceOutput.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/MemoryCompressedSliceOutput.java
index 04b5a636084f..088489daddd7 100644
--- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/MemoryCompressedSliceOutput.java
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/MemoryCompressedSliceOutput.java
@@ -13,12 +13,11 @@
*/
package io.trino.hive.formats.compression;
+import io.airlift.compress.hadoop.HadoopStreams;
import io.airlift.slice.Slice;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.List;
-import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
@@ -30,28 +29,23 @@
public final class MemoryCompressedSliceOutput
extends BufferedOutputStreamSliceOutput
{
+ private final HadoopStreams hadoopStreams;
private final ChunkedSliceOutput bufferedOutput;
- private final Supplier resetFactory;
- private final Runnable onDestroy;
private boolean closed;
private boolean destroyed;
- /**
- * @param compressionStream the compressed output stream to delegate to
- * @param bufferedOutput the output for the compressionStream
- * @param resetFactory the function to create a new CompressedSliceOutput that reuses the bufferedOutput
- * @param onDestroy used to cleanup the compression when done
- */
- public MemoryCompressedSliceOutput(
- OutputStream compressionStream,
- ChunkedSliceOutput bufferedOutput,
- Supplier resetFactory,
- Runnable onDestroy)
+ MemoryCompressedSliceOutput(HadoopStreams hadoopStreams, int minChunkSize, int maxChunkSize)
+ throws IOException
+ {
+ this(hadoopStreams, new ChunkedSliceOutput(minChunkSize, maxChunkSize));
+ }
+
+ private MemoryCompressedSliceOutput(HadoopStreams hadoopStreams, ChunkedSliceOutput bufferedOutput)
+ throws IOException
{
- super(compressionStream);
+ super(hadoopStreams == null ? bufferedOutput : hadoopStreams.createOutputStream(bufferedOutput));
+ this.hadoopStreams = hadoopStreams;
this.bufferedOutput = requireNonNull(bufferedOutput, "bufferedOutput is null");
- this.resetFactory = requireNonNull(resetFactory, "resetFactory is null");
- this.onDestroy = requireNonNull(onDestroy, "onDestroy is null");
}
@Override
@@ -75,11 +69,14 @@ public List getCompressedSlices()
}
public MemoryCompressedSliceOutput createRecycledCompressedSliceOutput()
+ throws IOException
{
checkState(closed, "Stream has not been closed");
checkState(!destroyed, "Stream has been destroyed");
destroyed = true;
- return resetFactory.get();
+
+ bufferedOutput.reset();
+ return new MemoryCompressedSliceOutput(hadoopStreams, bufferedOutput);
}
@Override
@@ -97,35 +94,13 @@ public void destroy()
{
if (!destroyed) {
destroyed = true;
- try {
- close();
- }
- finally {
- onDestroy.run();
- }
+ close();
}
}
public static MemoryCompressedSliceOutput createUncompressedMemorySliceOutput(int minChunkSize, int maxChunkSize)
+ throws IOException
{
- return new UncompressedSliceOutputSupplier(minChunkSize, maxChunkSize).get();
- }
-
- private static class UncompressedSliceOutputSupplier
- implements Supplier
- {
- private final ChunkedSliceOutput chunkedSliceOutput;
-
- private UncompressedSliceOutputSupplier(int minChunkSize, int maxChunkSize)
- {
- chunkedSliceOutput = new ChunkedSliceOutput(minChunkSize, maxChunkSize);
- }
-
- @Override
- public MemoryCompressedSliceOutput get()
- {
- chunkedSliceOutput.reset();
- return new MemoryCompressedSliceOutput(chunkedSliceOutput, chunkedSliceOutput, this, () -> {});
- }
+ return new MemoryCompressedSliceOutput(null, new ChunkedSliceOutput(minChunkSize, maxChunkSize));
}
}
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/ValueCompressor.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/ValueCompressor.java
index 0dd99b5b677b..b4ab8154fb9e 100644
--- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/ValueCompressor.java
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/ValueCompressor.java
@@ -13,17 +13,33 @@
*/
package io.trino.hive.formats.compression;
+import io.airlift.compress.hadoop.HadoopStreams;
+import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
-import java.io.Closeable;
import java.io.IOException;
+import java.io.OutputStream;
-public interface ValueCompressor
- extends Closeable
+import static java.util.Objects.requireNonNull;
+
+public final class ValueCompressor
{
- Slice compress(Slice slice)
- throws IOException;
+ private final HadoopStreams hadoopStreams;
+ private final DynamicSliceOutput buffer;
+
+ ValueCompressor(HadoopStreams hadoopStreams)
+ {
+ this.hadoopStreams = requireNonNull(hadoopStreams, "hadoopStreams is null");
+ this.buffer = new DynamicSliceOutput(1024);
+ }
- @Override
- default void close() {}
+ public Slice compress(Slice slice)
+ throws IOException
+ {
+ buffer.reset();
+ try (OutputStream compressionStream = hadoopStreams.createOutputStream(buffer)) {
+ slice.getInput().transferTo(compressionStream);
+ }
+ return buffer.slice();
+ }
}
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/ValueDecompressor.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/ValueDecompressor.java
index e22ec5bb4b53..9ca81e49b875 100644
--- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/ValueDecompressor.java
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/ValueDecompressor.java
@@ -13,21 +13,43 @@
*/
package io.trino.hive.formats.compression;
+import io.airlift.compress.hadoop.HadoopStreams;
import io.airlift.slice.Slice;
-import java.io.Closeable;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
-public interface ValueDecompressor
- extends Closeable
+import static java.util.Objects.requireNonNull;
+
+public final class ValueDecompressor
{
- void decompress(Slice compressed, OutputStream uncompressed)
- throws IOException;
+ private final HadoopStreams hadoopStreams;
+
+ ValueDecompressor(HadoopStreams hadoopStreams)
+ {
+ this.hadoopStreams = requireNonNull(hadoopStreams, "hadoopStreams is null");
+ }
- void decompress(Slice compressed, Slice uncompressed)
- throws IOException;
+ public void decompress(Slice compressed, OutputStream uncompressed)
+ throws IOException
+ {
+ try (InputStream decompressorStream = hadoopStreams.createInputStream(compressed.getInput())) {
+ decompressorStream.transferTo(uncompressed);
+ }
+ catch (IndexOutOfBoundsException | IOException e) {
+ throw new IOException("Compressed stream is truncated", e);
+ }
+ }
- @Override
- default void close() {}
+ public void decompress(Slice compressed, Slice uncompressed)
+ throws IOException
+ {
+ try (InputStream decompressorStream = hadoopStreams.createInputStream(compressed.getInput())) {
+ uncompressed.setBytes(0, decompressorStream, uncompressed.length());
+ }
+ catch (IndexOutOfBoundsException | IOException e) {
+ throw new IOException("Compressed stream is truncated", e);
+ }
+ }
}
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReader.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReader.java
index d95c6540f09d..38992147d886 100644
--- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReader.java
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReader.java
@@ -122,7 +122,6 @@ public SequenceFileReader(TrinoInputFile inputFile, long offset, long length)
checkArgument(compressionKind != LZOP, "LZOP cannot be used with SequenceFile. LZO compression can be used, but LZ4 is preferred.");
Codec codecFromHadoopClassName = compressionKind.createCodec();
decompressor = codecFromHadoopClassName.createValueDecompressor();
- closer.register(decompressor);
}
else {
decompressor = null;
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileWriter.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileWriter.java
index 37cfe663eb12..32d2f40d735c 100644
--- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileWriter.java
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileWriter.java
@@ -193,7 +193,6 @@ public SingleValueWriter(DataOutputStream output, Optional codec, long sy
requireNonNull(codec, "codec is null");
if (codec.isPresent()) {
this.valueCompressor = codec.get().createValueCompressor();
- closer.register(valueCompressor);
}
else {
valueCompressor = null;
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileReader.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileReader.java
index 404dc784a349..f5800294407e 100644
--- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileReader.java
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileReader.java
@@ -294,14 +294,7 @@ public void close()
rowGroupPosition = 0;
rowGroupRowCount = 0;
currentChunkRowCount = 0;
- try {
- input.close();
- }
- finally {
- if (decompressor != null) {
- decompressor.close();
- }
- }
+ input.close();
if (writeChecksumBuilder.isPresent()) {
WriteChecksum actualChecksum = writeChecksumBuilder.get().build();
validateWrite(validation -> validation.getChecksum().getTotalRowCount() == actualChecksum.getTotalRowCount(), "Invalid row count");
diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileWriter.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileWriter.java
index 9d36d1f5aa75..3546b831838a 100644
--- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileWriter.java
+++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileWriter.java
@@ -412,6 +412,7 @@ public List getCompressedData()
}
public void reset()
+ throws IOException
{
checkArgument(columnClosed, "Column is open");
lengthOutput.reset();
diff --git a/lib/trino-hive-formats/src/test/java/com/hadoop/compression/lzo/LzopCodec.java b/lib/trino-hive-formats/src/test/java/com/hadoop/compression/lzo/LzopCodec.java
index ec46a1b77345..7989890f53ab 100644
--- a/lib/trino-hive-formats/src/test/java/com/hadoop/compression/lzo/LzopCodec.java
+++ b/lib/trino-hive-formats/src/test/java/com/hadoop/compression/lzo/LzopCodec.java
@@ -13,14 +13,5 @@
*/
package com.hadoop.compression.lzo;
-import org.apache.hadoop.io.compress.Compressor;
-
public class LzopCodec
- extends io.airlift.compress.lzo.LzopCodec
-{
- @Override
- public Class extends Compressor> getCompressorType()
- {
- return null;
- }
-}
+ extends io.airlift.compress.lzo.LzopCodec {}
diff --git a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/rcfile/RcFileTester.java b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/rcfile/RcFileTester.java
index 51d55af024bd..77c344397923 100644
--- a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/rcfile/RcFileTester.java
+++ b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/rcfile/RcFileTester.java
@@ -184,7 +184,7 @@ public static RcFileTester quickTestRcFileReader()
rcFileTester.listTestsEnabled = true;
rcFileTester.complexStructuralTestsEnabled = false;
rcFileTester.readLastBatchOnlyEnabled = false;
- rcFileTester.compressions = ImmutableList.of(Optional.empty(), Optional.of(CompressionKind.SNAPPY));
+ rcFileTester.compressions = ImmutableList.of(Optional.empty(), Optional.of(CompressionKind.ZSTD));
return rcFileTester;
}
diff --git a/pom.xml b/pom.xml
index e4df97382032..b0a8c2accd26 100644
--- a/pom.xml
+++ b/pom.xml
@@ -820,7 +820,7 @@
io.airlift
aircompressor
- 0.21
+ 0.22