Skip to content

Commit 19ad0d8

Browse files
authored
Fix GeoIpDownloader startup during rolling upgrade (#84000) (#84032)
If rolling upgrade was used from version prior GeoIPv2 (<`7.14`) then geoip downloader wouldn't be started so no new databases were downloaded. This is especially troubling in `8.x` as we no longer provide default databases inside ES so after upgrade no geoip enrichment can take place until downloader is started with workaround (setting `ingest.geoip.downloader.enabled` to `false` and `true` again). This is because logic that was used to lower number of requests / cluster update listeners at the startup was too optimistic about order of actions / who can be elected master at what time. This change fixes that and also cleans up logs when there are some ignorable errors and adds debug logging on start and stop of the task to ease up troubleshooting. It also adds rolling upgrade test to make sure the fix works.
1 parent 04f4fb6 commit 19ad0d8

File tree

5 files changed

+81
-16
lines changed

5 files changed

+81
-16
lines changed

docs/changelog/84000.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 84000
2+
summary: Fix `GeoIpDownloader` startup during rolling upgrade
3+
area: Ingest
4+
type: bug
5+
issues: []

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,15 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
6666
Property.Dynamic,
6767
Property.NodeScope
6868
);
69+
70+
// for overriding in tests
71+
private static final String DEFAULT_ENDPOINT = System.getProperty(
72+
"ingest.geoip.downloader.endpoint.default",
73+
"https://geoip.elastic.co/v1/database"
74+
);
6975
public static final Setting<String> ENDPOINT_SETTING = Setting.simpleString(
7076
"ingest.geoip.downloader.endpoint",
71-
"https://geoip.elastic.co/v1/database",
77+
DEFAULT_ENDPOINT,
7278
Property.NodeScope
7379
);
7480

@@ -258,6 +264,7 @@ void runDownloader() {
258264
try {
259265
updateDatabases();
260266
} catch (Exception e) {
267+
stats = stats.failedDownload();
261268
logger.error("exception during geoip databases update", e);
262269
}
263270
try {

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.elasticsearch.ResourceAlreadyExistsException;
1414
import org.elasticsearch.ResourceNotFoundException;
15+
import org.elasticsearch.Version;
1516
import org.elasticsearch.action.ActionListener;
1617
import org.elasticsearch.client.Client;
1718
import org.elasticsearch.client.OriginSettingClient;
1819
import org.elasticsearch.cluster.ClusterChangedEvent;
1920
import org.elasticsearch.cluster.ClusterStateListener;
21+
import org.elasticsearch.cluster.node.DiscoveryNode;
2022
import org.elasticsearch.cluster.service.ClusterService;
2123
import org.elasticsearch.common.settings.Setting;
2224
import org.elasticsearch.common.settings.Settings;
@@ -29,6 +31,7 @@
2931
import org.elasticsearch.persistent.PersistentTasksService;
3032
import org.elasticsearch.tasks.TaskId;
3133
import org.elasticsearch.threadpool.ThreadPool;
34+
import org.elasticsearch.transport.RemoteTransportException;
3235

3336
import java.util.Map;
3437
import java.util.concurrent.atomic.AtomicReference;
@@ -128,14 +131,18 @@ public void clusterChanged(ClusterChangedEvent event) {
128131
// wait for state recovered
129132
return;
130133
}
131-
// bootstrap downloader after first cluster start
134+
135+
DiscoveryNode masterNode = event.state().nodes().getMasterNode();
136+
if (masterNode == null || masterNode.getVersion().before(Version.V_7_14_0)) {
137+
// wait for master to be upgraded so it understands geoip task
138+
return;
139+
}
140+
132141
clusterService.removeListener(this);
133-
if (event.localNodeMaster()) {
134-
if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) {
135-
startTask(() -> clusterService.addListener(this));
136-
} else {
137-
stopTask(() -> clusterService.addListener(this));
138-
}
142+
if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) {
143+
startTask(() -> clusterService.addListener(this));
144+
} else {
145+
stopTask(() -> clusterService.addListener(this));
139146
}
140147
}
141148

