Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
Expand Down Expand Up @@ -226,7 +225,6 @@ public void setUp() throws IOException {
assertEquals(NUM_INIT_VOLUMES, getNumVolumes());
assertEquals(0, dataset.getNumFailedVolumes());
}

@Test(timeout=10000)
public void testReadLockEnabledByDefault()
throws Exception {
Expand Down Expand Up @@ -268,11 +266,12 @@ public void run() {
waiter.join();
// The holder thread is still holding the lock, but the waiter can still
// run as the lock is a shared read lock.
// Otherwise test will timeout with deadlock.
assertEquals(true, accessed.get());
holder.interrupt();
}

@Test(timeout=10000)
@Test(timeout=20000)
public void testReadLockCanBeDisabledByConfig()
throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
Expand All @@ -281,58 +280,58 @@ public void testReadLockCanBeDisabledByConfig()
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
try {
AtomicBoolean accessed = new AtomicBoolean(false);
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);

CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waiterLatch = new CountDownLatch(1);
// create a synchronized list and verify the order of elements.
List<Integer> syncList =
Collections.synchronizedList(new ArrayList<>());


Thread holder = new Thread() {
public void run() {
latch.countDown();
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
syncList.add(0);
} catch (Exception e) {
return;
}
try {
latch.countDown();
// wait for the waiter thread to access the lock.
waiterLatch.await();
syncList.add(2);
} catch (InterruptedException e) {
} catch (Exception e) {
}
}
};

Thread waiter = new Thread() {
public void run() {
try {
// wait for holder to get into the critical section.
// Wait for holder to get ds read lock.
latch.await();
} catch (InterruptedException e) {
waiterLatch.countDown();
return;
}
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
syncList.add(1);
accessed.getAndSet(true);
// signal the holder thread.
waiterLatch.countDown();
} catch (Exception e) {
}
}
};
waiter.start();
holder.start();

waiter.join();
// Wait for sometime to make sure we are in deadlock,
try {
GenericTestUtils.waitFor(() ->
accessed.get(),
100, 10000);
fail("Waiter thread should not execute.");
} catch (TimeoutException e) {
}
// Release waiterLatch to exit deadlock.
waiterLatch.countDown();
holder.join();

// verify that the synchronized list has the correct sequence.
assertEquals(
"The sequence of checkpoints does not correspond to shared lock",
syncList, Arrays.asList(0, 1, 2));
waiter.join();
// After releasing waiterLatch water
// thread will be able to execute.
assertTrue(accessed.get());
} finally {
cluster.shutdown();
}
Expand Down