Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.CompletableFuture;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -99,6 +100,7 @@ public void setup() throws Exception {
}

@Override
@AfterEach
public void teardown() throws Exception {
MultipartUploader uploader = getUploader(1);
if (uploader != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.test.tags.RootFilesystemTest;

import static org.apache.commons.lang3.StringUtils.join;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
Expand All @@ -52,6 +53,7 @@
* Only subclass this for tests against transient filesystems where
* you don't care about the data.
*/
@RootFilesystemTest
public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.tags.FlakyTest;

import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;

/**
* Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}.
* Some of these test cases can fail if the FS read() call returns less
* than requested, which is a valid (possibly correct) implementation
* of {@code InputStream.read(buffer[])} which may return only those bytes
* which can be returned without blocking for more data.
*/
@FlakyTest("buffer underflow")
public abstract class AbstractContractUnbufferTest extends AbstractFSContractTestBase {

private Path file;
Expand Down Expand Up @@ -105,6 +111,7 @@ public void testMultipleUnbuffers() throws IOException {
}
}


@Test
public void testUnbufferMultipleReads() throws IOException {
describe("unbuffer a file multiple times");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import java.util.function.IntFunction;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -68,6 +70,8 @@
* Both the original readVectored(allocator) and the readVectored(allocator, release)
* operations are tested.
*/
@ParameterizedClass(name="buffer-{0}")
@MethodSource("params")
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {

private static final Logger LOG =
Expand All @@ -80,15 +84,15 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
/**
* Buffer allocator for vector IO.
*/
protected IntFunction<ByteBuffer> allocate;
private final IntFunction<ByteBuffer> allocate;

/**
* Buffer pool for vector IO.
*/
protected final ElasticByteBufferPool pool =
private final ElasticByteBufferPool pool =
new WeakReferencedElasticByteBufferPool();

protected String bufferType;
private final String bufferType;

/**
* Path to the vector file.
Expand All @@ -106,8 +110,8 @@ public static List<String> params() {
return Arrays.asList("direct", "array");
}

public void initAbstractContractVectoredReadTest(String pBufferType) {
this.bufferType = pBufferType;
protected AbstractContractVectoredReadTest(String bufferType) {
this.bufferType = bufferType;
final boolean isDirect = !"array".equals(bufferType);
this.allocate = size -> pool.getBuffer(isDirect, size);
}
Expand Down Expand Up @@ -147,6 +151,7 @@ public void setup() throws Exception {
createFile(fs, vectorPath, true, DATASET);
}

@AfterEach
@Override
public void teardown() throws Exception {
pool.release();
Expand Down Expand Up @@ -177,10 +182,8 @@ protected FSDataInputStream openVectorFile(final FileSystem fs) throws IOExcepti
.build());
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testVectoredReadMultipleRanges(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testVectoredReadMultipleRanges() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
for (int i = 0; i < 10; i++) {
FileRange fileRange = FileRange.createFileRange(i * 100, 100);
Expand All @@ -201,10 +204,8 @@ public void testVectoredReadMultipleRanges(String pBufferType) throws Exception
}
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testVectoredReadAndReadFully(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testVectoredReadAndReadFully() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
range(fileRanges, 100, 100);
try (FSDataInputStream in = openVectorFile()) {
Expand All @@ -219,10 +220,8 @@ public void testVectoredReadAndReadFully(String pBufferType) throws Exception {
}
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testVectoredReadWholeFile(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testVectoredReadWholeFile() throws Exception {
describe("Read the whole file in one single vectored read");
List<FileRange> fileRanges = new ArrayList<>();
range(fileRanges, 0, DATASET_LEN);
Expand All @@ -240,10 +239,8 @@ public void testVectoredReadWholeFile(String pBufferType) throws Exception {
* As the minimum seek value is 4*1024,none of the below ranges
* will get merged.
*/
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testDisjointRanges(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testDisjointRanges() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
range(fileRanges, 0, 100);
range(fileRanges, 4_000 + 101, 100);
Expand All @@ -259,10 +256,8 @@ public void testDisjointRanges(String pBufferType) throws Exception {
* As the minimum seek value is 4*1024, all the below ranges
* will get merged into one.
*/
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testAllRangesMergedIntoOne(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testAllRangesMergedIntoOne() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
final int length = 100;
range(fileRanges, 0, length);
Expand All @@ -279,10 +274,8 @@ public void testAllRangesMergedIntoOne(String pBufferType) throws Exception {
* As the minimum seek value is 4*1024, the first three ranges will be
* merged into and other two will remain as it is.
*/
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testSomeRangesMergedSomeUnmerged(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testSomeRangesMergedSomeUnmerged() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
range(fileRanges, 8 * 1024, 100);
Expand All @@ -306,10 +299,8 @@ public void testSomeRangesMergedSomeUnmerged(String pBufferType) throws Exceptio
* Most file systems won't support overlapping ranges.
* Currently, only Raw Local supports it.
*/
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testOverlappingRanges(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testOverlappingRanges() throws Exception {
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
verifyExceptionalVectoredRead(
getSampleOverlappingRanges(),
Expand All @@ -327,10 +318,8 @@ public void testOverlappingRanges(String pBufferType) throws Exception {
/**
* Same ranges are special case of overlapping.
*/
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testSameRanges(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testSameRanges() throws Exception {
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
verifyExceptionalVectoredRead(
getSampleSameRanges(),
Expand All @@ -348,10 +337,8 @@ public void testSameRanges(String pBufferType) throws Exception {
/**
* A null range is not permitted.
*/
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testNullRange(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testNullRange() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
range(fileRanges, 500, 100);
fileRanges.add(null);
Expand All @@ -362,19 +349,15 @@ public void testNullRange(String pBufferType) throws Exception {
/**
* A null range is not permitted.
*/
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testNullRangeList(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testNullRangeList() throws Exception {
verifyExceptionalVectoredRead(
null,
NullPointerException.class);
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testSomeRandomNonOverlappingRanges(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testSomeRandomNonOverlappingRanges() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
range(fileRanges, 500, 100);
range(fileRanges, 1000, 200);
Expand All @@ -387,10 +370,8 @@ public void testSomeRandomNonOverlappingRanges(String pBufferType) throws Except
}
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testConsecutiveRanges(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testConsecutiveRanges() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
final int offset = 500;
final int length = 2011;
Expand All @@ -403,10 +384,8 @@ public void testConsecutiveRanges(String pBufferType) throws Exception {
}
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testEmptyRanges(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testEmptyRanges() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
try (FSDataInputStream in = openVectorFile()) {
in.readVectored(fileRanges, allocate);
Expand All @@ -425,10 +404,8 @@ public void testEmptyRanges(String pBufferType) throws Exception {
* The contract option {@link ContractOptions#VECTOR_IO_EARLY_EOF_CHECK} is used
* to determine which check to perform.
*/
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testEOFRanges(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testEOFRanges() throws Exception {
describe("Testing reading with an offset past the end of the file");
List<FileRange> fileRanges = range(DATASET_LEN + 1, 100);

Expand All @@ -441,10 +418,8 @@ public void testEOFRanges(String pBufferType) throws Exception {
}


@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testVectoredReadWholeFilePlusOne(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testVectoredReadWholeFilePlusOne() throws Exception {
describe("Try to read whole file plus 1 byte");
List<FileRange> fileRanges = range(0, DATASET_LEN + 1);

Expand All @@ -471,35 +446,29 @@ private void expectEOFinRead(final List<FileRange> fileRanges) throws Exception
}
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testNegativeLengthRange(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testNegativeLengthRange() throws Exception {

verifyExceptionalVectoredRead(range(0, -50), IllegalArgumentException.class);
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testNegativeOffsetRange(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testNegativeOffsetRange() throws Exception {
verifyExceptionalVectoredRead(range(-1, 50), EOFException.class);
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testNullReleaseOperation(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testNullReleaseOperation() throws Exception {

final List<FileRange> range = range(0, 10);
try (FSDataInputStream in = openVectorFile()) {
intercept(NullPointerException.class, () ->
in.readVectored(range, allocate, null));
intercept(NullPointerException.class, () ->
in.readVectored(range, allocate, null));
}
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testNormalReadAfterVectoredRead(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testNormalReadAfterVectoredRead() throws Exception {
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
try (FSDataInputStream in = openVectorFile()) {
in.readVectored(fileRanges, allocate);
Expand All @@ -514,10 +483,8 @@ public void testNormalReadAfterVectoredRead(String pBufferType) throws Exception
}
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testVectoredReadAfterNormalRead(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testVectoredReadAfterNormalRead() throws Exception {
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
try (FSDataInputStream in = openVectorFile()) {
// read starting 200 bytes
Expand All @@ -532,10 +499,8 @@ public void testVectoredReadAfterNormalRead(String pBufferType) throws Exception
}
}

@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testMultipleVectoredReads(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testMultipleVectoredReads() throws Exception {
List<FileRange> fileRanges1 = createSampleNonOverlappingRanges();
List<FileRange> fileRanges2 = createSampleNonOverlappingRanges();
try (FSDataInputStream in = openVectorFile()) {
Expand All @@ -553,10 +518,8 @@ public void testMultipleVectoredReads(String pBufferType) throws Exception {
* operation and then uses a separate thread pool to process the
* results asynchronously.
*/
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testVectoredIOEndToEnd(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
@Test
public void testVectoredIOEndToEnd() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
range(fileRanges, 8 * 1024, 100);
range(fileRanges, 14 * 1024, 100);
Expand Down
Loading