@@ -144,8 +151,9 @@ private void startTask(Runnable onFailure) {
144151
GEOIP_DOWNLOADER,
145152
GEOIP_DOWNLOADER,
146153
new GeoIpTaskParams(),
147-
ActionListener.wrap(r -> {}, e -> {
148-
if (e instanceof ResourceAlreadyExistsException == false) {
154+
ActionListener.wrap(r -> logger.debug("Started geoip downloader task"), e -> {
155+
Throwable t = e instanceof RemoteTransportException ? e.getCause() : e;
156+
if (t instanceof ResourceAlreadyExistsException == false) {
149157
logger.error("failed to create geoip downloader task", e);
150158
onFailure.run();
151159
}
@@ -154,18 +162,23 @@ private void startTask(Runnable onFailure) {
154162
}
155163

156164
private void stopTask(Runnable onFailure) {
157-
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = ActionListener.wrap(r -> {}, e -> {
158-
if (e instanceof ResourceNotFoundException == false) {
159-
logger.error("failed to remove geoip downloader task", e);
160-
onFailure.run();
165+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = ActionListener.wrap(
166+
r -> logger.debug("Stopped geoip downloader task"),
167+
e -> {
168+
Throwable t = e instanceof RemoteTransportException ? e.getCause() : e;
169+
if (t instanceof ResourceNotFoundException == false) {
170+
logger.error("failed to remove geoip downloader task", e);
171+
onFailure.run();
172+
}
161173
}
162-
});
174+
);
163175
persistentTasksService.sendRemoveRequest(
164176
GEOIP_DOWNLOADER,
165177
ActionListener.runAfter(
166178
listener,
167179
() -> client.admin().indices().prepareDelete(DATABASES_INDEX).execute(ActionListener.wrap(rr -> {}, e -> {
168-
if (e instanceof ResourceNotFoundException == false) {
180+
Throwable t = e instanceof RemoteTransportException ? e.getCause() : e;
181+
if (t instanceof ResourceNotFoundException == false) {
169182
logger.warn("failed to remove " + DATABASES_INDEX, e);
170183
}
171184
}))

x-pack/qa/rolling-upgrade/build.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ BuildParams.bwcVersions.withWireCompatible { bwcVersion, baseName ->
4141
versions = [oldVersion, project.version]
4242
numberOfNodes = 3
4343

44+
systemProperty 'ingest.geoip.downloader.enabled.default', 'true'
45+
//we don't want to hit real service from each test
46+
systemProperty 'ingest.geoip.downloader.endpoint.default', 'http://invalid.endpoint'
47+
if (bwcVersion.onOrAfter('7.14.0')) {
48+
setting 'ingest.geoip.downloader.endpoint', 'http://invalid.endpoint'
49+
}
50+
4451
setting 'repositories.url.allowed_urls', 'http://snapshot.test*'
4552
setting 'path.repo', "['${buildDir}/cluster/shared/repo/${baseName}', '${searchableSnapshotRepository}']"
4653
setting 'xpack.license.self_generated.type', 'trial'
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.upgrades;
9+
10+
import org.apache.http.util.EntityUtils;
11+
import org.elasticsearch.client.Request;
12+
import org.elasticsearch.client.Response;
13+
import org.hamcrest.Matchers;
14+
15+
import java.nio.charset.StandardCharsets;
16+
17+
public class GeoIpUpgradeIT extends AbstractUpgradeTestCase {
18+
19+
public void testGeoIpDownloader() throws Exception {
20+
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
21+
assertBusy(() -> {
22+
Response response = client().performRequest(new Request("GET", "_cat/tasks"));
23+
String tasks = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
24+
assertThat(tasks, Matchers.containsString("geoip-downloader"));
25+
});
26+
assertBusy(() -> {
27+
Response response = client().performRequest(new Request("GET", "_ingest/geoip/stats"));
28+
String tasks = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
29+
assertThat(tasks, Matchers.containsString("failed_downloads\":1"));
30+
});
31+
}
32+
}
33+
}

0 commit comments

Comments
 (0)