Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;

import java.util.AbstractMap;
Expand All @@ -38,6 +39,7 @@ public final class AutoFollowStats {
new ParseField("number_of_failed_remote_cluster_state_requests");
static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors");
static final ParseField LEADER_INDEX = new ParseField("leader_index");
static final ParseField TIMESTAMP = new ParseField("timestamp");
static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception");
static final ParseField AUTO_FOLLOWED_CLUSTERS = new ParseField("auto_followed_clusters");
static final ParseField CLUSTER_NAME = new ParseField("cluster_name");
Expand All @@ -51,7 +53,7 @@ public final class AutoFollowStats {
(Long) args[1],
(Long) args[2],
new TreeMap<>(
((List<Map.Entry<String, ElasticsearchException>>) args[3])
((List<Map.Entry<String, Tuple<Long, ElasticsearchException>>>) args[3])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
new TreeMap<>(
Expand All @@ -60,10 +62,10 @@ public final class AutoFollowStats {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
));

private static final ConstructingObjectParser<Map.Entry<String, ElasticsearchException>, Void> AUTO_FOLLOW_EXCEPTIONS_PARSER =
static final ConstructingObjectParser<Map.Entry<String, Tuple<Long, ElasticsearchException>>, Void> AUTO_FOLLOW_EXCEPTIONS_PARSER =
new ConstructingObjectParser<>(
"auto_follow_stats_errors",
args -> new AbstractMap.SimpleEntry<>((String) args[0], (ElasticsearchException) args[1]));
args -> new AbstractMap.SimpleEntry<>((String) args[0], Tuple.tuple((Long) args[1], (ElasticsearchException) args[2])));

private static final ConstructingObjectParser<Map.Entry<String, AutoFollowedCluster>, Void> AUTO_FOLLOWED_CLUSTERS_PARSER =
new ConstructingObjectParser<>(
Expand All @@ -72,6 +74,7 @@ public final class AutoFollowStats {

static {
AUTO_FOLLOW_EXCEPTIONS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
AUTO_FOLLOW_EXCEPTIONS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIMESTAMP);
AUTO_FOLLOW_EXCEPTIONS_PARSER.declareObject(
ConstructingObjectParser.constructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
Expand All @@ -93,13 +96,13 @@ public final class AutoFollowStats {
private final long numberOfFailedFollowIndices;
private final long numberOfFailedRemoteClusterStateRequests;
private final long numberOfSuccessfulFollowIndices;
private final NavigableMap<String, ElasticsearchException> recentAutoFollowErrors;
private final NavigableMap<String, Tuple<Long, ElasticsearchException>> recentAutoFollowErrors;
private final NavigableMap<String, AutoFollowedCluster> autoFollowedClusters;

AutoFollowStats(long numberOfFailedFollowIndices,
long numberOfFailedRemoteClusterStateRequests,
long numberOfSuccessfulFollowIndices,
NavigableMap<String, ElasticsearchException> recentAutoFollowErrors,
NavigableMap<String, Tuple<Long, ElasticsearchException>> recentAutoFollowErrors,
NavigableMap<String, AutoFollowedCluster> autoFollowedClusters) {
this.numberOfFailedFollowIndices = numberOfFailedFollowIndices;
this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests;
Expand All @@ -120,7 +123,7 @@ public long getNumberOfSuccessfulFollowIndices() {
return numberOfSuccessfulFollowIndices;
}

public NavigableMap<String, ElasticsearchException> getRecentAutoFollowErrors() {
public NavigableMap<String, Tuple<Long, ElasticsearchException>> getRecentAutoFollowErrors() {
return recentAutoFollowErrors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,18 @@ private static void assertEqualInstances(CcrStatsResponse expectedInstance, CcrS
equalTo(expectedAutoFollowStats.getRecentAutoFollowErrors().size()));
assertThat(newAutoFollowStats.getRecentAutoFollowErrors().keySet(),
equalTo(expectedAutoFollowStats.getRecentAutoFollowErrors().keySet()));
for (final Map.Entry<String, ElasticsearchException> entry : newAutoFollowStats.getRecentAutoFollowErrors().entrySet()) {
for (final Map.Entry<String, Tuple<Long, ElasticsearchException>> entry :
newAutoFollowStats.getRecentAutoFollowErrors().entrySet()) {
// x-content loses the exception
final ElasticsearchException expected = expectedAutoFollowStats.getRecentAutoFollowErrors().get(entry.getKey());
assertThat(entry.getValue().getMessage(), containsString(expected.getMessage()));
assertNotNull(entry.getValue().getCause());
final Tuple<Long, ElasticsearchException> expected =
expectedAutoFollowStats.getRecentAutoFollowErrors().get(entry.getKey());
assertThat(entry.getValue().v2().getMessage(), containsString(expected.v2().getMessage()));
assertThat(entry.getValue().v1(), equalTo(expected.v1()));
assertNotNull(entry.getValue().v2().getCause());
assertThat(
entry.getValue().getCause(),
entry.getValue().v2().getCause(),
anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class)));
assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage()));
assertThat(entry.getValue().v2().getCause().getMessage(), containsString(expected.v2().getCause().getMessage()));
}
}
{
Expand Down Expand Up @@ -172,14 +175,16 @@ private static void toXContent(CcrStatsResponse response, XContentBuilder builde
builder.field(AutoFollowStats.NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED.getPreferredName(),
autoFollowStats.getNumberOfFailedFollowIndices());
builder.startArray(AutoFollowStats.RECENT_AUTO_FOLLOW_ERRORS.getPreferredName());
for (Map.Entry<String, ElasticsearchException> entry : autoFollowStats.getRecentAutoFollowErrors().entrySet()) {
for (Map.Entry<String, Tuple<Long, ElasticsearchException>> entry :
autoFollowStats.getRecentAutoFollowErrors().entrySet()) {
builder.startObject();
{
builder.field(AutoFollowStats.LEADER_INDEX.getPreferredName(), entry.getKey());
builder.field(AutoFollowStats.TIMESTAMP.getPreferredName(), entry.getValue().v1());
builder.field(AutoFollowStats.AUTO_FOLLOW_EXCEPTION.getPreferredName());
builder.startObject();
{
ElasticsearchException.generateThrowableXContent(builder, ToXContent.EMPTY_PARAMS, entry.getValue());
ElasticsearchException.generateThrowableXContent(builder, ToXContent.EMPTY_PARAMS, entry.getValue().v2());
}
builder.endObject();
}
Expand Down Expand Up @@ -325,9 +330,10 @@ private static CcrStatsResponse createTestInstance() {

private static AutoFollowStats randomAutoFollowStats() {
final int count = randomIntBetween(0, 16);
final NavigableMap<String, ElasticsearchException> readExceptions = new TreeMap<>();
final NavigableMap<String, Tuple<Long, ElasticsearchException>> readExceptions = new TreeMap<>();
for (int i = 0; i < count; i++) {
readExceptions.put("" + i, new ElasticsearchException(new IllegalStateException("index [" + i + "]")));
readExceptions.put("" + i, Tuple.tuple(randomNonNegativeLong(),
new ElasticsearchException(new IllegalStateException("index [" + i + "]"))));
}
final NavigableMap<String, AutoFollowedCluster> autoFollowClusters = new TreeMap<>();
for (int i = 0; i < count; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ public Collection<Object> createComponents(
ccrLicenseChecker,
restoreSourceService,
new CcrRepositoryManager(settings, clusterService, client),
new AutoFollowCoordinator(settings, client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
new AutoFollowCoordinator(settings, client, clusterService, ccrLicenseChecker,
threadPool::relativeTimeInMillis, threadPool::absoluteTimeInMillis)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private final ClusterService clusterService;
private final CcrLicenseChecker ccrLicenseChecker;
private final LongSupplier relativeMillisTimeProvider;
private final LongSupplier absoluteMillisTimeProvider;

private volatile TimeValue waitForMetadataTimeOut;
private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();
Expand All @@ -82,23 +83,25 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private long numberOfSuccessfulIndicesAutoFollowed = 0;
private long numberOfFailedIndicesAutoFollowed = 0;
private long numberOfFailedRemoteClusterStateRequests = 0;
private final LinkedHashMap<String, ElasticsearchException> recentAutoFollowErrors;
private final LinkedHashMap<String, Tuple<Long, ElasticsearchException>> recentAutoFollowErrors;

public AutoFollowCoordinator(
Settings settings,
Client client,
ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker,
LongSupplier relativeMillisTimeProvider) {
Settings settings,
Client client,
ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker,
LongSupplier relativeMillisTimeProvider,
LongSupplier absoluteMillisTimeProvider) {

this.client = client;
this.clusterService = clusterService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
this.relativeMillisTimeProvider = relativeMillisTimeProvider;
this.absoluteMillisTimeProvider = absoluteMillisTimeProvider;
clusterService.addListener(this);
this.recentAutoFollowErrors = new LinkedHashMap<String, ElasticsearchException>() {
this.recentAutoFollowErrors = new LinkedHashMap<String, Tuple<Long, ElasticsearchException>>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchException> eldest) {
protected boolean removeEldestEntry(final Map.Entry<String, Tuple<Long, ElasticsearchException>> eldest) {
return size() > MAX_AUTO_FOLLOW_ERRORS;
}
};
Expand Down Expand Up @@ -138,10 +141,11 @@ public synchronized AutoFollowStats getStats() {
}

synchronized void updateStats(List<AutoFollowResult> results) {
long newStatsReceivedTimeStamp = absoluteMillisTimeProvider.getAsLong();
for (AutoFollowResult result : results) {
if (result.clusterStateFetchException != null) {
recentAutoFollowErrors.put(result.autoFollowPatternName,
new ElasticsearchException(result.clusterStateFetchException));
Tuple.tuple(newStatsReceivedTimeStamp, new ElasticsearchException(result.clusterStateFetchException)));
numberOfFailedRemoteClusterStateRequests++;
LOGGER.warn(new ParameterizedMessage("failure occurred while fetching cluster state for auto follow pattern [{}]",
result.autoFollowPatternName), result.clusterStateFetchException);
Expand All @@ -150,7 +154,7 @@ synchronized void updateStats(List<AutoFollowResult> results) {
if (entry.getValue() != null) {
numberOfFailedIndicesAutoFollowed++;
recentAutoFollowErrors.put(result.autoFollowPatternName + ":" + entry.getKey().getName(),
ExceptionsHelper.convertToElastic(entry.getValue()));
Tuple.tuple(newStatsReceivedTimeStamp, ExceptionsHelper.convertToElastic(entry.getValue())));
LOGGER.warn(new ParameterizedMessage("failure occurred while auto following index [{}] for auto follow " +
"pattern [{}]", entry.getKey(), result.autoFollowPatternName), entry.getValue());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,12 @@ public void testConflictingPatterns() throws Exception {
assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(0L));

assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(2));
ElasticsearchException autoFollowError1 = autoFollowStats.getRecentAutoFollowErrors().get("my-pattern1:logs-201801");
ElasticsearchException autoFollowError1 = autoFollowStats.getRecentAutoFollowErrors().get("my-pattern1:logs-201801").v2();
assertThat(autoFollowError1, notNullValue());
assertThat(autoFollowError1.getRootCause().getMessage(), equalTo("index to follow [logs-201801] for pattern [my-pattern1] " +
"matches with other patterns [my-pattern2]"));

ElasticsearchException autoFollowError2 = autoFollowStats.getRecentAutoFollowErrors().get("my-pattern2:logs-201801");
ElasticsearchException autoFollowError2 = autoFollowStats.getRecentAutoFollowErrors().get("my-pattern2:logs-201801").v2();
assertThat(autoFollowError2, notNullValue());
assertThat(autoFollowError2.getRootCause().getMessage(), equalTo("index to follow [logs-201801] for pattern [my-pattern2] " +
"matches with other patterns [my-pattern1]"));
Expand All @@ -311,7 +311,7 @@ public void testAutoFollowSoftDeletesDisabled() throws Exception {
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L));
assertThat(autoFollowStats.getNumberOfFailedFollowIndices(), equalTo(1L));
assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(1));
ElasticsearchException failure = autoFollowStats.getRecentAutoFollowErrors().firstEntry().getValue();
ElasticsearchException failure = autoFollowStats.getRecentAutoFollowErrors().firstEntry().getValue().v2();
assertThat(failure.getMessage(), equalTo("index [logs-20200101] cannot be followed, " +
"because soft deletes are not enabled"));
IndicesExistsRequest request = new IndicesExistsRequest("copy-logs-20200101");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ public void testStats() {
null,
mockClusterService(),
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);
() -> 1L, () -> 1L);

autoFollowCoordinator.updateStats(Collections.singletonList(
new AutoFollowCoordinator.AutoFollowResult("_alias1"))
Expand All @@ -558,7 +558,7 @@ public void testStats() {
assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L));
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L));
assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(1));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error"));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").v2().getCause().getMessage(), equalTo("error"));

autoFollowCoordinator.updateStats(Arrays.asList(
new AutoFollowCoordinator.AutoFollowResult("_alias1",
Expand All @@ -571,9 +571,9 @@ public void testStats() {
assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L));
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L));
assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(3));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error"));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1:index1").getCause().getMessage(), equalTo("error"));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error"));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").v2().getCause().getMessage(), equalTo("error"));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1:index1").v2().getCause().getMessage(), equalTo("error"));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").v2().getCause().getMessage(), equalTo("error"));

autoFollowCoordinator.updateStats(Arrays.asList(
new AutoFollowCoordinator.AutoFollowResult("_alias1",
Expand All @@ -586,9 +586,9 @@ public void testStats() {
assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L));
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(2L));
assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(3));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error"));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1:index1").getCause().getMessage(), equalTo("error"));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error"));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").v2().getCause().getMessage(), equalTo("error"));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1:index1").v2().getCause().getMessage(), equalTo("error"));
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").v2().getCause().getMessage(), equalTo("error"));
}

public void testUpdateAutoFollowers() {
Expand All @@ -604,7 +604,7 @@ public void testUpdateAutoFollowers() {
null,
clusterService,
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);
() -> 1L, () -> 1L);
// Add 3 patterns:
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, null, null,
Expand Down Expand Up @@ -659,7 +659,7 @@ public void testUpdateAutoFollowersNoPatterns() {
null,
mockClusterService(),
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);
() -> 1L, () -> 1L);
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())))
Expand All @@ -674,7 +674,7 @@ public void testUpdateAutoFollowersNoAutoFollowMetadata() {
null,
mockClusterService(),
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);
() -> 1L, () -> 1L);
ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build();
autoFollowCoordinator.updateAutoFollowers(clusterState);
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0));
Expand Down
Loading