Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,7 @@ public synchronized Client client() {
* Returns a node client to a data node in the cluster.
* Note: use this with care tests should not rely on a certain nodes client.
*/
public synchronized Client dataNodeClient() {
ensureOpen();
public Client dataNodeClient() {
/* Randomly return a client to one of the nodes in the cluster */
return getRandomNodeAndClient(DATA_NODE_PREDICATE).client(random);
}
Expand All @@ -766,8 +765,7 @@ public synchronized Client dataNodeClient() {
* Returns a node client to the current master node.
* Note: use this with care tests should not rely on a certain nodes client.
*/
public synchronized Client masterClient() {
ensureOpen();
public Client masterClient() {
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new NodeNamePredicate(getMasterName()));
if (randomNodeAndClient != null) {
return randomNodeAndClient.nodeClient(); // ensure node client master is requested
Expand All @@ -778,8 +776,7 @@ public synchronized Client masterClient() {
/**
* Returns a node client to random node but not the master. This method will fail if no non-master client is available.
*/
public synchronized Client nonMasterClient() {
ensureOpen();
public Client nonMasterClient() {
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new NodeNamePredicate(getMasterName()).negate());
if (randomNodeAndClient != null) {
return randomNodeAndClient.nodeClient(); // ensure node client non-master is requested
Expand Down Expand Up @@ -813,16 +810,14 @@ public synchronized String startCoordinatingOnlyNode(Settings settings) {
* Returns a transport client
*/
public synchronized Client transportClient() {
ensureOpen();
// randomly return a transport client going to one of the nodes in the cluster
return getOrBuildRandomNode().transportClient();
}

/**
* Returns a node client to a given node.
*/
public synchronized Client client(String nodeName) {
ensureOpen();
public Client client(String nodeName) {
NodeAndClient nodeAndClient = nodes.get(nodeName);
if (nodeAndClient != null) {
return nodeAndClient.client(random);
Expand All @@ -834,7 +829,7 @@ public synchronized Client client(String nodeName) {
/**
* Returns a "smart" node client to a random node in the cluster
*/
public synchronized Client smartClient() {
public Client smartClient() {
NodeAndClient randomNodeAndClient = getRandomNodeAndClient();
if (randomNodeAndClient != null) {
return randomNodeAndClient.nodeClient();
Expand Down Expand Up @@ -897,17 +892,19 @@ public boolean isMasterEligible() {
}

Client client(Random random) {
if (closed.get()) {
throw new RuntimeException("already closed");
}
double nextDouble = random.nextDouble();
if (nextDouble < transportClientRatio) {
if (logger.isTraceEnabled()) {
logger.trace("Using transport client for node [{}] sniff: [{}]", node.settings().get("node.name"), false);
synchronized (InternalTestCluster.this) {
if (closed.get()) {
throw new RuntimeException("already closed");
}
double nextDouble = random.nextDouble();
if (nextDouble < transportClientRatio) {
if (logger.isTraceEnabled()) {
logger.trace("Using transport client for node [{}] sniff: [{}]", node.settings().get("node.name"), false);
}
return getOrBuildTransportClient();
} else {
return getOrBuildNodeClient();
}
return getOrBuildTransportClient();
} else {
return getOrBuildNodeClient();
}
}

Expand Down Expand Up @@ -1260,7 +1257,7 @@ public synchronized void afterTest() {
}

@Override
public synchronized void beforeIndexDeletion() throws Exception {
public void beforeIndexDeletion() throws Exception {
// Check that the operations counter on index shard has reached 0.
// The assumption here is that after a test there are no ongoing write operations.
// test that have ongoing write operations after the test (for example because ttl is used
Expand All @@ -1274,7 +1271,6 @@ public synchronized void beforeIndexDeletion() throws Exception {
}

private void assertSameSyncIdSameDocs() {
assert Thread.holdsLock(this);
Map<String, Long> docsOnShards = new HashMap<>();
final Collection<NodeAndClient> nodesAndClients = nodes.values();
for (NodeAndClient nodeAndClient : nodesAndClients) {
Expand Down Expand Up @@ -1303,10 +1299,8 @@ private void assertSameSyncIdSameDocs() {
}

private void assertNoPendingIndexOperations() throws Exception {
assert Thread.holdsLock(this);
assertBusy(() -> {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
for (NodeAndClient nodeAndClient : nodesAndClients) {
for (NodeAndClient nodeAndClient : nodes.values()) {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
Expand All @@ -1324,10 +1318,8 @@ private void assertNoPendingIndexOperations() throws Exception {
}

private void assertOpenTranslogReferences() throws Exception {
assert Thread.holdsLock(this);
assertBusy(() -> {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
for (NodeAndClient nodeAndClient : nodesAndClients) {
for (NodeAndClient nodeAndClient : nodes.values()) {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
Expand All @@ -1345,7 +1337,6 @@ private void assertOpenTranslogReferences() throws Exception {
}

private void assertNoSnapshottedIndexCommit() throws Exception {
assert Thread.holdsLock(this);
assertBusy(() -> {
for (NodeAndClient nodeAndClient : nodes.values()) {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
Expand All @@ -1370,7 +1361,7 @@ private void assertNoSnapshottedIndexCommit() throws Exception {
* Asserts that the document history in Lucene index is consistent with Translog's on every index shard of the cluster.
* This assertion might be expensive, thus we prefer not to execute on every test but only interesting tests.
*/
public synchronized void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOException {
public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOException {
for (NodeAndClient nodeAndClient : nodes.values()) {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
Expand Down Expand Up @@ -1720,9 +1711,7 @@ private synchronized void stopNodesAndClients(Collection<NodeAndClient> nodeAndC

for (NodeAndClient nodeAndClient: nodeAndClients) {
removeDisruptionSchemeFromNode(nodeAndClient);
final NavigableMap<String, NodeAndClient> newNodes = new TreeMap<>(nodes);
final NodeAndClient previous = newNodes.remove(nodeAndClient.name);
nodes = Collections.unmodifiableNavigableMap(newNodes);
final NodeAndClient previous = removeNode(nodeAndClient);
assert previous == nodeAndClient;
nodeAndClient.close();
}
Expand Down Expand Up @@ -1801,9 +1790,7 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback)
success = true;
} finally {
if (success == false) {
final NavigableMap<String, NodeAndClient> newNodes = new TreeMap<>(nodes);
newNodes.remove(nodeAndClient.name);
nodes = Collections.unmodifiableNavigableMap(newNodes);
removeNode(nodeAndClient);
}
}

Expand All @@ -1824,6 +1811,13 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback)
}
}

private NodeAndClient removeNode(NodeAndClient nodeAndClient) {
final NavigableMap<String, NodeAndClient> newNodes = new TreeMap<>(nodes);
final NodeAndClient previous = newNodes.remove(nodeAndClient.name);
nodes = Collections.unmodifiableNavigableMap(newNodes);
return previous;
}

private Set<String> excludeMasters(Collection<NodeAndClient> nodeAndClients) {
assert Thread.holdsLock(this);
final Set<String> excludedNodeIds = new HashSet<>();
Expand Down Expand Up @@ -2237,6 +2231,7 @@ public void clearDisruptionScheme() {
clearDisruptionScheme(true);
}

// synchronized to prevent concurrently modifying the cluster.
public synchronized void clearDisruptionScheme(boolean ensureHealthyCluster) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can wait for a healthy cluster. I think having the entire method synchronized while waiting for the cluster to become healthy could potentially lead to deadlocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question ..., I couldn't find a possible dead-lock in the implementations of our disruptions from a quick look over them.
My thinking would maybe be this:
If we don't synchronize here, we allow manipulating the cluster while we "wait for healthy" which could lead to some pretty hard to debug issues. Also, we really don't want to manipulate anything about the cluster while this method is in progress.
=> If we create some unforeseen deadlock here, I'd probably rather try to fix the implementation of the disruption to prevent the deadlock, then allow concurrent modification of the cluster while we clear the disruption?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is any chance of manipulations during this phase, I would rather guard against manipulating the cluster explicitly by adding this intermediate closing (or stopped) state.

Do we not risk something like what you described in #39118. If the disruption prevented the result from returning, the callback could be called at this time. If that in turn calls any of the synchronized methods it could potentially deadlock if we have to create a new connection while becoming healthy?

At a minimum I think we should add a comment why the synchronized is there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment for now. But I'm starting to think we're attacking this from the wrong angle to some degree. It seems like methods like this one (and a few others we now discussed) are currently only called from the main JUnit thread. Why, instead of worrying endlessly about how we sync. things like e.g. clearing the disruption while closing and such not just assert that we're on the main JUnit thread and simply not allow manipulating the cluster from elsewhere. We currently don't seem to be doing that and I don't see a good reason to start doing that kind of thing either (+ if someone needs this kind of thing down the line, they're free to add it as needed).
IMO, that would make calls to e.g. InternalTestCluster#restartRandomDataNode(org.elasticsearch.test.InternalTestCluster.RestartCallback) a lot easier to follow/debug.
WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also did not find any specific places where we deliberately manipulate the cluster in other threads (though I have not done an exhaustive search). However, it is not obvious that calling for instance client() could be invalid on a thread (if client or even node is lazily created, implicitly manipulating the cluster)? Also, I wonder if disruptive restart tests could be good to add and if that would be harder to then add since all changes have to be done in main thread. I think the code is now much clearer with this PR and would prefer to leave it with synchronized in place.

if (activeDisruptionScheme != null) {
TimeValue expectedHealingTime = activeDisruptionScheme.expectedTimeToHeal();
Expand Down Expand Up @@ -2318,8 +2313,7 @@ synchronized String routingKeyForShard(Index index, int shard, Random random) {
}

@Override
public synchronized Iterable<Client> getClients() {
ensureOpen();
public Iterable<Client> getClients() {
return () -> {
ensureOpen();
final Iterator<NodeAndClient> iterator = nodes.values().iterator();
Expand Down