From c08aef225e80a58d00a12a1e53528b735fe23c64 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 19 Apr 2023 10:58:31 +0200 Subject: [PATCH] PARQUET-2289: Avoid using `hasCapability` We should avoid using `hasCapability` to maintain compatibility with Hadoop 2. I think we can remove the whole check since Hadoop 1 isn't supported anymore. This allows us to use Hadoop 2. --- .../hadoop/util/H1SeekableInputStream.java | 2 + .../parquet/hadoop/util/HadoopStreams.java | 41 +------------------ .../util/TestHadoop2ByteBufferReads.java | 16 -------- 3 files changed, 3 insertions(+), 56 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java index 0f8cdbb551..71eea23ecc 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java @@ -26,7 +26,9 @@ /** * SeekableInputStream implementation that implements read(ByteBuffer) for * Hadoop 1 FSDataInputStream. + * @deprecated will be removed in 2.0.0. */ +@Deprecated class H1SeekableInputStream extends DelegatingSeekableInputStream { private final FSDataInputStream stream; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java index bafb45ad3f..a1178c440c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java @@ -19,7 +19,6 @@ package org.apache.parquet.hadoop.util; -import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.parquet.io.PositionOutputStream; @@ -27,7 +26,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.InputStream; import java.util.Objects; /** @@ -46,46 +44,9 @@ public class HadoopStreams { */ public static SeekableInputStream wrap(FSDataInputStream stream) { Objects.requireNonNull(stream, "Cannot wrap a null input stream"); - if (isWrappedStreamByteBufferReadable(stream)) { - return new H2SeekableInputStream(stream); - } else { - return new H1SeekableInputStream(stream); - } + return new H2SeekableInputStream(stream); } - /** - * Is the inner stream byte buffer readable? - * The test is "the stream is not FSDataInputStream - * and implements ByteBufferReadable' - * - * That is: all streams which implement ByteBufferReadable - * other than FSDataInputStream successfuly support read(ByteBuffer). - * This is true for all filesytem clients the hadoop codebase. - * - * In hadoop 3.3.0+, the StreamCapabilities probe can be used to - * check this: only those streams which provide the read(ByteBuffer) - * semantics MAY return true for the probe "in:readbytebuffer"; - * FSDataInputStream will pass the probe down to the underlying stream. - * - * @param stream stream to probe - * @return true if it is safe to a H2SeekableInputStream to access the data - */ - private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { - if (stream.hasCapability("in:readbytebuffer")) { - // stream is issuing the guarantee that it implements the - // API. Holds for all implementations in hadoop-* - // since Hadoop 3.3.0 (HDFS-14111). - return true; - } - InputStream wrapped = stream.getWrappedStream(); - if (wrapped instanceof FSDataInputStream) { - LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); - return isWrappedStreamByteBufferReadable(((FSDataInputStream) wrapped)); - } - return wrapped instanceof ByteBufferReadable; - } - - /** * Wraps a {@link FSDataOutputStream} in a {@link PositionOutputStream} * implementation for Parquet writers. diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java index b514febcbd..843eefa971 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java @@ -400,22 +400,6 @@ public void testDirectReadFullyPositionAndLimit() throws Exception { ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); } - @Test - public void testCreateStreamNoByteBufferReadable() { - final SeekableInputStream s = wrap(new FSDataInputStream( - new MockHadoopInputStream())); - Assert.assertTrue("Wrong wrapper: " + s, - s instanceof H1SeekableInputStream); - } - - @Test - public void testDoubleWrapNoByteBufferReadable() { - final SeekableInputStream s = wrap(new FSDataInputStream( - new FSDataInputStream(new MockHadoopInputStream()))); - Assert.assertTrue("Wrong wrapper: " + s, - s instanceof H1SeekableInputStream); - } - @Test public void testCreateStreamWithByteBufferReadable() { final SeekableInputStream s = wrap(new FSDataInputStream(