diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java index bafb45ad3f..fe7b4c5a88 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.util.DynMethods; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +38,13 @@ public class HadoopStreams { private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class); + private static final DynMethods.UnboundMethod hasCapabilitiesMethod = + new DynMethods + .Builder("hasCapabilities") + .impl(FSDataInputStream.class, "hasCapabilities", String.class) + .orNoop() + .build(); + /** * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream} * implementation for Parquet readers. @@ -46,7 +54,39 @@ public class HadoopStreams { */ public static SeekableInputStream wrap(FSDataInputStream stream) { Objects.requireNonNull(stream, "Cannot wrap a null input stream"); - if (isWrappedStreamByteBufferReadable(stream)) { + + // Try to check using hasCapabilities(str) + Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadable(stream); + + // If it is null, then fall back to the old method + if (hasCapabilitiesResult != null) { + if (hasCapabilitiesResult) { + return new H2SeekableInputStream(stream); + } else { + return new H1SeekableInputStream(stream); + } + } + + return unwrapByteBufferReadableLegacy(stream); + } + + /** + * Is the inner stream byte buffer readable? + * The test is 'the stream is not FSDataInputStream + * and implements ByteBufferReadable' + * + * This logic is only used for Hadoop <2.9.x, and <3.x.x + * + * @param stream stream to probe + * @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable + */ + private static SeekableInputStream unwrapByteBufferReadableLegacy(FSDataInputStream stream) { + InputStream wrapped = stream.getWrappedStream(); + if (wrapped instanceof FSDataInputStream) { + LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); + return unwrapByteBufferReadableLegacy(((FSDataInputStream) wrapped)); + } + if (stream.getWrappedStream() instanceof ByteBufferReadable) { return new H2SeekableInputStream(stream); } else { return new H1SeekableInputStream(stream); @@ -55,12 +95,12 @@ public static SeekableInputStream wrap(FSDataInputStream stream) { /** * Is the inner stream byte buffer readable? - * The test is "the stream is not FSDataInputStream + * The test is 'the stream is not FSDataInputStream * and implements ByteBufferReadable' * * That is: all streams which implement ByteBufferReadable - * other than FSDataInputStream successfuly support read(ByteBuffer). - * This is true for all filesytem clients the hadoop codebase. + * other than FSDataInputStream successfully support read(ByteBuffer). + * This is true for all filesystem clients the hadoop codebase. * * In hadoop 3.3.0+, the StreamCapabilities probe can be used to * check this: only those streams which provide the read(ByteBuffer) @@ -68,10 +108,18 @@ public static SeekableInputStream wrap(FSDataInputStream stream) { * FSDataInputStream will pass the probe down to the underlying stream. * * @param stream stream to probe - * @return true if it is safe to a H2SeekableInputStream to access the data + * @return true if it is safe to a H2SeekableInputStream to access + * the data, null when it cannot be determined because of missing hasCapabilities */ - private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { - if (stream.hasCapability("in:readbytebuffer")) { + private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { + if (hasCapabilitiesMethod.isNoop()) { + // When the method is not available, just return a null + return null; + } + + boolean isByteBufferReadable = hasCapabilitiesMethod.invoke(stream, "in:readbytebuffer"); + + if (isByteBufferReadable) { // stream is issuing the guarantee that it implements the // API. Holds for all implementations in hadoop-* // since Hadoop 3.3.0 (HDFS-14111). diff --git a/pom.xml b/pom.xml index d837f7b6b0..998e815b63 100644 --- a/pom.xml +++ b/pom.xml @@ -592,7 +592,7 @@ hadoop2 - 2.9.2 + 2.7.3