From e6d948f6933822521da6900612ae7371f50fb1dd Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sat, 29 Apr 2023 19:55:52 +0200 Subject: [PATCH 1/4] Bring back support for Hadoop 2.7.3 --- .../parquet/hadoop/util/HadoopStreams.java | 117 +++++++++++++++--- pom.xml | 2 +- 2 files changed, 103 insertions(+), 16 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java index bafb45ad3f..a4bf79d436 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java @@ -20,6 +20,7 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.parquet.io.PositionOutputStream; @@ -27,7 +28,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.io.InputStream; +import java.lang.reflect.Method; import java.util.Objects; /** @@ -37,6 +41,39 @@ public class HadoopStreams { private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class); + private static final Class byteBufferReadableClass = getReadableClass(); + static final Constructor h2SeekableConstructor = getH2SeekableConstructor(); + + private static Class getReadableClass() { + try { + return Class.forName("org.apache.hadoop.fs.ByteBufferReadable"); + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return null; + } + } + + @SuppressWarnings("unchecked") + private static Class getH2SeekableClass() { + try { + return (Class) Class.forName( + "org.apache.parquet.hadoop.util.H2SeekableInputStream"); + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return null; + } + } + + private static Constructor getH2SeekableConstructor() { + Class h2SeekableClass = getH2SeekableClass(); + if (h2SeekableClass != null) { + try { + return h2SeekableClass.getConstructor(FSDataInputStream.class); + } catch (NoSuchMethodException e) { + return null; + } + } + return null; + } + /** * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream} * implementation for Parquet readers. @@ -46,21 +83,60 @@ public class HadoopStreams { */ public static SeekableInputStream wrap(FSDataInputStream stream) { Objects.requireNonNull(stream, "Cannot wrap a null input stream"); - if (isWrappedStreamByteBufferReadable(stream)) { - return new H2SeekableInputStream(stream); - } else { - return new H1SeekableInputStream(stream); + + // Try to check using hasCapabilities(str) + Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadableHasCapabilities(stream); + + // If it is null, then fall back to the old method + if (hasCapabilitiesResult != null) { + if (hasCapabilitiesResult) { + return new H2SeekableInputStream(stream); + } else { + return new H1SeekableInputStream(stream); + } + } + + return isWrappedStreamByteBufferReadableLegacy(stream); + } + + /** + * Is the inner stream byte buffer readable? + * The test is 'the stream is not FSDataInputStream + * and implements ByteBufferReadable' + * + * This logic is only used for Hadoop <2.9.x, and <3.x.x + * + * @param stream stream to probe + * @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable + */ + private static SeekableInputStream isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) { + InputStream wrapped = stream.getWrappedStream(); + if (wrapped instanceof FSDataInputStream) { + LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); + return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) wrapped)); + } + if (byteBufferReadableClass != null && h2SeekableConstructor != null && + byteBufferReadableClass.isInstance(stream.getWrappedStream())) { + try { + return h2SeekableConstructor.newInstance(stream); + } catch (InstantiationException | IllegalAccessException e) { + LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e); + } catch (InvocationTargetException e) { + throw new ParquetDecodingException( + "Could not instantiate H2SeekableInputStream", e.getTargetException()); + } } + return new H1SeekableInputStream(stream); } /** * Is the inner stream byte buffer readable? - * The test is "the stream is not FSDataInputStream + * The test is 'the stream is not FSDataInputStream * and implements ByteBufferReadable' * * That is: all streams which implement ByteBufferReadable - * other than FSDataInputStream successfuly support read(ByteBuffer). - * This is true for all filesytem clients the hadoop codebase. + * other than FSDataInputStream successfully support read(ByteBuffer). + * This is true for all filesystem clients the hadoop codebase. * * In hadoop 3.3.0+, the StreamCapabilities probe can be used to * check this: only those streams which provide the read(ByteBuffer) @@ -68,19 +144,30 @@ public static SeekableInputStream wrap(FSDataInputStream stream) { * FSDataInputStream will pass the probe down to the underlying stream. * * @param stream stream to probe - * @return true if it is safe to a H2SeekableInputStream to access the data + * @return true if it is safe to a H2SeekableInputStream to access + * the data, null when it cannot be determined */ - private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { - if (stream.hasCapability("in:readbytebuffer")) { - // stream is issuing the guarantee that it implements the - // API. Holds for all implementations in hadoop-* - // since Hadoop 3.3.0 (HDFS-14111). - return true; + private static Boolean isWrappedStreamByteBufferReadableHasCapabilities(FSDataInputStream stream) { + Method methodHasCapabilities; + try { + methodHasCapabilities = stream.getClass().getMethod("hasCapability", String.class); + } catch (Exception e) { + return null; + } + try { + if ((Boolean) methodHasCapabilities.invoke(stream, "in:readbytebuffer")) { + // stream is issuing the guarantee that it implements the + // API. Holds for all implementations in hadoop-* + // since Hadoop 3.3.0 (HDFS-14111). + return true; + } + } catch (IllegalAccessException | InvocationTargetException e) { + return null; } InputStream wrapped = stream.getWrappedStream(); if (wrapped instanceof FSDataInputStream) { LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); - return isWrappedStreamByteBufferReadable(((FSDataInputStream) wrapped)); + return isWrappedStreamByteBufferReadableHasCapabilities(((FSDataInputStream) wrapped)); } return wrapped instanceof ByteBufferReadable; } diff --git a/pom.xml b/pom.xml index d837f7b6b0..998e815b63 100644 --- a/pom.xml +++ b/pom.xml @@ -592,7 +592,7 @@ hadoop2 - 2.9.2 + 2.7.3 From cef627ae0aa7db916c26c5c69ed76a0f1cd520d3 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 2 May 2023 23:40:44 +0200 Subject: [PATCH 2/4] Simplify the code --- .../parquet/hadoop/util/HadoopStreams.java | 83 +++++-------------- 1 file changed, 22 insertions(+), 61 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java index a4bf79d436..11f690e4dd 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java @@ -20,18 +20,15 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.fs.ByteBufferReadable; -import org.apache.parquet.io.ParquetDecodingException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.util.DynMethods; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.io.InputStream; -import java.lang.reflect.Method; import java.util.Objects; /** @@ -41,38 +38,12 @@ public class HadoopStreams { private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class); - private static final Class byteBufferReadableClass = getReadableClass(); - static final Constructor h2SeekableConstructor = getH2SeekableConstructor(); - - private static Class getReadableClass() { - try { - return Class.forName("org.apache.hadoop.fs.ByteBufferReadable"); - } catch (ClassNotFoundException | NoClassDefFoundError e) { - return null; - } - } - - @SuppressWarnings("unchecked") - private static Class getH2SeekableClass() { - try { - return (Class) Class.forName( - "org.apache.parquet.hadoop.util.H2SeekableInputStream"); - } catch (ClassNotFoundException | NoClassDefFoundError e) { - return null; - } - } - - private static Constructor getH2SeekableConstructor() { - Class h2SeekableClass = getH2SeekableClass(); - if (h2SeekableClass != null) { - try { - return h2SeekableClass.getConstructor(FSDataInputStream.class); - } catch (NoSuchMethodException e) { - return null; - } - } - return null; - } + private static final DynMethods.UnboundMethod hasCapabilitiesMethod = + new DynMethods + .Builder("hasCapabilities") + .impl(FSDataInputStream.class, "hasCapabilities", String.class) + .orNoop() + .build(); /** * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream} @@ -115,18 +86,11 @@ private static SeekableInputStream isWrappedStreamByteBufferReadableLegacy(FSDat LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) wrapped)); } - if (byteBufferReadableClass != null && h2SeekableConstructor != null && - byteBufferReadableClass.isInstance(stream.getWrappedStream())) { - try { - return h2SeekableConstructor.newInstance(stream); - } catch (InstantiationException | IllegalAccessException e) { - LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e); - } catch (InvocationTargetException e) { - throw new ParquetDecodingException( - "Could not instantiate H2SeekableInputStream", e.getTargetException()); - } + if (stream.getWrappedStream() instanceof ByteBufferReadable) { + return new H2SeekableInputStream(stream); + } else { + return new H1SeekableInputStream(stream); } - return new H1SeekableInputStream(stream); } /** @@ -145,24 +109,21 @@ private static SeekableInputStream isWrappedStreamByteBufferReadableLegacy(FSDat * * @param stream stream to probe * @return true if it is safe to a H2SeekableInputStream to access - * the data, null when it cannot be determined + * the data, null when it cannot be determined because of missing hasCapabilities */ private static Boolean isWrappedStreamByteBufferReadableHasCapabilities(FSDataInputStream stream) { - Method methodHasCapabilities; - try { - methodHasCapabilities = stream.getClass().getMethod("hasCapability", String.class); - } catch (Exception e) { + if (hasCapabilitiesMethod.isNoop()) { + // When the method is not available, just return a null return null; } - try { - if ((Boolean) methodHasCapabilities.invoke(stream, "in:readbytebuffer")) { - // stream is issuing the guarantee that it implements the - // API. Holds for all implementations in hadoop-* - // since Hadoop 3.3.0 (HDFS-14111). - return true; - } - } catch (IllegalAccessException | InvocationTargetException e) { - return null; + + Boolean hasCapabilities = hasCapabilitiesMethod.invoke(stream, "in:readbytebuffer"); + + if (hasCapabilities) { + // stream is issuing the guarantee that it implements the + // API. Holds for all implementations in hadoop-* + // since Hadoop 3.3.0 (HDFS-14111). + return true; } InputStream wrapped = stream.getWrappedStream(); if (wrapped instanceof FSDataInputStream) { From 350167f1c83e9aff8647182a9c90096381b72aa7 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 4 May 2023 20:09:39 +0200 Subject: [PATCH 3/4] Fix the naming --- .../java/org/apache/parquet/hadoop/util/HadoopStreams.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java index 11f690e4dd..f0692a3522 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java @@ -56,7 +56,7 @@ public static SeekableInputStream wrap(FSDataInputStream stream) { Objects.requireNonNull(stream, "Cannot wrap a null input stream"); // Try to check using hasCapabilities(str) - Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadableHasCapabilities(stream); + Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadable(stream); // If it is null, then fall back to the old method if (hasCapabilitiesResult != null) { @@ -111,7 +111,7 @@ private static SeekableInputStream isWrappedStreamByteBufferReadableLegacy(FSDat * @return true if it is safe to a H2SeekableInputStream to access * the data, null when it cannot be determined because of missing hasCapabilities */ - private static Boolean isWrappedStreamByteBufferReadableHasCapabilities(FSDataInputStream stream) { + private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { if (hasCapabilitiesMethod.isNoop()) { // When the method is not available, just return a null return null; @@ -128,7 +128,7 @@ private static Boolean isWrappedStreamByteBufferReadableHasCapabilities(FSDataIn InputStream wrapped = stream.getWrappedStream(); if (wrapped instanceof FSDataInputStream) { LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); - return isWrappedStreamByteBufferReadableHasCapabilities(((FSDataInputStream) wrapped)); + return isWrappedStreamByteBufferReadable(((FSDataInputStream) wrapped)); } return wrapped instanceof ByteBufferReadable; } From 9552742084de64248ed5bf5501edbe106e40cc25 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 5 May 2023 23:40:05 +0200 Subject: [PATCH 4/4] Comments --- .../org/apache/parquet/hadoop/util/HadoopStreams.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java index f0692a3522..fe7b4c5a88 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java @@ -67,7 +67,7 @@ public static SeekableInputStream wrap(FSDataInputStream stream) { } } - return isWrappedStreamByteBufferReadableLegacy(stream); + return unwrapByteBufferReadableLegacy(stream); } /** @@ -80,11 +80,11 @@ public static SeekableInputStream wrap(FSDataInputStream stream) { * @param stream stream to probe * @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable */ - private static SeekableInputStream isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) { + private static SeekableInputStream unwrapByteBufferReadableLegacy(FSDataInputStream stream) { InputStream wrapped = stream.getWrappedStream(); if (wrapped instanceof FSDataInputStream) { LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); - return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) wrapped)); + return unwrapByteBufferReadableLegacy(((FSDataInputStream) wrapped)); } if (stream.getWrappedStream() instanceof ByteBufferReadable) { return new H2SeekableInputStream(stream); @@ -117,9 +117,9 @@ private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream strea return null; } - Boolean hasCapabilities = hasCapabilitiesMethod.invoke(stream, "in:readbytebuffer"); + boolean isByteBufferReadable = hasCapabilitiesMethod.invoke(stream, "in:readbytebuffer"); - if (hasCapabilities) { + if (isByteBufferReadable) { // stream is issuing the guarantee that it implements the // API. Holds for all implementations in hadoop-* // since Hadoop 3.3.0 (HDFS-14111).