Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,12 @@ public static CompressionKind fromHadoopClassName(String hadoopClassName)
.filter(codec -> codec.fileExtension != null)
.collect(toImmutableMap(codec -> codec.fileExtension, Function.identity()));

public static Optional<Codec> createCodecFromExtension(String extension)
public static Optional<CompressionKind> forFile(String fileName)
{
return Optional.ofNullable(CODECS_BY_FILE_EXTENSION.get(extension))
.map(CompressionKind::createCodec);
int position = fileName.lastIndexOf('.');
if (position < 0) {
return Optional.empty();
}
return Optional.ofNullable(CODECS_BY_FILE_EXTENSION.get(fileName.substring(position)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ public LineReader createLineReader(
{
InputStream inputStream = inputFile.newStream();
try {
Optional<Codec> codec = getExtension(inputFile.location())
.flatMap(CompressionKind::createCodecFromExtension);
Optional<Codec> codec = CompressionKind.forFile(inputFile.location())
.map(CompressionKind::createCodec);
if (codec.isPresent()) {
checkArgument(start == 0 && length == inputFile.length(), "Compressed files are not splittable");
checkArgument(start == 0, "Compressed files are not splittable");
// for compressed input, we do not know the length of the uncompressed text
length = Long.MAX_VALUE;
inputStream = codec.get().createStreamDecompressor(inputStream);
Expand Down Expand Up @@ -103,13 +103,4 @@ private void skipHeader(LineReader lineReader, int headerCount)
}
}
}

private static Optional<String> getExtension(String location)
{
int position = location.lastIndexOf('.');
if (position < 0) {
return Optional.empty();
}
return Optional.of(location.substring(position));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ public Optional<ReaderPageSource> createPageSource(

// get header and footer count
int headerCount = getHeaderCount(schema);
if (headerCount > 0) {
checkArgument(estimatedFileSize == start + length, "Header not supported for a split file");
if (headerCount > 1) {
checkArgument(start == 0, "Multiple header rows are not supported for a split file");
}
int footerCount = getFooterCount(schema);
if (footerCount > 0) {
checkArgument(estimatedFileSize == start + length, "Footer not supported for a split file");
checkArgument(start == 0, "Footer not supported for a split file");
}

// setup projected columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airlift.slice.SliceUtf8;
import io.airlift.slice.Slices;
import io.trino.hadoop.TextLineLengthLimitExceededException;
import io.trino.hive.formats.compression.CompressionKind;
import io.trino.orc.OrcWriterOptions;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HivePartitionKey;
Expand Down Expand Up @@ -415,9 +416,16 @@ public static long parseHiveTimestamp(String value)

public static boolean isSplittable(InputFormat<?, ?> inputFormat, FileSystem fileSystem, Path path)
{
// ORC uses a custom InputFormat but is always splittable
if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
return true;
// TODO move this to HiveStorageFormat when Hadoop library is removed
switch (inputFormat.getClass().getSimpleName()) {
case "OrcInputFormat", "MapredParquetInputFormat", "AvroContainerInputFormat", "RCFileInputFormat", "SequenceFileInputFormat" -> {
// These formats have splitting built into the format
return true;
}
case "TextInputFormat" -> {
// Only uncompressed text input format is splittable
return CompressionKind.forFile(path.getName()).isEmpty();
}
}

// use reflection to get isSplittable method on FileInputFormat
Expand Down