Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -958,13 +959,12 @@ public void testMoverWithStripedFile() throws Exception {
new String[] { "-p", barDir });
Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);

// verify storage types and locations
// Verify storage types and locations.
// Wait until Namenode confirms ARCHIVE storage type for all blocks of
// fooFile.
waitForUpdatedStorageType(client, fooFile, fileLen, StorageType.ARCHIVE);

locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen);
for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
for( StorageType type : lb.getStorageTypes()){
Assert.assertEquals(StorageType.ARCHIVE, type);
}
}
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);

Expand Down Expand Up @@ -1005,6 +1005,43 @@ public void testMoverWithStripedFile() throws Exception {
}
}

/**
* Wait until Namenode reports expected storage type for all blocks of
* given file.
*
* @param client handle all RPC calls to Namenode.
* @param file file for which we are expecting same storage type of all
* located blocks.
* @param fileLen length of the file.
* @param expectedStorageType storage type to expect for all blocks of the
* given file.
* @throws TimeoutException if the wait timed out.
* @throws InterruptedException if interrupted while waiting for the response.
*/
private void waitForUpdatedStorageType(ClientProtocol client, String file,
long fileLen, StorageType expectedStorageType)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
LocatedBlocks blocks;
try {
blocks = client.getBlockLocations(file, 0, fileLen);
} catch (IOException e) {
throw new RuntimeException(e);
}
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
for (StorageType type : lb.getStorageTypes()) {
if (!expectedStorageType.equals(type)) {
LOG.info("Block {} has StorageType: {}. It might not have been "
+ "updated yet, awaiting the latest update.",
lb.getBlock().toString(), type);
return false;
}
}
}
return true;
}, 500, 5000, "Blocks storage type must be ARCHIVE");
}

private void initSecureConf(Configuration conf) throws Exception {
String username = "mover";
File baseDir = GenericTestUtils.getTestDir(TestMover.class.getSimpleName());
Expand Down