diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/CompressionKind.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/CompressionKind.java index 56123049a571..b064791561a5 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/CompressionKind.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/compression/CompressionKind.java @@ -95,9 +95,12 @@ public static CompressionKind fromHadoopClassName(String hadoopClassName) .filter(codec -> codec.fileExtension != null) .collect(toImmutableMap(codec -> codec.fileExtension, Function.identity())); - public static Optional createCodecFromExtension(String extension) + public static Optional forFile(String fileName) { - return Optional.ofNullable(CODECS_BY_FILE_EXTENSION.get(extension)) - .map(CompressionKind::createCodec); + int position = fileName.lastIndexOf('.'); + if (position < 0) { + return Optional.empty(); + } + return Optional.ofNullable(CODECS_BY_FILE_EXTENSION.get(fileName.substring(position))); } } diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java index dad81ac82256..593fa6b23f2b 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java @@ -65,10 +65,10 @@ public LineReader createLineReader( { InputStream inputStream = inputFile.newStream(); try { - Optional codec = getExtension(inputFile.location()) - .flatMap(CompressionKind::createCodecFromExtension); + Optional codec = CompressionKind.forFile(inputFile.location()) + .map(CompressionKind::createCodec); if (codec.isPresent()) { - checkArgument(start == 0 && length == inputFile.length(), "Compressed files are not splittable"); + checkArgument(start == 0, "Compressed files are not splittable"); // for compressed input, we do not know the length of the uncompressed text length = Long.MAX_VALUE; inputStream = codec.get().createStreamDecompressor(inputStream); @@ -103,13 +103,4 @@ private void skipHeader(LineReader lineReader, int headerCount) } } } - - private static Optional getExtension(String location) - { - int position = location.lastIndexOf('.'); - if (position < 0) { - return Optional.empty(); - } - return Optional.of(location.substring(position)); - } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java index 4d1910da9598..66d8b0da07af 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java @@ -117,12 +117,12 @@ public Optional createPageSource( // get header and footer count int headerCount = getHeaderCount(schema); - if (headerCount > 0) { - checkArgument(estimatedFileSize == start + length, "Header not supported for a split file"); + if (headerCount > 1) { + checkArgument(start == 0, "Multiple header rows are not supported for a split file"); } int footerCount = getFooterCount(schema); if (footerCount > 0) { - checkArgument(estimatedFileSize == start + length, "Footer not supported for a split file"); + checkArgument(start == 0, "Footer not supported for a split file"); } // setup projected columns diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java index f0d61d59c141..3a158d19a1e7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java @@ -27,6 +27,7 @@ import io.airlift.slice.SliceUtf8; import io.airlift.slice.Slices; import io.trino.hadoop.TextLineLengthLimitExceededException; +import io.trino.hive.formats.compression.CompressionKind; import io.trino.orc.OrcWriterOptions; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartitionKey; @@ -415,9 +416,16 @@ public static long parseHiveTimestamp(String value) public static boolean isSplittable(InputFormat inputFormat, FileSystem fileSystem, Path path) { - // ORC uses a custom InputFormat but is always splittable - if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) { - return true; + // TODO move this to HiveStorageFormat when Hadoop library is removed + switch (inputFormat.getClass().getSimpleName()) { + case "OrcInputFormat", "MapredParquetInputFormat", "AvroContainerInputFormat", "RCFileInputFormat", "SequenceFileInputFormat" -> { + // These formats have splitting built into the format + return true; + } + case "TextInputFormat" -> { + // Only uncompressed text input format is splittable + return CompressionKind.forFile(path.getName()).isEmpty(); + } } // use reflection to get isSplittable method on FileInputFormat