Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
new AutoFollowCoordinator(settings, client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.XPackSettings;

import java.util.Arrays;
Expand All @@ -28,6 +29,12 @@ private CcrSettings() {
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);

/**
* Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using.
*/
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT = Setting.timeSetting(
"ccr.auto_follow_coordinator.wait_for_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we come up with another name which indicates that this is a timeout setting for wait_for_metadata_version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is wait_for_metadata_timeout a better name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's better :). I think we should also drop coordinator in the setting name.


/**
* The settings defined by CCR.
*
Expand All @@ -36,7 +43,8 @@ private CcrSettings() {
static List<Setting<?>> getSettings() {
return Arrays.asList(
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING);
CCR_FOLLOWING_INDEX_SETTING,
CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private final CcrLicenseChecker ccrLicenseChecker;
private final LongSupplier relativeMillisTimeProvider;

private volatile TimeValue waitForTimeOut;
private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();

// The following fields are read and updated under a lock:
Expand All @@ -81,6 +84,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private final LinkedHashMap<String, ElasticsearchException> recentAutoFollowErrors;

public AutoFollowCoordinator(
Settings settings,
Client client,
ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker,
Expand All @@ -97,6 +101,15 @@ protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchExcepti
return size() > MAX_AUTO_FOLLOW_ERRORS;
}
};

Consumer<TimeValue> updater = newWaitForTimeOut -> {
if (newWaitForTimeOut.equals(waitForTimeOut) == false) {
LOGGER.info("changing wait_for_timeout from [{}] to [{}]", waitForTimeOut, newWaitForTimeOut);
waitForTimeOut = newWaitForTimeOut;
}
};
clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT, updater);
waitForTimeOut = CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT.get(settings);
}

public synchronized AutoFollowStats getStats() {
Expand Down Expand Up @@ -180,6 +193,7 @@ void getRemoteClusterState(final String remoteCluster,
request.metaData(true);
request.routingTable(true);
request.waitForMetaDataVersion(metadataVersion);
request.waitForTimeout(waitForTimeOut);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.core.XPackSettings;
Expand Down Expand Up @@ -195,6 +196,8 @@ private NodeConfigurationSource createNodeConfigurationSource() {
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
return new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
Expand Down Expand Up @@ -41,6 +42,8 @@ protected Settings nodeSettings() {
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
Expand Down Expand Up @@ -530,8 +532,9 @@ public void testGetFollowerIndexName() {

public void testStats() {
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
Settings.EMPTY,
null,
mock(ClusterService.class),
mockClusterService(),
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);

Expand Down Expand Up @@ -586,14 +589,15 @@ public void testStats() {
}

public void testUpdateAutoFollowers() {
ClusterService clusterService = mock(ClusterService.class);
ClusterService clusterService = mockClusterService();
// Return a cluster state with no patterns so that the auto followers never really execute:
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())))
.build();
when(clusterService.state()).thenReturn(followerState);
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
Settings.EMPTY,
null,
clusterService,
new CcrLicenseChecker(() -> true, () -> false),
Expand Down Expand Up @@ -648,8 +652,9 @@ public void testUpdateAutoFollowers() {

public void testUpdateAutoFollowersNoPatterns() {
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
Settings.EMPTY,
null,
mock(ClusterService.class),
mockClusterService(),
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
Expand All @@ -662,8 +667,9 @@ public void testUpdateAutoFollowersNoPatterns() {

public void testUpdateAutoFollowersNoAutoFollowMetadata() {
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
Settings.EMPTY,
null,
mock(ClusterService.class),
mockClusterService(),
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);
ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build();
Expand Down Expand Up @@ -840,4 +846,12 @@ private static Supplier<ClusterState> localClusterStateSupplier(ClusterState...
};
}

private ClusterService mockClusterService() {
ClusterService clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings =
new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
return clusterService;
}

}