Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -31,9 +30,11 @@
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -44,22 +45,31 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/**
* Test the LocalDistributedCacheManager using mocking.
* This suite is brittle to changes in the class under test.
*/
@SuppressWarnings("deprecation")
public class TestLocalDistributedCacheManager {

private static final byte[] TEST_DATA = "This is a test file\n".getBytes();

private static FileSystem mockfs;

public static class MockFileSystem extends FilterFileSystem {
Expand All @@ -70,6 +80,14 @@ public MockFileSystem() {

private File localDir;

/**
* Recursive delete of a path.
* For safety, paths of length under 5 are rejected.
* @param file path to delete.
* @throws IOException never, it is just "a dummy in the method signature"
* @throws IllegalArgumentException path too short
* @throws RuntimeException File.delete() failed.
*/
private static void delete(File file) throws IOException {
if (file.getAbsolutePath().length() < 5) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -109,9 +127,9 @@ public void cleanup() throws Exception {
* Mock input stream based on a byte array so that it can be used by a
* FSDataInputStream.
*/
private static class MockInputStream extends ByteArrayInputStream
private static final class MockInputStream extends ByteArrayInputStream
implements Seekable, PositionedReadable {
public MockInputStream(byte[] buf) {
private MockInputStream(byte[] buf) {
super(buf);
}

Expand All @@ -134,47 +152,45 @@ public void testDownload() throws Exception {
when(mockfs.getUri()).thenReturn(mockBase);
Path working = new Path("mock://test-nn1/user/me/");
when(mockfs.getWorkingDirectory()).thenReturn(working);
when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
@Override
public Path answer(InvocationOnMock args) throws Throwable {
return (Path) args.getArguments()[0];
}
});
when(mockfs.resolvePath(any(Path.class))).thenAnswer(
(Answer<Path>) args -> (Path) args.getArguments()[0]);

final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
final Path filePath = new Path(file);
File link = new File("link");

// return a filestatus for the file "*/file.txt"; raise FNFE for anything else
when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
@Override
public FileStatus answer(InvocationOnMock args) throws Throwable {
Path p = (Path)args.getArguments()[0];
if("file.txt".equals(p.getName())) {
return new FileStatus(201, false, 1, 500, 101, 101,
FsPermission.getDefault(), "me", "me", filePath);
return createMockTestFileStatus(filePath);
} else {
throw new FileNotFoundException(p+" not supported by mocking");
throw notMocked(p);
}
}
});

when(mockfs.getConf()).thenReturn(conf);
final FSDataInputStream in =
new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
@Override
public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
Path src = (Path)args.getArguments()[0];
if ("file.txt".equals(src.getName())) {
return in;
} else {
throw new FileNotFoundException(src+" not supported by mocking");
}
}
});
new FSDataInputStream(new MockInputStream(TEST_DATA));

// file.txt: return an openfile builder which will eventually return the data,
// anything else: FNFE
when(mockfs.openFile(any(Path.class))).thenAnswer(
(Answer<FutureDataInputStreamBuilder>) args -> {
Path src = (Path)args.getArguments()[0];
if ("file.txt".equals(src.getName())) {
return new MockOpenFileBuilder(mockfs, src,
() -> CompletableFuture.completedFuture(in));
} else {
throw notMocked(src);
}
});

Job.addCacheFile(file, conf);
Map<String, Boolean> policies = new HashMap<String, Boolean>();
Map<String, Boolean> policies = new HashMap<>();
policies.put(file.toString(), true);
Job.setFileSharedCacheUploadPolicies(conf, policies);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
Expand All @@ -191,6 +207,12 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
assertFalse(link.exists());
}

/**
* This test case sets the mock FS to raise FNFE
* on any getFileStatus/openFile calls.
* If the manager successfully starts up, it means that
* no files were probed for/opened.
*/
@Test
public void testEmptyDownload() throws Exception {
JobID jobId = new JobID();
Expand All @@ -201,30 +223,21 @@ public void testEmptyDownload() throws Exception {
when(mockfs.getUri()).thenReturn(mockBase);
Path working = new Path("mock://test-nn1/user/me/");
when(mockfs.getWorkingDirectory()).thenReturn(working);
when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
@Override
public Path answer(InvocationOnMock args) throws Throwable {
return (Path) args.getArguments()[0];
}
});
when(mockfs.resolvePath(any(Path.class))).thenAnswer(
(Answer<Path>) args -> (Path) args.getArguments()[0]);

when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
@Override
public FileStatus answer(InvocationOnMock args) throws Throwable {
Path p = (Path)args.getArguments()[0];
throw new FileNotFoundException(p+" not supported by mocking");
}
});
when(mockfs.getFileStatus(any(Path.class))).thenAnswer(
(Answer<FileStatus>) args -> {
Path p = (Path)args.getArguments()[0];
throw notMocked(p);
});

when(mockfs.getConf()).thenReturn(conf);
when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
@Override
public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
Path src = (Path)args.getArguments()[0];
throw new FileNotFoundException(src+" not supported by mocking");
}
});

