From b988867ce2d0cb099466201c74cf0f2a578adf81 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 19 Feb 2019 18:42:57 +0100 Subject: [PATCH 01/11] Simplify and Fix Synchronization in InternalTestCluster * Remove unnecessary `synchronized` statements * Make `Predicate`s constants where possible * Cleanup some stream usage * Make unsafe public methods `synchronized` --- .../test/InternalTestCluster.java | 212 +++++++----------- 1 file changed, 87 insertions(+), 125 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 2c02abab9dc1d..e3bca9c43fb00 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -187,6 +187,16 @@ public final class InternalTestCluster extends TestCluster { private final Logger logger = LogManager.getLogger(getClass()); + private static final Predicate DATA_NODE_PREDICATE = + nodeAndClient -> DiscoveryNode.isDataNode(nodeAndClient.node.settings()); + + private static final Predicate NO_DATA_NO_MASTER_PREDICATE = nodeAndClient -> + DiscoveryNode.isMasterNode(nodeAndClient.node.settings()) == false + && DiscoveryNode.isDataNode(nodeAndClient.node.settings()) == false; + + private static final Predicate MASTER_NODE_PREDICATE = + nodeAndClient -> DiscoveryNode.isMasterNode(nodeAndClient.node.settings()); + public static final int DEFAULT_LOW_NUM_MASTER_NODES = 1; public static final int DEFAULT_HIGH_NUM_MASTER_NODES = 3; @@ -208,7 +218,7 @@ public final class InternalTestCluster extends TestCluster { private final Settings defaultSettings; - private AtomicInteger nextNodeId = new AtomicInteger(0); + private final AtomicInteger nextNodeId = new AtomicInteger(0); /* Each shared node has a node seed that is used to start up the node and get default settings * this is important if a node is randomly shut down in a test since the next test relies on a @@ -240,7 +250,7 @@ public final class InternalTestCluster extends TestCluster { private final Path baseDir; private ServiceDisruptionScheme activeDisruptionScheme; - private Function clientWrapper; + private final Function clientWrapper; private int bootstrapMasterNodeIndex = -1; @@ -405,10 +415,6 @@ private static boolean usingZen1(Settings settings) { return ZEN_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(settings)); } - public int getBootstrapMasterNodeIndex() { - return bootstrapMasterNodeIndex; - } - /** * Sets {@link #bootstrapMasterNodeIndex} to the given value, see {@link #bootstrapMasterNodeWithSpecifiedIndex(List)} * for the description of how this field is used. @@ -431,7 +437,7 @@ public boolean getAutoManageMinMasterNode() { return autoManageMinMasterNodes; } - public String[] getNodeNames() { + public synchronized String[] getNodeNames() { return nodes.keySet().toArray(Strings.EMPTY_ARRAY); } @@ -460,7 +466,7 @@ public Collection> getPlugins() { return plugins; } - private Settings getRandomNodeSettings(long seed) { + private static Settings getRandomNodeSettings(long seed) { Random random = new Random(seed); Builder builder = Settings.builder(); builder.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), rarely(random)); @@ -566,11 +572,10 @@ private synchronized NodeAndClient getOrBuildRandomNode() { return buildNode; } - private synchronized NodeAndClient getRandomNodeAndClient() { + private NodeAndClient getRandomNodeAndClient() { return getRandomNodeAndClient(nc -> true); } - private synchronized NodeAndClient getRandomNodeAndClient(Predicate predicate) { ensureOpen(); List values = nodes.values().stream().filter(predicate).collect(Collectors.toList()); @@ -612,7 +617,7 @@ public synchronized void ensureAtMostNumDataNodes(int n) throws IOException { final Stream collection = n == 0 ? nodes.values().stream() : nodes.values().stream() - .filter(new DataNodePredicate().and(new NodeNamePredicate(getMasterName()).negate())); + .filter(DATA_NODE_PREDICATE.and(new NodeNamePredicate(getMasterName()).negate())); final Iterator values = collection.iterator(); logger.info("changing cluster size from {} data nodes to {}", size, n); @@ -676,7 +681,7 @@ private Settings getNodeSettings(final int nodeId, final long seed, final Settin * the method will return the existing one * @param onTransportServiceStarted callback to run when transport service is started */ - private NodeAndClient buildNode(int nodeId, Settings settings, + private synchronized NodeAndClient buildNode(int nodeId, Settings settings, boolean reuseExisting, Runnable onTransportServiceStarted) { assert Thread.holdsLock(this); ensureOpen(); @@ -726,7 +731,7 @@ private String buildNodeName(int id, Settings settings) { /** * returns a suffix string based on the node role. If no explicit role is defined, the suffix will be empty */ - private String getRoleSuffix(Settings settings) { + private static String getRoleSuffix(Settings settings) { String suffix = ""; if (Node.NODE_MASTER_SETTING.exists(settings) && Node.NODE_MASTER_SETTING.get(settings)) { suffix = suffix + Role.MASTER.getAbbreviation(); @@ -756,7 +761,7 @@ public synchronized Client client() { public synchronized Client dataNodeClient() { ensureOpen(); /* Randomly return a client to one of the nodes in the cluster */ - return getRandomNodeAndClient(new DataNodePredicate()).client(random); + return getRandomNodeAndClient(DATA_NODE_PREDICATE).client(random); } /** @@ -791,14 +796,14 @@ public synchronized Client nonMasterClient() { */ public synchronized Client coordOnlyNodeClient() { ensureOpen(); - NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new NoDataNoMasterNodePredicate()); + NodeAndClient randomNodeAndClient = getRandomNodeAndClient(NO_DATA_NO_MASTER_PREDICATE); if (randomNodeAndClient != null) { return randomNodeAndClient.client(random); } int nodeId = nextNodeId.getAndIncrement(); Settings settings = getSettings(nodeId, random.nextLong(), Settings.EMPTY); startCoordinatingOnlyNode(settings); - return getRandomNodeAndClient(new NoDataNoMasterNodePredicate()).client(random); + return getRandomNodeAndClient(NO_DATA_NO_MASTER_PREDICATE).client(random); } public synchronized String startCoordinatingOnlyNode(Settings settings) { @@ -859,7 +864,7 @@ public synchronized void close() throws IOException { } } - public static final int REMOVED_MINIMUM_MASTER_NODES = Integer.MAX_VALUE; + private static final int REMOVED_MINIMUM_MASTER_NODES = Integer.MAX_VALUE; private final class NodeAndClient implements Closeable { private MockNode node; @@ -875,7 +880,7 @@ private final class NodeAndClient implements Closeable { this.name = name; this.originalNodeSettings = originalNodeSettings; this.nodeAndClientId = nodeAndClientId; - markNodeDataDirsAsNotEligableForWipe(node); + markNodeDataDirsAsNotEligibleForWipe(node); } Node node() { @@ -1034,7 +1039,7 @@ public void afterStart() { } }); closed.set(false); - markNodeDataDirsAsNotEligableForWipe(node); + markNodeDataDirsAsNotEligibleForWipe(node); } @Override @@ -1047,11 +1052,27 @@ public void close() throws IOException { node.close(); } } + + private void markNodeDataDirsAsPendingForWipe(Node node) { + assert Thread.holdsLock(InternalTestCluster.this); + NodeEnvironment nodeEnv = node.getNodeEnvironment(); + if (nodeEnv.hasNodeFile()) { + dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataPaths())); + } + } + + private void markNodeDataDirsAsNotEligibleForWipe(Node node) { + assert Thread.holdsLock(InternalTestCluster.this); + NodeEnvironment nodeEnv = node.getNodeEnvironment(); + if (nodeEnv.hasNodeFile()) { + dataDirToClean.removeAll(Arrays.asList(nodeEnv.nodeDataPaths())); + } + } } public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_"; - static class TransportClientFactory { + private static class TransportClientFactory { private final boolean sniff; private final Settings settings; private final Path baseDir; @@ -1120,8 +1141,7 @@ private synchronized void reset(boolean wipeData) throws IOException { // trash all nodes with id >= sharedNodesSeeds.length - they are non shared final List toClose = new ArrayList<>(); - for (Iterator iterator = nodes.values().iterator(); iterator.hasNext();) { - NodeAndClient nodeAndClient = iterator.next(); + for (NodeAndClient nodeAndClient : nodes.values()) { if (nodeAndClient.nodeAndClientId() >= sharedNodesSeeds.length) { logger.debug("Close Node [{}] not shared", nodeAndClient.name); toClose.add(nodeAndClient); @@ -1213,7 +1233,7 @@ public synchronized void validateClusterFormed() { } /** ensure a cluster is formed with all published nodes, but do so by using the client of the specified node */ - public synchronized void validateClusterFormed(String viaNode) { + private synchronized void validateClusterFormed(String viaNode) { Set expectedNodes = new HashSet<>(); for (NodeAndClient nodeAndClient : nodes.values()) { expectedNodes.add(getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode()); @@ -1242,7 +1262,7 @@ public synchronized void validateClusterFormed(String viaNode) { } @Override - public synchronized void afterTest() throws IOException { + public synchronized void afterTest() { wipePendingDataDirectories(); randomlyResetClients(); /* reset all clients - each test gets its own client based on the Random instance created above. */ } @@ -1300,7 +1320,7 @@ private void assertNoPendingIndexOperations() throws Exception { if (operations.size() > 0) { throw new AssertionError( "shard " + indexShard.shardId() + " on node [" + nodeAndClient.name + "] has pending operations:\n --> " + - operations.stream().collect(Collectors.joining("\n --> ")) + String.join("\n --> ", operations) ); } } @@ -1489,22 +1509,6 @@ public synchronized void wipePendingDataDirectories() { } } - private void markNodeDataDirsAsPendingForWipe(Node node) { - assert Thread.holdsLock(this); - NodeEnvironment nodeEnv = node.getNodeEnvironment(); - if (nodeEnv.hasNodeFile()) { - dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataPaths())); - } - } - - private void markNodeDataDirsAsNotEligableForWipe(Node node) { - assert Thread.holdsLock(this); - NodeEnvironment nodeEnv = node.getNodeEnvironment(); - if (nodeEnv.hasNodeFile()) { - dataDirToClean.removeAll(Arrays.asList(nodeEnv.nodeDataPaths())); - } - } - /** * Returns a reference to a random node's {@link ClusterService} */ @@ -1515,7 +1519,7 @@ public ClusterService clusterService() { /** * Returns a reference to a node's {@link ClusterService}. If the given node is null, a random node will be selected. */ - public synchronized ClusterService clusterService(@Nullable String node) { + public ClusterService clusterService(@Nullable String node) { return getInstance(ClusterService.class, node); } @@ -1533,8 +1537,8 @@ public synchronized Iterable getInstances(Class clazz) { /** * Returns an Iterable to all instances for the given class >T< across all data nodes in the cluster. */ - public synchronized Iterable getDataNodeInstances(Class clazz) { - return getInstances(clazz, new DataNodePredicate()); + public Iterable getDataNodeInstances(Class clazz) { + return getInstances(clazz, DATA_NODE_PREDICATE); } public synchronized T getCurrentMasterNodeInstance(Class clazz) { @@ -1545,8 +1549,8 @@ public synchronized T getCurrentMasterNodeInstance(Class clazz) { * Returns an Iterable to all instances for the given class >T< across all data and master nodes * in the cluster. */ - public synchronized Iterable getDataOrMasterNodeInstances(Class clazz) { - return getInstances(clazz, new DataNodePredicate().or(new MasterNodePredicate())); + public Iterable getDataOrMasterNodeInstances(Class clazz) { + return getInstances(clazz, DATA_NODE_PREDICATE.or(MASTER_NODE_PREDICATE)); } private synchronized Iterable getInstances(Class clazz, Predicate predicate) { @@ -1561,16 +1565,16 @@ private synchronized Iterable getInstances(Class clazz, Predicate T getInstance(Class clazz, final String node) { + public T getInstance(Class clazz, final String node) { return getInstance(clazz, nc -> node == null || node.equals(nc.name)); } - public synchronized T getDataNodeInstance(Class clazz) { - return getInstance(clazz, new DataNodePredicate()); + public T getDataNodeInstance(Class clazz) { + return getInstance(clazz, DATA_NODE_PREDICATE); } - public synchronized T getMasterNodeInstance(Class clazz) { - return getInstance(clazz, new MasterNodePredicate()); + public T getMasterNodeInstance(Class clazz) { + return getInstance(clazz, MASTER_NODE_PREDICATE); } private synchronized T getInstance(Class clazz, Predicate predicate) { @@ -1582,11 +1586,11 @@ private synchronized T getInstance(Class clazz, Predicate /** * Returns a reference to a random nodes instances of the given class >T< */ - public synchronized T getInstance(Class clazz) { + public T getInstance(Class clazz) { return getInstance(clazz, nc -> true); } - private synchronized T getInstanceFromNode(Class clazz, Node node) { + private static T getInstanceFromNode(Class clazz, Node node) { return node.injector().getInstance(clazz); } @@ -1609,7 +1613,7 @@ public InetSocketAddress[] httpAddresses() { */ public synchronized boolean stopRandomDataNode() throws IOException { ensureOpen(); - NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate()); + NodeAndClient nodeAndClient = getRandomNodeAndClient(DATA_NODE_PREDICATE); if (nodeAndClient != null) { logger.info("Closing random node [{}] ", nodeAndClient.name); stopNodesAndClient(nodeAndClient); @@ -1714,7 +1718,7 @@ private void rebuildUnicastHostFiles(List newNodes) { } } - private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException { + private void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException { stopNodesAndClients(Collections.singleton(nodeAndClient)); } @@ -1742,7 +1746,7 @@ public void restartRandomDataNode() throws Exception { * Restarts a random data node in the cluster and calls the callback during restart. */ public void restartRandomDataNode(RestartCallback callback) throws Exception { - restartRandomNode(new DataNodePredicate(), callback); + restartRandomNode(DATA_NODE_PREDICATE, callback); } /** @@ -1894,7 +1898,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient); } - assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == nodes.size(); + assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodes.size(); // randomize start up order, but making sure that: // 1) A data folder that was assigned to a data node will stay so @@ -1912,7 +1916,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception final List nodesByRole = nodesByRoles.get(roles); startUpOrder.add(nodesByRole.remove(0)); } - assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0; + assert nodesByRoles.values().stream().mapToInt(List::size).sum() == 0; for (NodeAndClient nodeAndClient : startUpOrder) { logger.info("creating node [{}] ", nodeAndClient.name); @@ -1959,7 +1963,7 @@ private synchronized Set nRandomDataNodes(int numNodes) { nodes .entrySet() .stream() - .filter(new EntryNodePredicate(new DataNodePredicate())) + .filter(entry -> DATA_NODE_PREDICATE.test(entry.getValue())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); final HashSet set = new HashSet<>(); final Iterator iterator = dataNodes.keySet().iterator(); @@ -1996,7 +2000,7 @@ public synchronized Set nodesInclude(String index) { * If {@link #bootstrapMasterNodeIndex} is -1 (default), this method does nothing. */ private List bootstrapMasterNodeWithSpecifiedIndex(List allNodesSettings) { - if (getBootstrapMasterNodeIndex() == -1) { // fast-path + if (bootstrapMasterNodeIndex == -1) { // fast-path return allNodesSettings; } @@ -2040,36 +2044,36 @@ private List bootstrapMasterNodeWithSpecifiedIndex(List allN /** * Starts a node with default settings and returns its name. */ - public synchronized String startNode() { + public String startNode() { return startNode(Settings.EMPTY); } /** * Starts a node with the given settings builder and returns its name. */ - public synchronized String startNode(Settings.Builder settings) { + public String startNode(Settings.Builder settings) { return startNode(settings.build()); } /** * Starts a node with the given settings and returns its name. */ - public synchronized String startNode(Settings settings) { + public String startNode(Settings settings) { return startNodes(settings).get(0); } /** * Starts multiple nodes with default settings and returns their names */ - public synchronized List startNodes(int numOfNodes) { + public List startNodes(int numOfNodes) { return startNodes(numOfNodes, Settings.EMPTY); } /** * Starts multiple nodes with the given settings and returns their names */ - public synchronized List startNodes(int numOfNodes, Settings settings) { - return startNodes(Collections.nCopies(numOfNodes, settings).stream().toArray(Settings[]::new)); + public List startNodes(int numOfNodes, Settings settings) { + return startNodes(Collections.nCopies(numOfNodes, settings).toArray(new Settings[0])); } /** @@ -2127,11 +2131,11 @@ public synchronized List startNodes(Settings... extraSettings) { return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList()); } - public synchronized List startMasterOnlyNodes(int numNodes) { + public List startMasterOnlyNodes(int numNodes) { return startMasterOnlyNodes(numNodes, Settings.EMPTY); } - public synchronized List startMasterOnlyNodes(int numNodes, Settings settings) { + public List startMasterOnlyNodes(int numNodes, Settings settings) { Settings settings1 = Settings.builder() .put(settings) .put(Node.NODE_MASTER_SETTING.getKey(), true) @@ -2140,16 +2144,12 @@ public synchronized List startMasterOnlyNodes(int numNodes, Settings set return startNodes(numNodes, settings1); } - public synchronized List startDataOnlyNodes(int numNodes) { - return startDataOnlyNodes(numNodes, Settings.EMPTY); - } - - public synchronized List startDataOnlyNodes(int numNodes, Settings settings) { + public List startDataOnlyNodes(int numNodes) { Settings settings1 = Settings.builder() - .put(settings) - .put(Node.NODE_MASTER_SETTING.getKey(), false) - .put(Node.NODE_DATA_SETTING.getKey(), true) - .build(); + .put(Settings.EMPTY) + .put(Node.NODE_MASTER_SETTING.getKey(), false) + .put(Node.NODE_DATA_SETTING.getKey(), true) + .build(); return startNodes(numNodes, settings1); } @@ -2177,7 +2177,7 @@ private int updateMinMasterNodes(int eligibleMasterNodeCount) { } /** calculates a min master nodes value based on the given number of master nodes */ - private int getMinMasterNodes(int eligibleMasterNodes) { + private static int getMinMasterNodes(int eligibleMasterNodes) { return eligibleMasterNodes / 2 + 1; } @@ -2185,11 +2185,11 @@ private int getMasterNodesCount() { return (int)nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count(); } - public synchronized String startMasterOnlyNode() { + public String startMasterOnlyNode() { return startMasterOnlyNode(Settings.EMPTY); } - public synchronized String startMasterOnlyNode(Settings settings) { + public String startMasterOnlyNode(Settings settings) { Settings settings1 = Settings.builder() .put(settings) .put(Node.NODE_MASTER_SETTING.getKey(), true) @@ -2198,10 +2198,11 @@ public synchronized String startMasterOnlyNode(Settings settings) { return startNode(settings1); } - public synchronized String startDataOnlyNode() { + public String startDataOnlyNode() { return startDataOnlyNode(Settings.EMPTY); } - public synchronized String startDataOnlyNode(Settings settings) { + + public String startDataOnlyNode(Settings settings) { Settings settings1 = Settings.builder() .put(settings) .put(Node.NODE_MASTER_SETTING.getKey(), false) @@ -2227,10 +2228,10 @@ public int numDataNodes() { @Override public int numDataAndMasterNodes() { - return dataAndMasterNodes().size(); + return filterNodes(nodes, DATA_NODE_PREDICATE.or(MASTER_NODE_PREDICATE)).size(); } - public synchronized int numMasterNodes() { + public int numMasterNodes() { return filterNodes(nodes, NodeAndClient::isMasterEligible).size(); } @@ -2245,7 +2246,7 @@ public void clearDisruptionScheme() { clearDisruptionScheme(true); } - public void clearDisruptionScheme(boolean ensureHealthyCluster) { + public synchronized void clearDisruptionScheme(boolean ensureHealthyCluster) { if (activeDisruptionScheme != null) { TimeValue expectedHealingTime = activeDisruptionScheme.expectedTimeToHeal(); logger.info("Clearing active scheme {}, expected healing time {}", activeDisruptionScheme, expectedHealingTime); @@ -2273,11 +2274,7 @@ private void removeDisruptionSchemeFromNode(NodeAndClient nodeAndClient) { } private synchronized Collection dataNodeAndClients() { - return filterNodes(nodes, new DataNodePredicate()); - } - - private synchronized Collection dataAndMasterNodes() { - return filterNodes(nodes, new DataNodePredicate().or(new MasterNodePredicate())); + return filterNodes(nodes, DATA_NODE_PREDICATE); } private synchronized Collection filterNodes(Map map, @@ -2289,20 +2286,6 @@ private synchronized Collection filterNodes(Map { - @Override - public boolean test(NodeAndClient nodeAndClient) { - return DiscoveryNode.isDataNode(nodeAndClient.node.settings()); - } - } - - private static final class MasterNodePredicate implements Predicate { - @Override - public boolean test(NodeAndClient nodeAndClient) { - return DiscoveryNode.isMasterNode(nodeAndClient.node.settings()); - } - } - private static final class NodeNamePredicate implements Predicate { private final HashSet nodeNames; @@ -2316,27 +2299,6 @@ public boolean test(NodeAndClient nodeAndClient) { } } - private static final class NoDataNoMasterNodePredicate implements Predicate { - @Override - public boolean test(NodeAndClient nodeAndClient) { - return DiscoveryNode.isMasterNode(nodeAndClient.node.settings()) == false && - DiscoveryNode.isDataNode(nodeAndClient.node.settings()) == false; - } - } - - private static final class EntryNodePredicate implements Predicate> { - private final Predicate delegateNodePredicate; - - EntryNodePredicate(Predicate delegateNodePredicate) { - this.delegateNodePredicate = delegateNodePredicate; - } - - @Override - public boolean test(Map.Entry entry) { - return delegateNodePredicate.test(entry.getValue()); - } - } - synchronized String routingKeyForShard(Index index, int shard, Random random) { assertThat(shard, greaterThanOrEqualTo(0)); assertThat(shard, greaterThanOrEqualTo(0)); From 49123ec4b61776c1dcce1c16228d779917efa5a2 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 20 Feb 2019 07:22:56 +0100 Subject: [PATCH 02/11] simpler --- .../test/InternalTestCluster.java | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index e3bca9c43fb00..c365a3148a3bc 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -944,7 +944,7 @@ private Client getOrBuildTransportClient() { * since it might throw NoNodeAvailableException if nodes are * shut down. we first need support of transportClientRatio * as annotations or so */ - transportClient = new TransportClientFactory(false, nodeConfigurationSource.transportClientSettings(), + transportClient = new TransportClientFactory(nodeConfigurationSource.transportClientSettings(), baseDir, nodeConfigurationSource.transportClientPlugins()).client(node, clusterName); } return clientWrapper.apply(transportClient); @@ -1073,13 +1073,11 @@ private void markNodeDataDirsAsNotEligibleForWipe(Node node) { public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_"; private static class TransportClientFactory { - private final boolean sniff; private final Settings settings; private final Path baseDir; private final Collection> plugins; - TransportClientFactory(boolean sniff, Settings settings, Path baseDir, Collection> plugins) { - this.sniff = sniff; + TransportClientFactory(Settings settings, Path baseDir, Collection> plugins) { this.settings = settings != null ? settings : Settings.EMPTY; this.baseDir = baseDir; this.plugins = plugins; @@ -1092,7 +1090,7 @@ public Client client(Node node, String clusterName) { .put("client.transport.nodes_sampler_interval", "1s") .put(Environment.PATH_HOME_SETTING.getKey(), baseDir) .put("node.name", TRANSPORT_CLIENT_PREFIX + node.settings().get("node.name")) - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", sniff) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", false) .put("logger.prefix", nodeSettings.get("logger.prefix", "")) .put("logger.level", nodeSettings.get("logger.level", "INFO")) .put(settings); @@ -1745,16 +1743,9 @@ public void restartRandomDataNode() throws Exception { /** * Restarts a random data node in the cluster and calls the callback during restart. */ - public void restartRandomDataNode(RestartCallback callback) throws Exception { - restartRandomNode(DATA_NODE_PREDICATE, callback); - } - - /** - * Restarts a random node in the cluster and calls the callback during restart. - */ - private synchronized void restartRandomNode(Predicate predicate, RestartCallback callback) throws Exception { + public synchronized void restartRandomDataNode(RestartCallback callback) throws Exception { ensureOpen(); - NodeAndClient nodeAndClient = getRandomNodeAndClient(predicate); + NodeAndClient nodeAndClient = getRandomNodeAndClient(InternalTestCluster.DATA_NODE_PREDICATE); if (nodeAndClient != null) { restartNode(nodeAndClient, callback); } @@ -1851,7 +1842,7 @@ private Set excludeMasters(Collection nodeAndClients) { logger.info("adding voting config exclusions {} prior to restart/shutdown", excludedNodeIds); try { client().execute(AddVotingConfigExclusionsAction.INSTANCE, - new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(new String[0]))).get(); + new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(Strings.EMPTY_ARRAY))).get(); } catch (InterruptedException | ExecutionException e) { throw new AssertionError("unexpected", e); } @@ -2158,7 +2149,7 @@ public List startDataOnlyNodes(int numNodes) { * * @param eligibleMasterNodeCount the number of master eligible nodes to use as basis for the min master node setting */ - private int updateMinMasterNodes(int eligibleMasterNodeCount) { + private void updateMinMasterNodes(int eligibleMasterNodeCount) { assert autoManageMinMasterNodes; final int minMasterNodes = getMinMasterNodes(eligibleMasterNodeCount); if (getMasterNodesCount() > 0) { @@ -2173,7 +2164,6 @@ private int updateMinMasterNodes(int eligibleMasterNodeCount) { minMasterNodes, getMasterNodesCount()); } } - return minMasterNodes; } /** calculates a min master nodes value based on the given number of master nodes */ From 407a9d9e09339113b2a96596222cdb4898d48152 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 20 Feb 2019 09:27:56 +0100 Subject: [PATCH 03/11] more cleanups --- .../test/InternalTestCluster.java | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index c365a3148a3bc..7a876b19a0b1a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -111,7 +111,6 @@ import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; -import org.junit.Assert; import java.io.Closeable; import java.io.IOException; @@ -551,8 +550,8 @@ private void ensureOpen() { } } - private synchronized NodeAndClient getOrBuildRandomNode() { - ensureOpen(); + private NodeAndClient getOrBuildRandomNode() { + assert Thread.holdsLock(this); final NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); if (randomNodeAndClient != null) { return randomNodeAndClient; @@ -774,8 +773,7 @@ public synchronized Client masterClient() { if (randomNodeAndClient != null) { return randomNodeAndClient.nodeClient(); // ensure node client master is requested } - Assert.fail("No master client found"); - return null; // can't happen + throw new AssertionError("No master client found"); } /** @@ -787,8 +785,7 @@ public synchronized Client nonMasterClient() { if (randomNodeAndClient != null) { return randomNodeAndClient.nodeClient(); // ensure node client non-master is requested } - Assert.fail("No non-master client found"); - return null; // can't happen + throw new AssertionError("No non-master client found"); } /** @@ -831,8 +828,7 @@ public synchronized Client client(String nodeName) { if (nodeAndClient != null) { return nodeAndClient.client(random); } - Assert.fail("No node found with name: [" + nodeName + "]"); - return null; // can't happen + throw new AssertionError("No node found with name: [" + nodeName + "]"); } @@ -844,8 +840,7 @@ public synchronized Client smartClient() { if (randomNodeAndClient != null) { return randomNodeAndClient.nodeClient(); } - Assert.fail("No smart client found"); - return null; // can't happen + throw new AssertionError("No smart client found"); } @Override @@ -1824,6 +1819,7 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) } private Set excludeMasters(Collection nodeAndClients) { + assert Thread.holdsLock(this); final Set excludedNodeIds = new HashSet<>(); if (autoManageMinMasterNodes && nodeAndClients.size() > 0) { @@ -1856,6 +1852,7 @@ private Set excludeMasters(Collection nodeAndClients) { } private void removeExclusions(Set excludedNodeIds) { + assert Thread.holdsLock(this); if (excludedNodeIds.isEmpty() == false) { logger.info("removing voting config exclusions for {} after restart/shutdown", excludedNodeIds); try { @@ -1944,11 +1941,8 @@ public String getMasterName(@Nullable String viaNode) { } } - synchronized Set allDataNodesButN(int numNodes) { - return nRandomDataNodes(numDataNodes() - numNodes); - } - - private synchronized Set nRandomDataNodes(int numNodes) { + synchronized Set allDataNodesButN(int count) { + final int numNodes = numDataNodes() - count; assert size() >= numNodes; Map dataNodes = nodes @@ -1991,6 +1985,7 @@ public synchronized Set nodesInclude(String index) { * If {@link #bootstrapMasterNodeIndex} is -1 (default), this method does nothing. */ private List bootstrapMasterNodeWithSpecifiedIndex(List allNodesSettings) { + assert Thread.holdsLock(this); if (bootstrapMasterNodeIndex == -1) { // fast-path return allNodesSettings; } @@ -2277,15 +2272,15 @@ private synchronized Collection filterNodes(Map { - private final HashSet nodeNames; + private final String nodeName; - NodeNamePredicate(String... nodeNames) { - this.nodeNames = Sets.newHashSet(nodeNames); + NodeNamePredicate(String nodeName) { + this.nodeName = nodeName; } @Override public boolean test(NodeAndClient nodeAndClient) { - return nodeNames.contains(nodeAndClient.getName()); + return nodeName.equals(nodeAndClient.getName()); } } @@ -2444,7 +2439,7 @@ public void ensureEstimatedStats() { } @Override - public void assertAfterTest() throws IOException { + public synchronized void assertAfterTest() throws IOException { super.assertAfterTest(); assertRequestsFinished(); for (NodeAndClient nodeAndClient : nodes.values()) { @@ -2461,6 +2456,7 @@ public void assertAfterTest() throws IOException { } private void assertRequestsFinished() { + assert Thread.holdsLock(this); if (size() > 0) { for (NodeAndClient nodeAndClient : nodes.values()) { CircuitBreaker inFlightRequestsBreaker = getInstance(CircuitBreakerService.class, nodeAndClient.name) From 24f2848f35949105a066c0c70bc0393ea4d0caa4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 20 Feb 2019 10:55:13 +0100 Subject: [PATCH 04/11] more cleanups --- .../test/InternalTestCluster.java | 97 ++++++++++--------- 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 7a876b19a0b1a..c54bc13a3c0de 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -207,7 +207,7 @@ public final class InternalTestCluster extends TestCluster { static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1; /* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */ - private final NavigableMap nodes = new TreeMap<>(); + private final NavigableMap nodes = Collections.synchronizedNavigableMap(new TreeMap<>()); private final Set dataDirToClean = new HashSet<>(); @@ -436,8 +436,8 @@ public boolean getAutoManageMinMasterNode() { return autoManageMinMasterNodes; } - public synchronized String[] getNodeNames() { - return nodes.keySet().toArray(Strings.EMPTY_ARRAY); + public String[] getNodeNames() { + return currentNodes().keySet().toArray(Strings.EMPTY_ARRAY); } private Settings getSettings(int nodeOrdinal, long nodeSeed, Settings others) { @@ -687,13 +687,12 @@ private synchronized NodeAndClient buildNode(int nodeId, Settings settings, Collection> plugins = getPlugins(); String name = settings.get("node.name"); - if (reuseExisting && nodes.containsKey(name)) { + final NodeAndClient nodeAndClient = nodes.get(name); + if (reuseExisting && nodeAndClient != null) { onTransportServiceStarted.run(); // reusing an existing node implies its transport service already started - return nodes.get(name); - } else { - assert reuseExisting == true || nodes.containsKey(name) == false : - "node name [" + name + "] already exists but not allowed to use it"; + return nodeAndClient; } + assert reuseExisting == true || nodeAndClient == null : "node name [" + name + "] already exists but not allowed to use it"; SecureSettings secureSettings = Settings.builder().put(settings).getSecureSettings(); if (secureSettings instanceof MockSecureSettings) { @@ -835,7 +834,7 @@ public synchronized Client client(String nodeName) { /** * Returns a "smart" node client to a random node in the cluster */ - public synchronized Client smartClient() { + public Client smartClient() { NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); if (randomNodeAndClient != null) { return randomNodeAndClient.nodeClient(); @@ -1261,7 +1260,7 @@ public synchronized void afterTest() { } @Override - public void beforeIndexDeletion() throws Exception { + public synchronized void beforeIndexDeletion() throws Exception { // Check that the operations counter on index shard has reached 0. // The assumption here is that after a test there are no ongoing write operations. // test that have ongoing write operations after the test (for example because ttl is used @@ -1275,6 +1274,7 @@ public void beforeIndexDeletion() throws Exception { } private void assertSameSyncIdSameDocs() { + assert Thread.holdsLock(this); Map docsOnShards = new HashMap<>(); final Collection nodesAndClients = nodes.values(); for (NodeAndClient nodeAndClient : nodesAndClients) { @@ -1303,6 +1303,7 @@ private void assertSameSyncIdSameDocs() { } private void assertNoPendingIndexOperations() throws Exception { + assert Thread.holdsLock(this); assertBusy(() -> { final Collection nodesAndClients = nodes.values(); for (NodeAndClient nodeAndClient : nodesAndClients) { @@ -1323,6 +1324,7 @@ private void assertNoPendingIndexOperations() throws Exception { } private void assertOpenTranslogReferences() throws Exception { + assert Thread.holdsLock(this); assertBusy(() -> { final Collection nodesAndClients = nodes.values(); for (NodeAndClient nodeAndClient : nodesAndClients) { @@ -1343,9 +1345,9 @@ private void assertOpenTranslogReferences() throws Exception { } private void assertNoSnapshottedIndexCommit() throws Exception { + assert Thread.holdsLock(this); assertBusy(() -> { - final Collection nodesAndClients = nodes.values(); - for (NodeAndClient nodeAndClient : nodesAndClients) { + for (NodeAndClient nodeAndClient : nodes.values()) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { @@ -1368,9 +1370,8 @@ private void assertNoSnapshottedIndexCommit() throws Exception { * Asserts that the document history in Lucene index is consistent with Translog's on every index shard of the cluster. * This assertion might be expensive, thus we prefer not to execute on every test but only interesting tests. */ - public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOException { - final Collection nodesAndClients = nodes.values(); - for (NodeAndClient nodeAndClient : nodesAndClients) { + public synchronized void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOException { + for (NodeAndClient nodeAndClient : nodes.values()) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { @@ -1384,6 +1385,12 @@ public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOExce } } + private NavigableMap currentNodes() { + synchronized (nodes) { + return new TreeMap<>(nodes); + } + } + private IndexShard getShardOrNull(ClusterState clusterState, ShardRouting shardRouting) { if (shardRouting == null || shardRouting.assignedToNode() == false) { return null; @@ -1476,6 +1483,7 @@ public void assertSameDocIdsOnShards() throws Exception { } private void randomlyResetClients() { + assert Thread.holdsLock(this); // only reset the clients on nightly tests, it causes heavy load... if (RandomizedTest.isNightly() && rarely(random)) { final Collection nodesAndClients = nodes.values(); @@ -1519,12 +1527,10 @@ public ClusterService clusterService(@Nullable String node) { /** * Returns an Iterable to all instances for the given class >T< across all nodes in the cluster. */ - public synchronized Iterable getInstances(Class clazz) { - List instances = new ArrayList<>(nodes.size()); - for (NodeAndClient nodeAndClient : nodes.values()) { - instances.add(getInstanceFromNode(clazz, nodeAndClient.node)); + public Iterable getInstances(Class clazz) { + synchronized (nodes) { + return nodes.values().stream().map(node -> getInstanceFromNode(clazz, node.node)).collect(Collectors.toList()); } - return instances; } /** @@ -1546,8 +1552,8 @@ public Iterable getDataOrMasterNodeInstances(Class clazz) { return getInstances(clazz, DATA_NODE_PREDICATE.or(MASTER_NODE_PREDICATE)); } - private synchronized Iterable getInstances(Class clazz, Predicate predicate) { - Iterable filteredNodes = nodes.values().stream().filter(predicate)::iterator; + private Iterable getInstances(Class clazz, Predicate predicate) { + Iterable filteredNodes = currentNodes().values().stream().filter(predicate)::iterator; List instances = new ArrayList<>(); for (NodeAndClient nodeAndClient : filteredNodes) { instances.add(getInstanceFromNode(clazz, nodeAndClient.node)); @@ -1588,8 +1594,8 @@ private static T getInstanceFromNode(Class clazz, Node node) { } @Override - public synchronized int size() { - return this.nodes.size(); + public int size() { + return nodes.size(); } @Override @@ -1635,9 +1641,10 @@ public synchronized void stopCurrentMasterNode() throws IOException { ensureOpen(); assert size() > 0; String masterNodeName = getMasterName(); - assert nodes.containsKey(masterNodeName); + final NodeAndClient masterNode = nodes.get(masterNodeName); + assert masterNode != null; logger.info("Closing master node [{}] ", masterNodeName); - stopNodesAndClient(nodes.get(masterNodeName)); + stopNodesAndClient(masterNode); } /** @@ -1691,19 +1698,21 @@ private void rebuildUnicastHostFiles(List newNodes) { // cannot be a synchronized method since it's called on other threads from within synchronized startAndPublishNodesAndClients() synchronized (discoveryFileMutex) { try { - Stream unicastHosts = Stream.concat(nodes.values().stream(), newNodes.stream()); - List discoveryFileContents = unicastHosts.map( + synchronized (nodes) { + Stream unicastHosts = Stream.concat(nodes.values().stream(), newNodes.stream()); + List discoveryFileContents = unicastHosts.map( nac -> nac.node.injector().getInstance(TransportService.class) ).filter(Objects::nonNull) - .map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode) - .map(n -> n.getAddress().toString()) - .distinct().collect(Collectors.toList()); - Set configPaths = Stream.concat(nodes.values().stream(), newNodes.stream()) - .map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet()); - logger.debug("configuring discovery with {} at {}", discoveryFileContents, configPaths); - for (final Path configPath : configPaths) { - Files.createDirectories(configPath); - Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents); + .map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode) + .map(n -> n.getAddress().toString()) + .distinct().collect(Collectors.toList()); + Set configPaths = Stream.concat(nodes.values().stream(), newNodes.stream()) + .map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet()); + logger.debug("configuring discovery with {} at {}", discoveryFileContents, configPaths); + for (final Path configPath : configPaths) { + Files.createDirectories(configPath); + Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents); + } } } catch (IOException e) { throw new AssertionError("failed to configure file-based discovery", e); @@ -1871,7 +1880,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception int numNodesRestarted = 0; final Settings[] newNodeSettings = new Settings[nextNodeId.get()]; Map, List> nodesByRoles = new HashMap<>(); - Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()]; + Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()]; final int minMasterNodes = autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1; for (NodeAndClient nodeAndClient : nodes.values()) { callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient()); @@ -2213,11 +2222,11 @@ public int numDataNodes() { @Override public int numDataAndMasterNodes() { - return filterNodes(nodes, DATA_NODE_PREDICATE.or(MASTER_NODE_PREDICATE)).size(); + return filterNodes(currentNodes(), DATA_NODE_PREDICATE.or(MASTER_NODE_PREDICATE)).size(); } public int numMasterNodes() { - return filterNodes(nodes, NodeAndClient::isMasterEligible).size(); + return filterNodes(currentNodes(), NodeAndClient::isMasterEligible).size(); } public void setDisruptionScheme(ServiceDisruptionScheme scheme) { @@ -2258,11 +2267,11 @@ private void removeDisruptionSchemeFromNode(NodeAndClient nodeAndClient) { } } - private synchronized Collection dataNodeAndClients() { - return filterNodes(nodes, DATA_NODE_PREDICATE); + private Collection dataNodeAndClients() { + return filterNodes(currentNodes(), DATA_NODE_PREDICATE); } - private synchronized Collection filterNodes(Map map, + private static Collection filterNodes(Map map, Predicate predicate) { return map .values() @@ -2394,7 +2403,7 @@ public void ensureEstimatedStats() { // Checks that the breakers have been reset without incurring a // network request, because a network request can increment one // of the breakers - for (NodeAndClient nodeAndClient : nodes.values()) { + for (NodeAndClient nodeAndClient : currentNodes().values()) { final IndicesFieldDataCache fdCache = getInstanceFromNode(IndicesService.class, nodeAndClient.node).getIndicesFieldDataCache(); // Clean up the cache, ensuring that entries' listeners have been called From 93b75fb582bd5b01c35725baa1303a472cebc01f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 20 Feb 2019 14:38:54 +0100 Subject: [PATCH 05/11] re-add sync --- .../main/java/org/elasticsearch/test/InternalTestCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index c54bc13a3c0de..8eefeed49bac0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -834,7 +834,7 @@ public synchronized Client client(String nodeName) { /** * Returns a "smart" node client to a random node in the cluster */ - public Client smartClient() { + public synchronized Client smartClient() { NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); if (randomNodeAndClient != null) { return randomNodeAndClient.nodeClient(); From 8a597fc765dd7b268ac473c885cab4dce8b452e0 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 20 Feb 2019 15:21:43 +0100 Subject: [PATCH 06/11] move to immutable nodes map --- .../test/InternalTestCluster.java | 85 +++++++++---------- 1 file changed, 41 insertions(+), 44 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 8eefeed49bac0..771ccd3e9381a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -206,8 +206,8 @@ public final class InternalTestCluster extends TestCluster { static final int DEFAULT_MIN_NUM_CLIENT_NODES = 0; static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1; - /* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */ - private final NavigableMap nodes = Collections.synchronizedNavigableMap(new TreeMap<>()); + /* sorted map to make traverse order reproducible.*/ + private volatile NavigableMap nodes = Collections.emptyNavigableMap(); private final Set dataDirToClean = new HashSet<>(); @@ -437,7 +437,7 @@ public boolean getAutoManageMinMasterNode() { } public String[] getNodeNames() { - return currentNodes().keySet().toArray(Strings.EMPTY_ARRAY); + return nodes.keySet().toArray(Strings.EMPTY_ARRAY); } private Settings getSettings(int nodeOrdinal, long nodeSeed, Settings others) { @@ -852,7 +852,7 @@ public synchronized void close() throws IOException { try { IOUtils.close(nodes.values()); } finally { - nodes.clear(); + nodes = Collections.emptyNavigableMap(); executor.shutdownNow(); } } @@ -1385,12 +1385,6 @@ public synchronized void assertConsistentHistoryBetweenTranslogAndLuceneIndex() } } - private NavigableMap currentNodes() { - synchronized (nodes) { - return new TreeMap<>(nodes); - } - } - private IndexShard getShardOrNull(ClusterState clusterState, ShardRouting shardRouting) { if (shardRouting == null || shardRouting.assignedToNode() == false) { return null; @@ -1528,9 +1522,7 @@ public ClusterService clusterService(@Nullable String node) { * Returns an Iterable to all instances for the given class >T< across all nodes in the cluster. */ public Iterable getInstances(Class clazz) { - synchronized (nodes) { - return nodes.values().stream().map(node -> getInstanceFromNode(clazz, node.node)).collect(Collectors.toList()); - } + return nodes.values().stream().map(node -> getInstanceFromNode(clazz, node.node)).collect(Collectors.toList()); } /** @@ -1553,7 +1545,7 @@ public Iterable getDataOrMasterNodeInstances(Class clazz) { } private Iterable getInstances(Class clazz, Predicate predicate) { - Iterable filteredNodes = currentNodes().values().stream().filter(predicate)::iterator; + Iterable filteredNodes = nodes.values().stream().filter(predicate)::iterator; List instances = new ArrayList<>(); for (NodeAndClient nodeAndClient : filteredNodes) { instances.add(getInstanceFromNode(clazz, nodeAndClient.node)); @@ -1698,21 +1690,20 @@ private void rebuildUnicastHostFiles(List newNodes) { // cannot be a synchronized method since it's called on other threads from within synchronized startAndPublishNodesAndClients() synchronized (discoveryFileMutex) { try { - synchronized (nodes) { - Stream unicastHosts = Stream.concat(nodes.values().stream(), newNodes.stream()); - List discoveryFileContents = unicastHosts.map( - nac -> nac.node.injector().getInstance(TransportService.class) - ).filter(Objects::nonNull) - .map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode) - .map(n -> n.getAddress().toString()) - .distinct().collect(Collectors.toList()); - Set configPaths = Stream.concat(nodes.values().stream(), newNodes.stream()) - .map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet()); - logger.debug("configuring discovery with {} at {}", discoveryFileContents, configPaths); - for (final Path configPath : configPaths) { - Files.createDirectories(configPath); - Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents); - } + final Collection currentNodes = nodes.values(); + Stream unicastHosts = Stream.concat(currentNodes.stream(), newNodes.stream()); + List discoveryFileContents = unicastHosts.map( + nac -> nac.node.injector().getInstance(TransportService.class) + ).filter(Objects::nonNull) + .map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode) + .map(n -> n.getAddress().toString()) + .distinct().collect(Collectors.toList()); + Set configPaths = Stream.concat(currentNodes.stream(), newNodes.stream()) + .map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet()); + logger.debug("configuring discovery with {} at {}", discoveryFileContents, configPaths); + for (final Path configPath : configPaths) { + Files.createDirectories(configPath); + Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents); } } catch (IOException e) { throw new AssertionError("failed to configure file-based discovery", e); @@ -1729,7 +1720,9 @@ private synchronized void stopNodesAndClients(Collection nodeAndC for (NodeAndClient nodeAndClient: nodeAndClients) { removeDisruptionSchemeFromNode(nodeAndClient); - NodeAndClient previous = nodes.remove(nodeAndClient.name); + final NavigableMap newNodes = new TreeMap<>(nodes); + final NodeAndClient previous = newNodes.remove(nodeAndClient.name); + nodes = Collections.unmodifiableNavigableMap(newNodes); assert previous == nodeAndClient; nodeAndClient.close(); } @@ -1787,6 +1780,7 @@ public synchronized void rollingRestart(RestartCallback callback) throws Excepti } private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception { + assert Thread.holdsLock(this); logger.info("Restarting node [{}] ", nodeAndClient.name); if (activeDisruptionScheme != null) { @@ -1806,8 +1800,11 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) nodeAndClient.startNode(); success = true; } finally { - if (success == false) - nodes.remove(nodeAndClient.name); + if (success == false) { + final NavigableMap newNodes = new TreeMap<>(nodes); + newNodes.remove(nodeAndClient.name); + nodes = Collections.unmodifiableNavigableMap(newNodes); + } } if (activeDisruptionScheme != null) { @@ -2140,12 +2137,10 @@ public List startMasterOnlyNodes(int numNodes, Settings settings) { } public List startDataOnlyNodes(int numNodes) { - Settings settings1 = Settings.builder() - .put(Settings.EMPTY) - .put(Node.NODE_MASTER_SETTING.getKey(), false) - .put(Node.NODE_DATA_SETTING.getKey(), true) - .build(); - return startNodes(numNodes, settings1); + return startNodes( + numNodes, + Settings.builder().put(Settings.EMPTY).put(Node.NODE_MASTER_SETTING.getKey(), false) + .put(Node.NODE_DATA_SETTING.getKey(), true).build()); } /** @@ -2176,7 +2171,7 @@ private static int getMinMasterNodes(int eligibleMasterNodes) { } private int getMasterNodesCount() { - return (int)nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count(); + return (int) nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count(); } public String startMasterOnlyNode() { @@ -2207,7 +2202,9 @@ public String startDataOnlyNode(Settings settings) { private synchronized void publishNode(NodeAndClient nodeAndClient) { assert !nodeAndClient.node().isClosed(); - nodes.put(nodeAndClient.name, nodeAndClient); + final NavigableMap newNodes = new TreeMap<>(nodes); + newNodes.put(nodeAndClient.name, nodeAndClient); + nodes = Collections.unmodifiableNavigableMap(newNodes); applyDisruptionSchemeToNode(nodeAndClient); } @@ -2222,11 +2219,11 @@ public int numDataNodes() { @Override public int numDataAndMasterNodes() { - return filterNodes(currentNodes(), DATA_NODE_PREDICATE.or(MASTER_NODE_PREDICATE)).size(); + return filterNodes(nodes, DATA_NODE_PREDICATE.or(MASTER_NODE_PREDICATE)).size(); } public int numMasterNodes() { - return filterNodes(currentNodes(), NodeAndClient::isMasterEligible).size(); + return filterNodes(nodes, NodeAndClient::isMasterEligible).size(); } public void setDisruptionScheme(ServiceDisruptionScheme scheme) { @@ -2268,7 +2265,7 @@ private void removeDisruptionSchemeFromNode(NodeAndClient nodeAndClient) { } private Collection dataNodeAndClients() { - return filterNodes(currentNodes(), DATA_NODE_PREDICATE); + return filterNodes(nodes, DATA_NODE_PREDICATE); } private static Collection filterNodes(Map map, @@ -2403,7 +2400,7 @@ public void ensureEstimatedStats() { // Checks that the breakers have been reset without incurring a // network request, because a network request can increment one // of the breakers - for (NodeAndClient nodeAndClient : currentNodes().values()) { + for (NodeAndClient nodeAndClient : nodes.values()) { final IndicesFieldDataCache fdCache = getInstanceFromNode(IndicesService.class, nodeAndClient.node).getIndicesFieldDataCache(); // Clean up the cache, ensuring that entries' listeners have been called From fcd056bb456326beab970aa157421985c10f38a0 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 20 Feb 2019 19:40:31 +0100 Subject: [PATCH 07/11] CR: sync client lazy init + remove sync on assertions --- .../test/InternalTestCluster.java | 55 ++++++++----------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 771ccd3e9381a..697e19df87b7d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -756,8 +756,7 @@ public synchronized Client client() { * Returns a node client to a data node in the cluster. * Note: use this with care tests should not rely on a certain nodes client. */ - public synchronized Client dataNodeClient() { - ensureOpen(); + public Client dataNodeClient() { /* Randomly return a client to one of the nodes in the cluster */ return getRandomNodeAndClient(DATA_NODE_PREDICATE).client(random); } @@ -766,8 +765,7 @@ public synchronized Client dataNodeClient() { * Returns a node client to the current master node. * Note: use this with care tests should not rely on a certain nodes client. */ - public synchronized Client masterClient() { - ensureOpen(); + public Client masterClient() { NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new NodeNamePredicate(getMasterName())); if (randomNodeAndClient != null) { return randomNodeAndClient.nodeClient(); // ensure node client master is requested @@ -778,8 +776,7 @@ public synchronized Client masterClient() { /** * Returns a node client to random node but not the master. This method will fail if no non-master client is available. */ - public synchronized Client nonMasterClient() { - ensureOpen(); + public Client nonMasterClient() { NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new NodeNamePredicate(getMasterName()).negate()); if (randomNodeAndClient != null) { return randomNodeAndClient.nodeClient(); // ensure node client non-master is requested @@ -813,7 +810,6 @@ public synchronized String startCoordinatingOnlyNode(Settings settings) { * Returns a transport client */ public synchronized Client transportClient() { - ensureOpen(); // randomly return a transport client going to one of the nodes in the cluster return getOrBuildRandomNode().transportClient(); } @@ -821,8 +817,7 @@ public synchronized Client transportClient() { /** * Returns a node client to a given node. */ - public synchronized Client client(String nodeName) { - ensureOpen(); + public Client client(String nodeName) { NodeAndClient nodeAndClient = nodes.get(nodeName); if (nodeAndClient != null) { return nodeAndClient.client(random); @@ -834,7 +829,7 @@ public synchronized Client client(String nodeName) { /** * Returns a "smart" node client to a random node in the cluster */ - public synchronized Client smartClient() { + public Client smartClient() { NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); if (randomNodeAndClient != null) { return randomNodeAndClient.nodeClient(); @@ -897,17 +892,19 @@ public boolean isMasterEligible() { } Client client(Random random) { - if (closed.get()) { - throw new RuntimeException("already closed"); - } - double nextDouble = random.nextDouble(); - if (nextDouble < transportClientRatio) { - if (logger.isTraceEnabled()) { - logger.trace("Using transport client for node [{}] sniff: [{}]", node.settings().get("node.name"), false); + synchronized (InternalTestCluster.this) { + if (closed.get()) { + throw new RuntimeException("already closed"); + } + double nextDouble = random.nextDouble(); + if (nextDouble < transportClientRatio) { + if (logger.isTraceEnabled()) { + logger.trace("Using transport client for node [{}] sniff: [{}]", node.settings().get("node.name"), false); + } + return getOrBuildTransportClient(); + } else { + return getOrBuildNodeClient(); } - return getOrBuildTransportClient(); - } else { - return getOrBuildNodeClient(); } } @@ -1260,7 +1257,7 @@ public synchronized void afterTest() { } @Override - public synchronized void beforeIndexDeletion() throws Exception { + public void beforeIndexDeletion() throws Exception { // Check that the operations counter on index shard has reached 0. // The assumption here is that after a test there are no ongoing write operations. // test that have ongoing write operations after the test (for example because ttl is used @@ -1274,7 +1271,6 @@ public synchronized void beforeIndexDeletion() throws Exception { } private void assertSameSyncIdSameDocs() { - assert Thread.holdsLock(this); Map docsOnShards = new HashMap<>(); final Collection nodesAndClients = nodes.values(); for (NodeAndClient nodeAndClient : nodesAndClients) { @@ -1303,10 +1299,8 @@ private void assertSameSyncIdSameDocs() { } private void assertNoPendingIndexOperations() throws Exception { - assert Thread.holdsLock(this); assertBusy(() -> { - final Collection nodesAndClients = nodes.values(); - for (NodeAndClient nodeAndClient : nodesAndClients) { + for (NodeAndClient nodeAndClient : nodes.values()) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { @@ -1324,10 +1318,8 @@ private void assertNoPendingIndexOperations() throws Exception { } private void assertOpenTranslogReferences() throws Exception { - assert Thread.holdsLock(this); assertBusy(() -> { - final Collection nodesAndClients = nodes.values(); - for (NodeAndClient nodeAndClient : nodesAndClients) { + for (NodeAndClient nodeAndClient : nodes.values()) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { @@ -1345,7 +1337,6 @@ private void assertOpenTranslogReferences() throws Exception { } private void assertNoSnapshottedIndexCommit() throws Exception { - assert Thread.holdsLock(this); assertBusy(() -> { for (NodeAndClient nodeAndClient : nodes.values()) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); @@ -1370,7 +1361,7 @@ private void assertNoSnapshottedIndexCommit() throws Exception { * Asserts that the document history in Lucene index is consistent with Translog's on every index shard of the cluster. * This assertion might be expensive, thus we prefer not to execute on every test but only interesting tests. */ - public synchronized void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOException { + public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOException { for (NodeAndClient nodeAndClient : nodes.values()) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { @@ -2237,6 +2228,7 @@ public void clearDisruptionScheme() { clearDisruptionScheme(true); } + // synchronized to prevent concurrently modifying the cluster. public synchronized void clearDisruptionScheme(boolean ensureHealthyCluster) { if (activeDisruptionScheme != null) { TimeValue expectedHealingTime = activeDisruptionScheme.expectedTimeToHeal(); @@ -2318,8 +2310,7 @@ synchronized String routingKeyForShard(Index index, int shard, Random random) { } @Override - public synchronized Iterable getClients() { - ensureOpen(); + public Iterable getClients() { return () -> { ensureOpen(); final Iterator iterator = nodes.values().iterator(); From 9dfa77961f77482ea161250c763463e8129dcf34 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 20 Feb 2019 19:46:26 +0100 Subject: [PATCH 08/11] CR: remove duplication --- .../elasticsearch/test/InternalTestCluster.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 697e19df87b7d..1181c738d370b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1711,9 +1711,7 @@ private synchronized void stopNodesAndClients(Collection nodeAndC for (NodeAndClient nodeAndClient: nodeAndClients) { removeDisruptionSchemeFromNode(nodeAndClient); - final NavigableMap newNodes = new TreeMap<>(nodes); - final NodeAndClient previous = newNodes.remove(nodeAndClient.name); - nodes = Collections.unmodifiableNavigableMap(newNodes); + final NodeAndClient previous = removeNode(nodeAndClient); assert previous == nodeAndClient; nodeAndClient.close(); } @@ -1792,9 +1790,7 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) success = true; } finally { if (success == false) { - final NavigableMap newNodes = new TreeMap<>(nodes); - newNodes.remove(nodeAndClient.name); - nodes = Collections.unmodifiableNavigableMap(newNodes); + removeNode(nodeAndClient); } } @@ -1815,6 +1811,13 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) } } + private NodeAndClient removeNode(NodeAndClient nodeAndClient) { + final NavigableMap newNodes = new TreeMap<>(nodes); + final NodeAndClient previous = newNodes.remove(nodeAndClient.name); + nodes = Collections.unmodifiableNavigableMap(newNodes); + return previous; + } + private Set excludeMasters(Collection nodeAndClients) { assert Thread.holdsLock(this); final Set excludedNodeIds = new HashSet<>(); From 2c03cc5aa624fedb42c7dba5d063661a571554cf Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 21 Feb 2019 09:29:38 +0100 Subject: [PATCH 09/11] CR: doc copy-on-wirte --- .../main/java/org/elasticsearch/test/InternalTestCluster.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 1181c738d370b..4c8b7318b819b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -206,7 +206,9 @@ public final class InternalTestCluster extends TestCluster { static final int DEFAULT_MIN_NUM_CLIENT_NODES = 0; static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1; - /* sorted map to make traverse order reproducible.*/ + /* Sorted map to make traverse order reproducible. + * The map of nodes is never mutated so it is safe to read without synchronization. + * Updates are intended to follow a copy-on-write approach. */ private volatile NavigableMap nodes = Collections.emptyNavigableMap(); private final Set dataDirToClean = new HashSet<>(); From cf09759d7c86cda9858df6623c202e489b6ff527 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 21 Feb 2019 09:30:54 +0100 Subject: [PATCH 10/11] CR: doc copy-on-wirte --- .../main/java/org/elasticsearch/test/InternalTestCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 4c8b7318b819b..1cc01310cd12e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -207,7 +207,7 @@ public final class InternalTestCluster extends TestCluster { static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1; /* Sorted map to make traverse order reproducible. - * The map of nodes is never mutated so it is safe to read without synchronization. + * The map of nodes is never mutated so individual reads are safe without synchronization. * Updates are intended to follow a copy-on-write approach. */ private volatile NavigableMap nodes = Collections.emptyNavigableMap(); From e649b40128097be4959de36b162eef211bd4c45c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 21 Feb 2019 10:58:08 +0100 Subject: [PATCH 11/11] CR: fix syn on node --- .../test/InternalTestCluster.java | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 1cc01310cd12e..0bd976420f07d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -894,19 +894,14 @@ public boolean isMasterEligible() { } Client client(Random random) { - synchronized (InternalTestCluster.this) { - if (closed.get()) { - throw new RuntimeException("already closed"); - } - double nextDouble = random.nextDouble(); - if (nextDouble < transportClientRatio) { - if (logger.isTraceEnabled()) { - logger.trace("Using transport client for node [{}] sniff: [{}]", node.settings().get("node.name"), false); - } - return getOrBuildTransportClient(); - } else { - return getOrBuildNodeClient(); + double nextDouble = random.nextDouble(); + if (nextDouble < transportClientRatio) { + if (logger.isTraceEnabled()) { + logger.trace("Using transport client for node [{}] sniff: [{}]", node.settings().get("node.name"), false); } + return getOrBuildTransportClient(); + } else { + return getOrBuildNodeClient(); } } @@ -925,22 +920,32 @@ Client transportClient() { } private Client getOrBuildNodeClient() { - if (nodeClient == null) { - nodeClient = node.client(); + synchronized (InternalTestCluster.this) { + if (closed.get()) { + throw new RuntimeException("already closed"); + } + if (nodeClient == null) { + nodeClient = node.client(); + } + return clientWrapper.apply(nodeClient); } - return clientWrapper.apply(nodeClient); } private Client getOrBuildTransportClient() { - if (transportClient == null) { - /* don't sniff client for now - doesn't work will all tests - * since it might throw NoNodeAvailableException if nodes are - * shut down. we first need support of transportClientRatio - * as annotations or so */ - transportClient = new TransportClientFactory(nodeConfigurationSource.transportClientSettings(), + synchronized (InternalTestCluster.this) { + if (closed.get()) { + throw new RuntimeException("already closed"); + } + if (transportClient == null) { + /* don't sniff client for now - doesn't work will all tests + * since it might throw NoNodeAvailableException if nodes are + * shut down. we first need support of transportClientRatio + * as annotations or so */ + transportClient = new TransportClientFactory(nodeConfigurationSource.transportClientSettings(), baseDir, nodeConfigurationSource.transportClientPlugins()).client(node, clusterName); + } + return clientWrapper.apply(transportClient); } - return clientWrapper.apply(transportClient); } void resetClient() { @@ -1037,6 +1042,7 @@ public void afterStart() { @Override public void close() throws IOException { + assert Thread.holdsLock(InternalTestCluster.this); try { resetClient(); } finally { @@ -1814,6 +1820,7 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) } private NodeAndClient removeNode(NodeAndClient nodeAndClient) { + assert Thread.holdsLock(this); final NavigableMap newNodes = new TreeMap<>(nodes); final NodeAndClient previous = newNodes.remove(nodeAndClient.name); nodes = Collections.unmodifiableNavigableMap(newNodes);