Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.File;
Expand Down Expand Up @@ -85,8 +84,22 @@ public void testEmptyFileReadTail()
}
}

@Test(dataProvider = "validFileSizeAndPaddedFileSize")
public void testReadTailForFileSize(int fileSize, int paddedFileSize)
@Test
public void testReadTailForFileSize()
throws Exception
{
testReadTailForFileSize(0, 0);
testReadTailForFileSize(0, 1);
testReadTailForFileSize(0, 15);
testReadTailForFileSize(0, FSDataInputStreamTail.MAX_SUPPORTED_PADDING_BYTES - 1);
testReadTailForFileSize(0, FSDataInputStreamTail.MAX_SUPPORTED_PADDING_BYTES);
testReadTailForFileSize(63, 63);
testReadTailForFileSize(63, 64);
testReadTailForFileSize(64, 74);
testReadTailForFileSize(65, 65 + FSDataInputStreamTail.MAX_SUPPORTED_PADDING_BYTES);
}

private void testReadTailForFileSize(int fileSize, int paddedFileSize)
throws Exception
{
// Cleanup between each input run
Expand All @@ -105,10 +118,26 @@ public void testReadTailForFileSize(int fileSize, int paddedFileSize)
}
}

@Test(dataProvider = "validFileSizeAndPaddedFileSize")
public void testReadTailCompletely(int fileSize, int paddedFileSize)
@Test
public void testReadTailCompletely()
throws Exception
{
testReadTailCompletely(0, 0);
testReadTailCompletely(0, 1);
testReadTailCompletely(0, 15);
testReadTailCompletely(0, FSDataInputStreamTail.MAX_SUPPORTED_PADDING_BYTES - 1);
testReadTailCompletely(0, FSDataInputStreamTail.MAX_SUPPORTED_PADDING_BYTES);
testReadTailCompletely(63, 63);
testReadTailCompletely(63, 64);
testReadTailCompletely(64, 74);
testReadTailCompletely(65, 65 + FSDataInputStreamTail.MAX_SUPPORTED_PADDING_BYTES);
}

private void testReadTailCompletely(int fileSize, int paddedFileSize)
throws Exception
{
fs.truncate(tempFile, 0);

byte[] contents = countingTestFileContentsWithLength(fileSize);
if (contents.length > 0) {
try (FSDataOutputStream os = fs.append(tempFile)) {
Expand All @@ -123,7 +152,8 @@ public void testReadTailCompletely(int fileSize, int paddedFileSize)
assertEquals(tail.getFileSize(), fileSize);
Slice tailSlice = tail.getTailSlice();
assertEquals(tailSlice.length(), fileSize);
assertCountingTestFileContents(tailSlice.getBytes());
byte[] tailContents = tailSlice.getBytes();
assertEquals(tailContents, countingTestFileContentsWithLength(tailContents.length));
}
}

Expand Down Expand Up @@ -184,26 +214,6 @@ public void testReadTailForFileSizeNoEndOfFileFound()
}
}

@DataProvider(name = "validFileSizeAndPaddedFileSize")
public static Object[][] validFileSizeAndPaddedFileSize()
{
return new Object[][] {
{0, 0},
{0, 1},
{0, 15},
{0, FSDataInputStreamTail.MAX_SUPPORTED_PADDING_BYTES - 1},
{0, FSDataInputStreamTail.MAX_SUPPORTED_PADDING_BYTES},
{63, 63},
{63, 64},
{64, 74},
{65, 65 + FSDataInputStreamTail.MAX_SUPPORTED_PADDING_BYTES}};
}

private static void assertCountingTestFileContents(byte[] contents)
{
assertEquals(contents, countingTestFileContentsWithLength(contents.length));
}

private static byte[] countingTestFileContentsWithLength(int length)
{
byte[] contents = new byte[length];
Expand Down
223 changes: 117 additions & 106 deletions lib/trino-hdfs/src/test/java/io/trino/hdfs/rubix/TestRubixCaching.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import javax.management.MBeanServer;
Expand Down Expand Up @@ -276,12 +275,6 @@ private static void closeFileSystem(FileSystem fileSystem)
fileSystem.close();
}

@DataProvider
public static Object[][] readMode()
{
return new Object[][] {{ASYNC}, {READ_THROUGH}};
}

@Test
public void testCoordinatorNotJoining()
{
Expand Down Expand Up @@ -340,135 +333,153 @@ public void testGetBlockLocations()
assertEquals(file2Locations[0].getHosts()[0], "127.0.0.2");
}