when(mockfs.openFile(any(Path.class))).thenAnswer(
(Answer<FutureDataInputStreamBuilder>) args -> {
Path src = (Path)args.getArguments()[0];
throw notMocked(src);
});
conf.set(MRJobConfig.CACHE_FILES, "");
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
Expand All @@ -236,6 +249,9 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
}


/**
* The same file can be added to the cache twice.
*/
@Test
public void testDuplicateDownload() throws Exception {
JobID jobId = new JobID();
Expand All @@ -246,12 +262,8 @@ public void testDuplicateDownload() throws Exception {
when(mockfs.getUri()).thenReturn(mockBase);
Path working = new Path("mock://test-nn1/user/me/");
when(mockfs.getWorkingDirectory()).thenReturn(working);
when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
@Override
public Path answer(InvocationOnMock args) throws Throwable {
return (Path) args.getArguments()[0];
}
});
when(mockfs.resolvePath(any(Path.class))).thenAnswer(
(Answer<Path>) args -> (Path) args.getArguments()[0]);

final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
final Path filePath = new Path(file);
Expand All @@ -262,32 +274,30 @@ public Path answer(InvocationOnMock args) throws Throwable {
public FileStatus answer(InvocationOnMock args) throws Throwable {
Path p = (Path)args.getArguments()[0];
if("file.txt".equals(p.getName())) {
return new FileStatus(201, false, 1, 500, 101, 101,
FsPermission.getDefault(), "me", "me", filePath);
return createMockTestFileStatus(filePath);
} else {
throw new FileNotFoundException(p+" not supported by mocking");
throw notMocked(p);
}
}
});

when(mockfs.getConf()).thenReturn(conf);
final FSDataInputStream in =
new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
@Override
public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
Path src = (Path)args.getArguments()[0];
if ("file.txt".equals(src.getName())) {
return in;
} else {
throw new FileNotFoundException(src+" not supported by mocking");
}
}
});
new FSDataInputStream(new MockInputStream(TEST_DATA));
when(mockfs.openFile(any(Path.class))).thenAnswer(
(Answer<FutureDataInputStreamBuilder>) args -> {
Path src = (Path)args.getArguments()[0];
if ("file.txt".equals(src.getName())) {
return new MockOpenFileBuilder(mockfs, src,
() -> CompletableFuture.completedFuture(in));
} else {
throw notMocked(src);
}
});

Job.addCacheFile(file, conf);
Job.addCacheFile(file, conf);
Map<String, Boolean> policies = new HashMap<String, Boolean>();
Map<String, Boolean> policies = new HashMap<>();
policies.put(file.toString(), true);
Job.setFileSharedCacheUploadPolicies(conf, policies);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");
Expand All @@ -306,7 +316,7 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {

/**
* This test tries to replicate the issue with the previous version of
* {@ref LocalDistributedCacheManager} when the resulting timestamp is
* {@link LocalDistributedCacheManager} when the resulting timestamp is
* identical as that in another process. Unfortunately, it is difficult
* to mimic such behavior in a single process unit test. And mocking
* the unique id (timestamp previously, UUID otherwise) won't prove the
Expand All @@ -321,7 +331,7 @@ public void testMultipleCacheSetup() throws Exception {
final int threadCount = 10;
final CyclicBarrier barrier = new CyclicBarrier(threadCount);

ArrayList<Callable<Void>> setupCallable = new ArrayList<>();
List<Callable<Void>> setupCallable = new ArrayList<>();
for (int i = 0; i < threadCount; ++i) {
setupCallable.add(() -> {
barrier.await();
Expand All @@ -340,4 +350,58 @@ public void testMultipleCacheSetup() throws Exception {
manager.close();
}
}

/**
* Create test file status using test data as the length.
* @param filePath path to the file
* @return a file status.
*/
private FileStatus createMockTestFileStatus(final Path filePath) {
return new FileStatus(TEST_DATA.length, false, 1, 500, 101, 101,
FsPermission.getDefault(), "me", "me", filePath);
}

/**
* Exception to throw on a not mocked path.
* @return a FileNotFoundException
*/
private FileNotFoundException notMocked(final Path p) {
return new FileNotFoundException(p + " not supported by mocking");
}

/**
* Openfile builder where the build operation is a l-expression
* supplied in the constructor.
*/
private static final class MockOpenFileBuilder extends
FutureDataInputStreamBuilderImpl {

/**
* Operation to invoke to build the result.
*/
private final CallableRaisingIOE<CompletableFuture<FSDataInputStream>>
buildTheResult;

/**
* Create the builder. the FS and path must be non-null.
* FileSystem.getConf() is the only method invoked of the FS by
* the superclass.
* @param fileSystem fs
* @param path path to open
* @param buildTheResult builder operation.
*/
private MockOpenFileBuilder(final FileSystem fileSystem, Path path,
final CallableRaisingIOE<CompletableFuture<FSDataInputStream>> buildTheResult) {
super(fileSystem, path);
this.buildTheResult = buildTheResult;
}

@Override
public CompletableFuture<FSDataInputStream> build()
throws IllegalArgumentException, UnsupportedOperationException,
IOException {
return buildTheResult.apply();
}
}

}