diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java index 2994ca829c..4bbbb8ed1f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java @@ -84,7 +84,7 @@ public int read(ByteBuffer buf) throws IOException { } public static void readFully(Reader reader, ByteBuffer buf) throws IOException { - // unfortunately the Hadoop APIs seem to not have a 'readFully' equivalent for the byteBuffer read + // unfortunately the Hadoop 2 APIs do not have a 'readFully' equivalent for the byteBuffer read // calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we // have to loop to ensure we read them. while (buf.hasRemaining()) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java index 40f12fefd1..bafb45ad3f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java @@ -19,16 +19,15 @@ package org.apache.parquet.hadoop.util; +import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.parquet.io.ParquetDecodingException; -import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.io.SeekableInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; +import java.io.InputStream; import java.util.Objects; /** @@ -38,9 +37,6 @@ public class HadoopStreams { private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class); - private static final Class byteBufferReadableClass = getReadableClass(); - static final Constructor h2SeekableConstructor = getH2SeekableConstructor(); - /** * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream} * implementation for Parquet readers. @@ -50,51 +46,45 @@ public class HadoopStreams { */ public static SeekableInputStream wrap(FSDataInputStream stream) { Objects.requireNonNull(stream, "Cannot wrap a null input stream"); - if (byteBufferReadableClass != null && h2SeekableConstructor != null && - byteBufferReadableClass.isInstance(stream.getWrappedStream())) { - try { - return h2SeekableConstructor.newInstance(stream); - } catch (InstantiationException | IllegalAccessException e) { - LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e); - return new H1SeekableInputStream(stream); - } catch (InvocationTargetException e) { - throw new ParquetDecodingException( - "Could not instantiate H2SeekableInputStream", e.getTargetException()); - } + if (isWrappedStreamByteBufferReadable(stream)) { + return new H2SeekableInputStream(stream); } else { return new H1SeekableInputStream(stream); } } - private static Class getReadableClass() { - try { - return Class.forName("org.apache.hadoop.fs.ByteBufferReadable"); - } catch (ClassNotFoundException | NoClassDefFoundError e) { - return null; + /** + * Is the inner stream byte buffer readable? + * The test is "the stream is not FSDataInputStream + * and implements ByteBufferReadable' + * + * That is: all streams which implement ByteBufferReadable + * other than FSDataInputStream successfuly support read(ByteBuffer). + * This is true for all filesytem clients the hadoop codebase. + * + * In hadoop 3.3.0+, the StreamCapabilities probe can be used to + * check this: only those streams which provide the read(ByteBuffer) + * semantics MAY return true for the probe "in:readbytebuffer"; + * FSDataInputStream will pass the probe down to the underlying stream. + * + * @param stream stream to probe + * @return true if it is safe to a H2SeekableInputStream to access the data + */ + private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { + if (stream.hasCapability("in:readbytebuffer")) { + // stream is issuing the guarantee that it implements the + // API. Holds for all implementations in hadoop-* + // since Hadoop 3.3.0 (HDFS-14111). + return true; } - } - - @SuppressWarnings("unchecked") - private static Class getH2SeekableClass() { - try { - return (Class) Class.forName( - "org.apache.parquet.hadoop.util.H2SeekableInputStream"); - } catch (ClassNotFoundException | NoClassDefFoundError e) { - return null; + InputStream wrapped = stream.getWrappedStream(); + if (wrapped instanceof FSDataInputStream) { + LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); + return isWrappedStreamByteBufferReadable(((FSDataInputStream) wrapped)); } + return wrapped instanceof ByteBufferReadable; } - private static Constructor getH2SeekableConstructor() { - Class h2SeekableClass = getH2SeekableClass(); - if (h2SeekableClass != null) { - try { - return h2SeekableClass.getConstructor(FSDataInputStream.class); - } catch (NoSuchMethodException e) { - return null; - } - } - return null; - } /** * Wraps a {@link FSDataOutputStream} in a {@link PositionOutputStream} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java index 1b1e37354b..b514febcbd 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java @@ -19,8 +19,10 @@ package org.apache.parquet.hadoop.util; +import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.parquet.hadoop.TestUtils; +import org.apache.parquet.io.SeekableInputStream; import org.junit.Assert; import org.junit.Test; import java.io.EOFException; @@ -28,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.Callable; +import static org.apache.parquet.hadoop.util.HadoopStreams.wrap; import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY; public class TestHadoop2ByteBufferReads { @@ -396,4 +399,48 @@ public void testDirectReadFullyPositionAndLimit() throws Exception { Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); } + + @Test + public void testCreateStreamNoByteBufferReadable() { + final SeekableInputStream s = wrap(new FSDataInputStream( + new MockHadoopInputStream())); + Assert.assertTrue("Wrong wrapper: " + s, + s instanceof H1SeekableInputStream); + } + + @Test + public void testDoubleWrapNoByteBufferReadable() { + final SeekableInputStream s = wrap(new FSDataInputStream( + new FSDataInputStream(new MockHadoopInputStream()))); + Assert.assertTrue("Wrong wrapper: " + s, + s instanceof H1SeekableInputStream); + } + + @Test + public void testCreateStreamWithByteBufferReadable() { + final SeekableInputStream s = wrap(new FSDataInputStream( + new MockByteBufferInputStream())); + Assert.assertTrue("Wrong wrapper: " + s, + s instanceof H2SeekableInputStream); + } + + @Test + public void testDoubleWrapByteBufferReadable() { + final SeekableInputStream s = wrap(new FSDataInputStream( + new FSDataInputStream(new MockByteBufferInputStream()))); + Assert.assertTrue("Wrong wrapper: " + s, + s instanceof H2SeekableInputStream); + } + + /** + * Input stream which claims to implement ByteBufferReadable. + */ + private static final class MockByteBufferInputStream + extends MockHadoopInputStream implements ByteBufferReadable { + + @Override + public int read(final ByteBuffer buf) { + return 0; + } + } }