@Test(dataProvider = "readMode")
public void testCacheRead(ReadMode readMode)
@Test
public void testCacheRead()
throws Exception
{
RubixConfig rubixConfig = new RubixConfig().setReadMode(readMode);
initializeCachingFileSystem(rubixConfig);
byte[] randomData = new byte[toIntExact(SMALL_FILE_SIZE.toBytes())];
new Random().nextBytes(randomData);
for (ReadMode readMode : ReadMode.values()) {
deinitializeRubix();

RubixConfig rubixConfig = new RubixConfig().setReadMode(readMode);
initializeCachingFileSystem(rubixConfig);
byte[] randomData = new byte[toIntExact(SMALL_FILE_SIZE.toBytes())];
new Random().nextBytes(randomData);

Path file = getStoragePath("some_file");
writeFile(nonCachingFileSystem.create(file), randomData);
Path file = getStoragePath("some_file");
writeFile(nonCachingFileSystem.create(file), randomData);

long beforeRemoteReadsCount = getRemoteReadsCount();
long beforeCachedReadsCount = getCachedReadsCount();
long beforeAsyncDownloadedMb = getAsyncDownloadedMb(readMode);
long beforeRemoteReadsCount = getRemoteReadsCount();
long beforeCachedReadsCount = getCachedReadsCount();
long beforeAsyncDownloadedMb = getAsyncDownloadedMb(readMode);

assertFileContents(cachingFileSystem, file, randomData);
assertFileContents(cachingFileSystem, file, randomData);

if (readMode == ASYNC) {
// wait for async Rubix requests to complete
if (readMode == ASYNC) {
// wait for async Rubix requests to complete
assertEventually(
new Duration(10, SECONDS),
() -> assertEquals(getAsyncDownloadedMb(ASYNC), beforeAsyncDownloadedMb + 1));
}

// stats are propagated asynchronously
assertEventually(
new Duration(10, SECONDS),
() -> assertEquals(getAsyncDownloadedMb(ASYNC), beforeAsyncDownloadedMb + 1));
}
() -> {
// data should be read from remote source only
assertGreaterThan(getRemoteReadsCount(), beforeRemoteReadsCount);
assertEquals(getCachedReadsCount(), beforeCachedReadsCount);
});

// stats are propagated asynchronously
assertEventually(
new Duration(10, SECONDS),
() -> {
// data should be read from remote source only
assertGreaterThan(getRemoteReadsCount(), beforeRemoteReadsCount);
assertEquals(getCachedReadsCount(), beforeCachedReadsCount);
});

// ensure that subsequent read uses cache exclusively
assertEventually(
new Duration(10, SECONDS),
() -> {
long remoteReadsCount = getRemoteReadsCount();
assertFileContents(cachingFileSystem, file, randomData);
assertGreaterThan(getCachedReadsCount(), beforeCachedReadsCount);
assertEquals(getRemoteReadsCount(), remoteReadsCount);
});
// ensure that subsequent read uses cache exclusively
assertEventually(
new Duration(10, SECONDS),
() -> {
long remoteReadsCount = getRemoteReadsCount();
assertFileContents(cachingFileSystem, file, randomData);
assertGreaterThan(getCachedReadsCount(), beforeCachedReadsCount);
assertEquals(getRemoteReadsCount(), remoteReadsCount);
});

closeRubix();
}
}

@Test(dataProvider = "readMode")
public void testCacheWrite(ReadMode readMode)
@Test
public void testCacheWrite()
throws Exception
{
initializeCachingFileSystem(new RubixConfig().setReadMode(readMode));
Path file = getStoragePath("some_file_write");
for (ReadMode readMode : ReadMode.values()) {
deinitializeRubix();

initializeCachingFileSystem(new RubixConfig().setReadMode(readMode));
Path file = getStoragePath("some_file_write");

byte[] data = "Hello world".getBytes(UTF_8);
writeFile(cachingFileSystem.create(file), data);
assertFileContents(cachingFileSystem, file, data);
byte[] data = "Hello world".getBytes(UTF_8);
writeFile(cachingFileSystem.create(file), data);
assertFileContents(cachingFileSystem, file, data);

closeRubix();
}
}

