-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HDFS-16535. SlotReleaser should reuse the domain socket based on socket paths #4158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
jojochuang
merged 3 commits into
apache:trunk
from
stiga-huang:fix-short-circuit-slot-release
Apr 18, 2022
Merged
Changes from 2 commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -63,6 +63,7 @@ | |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; | ||
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; | ||
| import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; | ||
| import org.apache.hadoop.hdfs.server.datanode.DataNode; | ||
| import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; | ||
| import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; | ||
| import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm; | ||
|
|
@@ -957,6 +958,83 @@ public void testDomainSocketClosedByDN() throws Exception { | |
| } | ||
| } | ||
|
|
||
| // Regression test for HDFS-16473 | ||
|
||
| @Test(timeout = 60000) | ||
| public void testDomainSocketClosedByMultipleDNs() throws Exception { | ||
| TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); | ||
| String testName = "testDomainSocketClosedByMultipleDNs"; | ||
| Configuration conf = createShortCircuitConf(testName, sockDir); | ||
| conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), | ||
| testName + "._PORT").getAbsolutePath()); | ||
| MiniDFSCluster cluster = | ||
| new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); | ||
|
|
||
| try { | ||
| cluster.waitActive(); | ||
| DistributedFileSystem fs = cluster.getFileSystem(); | ||
| final ShortCircuitCache cache = | ||
| fs.getClient().getClientContext().getShortCircuitCache(); | ||
|
|
||
| ExtendedBlockId blockId0 = new ExtendedBlockId(123, "xyz"); | ||
| ExtendedBlockId blockId1 = new ExtendedBlockId(456, "xyz"); | ||
|
|
||
| DataNode dn0 = cluster.getDataNodes().get(0); | ||
| DataNode dn1 = cluster.getDataNodes().get(1); | ||
|
|
||
| DomainPeer peer0 = new DomainPeer(DomainSocket.connect(new File( | ||
| sockDir.getDir(), testName + "." + dn0.getXferPort()).getAbsolutePath())); | ||
| DomainPeer peer1 = new DomainPeer(DomainSocket.connect(new File( | ||
| sockDir.getDir(), testName + "." + dn1.getXferPort()).getAbsolutePath())); | ||
|
|
||
| final DatanodeInfo dnInfo0 = new DatanodeInfo.DatanodeInfoBuilder() | ||
| .setNodeID(dn0.getDatanodeId()).build(); | ||
| final DatanodeInfo dnInfo1 = new DatanodeInfo.DatanodeInfoBuilder() | ||
| .setNodeID(dn1.getDatanodeId()).build(); | ||
|
|
||
| // Allocate 2 shm slots from DataNode-0 | ||
| MutableBoolean usedPeer = new MutableBoolean(false); | ||
| Slot slot1 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0, | ||
| "testDomainSocketClosedByMultipleDNs_client"); | ||
| dn0.getShortCircuitRegistry() | ||
| .registerSlot(blockId0, slot1.getSlotId(), false); | ||
|
|
||
| Slot slot2 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0, | ||
| "testDomainSocketClosedByMultipleDNs_client"); | ||
| dn0.getShortCircuitRegistry() | ||
| .registerSlot(blockId0, slot2.getSlotId(), false); | ||
|
|
||
| // Allocate 1 shm slot from DataNode-1 | ||
| Slot slot3 = cache.allocShmSlot(dnInfo1, peer1, usedPeer, blockId1, | ||
| "testDomainSocketClosedByMultipleDNs_client"); | ||
| dn1.getShortCircuitRegistry() | ||
| .registerSlot(blockId1, slot3.getSlotId(), false); | ||
|
|
||
| Assert.assertEquals(2, cache.getDfsClientShmManager().getShmNum()); | ||
| Assert.assertEquals(1, dn0.getShortCircuitRegistry().getShmNum()); | ||
| Assert.assertEquals(1, dn1.getShortCircuitRegistry().getShmNum()); | ||
|
|
||
| // Release the slot of DataNode-1 first. | ||
| cache.scheduleSlotReleaser(slot3); | ||
| Thread.sleep(2000); | ||
| Assert.assertEquals(1, cache.getDfsClientShmManager().getShmNum()); | ||
|
|
||
| // Release the slots of DataNode-0. | ||
| cache.scheduleSlotReleaser(slot1); | ||
| Thread.sleep(2000); | ||
| Assert.assertEquals("0 ShmNum means the shm of DataNode-0 is shutdown" + | ||
| " due to slot release failures.", | ||
| 1, cache.getDfsClientShmManager().getShmNum()); | ||
| cache.scheduleSlotReleaser(slot2); | ||
| Thread.sleep(2000); | ||
|
|
||
| Assert.assertEquals(0, dn0.getShortCircuitRegistry().getShmNum()); | ||
| Assert.assertEquals(0, dn1.getShortCircuitRegistry().getShmNum()); | ||
| Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum()); | ||
| } finally { | ||
| cluster.shutdown(); | ||
| } | ||
| } | ||
|
|
||
| @Test(timeout = 60000) | ||
| public void testDNRestart() throws Exception { | ||
| TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you remove the line 243? If remove it, the line 228 looks redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
domainSocketis now a local variable (defined at line 192). We don't need to update it at the end of this method. Note that thisfinallyclause belongs to the try-clause starts at line 195.For line 228, it's needed in the next retry of the while-loop. The check at line 199 will catch it and trigger a re-connect.