diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/LineReaderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/LineReaderFactory.java index 6ff4ea849b5a..ecea13089901 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/LineReaderFactory.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/LineReaderFactory.java @@ -13,6 +13,8 @@ */ package io.trino.hive.formats.line; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; import java.io.IOException; @@ -31,4 +33,10 @@ LineReader createLineReader( int headerCount, int footerCount) throws IOException; + + TrinoInputFile newInputFile( + TrinoFileSystem trinoFileSystem, + Location path, + long estimatedFileSize, + long fileModifiedTime); } diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReaderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReaderFactory.java index b42e612a480c..2d298c6f012f 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReaderFactory.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReaderFactory.java @@ -14,6 +14,8 @@ package io.trino.hive.formats.line.sequence; import com.google.common.collect.ImmutableSet; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; import io.trino.hive.formats.line.FooterAwareLineReader; import io.trino.hive.formats.line.LineBuffer; @@ -76,6 +78,14 @@ public LineReader createLineReader( return lineReader; } + @Override + public TrinoInputFile newInputFile(TrinoFileSystem trinoFileSystem, Location path, long estimatedFileSize, long fileModifiedTime) + { + // estimatedFileSize contains padded bytes + // The reads in SequenceFileReader rely on non-padded file length to detect end of file + return trinoFileSystem.newInputFile(path); + } + private void skipHeader(LineReader lineReader, int headerCount) throws IOException { diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java index 7bd81bf96b02..675d61f319ed 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java @@ -14,6 +14,8 @@ package io.trino.hive.formats.line.text; import com.google.common.collect.ImmutableSet; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; import io.trino.hive.formats.compression.Codec; import io.trino.hive.formats.compression.CompressionKind; @@ -25,6 +27,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.time.Instant; import java.util.Optional; import java.util.Set; @@ -101,6 +104,12 @@ public LineReader createLineReader( } } + @Override + public TrinoInputFile newInputFile(TrinoFileSystem trinoFileSystem, Location path, long estimatedFileSize, long fileModifiedTime) + { + return trinoFileSystem.newInputFile(path, estimatedFileSize, Instant.ofEpochMilli(fileModifiedTime)); + } + private void skipHeader(LineReader lineReader, int headerCount) throws IOException { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java index 7e50be47b264..a52a2e9f195c 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java @@ -96,7 +96,7 @@ public Optional createPageSource( checkArgument(acidInfo.isEmpty(), "Acid is not supported"); - return Optional.of(projectColumnDereferences(columns, baseColumns -> createPageSource(session, path, start, length, estimatedFileSize, schema, baseColumns))); + return Optional.of(projectColumnDereferences(columns, baseColumns -> createPageSource(session, path, start, length, estimatedFileSize, fileModifiedTime, schema, baseColumns))); } private ConnectorPageSource createPageSource( @@ -105,6 +105,7 @@ private ConnectorPageSource createPageSource( long start, long length, long estimatedFileSize, + long fileModifiedTime, Schema schema, List columns) { @@ -129,7 +130,7 @@ private ConnectorPageSource createPageSource( } TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session); - TrinoInputFile inputFile = trinoFileSystem.newInputFile(path); + TrinoInputFile inputFile = lineReaderFactory.newInputFile(trinoFileSystem, path, estimatedFileSize, fileModifiedTime); try { // buffer file if small long smallFileReadTimeNanos = 0;