@Test(dataProvider = "readMode")
public void testLargeFile(ReadMode readMode)
@Test
public void testLargeFile()
throws Exception
{
initializeCachingFileSystem(new RubixConfig().setReadMode(readMode));
byte[] randomData = new byte[toIntExact(LARGE_FILE_SIZE.toBytes())];
new Random().nextBytes(randomData);
for (ReadMode readMode : ReadMode.values()) {
deinitializeRubix();

initializeCachingFileSystem(new RubixConfig().setReadMode(readMode));
byte[] randomData = new byte[toIntExact(LARGE_FILE_SIZE.toBytes())];
new Random().nextBytes(randomData);

Path file = getStoragePath("large_file");
writeFile(nonCachingFileSystem.create(file), randomData);
Path file = getStoragePath("large_file");
writeFile(nonCachingFileSystem.create(file), randomData);

long beforeRemoteReadsCount = getRemoteReadsCount();
long beforeCachedReadsCount = getCachedReadsCount();
long beforeAsyncDownloadedMb = getAsyncDownloadedMb(readMode);
long beforeRemoteReadsCount = getRemoteReadsCount();
long beforeCachedReadsCount = getCachedReadsCount();
long beforeAsyncDownloadedMb = getAsyncDownloadedMb(readMode);

assertFileContents(cachingFileSystem, file, randomData);
assertFileContents(cachingFileSystem, file, randomData);

if (readMode == ASYNC) {
// wait for async Rubix requests to complete
if (readMode == ASYNC) {
// wait for async Rubix requests to complete
assertEventually(
new Duration(10, SECONDS),
() -> assertEquals(getAsyncDownloadedMb(ASYNC), beforeAsyncDownloadedMb + 100));
}

// stats are propagated asynchronously
assertEventually(
new Duration(10, SECONDS),
() -> assertEquals(getAsyncDownloadedMb(ASYNC), beforeAsyncDownloadedMb + 100));
}
() -> {
// data should be fetched from remote source
assertGreaterThan(getRemoteReadsCount(), beforeRemoteReadsCount);
});

// stats are propagated asynchronously
assertEventually(
new Duration(10, SECONDS),
() -> {
// data should be fetched from remote source
assertGreaterThan(getRemoteReadsCount(), beforeRemoteReadsCount);
});

// ensure that subsequent read uses cache exclusively
assertEventually(
new Duration(10, SECONDS),
() -> {
long remoteReadsCount = getRemoteReadsCount();
assertFileContents(cachingFileSystem, file, randomData);
assertGreaterThan(getCachedReadsCount(), beforeCachedReadsCount);
assertEquals(getRemoteReadsCount(), remoteReadsCount);
});
long secondCachedReadsCount = getCachedReadsCount();
long secondRemoteReadsCount = getRemoteReadsCount();

// make sure parallel reading of large file works
ExecutorService executorService = newFixedThreadPool(3);
try {
List<Callable<?>> reads = nCopies(
3,
// ensure that subsequent read uses cache exclusively
assertEventually(
new Duration(10, SECONDS),
() -> {
long remoteReadsCount = getRemoteReadsCount();
assertFileContents(cachingFileSystem, file, randomData);
return null;
assertGreaterThan(getCachedReadsCount(), beforeCachedReadsCount);
assertEquals(getRemoteReadsCount(), remoteReadsCount);
});
List<Future<?>> futures = reads.stream()
.map(executorService::submit)
.collect(toImmutableList());
for (Future<?> future : futures) {
future.get();
long secondCachedReadsCount = getCachedReadsCount();
long secondRemoteReadsCount = getRemoteReadsCount();

// make sure parallel reading of large file works
ExecutorService executorService = newFixedThreadPool(3);
try {
List<Callable<?>> reads = nCopies(
3,
() -> {
assertFileContents(cachingFileSystem, file, randomData);
return null;
});
List<Future<?>> futures = reads.stream()
.map(executorService::submit)
.collect(toImmutableList());
for (Future<?> future : futures) {
future.get();
}
}
}
finally {
executorService.shutdownNow();
}
finally {
executorService.shutdownNow();
}

// stats are propagated asynchronously
assertEventually(
new Duration(10, SECONDS),
() -> {
// data should be read from cache only
assertGreaterThan(getCachedReadsCount(), secondCachedReadsCount);
assertEquals(getRemoteReadsCount(), secondRemoteReadsCount);
});

// stats are propagated asynchronously
assertEventually(
new Duration(10, SECONDS),
() -> {
// data should be read from cache only
assertGreaterThan(getCachedReadsCount(), secondCachedReadsCount);
assertEquals(getRemoteReadsCount(), secondRemoteReadsCount);
});
closeRubix();
}
}

@SuppressModernizer
Expand Down
Loading