diff --git a/docs/changelog/84000.yaml b/docs/changelog/84000.yaml new file mode 100644 index 0000000000000..b24d357834e21 --- /dev/null +++ b/docs/changelog/84000.yaml @@ -0,0 +1,5 @@ +pr: 84000 +summary: Fix `GeoIpDownloader` startup during rolling upgrade +area: Ingest +type: bug +issues: [] diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java index 4d1c594ab7b7c..5ec08891981f6 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -66,9 +66,15 @@ public class GeoIpDownloader extends AllocatedPersistentTask { Property.Dynamic, Property.NodeScope ); + + // for overriding in tests + private static final String DEFAULT_ENDPOINT = System.getProperty( + "ingest.geoip.downloader.endpoint.default", + "https://geoip.elastic.co/v1/database" + ); public static final Setting ENDPOINT_SETTING = Setting.simpleString( "ingest.geoip.downloader.endpoint", - "https://geoip.elastic.co/v1/database", + DEFAULT_ENDPOINT, Property.NodeScope ); @@ -258,6 +264,7 @@ void runDownloader() { try { updateDatabases(); } catch (Exception e) { + stats = stats.failedDownload(); logger.error("exception during geoip databases update", e); } try { diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 16cb86953003e..9d65b17bacc5e 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -12,11 +12,13 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -29,6 +31,7 @@ import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteTransportException; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -128,14 +131,18 @@ public void clusterChanged(ClusterChangedEvent event) { // wait for state recovered return; } - // bootstrap downloader after first cluster start + + DiscoveryNode masterNode = event.state().nodes().getMasterNode(); + if (masterNode == null || masterNode.getVersion().before(Version.V_7_14_0)) { + // wait for master to be upgraded so it understands geoip task + return; + } + clusterService.removeListener(this); - if (event.localNodeMaster()) { - if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) { - startTask(() -> clusterService.addListener(this)); - } else { - stopTask(() -> clusterService.addListener(this)); - } + if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) { + startTask(() -> clusterService.addListener(this)); + } else { + stopTask(() -> clusterService.addListener(this)); } } @@ -144,8 +151,9 @@ private void startTask(Runnable onFailure) { GEOIP_DOWNLOADER, GEOIP_DOWNLOADER, new GeoIpTaskParams(), - ActionListener.wrap(r -> {}, e -> { - if (e instanceof ResourceAlreadyExistsException == false) { + ActionListener.wrap(r -> logger.debug("Started geoip downloader task"), e -> { + Throwable t = e instanceof RemoteTransportException ? e.getCause() : e; + if (t instanceof ResourceAlreadyExistsException == false) { logger.error("failed to create geoip downloader task", e); onFailure.run(); } @@ -154,18 +162,23 @@ private void startTask(Runnable onFailure) { } private void stopTask(Runnable onFailure) { - ActionListener> listener = ActionListener.wrap(r -> {}, e -> { - if (e instanceof ResourceNotFoundException == false) { - logger.error("failed to remove geoip downloader task", e); - onFailure.run(); + ActionListener> listener = ActionListener.wrap( + r -> logger.debug("Stopped geoip downloader task"), + e -> { + Throwable t = e instanceof RemoteTransportException ? e.getCause() : e; + if (t instanceof ResourceNotFoundException == false) { + logger.error("failed to remove geoip downloader task", e); + onFailure.run(); + } } - }); + ); persistentTasksService.sendRemoveRequest( GEOIP_DOWNLOADER, ActionListener.runAfter( listener, () -> client.admin().indices().prepareDelete(DATABASES_INDEX).execute(ActionListener.wrap(rr -> {}, e -> { - if (e instanceof ResourceNotFoundException == false) { + Throwable t = e instanceof RemoteTransportException ? e.getCause() : e; + if (t instanceof ResourceNotFoundException == false) { logger.warn("failed to remove " + DATABASES_INDEX, e); } })) diff --git a/x-pack/qa/rolling-upgrade/build.gradle b/x-pack/qa/rolling-upgrade/build.gradle index 93a9a99ce3e3f..a6db46c9d0d10 100644 --- a/x-pack/qa/rolling-upgrade/build.gradle +++ b/x-pack/qa/rolling-upgrade/build.gradle @@ -41,6 +41,13 @@ BuildParams.bwcVersions.withWireCompatible { bwcVersion, baseName -> versions = [oldVersion, project.version] numberOfNodes = 3 + systemProperty 'ingest.geoip.downloader.enabled.default', 'true' + //we don't want to hit real service from each test + systemProperty 'ingest.geoip.downloader.endpoint.default', 'http://invalid.endpoint' + if (bwcVersion.onOrAfter('7.14.0')) { + setting 'ingest.geoip.downloader.endpoint', 'http://invalid.endpoint' + } + setting 'repositories.url.allowed_urls', 'http://snapshot.test*' setting 'path.repo', "['${buildDir}/cluster/shared/repo/${baseName}', '${searchableSnapshotRepository}']" setting 'xpack.license.self_generated.type', 'trial' diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java new file mode 100644 index 0000000000000..3dedd041d6465 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.upgrades; + +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.hamcrest.Matchers; + +import java.nio.charset.StandardCharsets; + +public class GeoIpUpgradeIT extends AbstractUpgradeTestCase { + + public void testGeoIpDownloader() throws Exception { + if (CLUSTER_TYPE == ClusterType.UPGRADED) { + assertBusy(() -> { + Response response = client().performRequest(new Request("GET", "_cat/tasks")); + String tasks = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + assertThat(tasks, Matchers.containsString("geoip-downloader")); + }); + assertBusy(() -> { + Response response = client().performRequest(new Request("GET", "_ingest/geoip/stats")); + String tasks = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + assertThat(tasks, Matchers.containsString("failed_downloads\":1")); + }); + } + } +}