Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,40 @@
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;

public record FileEntry(String path, long length, Instant lastModified, Optional<List<BlockLocation>> blockLocations)
public record FileEntry(String location, long length, Instant lastModified, Optional<List<Block>> blocks)
{
public FileEntry
{
checkArgument(length >= 0, "length is negative");
requireNonNull(path, "path is null");
requireNonNull(blockLocations, "blockLocations is null");
blockLocations = blockLocations.map(locations -> validatedBlockLocations(locations, length));
requireNonNull(location, "location is null");
requireNonNull(blocks, "blocks is null");
blocks = blocks.map(locations -> validatedBlocks(locations, length));
}

public record BlockLocation(List<String> hosts, long offset, long length)
public record Block(List<String> hosts, long offset, long length)
{
public BlockLocation
public Block
{
hosts = ImmutableList.copyOf(requireNonNull(hosts, "hosts is null"));
checkArgument(offset >= 0, "offset is negative");
checkArgument(length >= 0, "length is negative");
}
}

private static List<BlockLocation> validatedBlockLocations(List<BlockLocation> blockLocations, long length)
private static List<Block> validatedBlocks(List<Block> blocks, long length)
{
checkArgument(!blockLocations.isEmpty(), "blockLocations is empty");
blockLocations = blockLocations.stream()
.sorted(comparing(BlockLocation::offset))
checkArgument(!blocks.isEmpty(), "blocks is empty");
blocks = blocks.stream()
.sorted(comparing(Block::offset))
.collect(toImmutableList());

long position = 0;
for (BlockLocation location : blockLocations) {
checkArgument(location.offset() <= position, "blockLocations has a gap");
position = max(position, addExact(location.offset(), location.length()));
for (Block block : blocks) {
checkArgument(block.offset() <= position, "blocks have a gap");
position = max(position, addExact(block.offset(), block.length()));
}
checkArgument(position >= length, "blockLocations does not cover file");
checkArgument(position >= length, "blocks do not cover file");

return blockLocations;
return blocks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,28 @@

public interface TrinoFileSystem
{
TrinoInputFile newInputFile(String path);
TrinoInputFile newInputFile(String location);

TrinoInputFile newInputFile(String path, long length);
TrinoInputFile newInputFile(String location, long length);

TrinoOutputFile newOutputFile(String path);
TrinoOutputFile newOutputFile(String location);

void deleteFile(String path)
void deleteFile(String location)
throws IOException;

/**
* Delete paths in batches, it is not guaranteed to be atomic.
*
* @param paths collection of paths to be deleted
* @throws IOException when there is a problem with deletion of one or more specific paths
* Delete files in batches, possibly non-atomically.
* If an error occurs, some files may have been deleted.
*/
void deleteFiles(Collection<String> paths)
void deleteFiles(Collection<String> locations)
throws IOException;

void deleteDirectory(String path)
void deleteDirectory(String location)
throws IOException;

void renameFile(String source, String target)
throws IOException;

FileIterator listFiles(String path)
FileIterator listFiles(String location)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public interface TrinoInputFile
TrinoInput newInput()
throws IOException;

SeekableInputStream newStream()
TrinoInputStream newStream()
throws IOException;

long length()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.io.IOException;
import java.io.InputStream;

public abstract class SeekableInputStream
public abstract class TrinoInputStream
extends InputStream
{
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@
package io.trino.filesystem.local;

import com.google.common.primitives.Ints;
import io.trino.filesystem.SeekableInputStream;
import io.trino.filesystem.TrinoInputStream;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;

class FileSeekableInputStream
extends SeekableInputStream
class FileTrinoInputStream
extends TrinoInputStream
{
private final RandomAccessFile input;

public FileSeekableInputStream(File file)
public FileTrinoInputStream(File file)
throws FileNotFoundException
{
this.input = new RandomAccessFile(file, "r");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
*/
package io.trino.filesystem.local;

import io.trino.filesystem.SeekableInputStream;
import io.trino.filesystem.TrinoInput;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoInputStream;

import java.io.File;
import java.io.IOException;
Expand All @@ -41,10 +41,10 @@ public TrinoInput newInput()
}

@Override
public SeekableInputStream newStream()
public TrinoInputStream newStream()
throws IOException
{
return new FileSeekableInputStream(file);
return new FileTrinoInputStream(file);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
package io.trino.filesystem.memory;

import io.airlift.slice.Slice;
import io.trino.filesystem.SeekableInputStream;
import io.trino.filesystem.TrinoInput;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoInputStream;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -43,10 +43,10 @@ public TrinoInput newInput()
}

@Override
public SeekableInputStream newStream()
public TrinoInputStream newStream()
throws IOException
{
return new MemorySeekableInputStream(data);
return new MemoryTrinoInputStream(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@

import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.trino.filesystem.SeekableInputStream;
import io.trino.filesystem.TrinoInputStream;

import java.io.IOException;

public class MemorySeekableInputStream
extends SeekableInputStream
public class MemoryTrinoInputStream
extends TrinoInputStream
{
private final SliceInput input;

public MemorySeekableInputStream(Slice data)
public MemoryTrinoInputStream(Slice data)
{
input = data.getInput();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package io.trino.filesystem;

import io.trino.filesystem.FileEntry.BlockLocation;
import io.trino.filesystem.FileEntry.Block;
import org.junit.jupiter.api.Test;

import java.time.Instant;
Expand All @@ -28,72 +28,72 @@ public class TestFileEntry
private static final Instant MODIFIED = Instant.ofEpochSecond(1234567890);

@Test
public void testEmptyBlockLocations()
public void testEmptyBlocks()
{
assertThat(new FileEntry("/test", 123, MODIFIED, Optional.empty()))
.satisfies(entry -> {
assertThat(entry.path()).isEqualTo("/test");
assertThat(entry.location()).isEqualTo("/test");
assertThat(entry.length()).isEqualTo(123);
assertThat(entry.lastModified()).isEqualTo(MODIFIED);
assertThat(entry.blockLocations()).isEmpty();
assertThat(entry.blocks()).isEmpty();
});
}

@Test
public void testPresentBlockLocations()
public void testPresentBlocks()
{
List<BlockLocation> locations = List.of(
new BlockLocation(List.of(), 0, 50),
new BlockLocation(List.of(), 50, 70),
new BlockLocation(List.of(), 100, 150));
List<Block> locations = List.of(
new Block(List.of(), 0, 50),
new Block(List.of(), 50, 70),
new Block(List.of(), 100, 150));
assertThat(new FileEntry("/test", 200, MODIFIED, Optional.of(locations)))
.satisfies(entry -> assertThat(entry.blockLocations()).contains(locations));
.satisfies(entry -> assertThat(entry.blocks()).contains(locations));
}

@Test
public void testMissingBlockLocations()
public void testMissingBlocks()
{
assertThatThrownBy(() -> new FileEntry("/test", 0, MODIFIED, Optional.of(List.of())))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("blockLocations is empty");
.hasMessage("blocks is empty");
}

@Test
public void testBlockLocationsEmptyFile()
public void testBlocksEmptyFile()
{
List<BlockLocation> locations = List.of(new BlockLocation(List.of(), 0, 0));
List<Block> locations = List.of(new Block(List.of(), 0, 0));
assertThat(new FileEntry("/test", 0, MODIFIED, Optional.of(locations)))
.satisfies(entry -> assertThat(entry.blockLocations()).contains(locations));
.satisfies(entry -> assertThat(entry.blocks()).contains(locations));
}

@Test
public void testBlockLocationsGapAtStart()
public void testBlocksGapAtStart()
{
List<BlockLocation> locations = List.of(new BlockLocation(List.of(), 50, 50));
List<Block> locations = List.of(new Block(List.of(), 50, 50));
assertThatThrownBy(() -> new FileEntry("/test", 100, MODIFIED, Optional.of(locations)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("blockLocations has a gap");
.hasMessage("blocks have a gap");
}

@Test
public void testBlockLocationsGapInMiddle()
public void testBlocksGapInMiddle()
{
List<BlockLocation> locations = List.of(
new BlockLocation(List.of(), 0, 50),
new BlockLocation(List.of(), 100, 100));
List<Block> locations = List.of(
new Block(List.of(), 0, 50),
new Block(List.of(), 100, 100));
assertThatThrownBy(() -> new FileEntry("/test", 200, MODIFIED, Optional.of(locations)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("blockLocations has a gap");
.hasMessage("blocks have a gap");
}

@Test
public void testBlockLocationsGapAtEnd()
public void testBlocksGapAtEnd()
{
List<BlockLocation> locations = List.of(
new BlockLocation(List.of(), 0, 50),
new BlockLocation(List.of(), 50, 49));
List<Block> locations = List.of(
new Block(List.of(), 0, 50),
new Block(List.of(), 50, 49));
assertThatThrownBy(() -> new FileEntry("/test", 100, MODIFIED, Optional.of(locations)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("blockLocations does not cover file");
.hasMessage("blocks do not cover file");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@

import com.google.common.collect.ImmutableList;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileEntry.BlockLocation;
import io.trino.filesystem.FileEntry.Block;
import io.trino.filesystem.FileIterator;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -75,23 +76,21 @@ public FileEntry next()
path += relativeUri.getPath();
}

List<BlockLocation> locations = Stream.of(status.getBlockLocations())
.map(HdfsFileIterator::toTrinoBlockLocation)
List<Block> blocks = Stream.of(status.getBlockLocations())
.map(HdfsFileIterator::toTrinoBlock)
.collect(toImmutableList());

Optional<List<BlockLocation>> blockLocations = locations.isEmpty() ? Optional.empty() : Optional.of(locations);

return new FileEntry(
path,
status.getLen(),
Instant.ofEpochMilli(status.getModificationTime()),
blockLocations);
blocks.isEmpty() ? Optional.empty() : Optional.of(blocks));
}

private static BlockLocation toTrinoBlockLocation(org.apache.hadoop.fs.BlockLocation location)
private static Block toTrinoBlock(BlockLocation location)
{
try {
return new BlockLocation(ImmutableList.copyOf(location.getHosts()), location.getOffset(), location.getLength());
return new Block(ImmutableList.copyOf(location.getHosts()), location.getOffset(), location.getLength());
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Loading