diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/FileEntry.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/FileEntry.java index 5c106f9a93c6..397ea27d1bc1 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/FileEntry.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/FileEntry.java @@ -26,19 +26,19 @@ import static java.util.Comparator.comparing; import static java.util.Objects.requireNonNull; -public record FileEntry(String path, long length, Instant lastModified, Optional> blockLocations) +public record FileEntry(String location, long length, Instant lastModified, Optional> blocks) { public FileEntry { checkArgument(length >= 0, "length is negative"); - requireNonNull(path, "path is null"); - requireNonNull(blockLocations, "blockLocations is null"); - blockLocations = blockLocations.map(locations -> validatedBlockLocations(locations, length)); + requireNonNull(location, "location is null"); + requireNonNull(blocks, "blocks is null"); + blocks = blocks.map(locations -> validatedBlocks(locations, length)); } - public record BlockLocation(List hosts, long offset, long length) + public record Block(List hosts, long offset, long length) { - public BlockLocation + public Block { hosts = ImmutableList.copyOf(requireNonNull(hosts, "hosts is null")); checkArgument(offset >= 0, "offset is negative"); @@ -46,20 +46,20 @@ public record BlockLocation(List hosts, long offset, long length) } } - private static List validatedBlockLocations(List blockLocations, long length) + private static List validatedBlocks(List blocks, long length) { - checkArgument(!blockLocations.isEmpty(), "blockLocations is empty"); - blockLocations = blockLocations.stream() - .sorted(comparing(BlockLocation::offset)) + checkArgument(!blocks.isEmpty(), "blocks is empty"); + blocks = blocks.stream() + .sorted(comparing(Block::offset)) .collect(toImmutableList()); long position = 0; - for (BlockLocation location : blockLocations) { - checkArgument(location.offset() <= position, "blockLocations has a gap"); - position = max(position, addExact(location.offset(), location.length())); + for (Block block : blocks) { + checkArgument(block.offset() <= position, "blocks have a gap"); + position = max(position, addExact(block.offset(), block.length())); } - checkArgument(position >= length, "blockLocations does not cover file"); + checkArgument(position >= length, "blocks do not cover file"); - return blockLocations; + return blocks; } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java index ed969382fd9c..3da75b379ce3 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java @@ -18,30 +18,28 @@ public interface TrinoFileSystem { - TrinoInputFile newInputFile(String path); + TrinoInputFile newInputFile(String location); - TrinoInputFile newInputFile(String path, long length); + TrinoInputFile newInputFile(String location, long length); - TrinoOutputFile newOutputFile(String path); + TrinoOutputFile newOutputFile(String location); - void deleteFile(String path) + void deleteFile(String location) throws IOException; /** - * Delete paths in batches, it is not guaranteed to be atomic. - * - * @param paths collection of paths to be deleted - * @throws IOException when there is a problem with deletion of one or more specific paths + * Delete files in batches, possibly non-atomically. + * If an error occurs, some files may have been deleted. */ - void deleteFiles(Collection paths) + void deleteFiles(Collection locations) throws IOException; - void deleteDirectory(String path) + void deleteDirectory(String location) throws IOException; void renameFile(String source, String target) throws IOException; - FileIterator listFiles(String path) + FileIterator listFiles(String location) throws IOException; } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoInputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoInputFile.java index 93f6f4e1c671..eaefaaccd664 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoInputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoInputFile.java @@ -21,7 +21,7 @@ public interface TrinoInputFile TrinoInput newInput() throws IOException; - SeekableInputStream newStream() + TrinoInputStream newStream() throws IOException; long length() diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/SeekableInputStream.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoInputStream.java similarity index 95% rename from lib/trino-filesystem/src/main/java/io/trino/filesystem/SeekableInputStream.java rename to lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoInputStream.java index bf4c269e9ffe..7a9e6c462b3a 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/SeekableInputStream.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoInputStream.java @@ -16,7 +16,7 @@ import java.io.IOException; import java.io.InputStream; -public abstract class SeekableInputStream +public abstract class TrinoInputStream extends InputStream { /** diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/FileSeekableInputStream.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/FileTrinoInputStream.java similarity index 91% rename from lib/trino-filesystem/src/main/java/io/trino/filesystem/local/FileSeekableInputStream.java rename to lib/trino-filesystem/src/main/java/io/trino/filesystem/local/FileTrinoInputStream.java index ce6ea95663fe..9bb31e69a966 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/FileSeekableInputStream.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/FileTrinoInputStream.java @@ -14,19 +14,19 @@ package io.trino.filesystem.local; import com.google.common.primitives.Ints; -import io.trino.filesystem.SeekableInputStream; +import io.trino.filesystem.TrinoInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; -class FileSeekableInputStream - extends SeekableInputStream +class FileTrinoInputStream + extends TrinoInputStream { private final RandomAccessFile input; - public FileSeekableInputStream(File file) + public FileTrinoInputStream(File file) throws FileNotFoundException { this.input = new RandomAccessFile(file, "r"); diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalInputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalInputFile.java index b74d86a55692..1deb68863941 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalInputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalInputFile.java @@ -13,9 +13,9 @@ */ package io.trino.filesystem.local; -import io.trino.filesystem.SeekableInputStream; import io.trino.filesystem.TrinoInput; import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; import java.io.File; import java.io.IOException; @@ -41,10 +41,10 @@ public TrinoInput newInput() } @Override - public SeekableInputStream newStream() + public TrinoInputStream newStream() throws IOException { - return new FileSeekableInputStream(file); + return new FileTrinoInputStream(file); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryInputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryInputFile.java index 23195e81f2c2..7285451741a4 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryInputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryInputFile.java @@ -14,9 +14,9 @@ package io.trino.filesystem.memory; import io.airlift.slice.Slice; -import io.trino.filesystem.SeekableInputStream; import io.trino.filesystem.TrinoInput; import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; import java.io.IOException; import java.time.Instant; @@ -43,10 +43,10 @@ public TrinoInput newInput() } @Override - public SeekableInputStream newStream() + public TrinoInputStream newStream() throws IOException { - return new MemorySeekableInputStream(data); + return new MemoryTrinoInputStream(data); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemorySeekableInputStream.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryTrinoInputStream.java similarity index 88% rename from lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemorySeekableInputStream.java rename to lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryTrinoInputStream.java index 4afa813edb03..56ade34e1685 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemorySeekableInputStream.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryTrinoInputStream.java @@ -15,16 +15,16 @@ import io.airlift.slice.Slice; import io.airlift.slice.SliceInput; -import io.trino.filesystem.SeekableInputStream; +import io.trino.filesystem.TrinoInputStream; import java.io.IOException; -public class MemorySeekableInputStream - extends SeekableInputStream +public class MemoryTrinoInputStream + extends TrinoInputStream { private final SliceInput input; - public MemorySeekableInputStream(Slice data) + public MemoryTrinoInputStream(Slice data) { input = data.getInput(); } diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/TestFileEntry.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/TestFileEntry.java index b1a6317ad334..24ab1852e228 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/TestFileEntry.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/TestFileEntry.java @@ -13,7 +13,7 @@ */ package io.trino.filesystem; -import io.trino.filesystem.FileEntry.BlockLocation; +import io.trino.filesystem.FileEntry.Block; import org.junit.jupiter.api.Test; import java.time.Instant; @@ -28,72 +28,72 @@ public class TestFileEntry private static final Instant MODIFIED = Instant.ofEpochSecond(1234567890); @Test - public void testEmptyBlockLocations() + public void testEmptyBlocks() { assertThat(new FileEntry("/test", 123, MODIFIED, Optional.empty())) .satisfies(entry -> { - assertThat(entry.path()).isEqualTo("/test"); + assertThat(entry.location()).isEqualTo("/test"); assertThat(entry.length()).isEqualTo(123); assertThat(entry.lastModified()).isEqualTo(MODIFIED); - assertThat(entry.blockLocations()).isEmpty(); + assertThat(entry.blocks()).isEmpty(); }); } @Test - public void testPresentBlockLocations() + public void testPresentBlocks() { - List locations = List.of( - new BlockLocation(List.of(), 0, 50), - new BlockLocation(List.of(), 50, 70), - new BlockLocation(List.of(), 100, 150)); + List locations = List.of( + new Block(List.of(), 0, 50), + new Block(List.of(), 50, 70), + new Block(List.of(), 100, 150)); assertThat(new FileEntry("/test", 200, MODIFIED, Optional.of(locations))) - .satisfies(entry -> assertThat(entry.blockLocations()).contains(locations)); + .satisfies(entry -> assertThat(entry.blocks()).contains(locations)); } @Test - public void testMissingBlockLocations() + public void testMissingBlocks() { assertThatThrownBy(() -> new FileEntry("/test", 0, MODIFIED, Optional.of(List.of()))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("blockLocations is empty"); + .hasMessage("blocks is empty"); } @Test - public void testBlockLocationsEmptyFile() + public void testBlocksEmptyFile() { - List locations = List.of(new BlockLocation(List.of(), 0, 0)); + List locations = List.of(new Block(List.of(), 0, 0)); assertThat(new FileEntry("/test", 0, MODIFIED, Optional.of(locations))) - .satisfies(entry -> assertThat(entry.blockLocations()).contains(locations)); + .satisfies(entry -> assertThat(entry.blocks()).contains(locations)); } @Test - public void testBlockLocationsGapAtStart() + public void testBlocksGapAtStart() { - List locations = List.of(new BlockLocation(List.of(), 50, 50)); + List locations = List.of(new Block(List.of(), 50, 50)); assertThatThrownBy(() -> new FileEntry("/test", 100, MODIFIED, Optional.of(locations))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("blockLocations has a gap"); + .hasMessage("blocks have a gap"); } @Test - public void testBlockLocationsGapInMiddle() + public void testBlocksGapInMiddle() { - List locations = List.of( - new BlockLocation(List.of(), 0, 50), - new BlockLocation(List.of(), 100, 100)); + List locations = List.of( + new Block(List.of(), 0, 50), + new Block(List.of(), 100, 100)); assertThatThrownBy(() -> new FileEntry("/test", 200, MODIFIED, Optional.of(locations))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("blockLocations has a gap"); + .hasMessage("blocks have a gap"); } @Test - public void testBlockLocationsGapAtEnd() + public void testBlocksGapAtEnd() { - List locations = List.of( - new BlockLocation(List.of(), 0, 50), - new BlockLocation(List.of(), 50, 49)); + List locations = List.of( + new Block(List.of(), 0, 50), + new Block(List.of(), 50, 49)); assertThatThrownBy(() -> new FileEntry("/test", 100, MODIFIED, Optional.of(locations))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("blockLocations does not cover file"); + .hasMessage("blocks do not cover file"); } } diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileIterator.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileIterator.java index 887666c546d4..422d285199fb 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileIterator.java +++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileIterator.java @@ -15,8 +15,9 @@ import com.google.common.collect.ImmutableList; import io.trino.filesystem.FileEntry; -import io.trino.filesystem.FileEntry.BlockLocation; +import io.trino.filesystem.FileEntry.Block; import io.trino.filesystem.FileIterator; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -75,23 +76,21 @@ public FileEntry next() path += relativeUri.getPath(); } - List locations = Stream.of(status.getBlockLocations()) - .map(HdfsFileIterator::toTrinoBlockLocation) + List blocks = Stream.of(status.getBlockLocations()) + .map(HdfsFileIterator::toTrinoBlock) .collect(toImmutableList()); - Optional> blockLocations = locations.isEmpty() ? Optional.empty() : Optional.of(locations); - return new FileEntry( path, status.getLen(), Instant.ofEpochMilli(status.getModificationTime()), - blockLocations); + blocks.isEmpty() ? Optional.empty() : Optional.of(blocks)); } - private static BlockLocation toTrinoBlockLocation(org.apache.hadoop.fs.BlockLocation location) + private static Block toTrinoBlock(BlockLocation location) { try { - return new BlockLocation(ImmutableList.copyOf(location.getHosts()), location.getOffset(), location.getLength()); + return new Block(ImmutableList.copyOf(location.getHosts()), location.getOffset(), location.getLength()); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java index 08a6e856a446..7ef300a9383a 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java +++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java @@ -51,28 +51,28 @@ public HdfsFileSystem(HdfsEnvironment environment, HdfsContext context) } @Override - public TrinoInputFile newInputFile(String path) + public TrinoInputFile newInputFile(String location) { - return new HdfsInputFile(path, null, environment, context); + return new HdfsInputFile(location, null, environment, context); } @Override - public TrinoInputFile newInputFile(String path, long length) + public TrinoInputFile newInputFile(String location, long length) { - return new HdfsInputFile(path, length, environment, context); + return new HdfsInputFile(location, length, environment, context); } @Override - public TrinoOutputFile newOutputFile(String path) + public TrinoOutputFile newOutputFile(String location) { - return new HdfsOutputFile(path, environment, context); + return new HdfsOutputFile(location, environment, context); } @Override - public void deleteFile(String path) + public void deleteFile(String location) throws IOException { - Path file = hadoopPath(path); + Path file = hadoopPath(location); FileSystem fileSystem = environment.getFileSystem(context, file); environment.doAs(context.getIdentity(), () -> { if (!fileSystem.delete(file, false)) { @@ -83,10 +83,10 @@ public void deleteFile(String path) } @Override - public void deleteFiles(Collection paths) + public void deleteFiles(Collection locations) throws IOException { - Map> pathsGroupedByDirectory = paths.stream().collect( + Map> pathsGroupedByDirectory = locations.stream().collect( groupingBy( path -> hadoopPath(path.replaceFirst("/[^/]*$", "")), mapping(HadoopPaths::hadoopPath, toList()))); @@ -107,10 +107,10 @@ public void deleteFiles(Collection paths) } @Override - public void deleteDirectory(String path) + public void deleteDirectory(String location) throws IOException { - Path directory = hadoopPath(path); + Path directory = hadoopPath(location); FileSystem fileSystem = environment.getFileSystem(context, directory); environment.doAs(context.getIdentity(), () -> { if (!fileSystem.delete(directory, true) && fileSystem.exists(directory)) { @@ -136,14 +136,14 @@ public void renameFile(String source, String target) } @Override - public FileIterator listFiles(String path) + public FileIterator listFiles(String location) throws IOException { - Path directory = hadoopPath(path); + Path directory = hadoopPath(location); FileSystem fileSystem = environment.getFileSystem(context, directory); return environment.doAs(context.getIdentity(), () -> { try { - return new HdfsFileIterator(path, fileSystem, fileSystem.listFiles(directory, true)); + return new HdfsFileIterator(location, fileSystem, fileSystem.listFiles(directory, true)); } catch (FileNotFoundException e) { return FileIterator.empty(); diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsInputFile.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsInputFile.java index e40373b21487..a38876e0ec63 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsInputFile.java +++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsInputFile.java @@ -13,9 +13,9 @@ */ package io.trino.filesystem.hdfs; -import io.trino.filesystem.SeekableInputStream; import io.trino.filesystem.TrinoInput; import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import org.apache.hadoop.fs.FSDataInputStream; @@ -58,10 +58,10 @@ public TrinoInput newInput() } @Override - public SeekableInputStream newStream() + public TrinoInputStream newStream() throws IOException { - return new HdfsSeekableInputStream(openFile()); + return new HdfsTrinoInputStream(openFile()); } @Override diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsSeekableInputStream.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsTrinoInputStream.java similarity index 90% rename from lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsSeekableInputStream.java rename to lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsTrinoInputStream.java index b6a69bc590c4..bc606d209c4e 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsSeekableInputStream.java +++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsTrinoInputStream.java @@ -13,19 +13,19 @@ */ package io.trino.filesystem.hdfs; -import io.trino.filesystem.SeekableInputStream; +import io.trino.filesystem.TrinoInputStream; import org.apache.hadoop.fs.FSDataInputStream; import java.io.IOException; import static java.util.Objects.requireNonNull; -class HdfsSeekableInputStream - extends SeekableInputStream +class HdfsTrinoInputStream + extends TrinoInputStream { private final FSDataInputStream stream; - HdfsSeekableInputStream(FSDataInputStream stream) + HdfsTrinoInputStream(FSDataInputStream stream) { this.stream = requireNonNull(stream, "stream is null"); } diff --git a/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/TestHdfsFileSystem.java b/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/TestHdfsFileSystem.java index 43a62740befe..f154b93ffb79 100644 --- a/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/TestHdfsFileSystem.java +++ b/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/TestHdfsFileSystem.java @@ -92,7 +92,7 @@ private static List listFiles(TrinoFileSystem fileSystem, String path) FileIterator iterator = fileSystem.listFiles(path); ImmutableList.Builder files = ImmutableList.builder(); while (iterator.hasNext()) { - files.add(iterator.next().path()); + files.add(iterator.next().location()); } return files.build(); } diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ReadWriteUtils.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ReadWriteUtils.java index 2dff38ea9d00..ca0d80c305f0 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ReadWriteUtils.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ReadWriteUtils.java @@ -67,7 +67,7 @@ public static boolean isNegativeVInt(byte value) return value < -120 || (value >= -112 && value < 0); } - public static long readVInt(DataSeekableInputStream in) + public static long readVInt(TrinoDataInputStream in) throws IOException { byte firstByte = in.readByte(); diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/DataSeekableInputStream.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/TrinoDataInputStream.java similarity index 96% rename from lib/trino-hive-formats/src/main/java/io/trino/hive/formats/DataSeekableInputStream.java rename to lib/trino-hive-formats/src/main/java/io/trino/hive/formats/TrinoDataInputStream.java index 53975b8fbd85..8cfcb3e6ebc8 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/DataSeekableInputStream.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/TrinoDataInputStream.java @@ -15,7 +15,7 @@ import io.airlift.slice.Slice; import io.airlift.slice.Slices; -import io.trino.filesystem.SeekableInputStream; +import io.trino.filesystem.TrinoInputStream; import java.io.DataInput; import java.io.EOFException; @@ -33,15 +33,15 @@ import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; -public final class DataSeekableInputStream +public final class TrinoDataInputStream extends InputStream implements DataInput { - private static final int INSTANCE_SIZE = instanceSize(DataSeekableInputStream.class); + private static final int INSTANCE_SIZE = instanceSize(TrinoDataInputStream.class); private static final int DEFAULT_BUFFER_SIZE = 4 * 1024; private static final int MINIMUM_CHUNK_SIZE = 1024; - private final SeekableInputStream inputStream; + private final TrinoInputStream inputStream; private long readTimeNanos; private long readBytes; @@ -58,12 +58,12 @@ public final class DataSeekableInputStream private int bufferFill; - public DataSeekableInputStream(SeekableInputStream inputStream) + public TrinoDataInputStream(TrinoInputStream inputStream) { this(inputStream, DEFAULT_BUFFER_SIZE); } - public DataSeekableInputStream(SeekableInputStream inputStream, int bufferSize) + public TrinoDataInputStream(TrinoInputStream inputStream, int bufferSize) { requireNonNull(inputStream, "inputStream is null"); checkArgument(bufferSize >= MINIMUM_CHUNK_SIZE, "minimum buffer size of " + MINIMUM_CHUNK_SIZE + " required"); diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReader.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReader.java index 38992147d886..f779b417c648 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReader.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReader.java @@ -21,8 +21,8 @@ import io.airlift.slice.SliceInput; import io.airlift.slice.Slices; import io.trino.filesystem.TrinoInputFile; -import io.trino.hive.formats.DataSeekableInputStream; import io.trino.hive.formats.FileCorruptionException; +import io.trino.hive.formats.TrinoDataInputStream; import io.trino.hive.formats.compression.Codec; import io.trino.hive.formats.compression.CompressionKind; import io.trino.hive.formats.compression.ValueDecompressor; @@ -61,7 +61,7 @@ public final class SequenceFileReader private static final int MAX_METADATA_STRING_LENGTH = 1024 * 1024; private final String location; - private final DataSeekableInputStream input; + private final TrinoDataInputStream input; private final String keyClassName; private final String valueClassName; @@ -84,7 +84,7 @@ public SequenceFileReader(TrinoInputFile inputFile, long offset, long length) try { requireNonNull(inputFile, "inputFile is null"); this.location = inputFile.location(); - this.input = new DataSeekableInputStream(inputFile.newStream()); + this.input = new TrinoDataInputStream(inputFile.newStream()); closer.register(input); verify(offset >= 0, "offset is negative"); @@ -282,7 +282,7 @@ private static class SingleValueReader private final String location; private final long fileSize; - private final DataSeekableInputStream input; + private final TrinoDataInputStream input; private final ValueDecompressor decompressor; private final long end; @@ -295,7 +295,7 @@ private static class SingleValueReader public SingleValueReader( String location, long fileSize, - DataSeekableInputStream input, + TrinoDataInputStream input, ValueDecompressor decompressor, long end, long syncFirst, @@ -395,7 +395,7 @@ private static class BlockCompressedValueReader private final String location; private final long fileSize; - private final DataSeekableInputStream input; + private final TrinoDataInputStream input; private final long end; private final long syncFirst; @@ -409,7 +409,7 @@ private static class BlockCompressedValueReader public BlockCompressedValueReader( String location, long fileSize, - DataSeekableInputStream input, + TrinoDataInputStream input, ValueDecompressor decompressor, long end, long syncFirst, @@ -545,12 +545,12 @@ private static class ReadBuffer { private static final int INSTANCE_SIZE = instanceSize(ReadBuffer.class); - private final DataSeekableInputStream input; + private final TrinoDataInputStream input; private final ValueDecompressor decompressor; private final DynamicSliceOutput compressedBuffer = new DynamicSliceOutput(0); private final DynamicSliceOutput uncompressedBuffer = new DynamicSliceOutput(0); - public ReadBuffer(DataSeekableInputStream input, ValueDecompressor decompressor) + public ReadBuffer(TrinoDataInputStream input, ValueDecompressor decompressor) { this.input = requireNonNull(input, "input is null"); this.decompressor = decompressor; @@ -578,7 +578,7 @@ public Slice readBlock(int length) } } - private Slice readLengthPrefixedString(DataSeekableInputStream in) + private Slice readLengthPrefixedString(TrinoDataInputStream in) throws IOException { int length = toIntExact(readVInt(in)); diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileReader.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileReader.java index f5800294407e..37a0c8e359e5 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileReader.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/rcfile/RcFileReader.java @@ -18,9 +18,9 @@ import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.trino.filesystem.TrinoInputFile; -import io.trino.hive.formats.DataSeekableInputStream; import io.trino.hive.formats.FileCorruptionException; import io.trino.hive.formats.ReadWriteUtils; +import io.trino.hive.formats.TrinoDataInputStream; import io.trino.hive.formats.compression.Codec; import io.trino.hive.formats.compression.CompressionKind; import io.trino.hive.formats.compression.ValueDecompressor; @@ -76,7 +76,7 @@ public class RcFileReader private final String location; private final long fileSize; private final Map readColumns; - private final DataSeekableInputStream input; + private final TrinoDataInputStream input; private final long length; private final byte version; @@ -131,7 +131,7 @@ private RcFileReader( this.location = inputFile.location(); this.fileSize = inputFile.length(); this.readColumns = ImmutableMap.copyOf(requireNonNull(readColumns, "readColumns is null")); - this.input = new DataSeekableInputStream(inputFile.newStream()); + this.input = new TrinoDataInputStream(inputFile.newStream()); this.writeValidation = requireNonNull(writeValidation, "writeValidation is null"); this.writeChecksumBuilder = writeValidation.map(validation -> WriteChecksumBuilder.createWriteChecksumBuilder(readColumns)); @@ -452,7 +452,7 @@ private void closeQuietly() } } - private Slice readLengthPrefixedString(DataSeekableInputStream in) + private Slice readLengthPrefixedString(TrinoDataInputStream in) throws IOException { int length = toIntExact(ReadWriteUtils.readVInt(in)); diff --git a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/TestDataSeekableInputStream.java b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/TestTrinoDataInputStream.java similarity index 74% rename from lib/trino-hive-formats/src/test/java/io/trino/hive/formats/TestDataSeekableInputStream.java rename to lib/trino-hive-formats/src/test/java/io/trino/hive/formats/TestTrinoDataInputStream.java index ad696e81a30d..1807e75ac27f 100644 --- a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/TestDataSeekableInputStream.java +++ b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/TestTrinoDataInputStream.java @@ -18,8 +18,8 @@ import com.google.common.io.ByteStreams; import io.airlift.slice.Slice; import io.airlift.slice.Slices; -import io.trino.filesystem.SeekableInputStream; -import io.trino.filesystem.memory.MemorySeekableInputStream; +import io.trino.filesystem.TrinoInputStream; +import io.trino.filesystem.memory.MemoryTrinoInputStream; import org.testng.annotations.Test; import java.io.ByteArrayOutputStream; @@ -43,7 +43,7 @@ import static org.testng.Assert.fail; @SuppressWarnings("resource") -public class TestDataSeekableInputStream +public class TestTrinoDataInputStream { private static final int BUFFER_SIZE = 129; @@ -70,7 +70,7 @@ public void loadValue(DataOutputStream output, int valueIndex) } @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { assertEquals(input.readBoolean(), valueIndex % 2 == 0); @@ -92,7 +92,7 @@ public void loadValue(DataOutputStream output, int valueIndex) } @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { assertEquals(input.readByte(), (byte) valueIndex); @@ -114,14 +114,14 @@ public void loadValue(DataOutputStream output, int valueIndex) } @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { assertEquals(input.read(), valueIndex & 0xFF); } @Override - public void verifyReadOffEnd(DataSeekableInputStream input) + public void verifyReadOffEnd(TrinoDataInputStream input) throws IOException { assertEquals(input.read(), -1); @@ -143,7 +143,7 @@ public void loadValue(DataOutputStream output, int valueIndex) } @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { assertEquals(input.readShort(), (short) valueIndex); @@ -165,7 +165,7 @@ public void loadValue(DataOutputStream output, int valueIndex) } @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { assertEquals(input.readUnsignedShort(), valueIndex & 0xFFF); @@ -187,7 +187,7 @@ public void loadValue(DataOutputStream output, int valueIndex) } @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { assertEquals(input.readInt(), valueIndex); @@ -209,7 +209,7 @@ public void loadValue(DataOutputStream output, int valueIndex) } @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { assertEquals(input.readUnsignedInt(), valueIndex); @@ -231,7 +231,7 @@ public void loadValue(DataOutputStream output, int valueIndex) } @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { assertEquals(input.readLong(), valueIndex); @@ -253,7 +253,7 @@ public void loadValue(DataOutputStream output, int valueIndex) } @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { assertEquals(input.readFloat(), valueIndex + 0.12f); @@ -275,7 +275,7 @@ public void loadValue(DataOutputStream output, int valueIndex) } @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { assertEquals(input.readDouble(), valueIndex + 0.12); @@ -292,14 +292,14 @@ public void testSkip() testDataInput(new SkipDataInputTester(readSize) { @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { input.skip(valueSize()); } @Override - public void verifyReadOffEnd(DataSeekableInputStream input) + public void verifyReadOffEnd(TrinoDataInputStream input) throws IOException { assertEquals(input.skip(valueSize()), valueSize() - 1); @@ -308,14 +308,14 @@ public void verifyReadOffEnd(DataSeekableInputStream input) testDataInput(new SkipDataInputTester(readSize) { @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { input.skipBytes(valueSize()); } @Override - public void verifyReadOffEnd(DataSeekableInputStream input) + public void verifyReadOffEnd(TrinoDataInputStream input) throws IOException { assertEquals(input.skip(valueSize()), valueSize() - 1); @@ -326,7 +326,7 @@ public void verifyReadOffEnd(DataSeekableInputStream input) testDataInput(new SkipDataInputTester(readSize) { @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { int length = valueSize(); @@ -344,7 +344,7 @@ public void verifyValue(DataSeekableInputStream input, int valueIndex) testDataInput(new SkipDataInputTester(readSize) { @Override - public void verifyValue(DataSeekableInputStream input, int valueIndex) + public void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { long length = valueSize(); @@ -370,7 +370,7 @@ public void testReadSlice() testDataInput(new StringDataInputTester(readSize) { @Override - public String readActual(DataSeekableInputStream input) + public String readActual(TrinoDataInputStream input) throws IOException { return input.readSlice(valueSize()).toStringUtf8(); @@ -387,7 +387,7 @@ public void testReadFully() testDataInput(new StringDataInputTester(readSize) { @Override - public String readActual(DataSeekableInputStream input) + public String readActual(TrinoDataInputStream input) throws IOException { Slice slice = Slices.allocate(valueSize()); @@ -398,7 +398,7 @@ public String readActual(DataSeekableInputStream input) testDataInput(new StringDataInputTester(readSize) { @Override - public String readActual(DataSeekableInputStream input) + public String readActual(TrinoDataInputStream input) throws IOException { Slice slice = Slices.allocate(valueSize() + 10); @@ -409,7 +409,7 @@ public String readActual(DataSeekableInputStream input) testDataInput(new StringDataInputTester(readSize) { @Override - public String readActual(DataSeekableInputStream input) + public String readActual(TrinoDataInputStream input) throws IOException { byte[] bytes = new byte[valueSize()]; @@ -420,7 +420,7 @@ public String readActual(DataSeekableInputStream input) testDataInput(new StringDataInputTester(readSize) { @Override - public String readActual(DataSeekableInputStream input) + public String readActual(TrinoDataInputStream input) throws IOException { byte[] bytes = new byte[valueSize() + 10]; @@ -431,7 +431,7 @@ public String readActual(DataSeekableInputStream input) testDataInput(new StringDataInputTester(readSize) { @Override - public String readActual(DataSeekableInputStream input) + public String readActual(TrinoDataInputStream input) throws IOException { byte[] bytes = new byte[valueSize()]; @@ -447,7 +447,7 @@ public String readActual(DataSeekableInputStream input) testDataInput(new StringDataInputTester(readSize) { @Override - public String readActual(DataSeekableInputStream input) + public String readActual(TrinoDataInputStream input) throws IOException { byte[] bytes = new byte[valueSize() + 10]; @@ -458,7 +458,7 @@ public String readActual(DataSeekableInputStream input) testDataInput(new StringDataInputTester(readSize) { @Override - public String readActual(DataSeekableInputStream input) + public String readActual(TrinoDataInputStream input) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -473,7 +473,7 @@ public String readActual(DataSeekableInputStream input) public void testEmptyInput() throws Exception { - DataSeekableInputStream input = createDataSeekableInputStream(new byte[0]); + TrinoDataInputStream input = createTrinoDataInputStream(new byte[0]); assertEquals(input.getPos(), 0); } @@ -481,7 +481,7 @@ public void testEmptyInput() public void testEmptyRead() throws Exception { - DataSeekableInputStream input = createDataSeekableInputStream(new byte[0]); + TrinoDataInputStream input = createTrinoDataInputStream(new byte[0]); assertEquals(input.read(), -1); } @@ -489,7 +489,7 @@ public void testEmptyRead() public void testReadByteBeyondEnd() throws Exception { - DataSeekableInputStream input = createDataSeekableInputStream(new byte[0]); + TrinoDataInputStream input = createTrinoDataInputStream(new byte[0]); input.readByte(); } @@ -497,7 +497,7 @@ public void testReadByteBeyondEnd() public void testReadShortBeyondEnd() throws Exception { - DataSeekableInputStream input = createDataSeekableInputStream(new byte[1]); + TrinoDataInputStream input = createTrinoDataInputStream(new byte[1]); input.readShort(); } @@ -505,7 +505,7 @@ public void testReadShortBeyondEnd() public void testReadIntBeyondEnd() throws Exception { - DataSeekableInputStream input = createDataSeekableInputStream(new byte[3]); + TrinoDataInputStream input = createTrinoDataInputStream(new byte[3]); input.readInt(); } @@ -513,7 +513,7 @@ public void testReadIntBeyondEnd() public void testReadLongBeyondEnd() throws Exception { - DataSeekableInputStream input = createDataSeekableInputStream(new byte[7]); + TrinoDataInputStream input = createTrinoDataInputStream(new byte[7]); input.readLong(); } @@ -521,78 +521,78 @@ public void testReadLongBeyondEnd() public void testEncodingBoolean() throws Exception { - assertTrue(createDataSeekableInputStream(new byte[] {1}).readBoolean()); - assertFalse(createDataSeekableInputStream(new byte[] {0}).readBoolean()); + assertTrue(createTrinoDataInputStream(new byte[] {1}).readBoolean()); + assertFalse(createTrinoDataInputStream(new byte[] {0}).readBoolean()); } @Test public void testEncodingByte() throws Exception { - assertEquals(createDataSeekableInputStream(new byte[] {92}).readByte(), 92); - assertEquals(createDataSeekableInputStream(new byte[] {-100}).readByte(), -100); - assertEquals(createDataSeekableInputStream(new byte[] {-17}).readByte(), -17); + assertEquals(createTrinoDataInputStream(new byte[] {92}).readByte(), 92); + assertEquals(createTrinoDataInputStream(new byte[] {-100}).readByte(), -100); + assertEquals(createTrinoDataInputStream(new byte[] {-17}).readByte(), -17); - assertEquals(createDataSeekableInputStream(new byte[] {92}).readUnsignedByte(), 92); - assertEquals(createDataSeekableInputStream(new byte[] {-100}).readUnsignedByte(), 156); - assertEquals(createDataSeekableInputStream(new byte[] {-17}).readUnsignedByte(), 239); + assertEquals(createTrinoDataInputStream(new byte[] {92}).readUnsignedByte(), 92); + assertEquals(createTrinoDataInputStream(new byte[] {-100}).readUnsignedByte(), 156); + assertEquals(createTrinoDataInputStream(new byte[] {-17}).readUnsignedByte(), 239); } @Test public void testEncodingShort() throws Exception { - assertEquals(createDataSeekableInputStream(new byte[] {109, 92}).readShort(), 23661); - assertEquals(createDataSeekableInputStream(new byte[] {109, -100}).readShort(), -25491); - assertEquals(createDataSeekableInputStream(new byte[] {-52, -107}).readShort(), -27188); + assertEquals(createTrinoDataInputStream(new byte[] {109, 92}).readShort(), 23661); + assertEquals(createTrinoDataInputStream(new byte[] {109, -100}).readShort(), -25491); + assertEquals(createTrinoDataInputStream(new byte[] {-52, -107}).readShort(), -27188); - assertEquals(createDataSeekableInputStream(new byte[] {109, -100}).readUnsignedShort(), 40045); - assertEquals(createDataSeekableInputStream(new byte[] {-52, -107}).readUnsignedShort(), 38348); + assertEquals(createTrinoDataInputStream(new byte[] {109, -100}).readUnsignedShort(), 40045); + assertEquals(createTrinoDataInputStream(new byte[] {-52, -107}).readUnsignedShort(), 38348); } @Test public void testEncodingInteger() throws Exception { - assertEquals(createDataSeekableInputStream(new byte[] {109, 92, 75, 58}).readInt(), 978017389); - assertEquals(createDataSeekableInputStream(new byte[] {-16, -60, -120, -1}).readInt(), -7813904); + assertEquals(createTrinoDataInputStream(new byte[] {109, 92, 75, 58}).readInt(), 978017389); + assertEquals(createTrinoDataInputStream(new byte[] {-16, -60, -120, -1}).readInt(), -7813904); } @Test public void testEncodingLong() throws Exception { - assertEquals(createDataSeekableInputStream(new byte[] {49, -114, -96, -23, -32, -96, -32, 127}).readLong(), 9214541725452766769L); - assertEquals(createDataSeekableInputStream(new byte[] {109, 92, 75, 58, 18, 120, -112, -17}).readLong(), -1184314682315678611L); + assertEquals(createTrinoDataInputStream(new byte[] {49, -114, -96, -23, -32, -96, -32, 127}).readLong(), 9214541725452766769L); + assertEquals(createTrinoDataInputStream(new byte[] {109, 92, 75, 58, 18, 120, -112, -17}).readLong(), -1184314682315678611L); } @Test public void testEncodingDouble() throws Exception { - assertEquals(createDataSeekableInputStream(new byte[] {31, -123, -21, 81, -72, 30, 9, 64}).readDouble(), 3.14); - assertEquals(createDataSeekableInputStream(new byte[] {0, 0, 0, 0, 0, 0, -8, 127}).readDouble(), Double.NaN); - assertEquals(createDataSeekableInputStream(new byte[] {0, 0, 0, 0, 0, 0, -16, -1}).readDouble(), Double.NEGATIVE_INFINITY); - assertEquals(createDataSeekableInputStream(new byte[] {0, 0, 0, 0, 0, 0, -16, 127}).readDouble(), Double.POSITIVE_INFINITY); + assertEquals(createTrinoDataInputStream(new byte[] {31, -123, -21, 81, -72, 30, 9, 64}).readDouble(), 3.14); + assertEquals(createTrinoDataInputStream(new byte[] {0, 0, 0, 0, 0, 0, -8, 127}).readDouble(), Double.NaN); + assertEquals(createTrinoDataInputStream(new byte[] {0, 0, 0, 0, 0, 0, -16, -1}).readDouble(), Double.NEGATIVE_INFINITY); + assertEquals(createTrinoDataInputStream(new byte[] {0, 0, 0, 0, 0, 0, -16, 127}).readDouble(), Double.POSITIVE_INFINITY); } @Test public void testEncodingFloat() throws Exception { - assertEquals(createDataSeekableInputStream(new byte[] {-61, -11, 72, 64}).readFloat(), 3.14f); - assertEquals(createDataSeekableInputStream(new byte[] {0, 0, -64, 127}).readFloat(), Float.NaN); - assertEquals(createDataSeekableInputStream(new byte[] {0, 0, -128, -1}).readFloat(), Float.NEGATIVE_INFINITY); - assertEquals(createDataSeekableInputStream(new byte[] {0, 0, -128, 127}).readFloat(), Float.POSITIVE_INFINITY); + assertEquals(createTrinoDataInputStream(new byte[] {-61, -11, 72, 64}).readFloat(), 3.14f); + assertEquals(createTrinoDataInputStream(new byte[] {0, 0, -64, 127}).readFloat(), Float.NaN); + assertEquals(createTrinoDataInputStream(new byte[] {0, 0, -128, -1}).readFloat(), Float.NEGATIVE_INFINITY); + assertEquals(createTrinoDataInputStream(new byte[] {0, 0, -128, 127}).readFloat(), Float.POSITIVE_INFINITY); } @Test public void testRetainedSize() { int bufferSize = 1024; - SeekableInputStream inputStream = new MemorySeekableInputStream(Slices.wrappedBuffer(new byte[] {0, 1})); - DataSeekableInputStream input = new DataSeekableInputStream(inputStream, bufferSize); - assertEquals(input.getRetainedSize(), instanceSize(DataSeekableInputStream.class) + sizeOfByteArray(bufferSize)); + TrinoInputStream inputStream = new MemoryTrinoInputStream(Slices.wrappedBuffer(new byte[] {0, 1})); + TrinoDataInputStream input = new TrinoDataInputStream(inputStream, bufferSize); + assertEquals(input.getRetainedSize(), instanceSize(TrinoDataInputStream.class) + sizeOfByteArray(bufferSize)); } private static void testDataInput(DataInputTester tester) @@ -615,7 +615,7 @@ private static void testDataInput(DataInputTester tester) private static void testReadForward(DataInputTester tester, byte[] bytes) throws IOException { - DataSeekableInputStream input = createDataSeekableInputStream(bytes); + TrinoDataInputStream input = createTrinoDataInputStream(bytes); for (int i = 0; i < bytes.length / tester.valueSize(); i++) { int position = i * tester.valueSize(); assertEquals(input.getPos(), position); @@ -626,7 +626,7 @@ private static void testReadForward(DataInputTester tester, byte[] bytes) private static void testReadReverse(DataInputTester tester, byte[] bytes) throws IOException { - DataSeekableInputStream input = createDataSeekableInputStream(bytes); + TrinoDataInputStream input = createTrinoDataInputStream(bytes); for (int i = bytes.length / tester.valueSize() - 1; i >= 0; i--) { int position = i * tester.valueSize(); input.seek(position); @@ -638,7 +638,7 @@ private static void testReadReverse(DataInputTester tester, byte[] bytes) private static void testReadOffEnd(DataInputTester tester, byte[] bytes) throws IOException { - DataSeekableInputStream input = createDataSeekableInputStream(bytes); + TrinoDataInputStream input = createTrinoDataInputStream(bytes); ByteStreams.skipFully(input, bytes.length - tester.valueSize() + 1); tester.verifyReadOffEnd(input); } @@ -666,10 +666,10 @@ public final int valueSize() public abstract void loadValue(DataOutputStream slice, int valueIndex) throws IOException; - public abstract void verifyValue(DataSeekableInputStream input, int valueIndex) + public abstract void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException; - public void verifyReadOffEnd(DataSeekableInputStream input) + public void verifyReadOffEnd(TrinoDataInputStream input) throws IOException { try { @@ -713,7 +713,7 @@ public final void loadValue(DataOutputStream output, int valueIndex) } @Override - public final void verifyValue(DataSeekableInputStream input, int valueIndex) + public final void verifyValue(TrinoDataInputStream input, int valueIndex) throws IOException { String actual = readActual(input); @@ -721,13 +721,13 @@ public final void verifyValue(DataSeekableInputStream input, int valueIndex) assertEquals(actual, expected); } - protected abstract String readActual(DataSeekableInputStream input) + protected abstract String readActual(TrinoDataInputStream input) throws IOException; } - private static DataSeekableInputStream createDataSeekableInputStream(byte[] bytes) + private static TrinoDataInputStream createTrinoDataInputStream(byte[] bytes) { - SeekableInputStream inputStream = new MemorySeekableInputStream(Slices.wrappedBuffer(bytes)); - return new DataSeekableInputStream(inputStream, 16 * 1024); + TrinoInputStream inputStream = new MemoryTrinoInputStream(Slices.wrappedBuffer(bytes)); + return new TrinoDataInputStream(inputStream, 16 * 1024); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 65c582121122..07fcdfc81aff 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -2410,7 +2410,7 @@ private void cleanExtraOutputFiles(ConnectorSession session, String queryId, Str FileIterator iterator = fileSystem.listFiles(location); while (iterator.hasNext()) { FileEntry file = iterator.next(); - String fileName = new Path(file.path()).getName(); + String fileName = new Path(file.location()).getName(); if (isFileCreatedByQuery(fileName, queryId) && !filesToKeep.contains(location + "/" + fileName)) { filesToDelete.add(fileName); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index 357713cb2ef5..b3902be75fdf 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -215,13 +215,13 @@ private void doVacuum( FileIterator listing = fileSystem.listFiles(tableLocation.toString()); while (listing.hasNext()) { FileEntry entry = listing.next(); - String path = entry.path(); + String location = entry.location(); checkState( - path.startsWith(commonPathPrefix), + location.startsWith(commonPathPrefix), "Unexpected path [%s] returned when listing files under [%s]", - path, + location, tableLocation); - String relativePath = path.substring(commonPathPrefix.length()); + String relativePath = location.substring(commonPathPrefix.length()); if (relativePath.isEmpty()) { // A file returned for "tableLocation/", might be possible on S3. continue; @@ -230,14 +230,14 @@ private void doVacuum( // ignore tableLocation/_delta_log/** if (relativePath.equals(TRANSACTION_LOG_DIRECTORY) || relativePath.startsWith(TRANSACTION_LOG_DIRECTORY + "/")) { - log.debug("[%s] skipping a file inside transaction log dir: %s", queryId, path); + log.debug("[%s] skipping a file inside transaction log dir: %s", queryId, location); transactionLogFiles++; continue; } // skip retained files if (retainedPaths.contains(relativePath)) { - log.debug("[%s] retaining a known file: %s", queryId, path); + log.debug("[%s] retaining a known file: %s", queryId, location); retainedKnownFiles++; continue; } @@ -245,13 +245,13 @@ private void doVacuum( // ignore recently created files Instant modificationTime = entry.lastModified(); if (!modificationTime.isBefore(threshold)) { - log.debug("[%s] retaining an unknown file %s with modification time %s", queryId, path, modificationTime); + log.debug("[%s] retaining an unknown file %s with modification time %s", queryId, location, modificationTime); retainedUnknownFiles++; continue; } - log.debug("[%s] deleting file [%s] with modification time %s", queryId, path, modificationTime); - filesToDelete.add(path); + log.debug("[%s] deleting file [%s] with modification time %s", queryId, location, modificationTime); + filesToDelete.add(location); if (filesToDelete.size() == DELETE_BATCH_SIZE) { fileSystem.deleteFiles(filesToDelete); removedFiles += filesToDelete.size(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/S3NativeTransactionLogSynchronizer.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/S3NativeTransactionLogSynchronizer.java index 088c339c35c2..5f623d489cfd 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/S3NativeTransactionLogSynchronizer.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/S3NativeTransactionLogSynchronizer.java @@ -189,9 +189,9 @@ private List listLockInfos(TrinoFileSystem fileSystem, Path lockDirect while (files.hasNext()) { FileEntry entry = files.next(); - String name = entry.path().substring(entry.path().lastIndexOf('/') + 1); + String name = entry.location().substring(entry.location().lastIndexOf('/') + 1); if (LOCK_FILENAME_PATTERN.matcher(name).matches()) { - Optional lockInfo = parseLockFile(fileSystem, entry.path(), name); + Optional lockInfo = parseLockFile(fileSystem, entry.location(), name); lockInfo.ifPresent(lockInfos::add); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java index efd5a064db27..f3c8e682b766 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java @@ -17,11 +17,11 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multiset; import io.trino.filesystem.FileIterator; -import io.trino.filesystem.SeekableInputStream; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInput; import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; import io.trino.filesystem.TrinoOutputFile; import io.trino.spi.security.ConnectorIdentity; @@ -75,39 +75,39 @@ public TrackingFileSystem(TrinoFileSystem delegate, Consumer fileOpened) } @Override - public TrinoInputFile newInputFile(String path) + public TrinoInputFile newInputFile(String location) { - TrinoInputFile inputFile = delegate.newInputFile(path); + TrinoInputFile inputFile = delegate.newInputFile(location); return new TrackingInputFile(inputFile, fileOpened); } @Override - public TrinoInputFile newInputFile(String path, long length) + public TrinoInputFile newInputFile(String location, long length) { - TrinoInputFile inputFile = delegate.newInputFile(path, length); + TrinoInputFile inputFile = delegate.newInputFile(location, length); return new TrackingInputFile(inputFile, fileOpened); } @Override - public TrinoOutputFile newOutputFile(String path) + public TrinoOutputFile newOutputFile(String location) { throw new UnsupportedOperationException(); } @Override - public void deleteFile(String path) + public void deleteFile(String location) { throw new UnsupportedOperationException(); } @Override - public void deleteFiles(Collection paths) + public void deleteFiles(Collection locations) { throw new UnsupportedOperationException(); } @Override - public void deleteDirectory(String path) + public void deleteDirectory(String location) { throw new UnsupportedOperationException(); } @@ -119,7 +119,7 @@ public void renameFile(String source, String target) } @Override - public FileIterator listFiles(String path) + public FileIterator listFiles(String location) { throw new UnsupportedOperationException(); } @@ -146,7 +146,7 @@ public TrinoInput newInput() } @Override - public SeekableInputStream newStream() + public TrinoInputStream newStream() throws IOException { fileOpened.accept(location()); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeGcsConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeGcsConnectorSmokeTest.java index ff7b1cfd77a1..6d6094cb7789 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeGcsConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeGcsConnectorSmokeTest.java @@ -20,7 +20,6 @@ import com.google.common.io.Resources; import com.google.common.reflect.ClassPath; import io.airlift.log.Logger; -import io.trino.filesystem.FileEntry; import io.trino.filesystem.FileIterator; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoOutputFile; @@ -207,16 +206,13 @@ protected List listCheckpointFiles(String transactionLogDirectory) private List listAllFilesRecursive(String directory) { - String path = bucketUrl() + directory; - ImmutableList.Builder paths = ImmutableList.builder(); - + ImmutableList.Builder locations = ImmutableList.builder(); try { - FileIterator files = fileSystem.listFiles(path); + FileIterator files = fileSystem.listFiles(bucketUrl() + directory); while (files.hasNext()) { - FileEntry file = files.next(); - paths.add(file.path()); + locations.add(files.next().location()); } - return paths.build(); + return locations.build(); } catch (FileNotFoundException e) { return ImmutableList.of(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java index 2647c793b61e..c04abcbad402 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java @@ -720,7 +720,7 @@ private ListenableFuture getTransactionalSplits(Path path, boolean splitta for (FileEntry entry : acidState.originalFiles()) { // Hive requires "original" files of transactional tables to conform to the bucketed tables naming pattern, to match them with delete deltas. - acidInfoBuilder.addOriginalFile(new Path(entry.path()), entry.length(), getRequiredBucketNumber(entry.path())); + acidInfoBuilder.addOriginalFile(new Path(entry.location()), entry.length(), getRequiredBucketNumber(entry.location())); } if (tableBucketInfo.isPresent()) { @@ -728,7 +728,7 @@ private ListenableFuture getTransactionalSplits(Path path, boolean splitta for (FileEntry entry : acidState.originalFiles()) { List fileStatuses = ImmutableList.of(new TrinoFileStatus(entry)); - Optional acidInfo = acidInfoForOriginalFiles(fullAcid, acidInfoBuilder, entry.path()); + Optional acidInfo = acidInfoForOriginalFiles(fullAcid, acidInfoBuilder, entry.location()); hiveSplitSource.addToQueue(getBucketedSplits(fileStatuses, splitFactory, bucketInfo, bucketConversion, splittable, acidInfo)); } @@ -755,7 +755,7 @@ private static Iterator generateOriginalFilesSplits( .map(entry -> createInternalHiveSplit( splitFactory, splittable, - acidInfoForOriginalFiles(fullAcid, acidInfoBuilder, entry.path()), + acidInfoForOriginalFiles(fullAcid, acidInfoBuilder, entry.location()), new TrinoFileStatus(entry))) .flatMap(Optional::stream) .iterator(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MonitoredTrinoInputFile.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MonitoredTrinoInputFile.java index 4731ddfe8c07..b1b9f20c97b1 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MonitoredTrinoInputFile.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MonitoredTrinoInputFile.java @@ -13,9 +13,9 @@ */ package io.trino.plugin.hive; -import io.trino.filesystem.SeekableInputStream; import io.trino.filesystem.TrinoInput; import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; import java.io.IOException; import java.time.Instant; @@ -42,10 +42,10 @@ public TrinoInput newInput() } @Override - public SeekableInputStream newStream() + public TrinoInputStream newStream() throws IOException { - return new MonitoredSeekableInputStream(stats, delegate.newStream()); + return new MonitoredTrinoInputStream(stats, delegate.newStream()); } @Override @@ -126,13 +126,13 @@ public String toString() } } - private static final class MonitoredSeekableInputStream - extends SeekableInputStream + private static final class MonitoredTrinoInputStream + extends TrinoInputStream { private final FileFormatDataSourceStats stats; - private final SeekableInputStream delegate; + private final TrinoInputStream delegate; - public MonitoredSeekableInputStream(FileFormatDataSourceStats stats, SeekableInputStream delegate) + public MonitoredTrinoInputStream(FileFormatDataSourceStats stats, TrinoInputStream delegate) { this.stats = requireNonNull(stats, "stats is null"); this.delegate = requireNonNull(delegate, "delegate is null"); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/BlockLocation.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/BlockLocation.java index 4be672ac2452..631ec3b4de31 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/BlockLocation.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/BlockLocation.java @@ -14,7 +14,7 @@ package io.trino.plugin.hive.fs; import com.google.common.collect.ImmutableList; -import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileEntry.Block; import javax.annotation.Nullable; @@ -45,11 +45,11 @@ public static List fromHiveBlockLocations(@Nullable org.apache.ha .collect(toImmutableList()); } - public BlockLocation(FileEntry.BlockLocation blockLocation) + public BlockLocation(Block block) { - this.hosts = ImmutableList.copyOf(blockLocation.hosts()); - this.offset = blockLocation.offset(); - this.length = blockLocation.length(); + this.hosts = ImmutableList.copyOf(block.hosts()); + this.offset = block.offset(); + this.length = block.length(); } public BlockLocation(org.apache.hadoop.fs.BlockLocation blockLocation) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java index 22f00f2ff40c..98f57e3ff707 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileEntry.Block; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -36,13 +37,12 @@ public class TrinoFileStatus public TrinoFileStatus(FileEntry entry) { - this( - entry.blockLocations() - .orElseGet(() -> List.of(new FileEntry.BlockLocation(List.of(), 0, entry.length()))) + this(entry.blocks() + .orElseGet(() -> List.of(new Block(List.of(), 0, entry.length()))) .stream() .map(BlockLocation::new) .collect(toImmutableList()), - new Path(entry.path()), + new Path(entry.location()), false, entry.length(), entry.lastModified().toEpochMilli()); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AcidTables.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AcidTables.java index 0023b71053f9..830b79c2c36f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AcidTables.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AcidTables.java @@ -126,14 +126,14 @@ public static AcidState getAcidState(TrinoFileSystem fileSystem, String director List originalFiles = new ArrayList<>(); for (FileEntry file : listFiles(fileSystem, directory)) { - String suffix = listingSuffix(directory, file.path()); + String suffix = listingSuffix(directory, file.location()); int slash = suffix.indexOf('/'); String name = (slash == -1) ? "" : suffix.substring(0, slash); if (name.startsWith("base_") || name.startsWith("delta_") || name.startsWith("delete_delta_")) { if (suffix.indexOf('/', slash + 1) != -1) { - throw new TrinoException(HIVE_INVALID_BUCKET_FILES, "Found file in sub-directory of ACID directory: " + file.path()); + throw new TrinoException(HIVE_INVALID_BUCKET_FILES, "Found file in sub-directory of ACID directory: " + file.location()); } groupedFiles.put(name, file); } @@ -188,7 +188,7 @@ else if (file.length() > 0) { originalFiles.clear(); } - originalFiles.sort(comparing(FileEntry::path)); + originalFiles.sort(comparing(FileEntry::location)); workingDeltas.sort(null); List deltas = new ArrayList<>(); @@ -312,7 +312,7 @@ private static List listFiles(TrinoFileSystem fileSystem, String dire FileIterator iterator = fileSystem.listFiles(directory); while (iterator.hasNext()) { FileEntry file = iterator.next(); - String name = new Path(file.path()).getName(); + String name = new Path(file.location()).getName(); if (!name.startsWith("_") && !name.startsWith(".")) { files.add(file); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestAcidTables.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestAcidTables.java index a957750c1657..369ee0745feb 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestAcidTables.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestAcidTables.java @@ -128,13 +128,13 @@ public void testOriginal() List files = state.originalFiles(); assertEquals(files.size(), 7); - assertEquals(files.get(0).path(), "mock:/tbl/part1/000000_0"); - assertEquals(files.get(1).path(), "mock:/tbl/part1/000000_0_copy_1"); - assertEquals(files.get(2).path(), "mock:/tbl/part1/000000_0_copy_2"); - assertEquals(files.get(3).path(), "mock:/tbl/part1/000001_1"); - assertEquals(files.get(4).path(), "mock:/tbl/part1/000002_0"); - assertEquals(files.get(5).path(), "mock:/tbl/part1/random"); - assertEquals(files.get(6).path(), "mock:/tbl/part1/subdir/000000_0"); + assertEquals(files.get(0).location(), "mock:/tbl/part1/000000_0"); + assertEquals(files.get(1).location(), "mock:/tbl/part1/000000_0_copy_1"); + assertEquals(files.get(2).location(), "mock:/tbl/part1/000000_0_copy_2"); + assertEquals(files.get(3).location(), "mock:/tbl/part1/000001_1"); + assertEquals(files.get(4).location(), "mock:/tbl/part1/000002_0"); + assertEquals(files.get(5).location(), "mock:/tbl/part1/random"); + assertEquals(files.get(6).location(), "mock:/tbl/part1/subdir/000000_0"); } @Test @@ -162,11 +162,11 @@ public void testOriginalDeltas() List files = state.originalFiles(); assertEquals(files.size(), 5); - assertEquals(files.get(0).path(), "mock:/tbl/part1/000000_0"); - assertEquals(files.get(1).path(), "mock:/tbl/part1/000001_1"); - assertEquals(files.get(2).path(), "mock:/tbl/part1/000002_0"); - assertEquals(files.get(3).path(), "mock:/tbl/part1/random"); - assertEquals(files.get(4).path(), "mock:/tbl/part1/subdir/000000_0"); + assertEquals(files.get(0).location(), "mock:/tbl/part1/000000_0"); + assertEquals(files.get(1).location(), "mock:/tbl/part1/000001_1"); + assertEquals(files.get(2).location(), "mock:/tbl/part1/000002_0"); + assertEquals(files.get(3).location(), "mock:/tbl/part1/random"); + assertEquals(files.get(4).location(), "mock:/tbl/part1/subdir/000000_0"); List deltas = state.deltas(); assertEquals(deltas.size(), 2); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index ebe0f429e7d8..476faf59bdce 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -910,7 +910,7 @@ private static void cleanExtraOutputFiles(TrinoFileSystem fileSystem, String que FileIterator iterator = fileSystem.listFiles(location); while (iterator.hasNext()) { FileEntry entry = iterator.next(); - String name = fileName(entry.path()); + String name = fileName(entry.location()); if (name.startsWith(queryId + "-") && !fileNamesToKeep.contains(name)) { filesToDelete.add(name); } @@ -1416,8 +1416,8 @@ private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, Sc FileIterator allFiles = fileSystem.listFiles(table.location() + "/" + subfolder); while (allFiles.hasNext()) { FileEntry entry = allFiles.next(); - if (entry.lastModified().isBefore(expiration) && !validFiles.contains(fileName(entry.path()))) { - filesToDelete.add(entry.path()); + if (entry.lastModified().isBefore(expiration) && !validFiles.contains(fileName(entry.location()))) { + filesToDelete.add(entry.location()); if (filesToDelete.size() >= DELETE_BATCH_SIZE) { log.debug("Deleting files while removing orphan files for table %s [%s]", schemaTableName, filesToDelete); fileSystem.deleteFiles(filesToDelete); @@ -1425,7 +1425,7 @@ private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, Sc } } else { - log.debug("%s file retained while removing orphan files %s", entry.path(), schemaTableName.getTableName()); + log.debug("%s file retained while removing orphan files %s", entry.location(), schemaTableName.getTableName()); } } if (!filesToDelete.isEmpty()) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingSeekableInputStream.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingSeekableInputStream.java index 8c4affd0e9e1..729ebb615561 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingSeekableInputStream.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingSeekableInputStream.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg.fileio; +import io.trino.filesystem.TrinoInputStream; import org.apache.iceberg.io.SeekableInputStream; import java.io.IOException; @@ -23,9 +24,9 @@ public class ForwardingSeekableInputStream extends SeekableInputStream { - private final io.trino.filesystem.SeekableInputStream stream; + private final TrinoInputStream stream; - public ForwardingSeekableInputStream(io.trino.filesystem.SeekableInputStream stream) + public ForwardingSeekableInputStream(TrinoInputStream stream) { this.stream = requireNonNull(stream, "stream is null"); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/RegisterTableProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/RegisterTableProcedure.java index 1e6ccd3be286..c274de1bd89c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/RegisterTableProcedure.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/RegisterTableProcedure.java @@ -182,17 +182,17 @@ public static String getLatestMetadataLocation(TrinoFileSystem fileSystem, Strin FileIterator fileIterator = fileSystem.listFiles(metadataDirectoryLocation); while (fileIterator.hasNext()) { FileEntry fileEntry = fileIterator.next(); - if (fileEntry.path().contains(METADATA_FILE_EXTENSION)) { - OptionalInt version = parseVersion(fileEntry.path()); + if (fileEntry.location().contains(METADATA_FILE_EXTENSION)) { + OptionalInt version = parseVersion(fileEntry.location()); if (version.isPresent()) { int versionNumber = version.getAsInt(); if (versionNumber > latestMetadataVersion) { latestMetadataVersion = versionNumber; latestMetadataLocations.clear(); - latestMetadataLocations.add(fileEntry.path()); + latestMetadataLocations.add(fileEntry.location()); } else if (versionNumber == latestMetadataVersion) { - latestMetadataLocations.add(fileEntry.path()); + latestMetadataLocations.add(fileEntry.location()); } } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergRegisterTableProcedure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergRegisterTableProcedure.java index 7ce8b9c6ac8c..841050742272 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergRegisterTableProcedure.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergRegisterTableProcedure.java @@ -306,8 +306,8 @@ public void testRegisterTableWithInvalidMetadataFile() String invalidMetadataFileName = "invalid-default.avro"; while (fileIterator.hasNext()) { FileEntry fileEntry = fileIterator.next(); - if (fileEntry.path().endsWith(".avro")) { - String file = fileEntry.path(); + if (fileEntry.location().endsWith(".avro")) { + String file = fileEntry.location(); invalidMetadataFileName = file.substring(file.lastIndexOf("/") + 1); break; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TrackingFileSystemFactory.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TrackingFileSystemFactory.java index 28f43f5b9be3..ae7ed049bdf9 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TrackingFileSystemFactory.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TrackingFileSystemFactory.java @@ -15,11 +15,11 @@ import com.google.common.collect.ImmutableMap; import io.trino.filesystem.FileIterator; -import io.trino.filesystem.SeekableInputStream; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInput; import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; import io.trino.filesystem.TrinoOutputFile; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.spi.security.ConnectorIdentity; @@ -109,51 +109,51 @@ private TrackingFileSystem(TrinoFileSystem delegate, Tracker tracker) } @Override - public TrinoInputFile newInputFile(String path) + public TrinoInputFile newInputFile(String location) { int nextId = fileId.incrementAndGet(); return new TrackingInputFile( - delegate.newInputFile(path), - operation -> tracker.track(path, nextId, operation)); + delegate.newInputFile(location), + operation -> tracker.track(location, nextId, operation)); } @Override - public TrinoInputFile newInputFile(String path, long length) + public TrinoInputFile newInputFile(String location, long length) { int nextId = fileId.incrementAndGet(); return new TrackingInputFile( - delegate.newInputFile(path, length), - operation -> tracker.track(path, nextId, operation)); + delegate.newInputFile(location, length), + operation -> tracker.track(location, nextId, operation)); } @Override - public TrinoOutputFile newOutputFile(String path) + public TrinoOutputFile newOutputFile(String location) { int nextId = fileId.incrementAndGet(); return new TrackingOutputFile( - delegate.newOutputFile(path), - operationType -> tracker.track(path, nextId, operationType)); + delegate.newOutputFile(location), + operationType -> tracker.track(location, nextId, operationType)); } @Override - public void deleteFile(String path) + public void deleteFile(String location) throws IOException { - delegate.deleteFile(path); + delegate.deleteFile(location); } @Override - public void deleteFiles(Collection paths) + public void deleteFiles(Collection locations) throws IOException { - delegate.deleteFiles(paths); + delegate.deleteFiles(locations); } @Override - public void deleteDirectory(String path) + public void deleteDirectory(String location) throws IOException { - delegate.deleteDirectory(path); + delegate.deleteDirectory(location); } @Override @@ -164,10 +164,10 @@ public void renameFile(String source, String target) } @Override - public FileIterator listFiles(String path) + public FileIterator listFiles(String location) throws IOException { - return delegate.listFiles(path); + return delegate.listFiles(location); } } @@ -200,7 +200,7 @@ public TrinoInput newInput() } @Override - public SeekableInputStream newStream() + public TrinoInputStream newStream() throws IOException { tracker.accept(INPUT_FILE_NEW_STREAM); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCreateTableFailure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCreateTableFailure.java index 6855bce4d8f5..e0c8fc6dff82 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCreateTableFailure.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCreateTableFailure.java @@ -170,8 +170,8 @@ protected void assertMetadataLocation(String tableName, boolean shouldMetadataFi boolean metadataFileFound = false; while (fileIterator.hasNext()) { FileEntry fileEntry = fileIterator.next(); - String path = fileEntry.path(); - if (path.startsWith(tableLocationPrefix) && path.endsWith(".metadata.json")) { + String location = fileEntry.location(); + if (location.startsWith(tableLocationPrefix) && location.endsWith(".metadata.json")) { metadataFileFound = true; break; }