diff --git a/docs/changelog/92335.yaml b/docs/changelog/92335.yaml new file mode 100644 index 0000000000000..9dc21fdcdc511 --- /dev/null +++ b/docs/changelog/92335.yaml @@ -0,0 +1,6 @@ +pr: 92335 +summary: Download the geoip databases only when needed +area: Ingest Node +type: bug +issues: + - 90673 diff --git a/docs/reference/ingest/processors/geoip.asciidoc b/docs/reference/ingest/processors/geoip.asciidoc index fa31a3bbe2543..d39f3be82d2b0 100644 --- a/docs/reference/ingest/processors/geoip.asciidoc +++ b/docs/reference/ingest/processors/geoip.asciidoc @@ -11,7 +11,10 @@ IPv4 or IPv6 address. By default, the processor uses the GeoLite2 City, GeoLite2 Country, and GeoLite2 ASN GeoIP2 databases from http://dev.maxmind.com/geoip/geoip2/geolite2/[MaxMind], shared under the -CC BY-SA 4.0 license. {es} automatically downloads updates for +CC BY-SA 4.0 license. It automatically downloads these databases if either +`ingest.geoip.downloader.eager.download` is set to true, or your cluster +has at least one pipeline with a `geoip` processor. {es} +automatically downloads updates for these databases from the Elastic GeoIP endpoint: https://geoip.elastic.co/v1/database. To get download statistics for these updates, use the <>. @@ -412,6 +415,13 @@ If `true`, {es} automatically downloads and manages updates for GeoIP2 databases from the `ingest.geoip.downloader.endpoint`. If `false`, {es} does not download updates and deletes all downloaded databases. Defaults to `true`. +[[ingest-geoip-downloader-eager-download]] +(<>, Boolean) +If `true`, {es} downloads GeoIP2 databases immediately, regardless of whether a +pipeline exists with a geoip processor. If `false`, {es} only begins downloading +the databases if a pipeline with a geoip processor exists or is added. Defaults +to `false`. + [[ingest-geoip-downloader-endpoint]] `ingest.geoip.downloader.endpoint`:: (<>, string) diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java index 0e164cab818b2..f3f37f50147fb 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java @@ -27,10 +27,13 @@ import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.search.SearchHit; @@ -51,11 +54,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -82,7 +87,12 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT { @Override protected Collection> nodePlugins() { - return Arrays.asList(ReindexPlugin.class, IngestGeoIpPlugin.class, GeoIpProcessorNonIngestNodeIT.IngestGeoIpSettingsPlugin.class); + return Arrays.asList( + ReindexPlugin.class, + IngestGeoIpPlugin.class, + GeoIpProcessorNonIngestNodeIT.IngestGeoIpSettingsPlugin.class, + NonGeoProcessorsPlugin.class + ); } @Override @@ -104,7 +114,7 @@ public void cleanUp() throws Exception { .setPersistentSettings( Settings.builder() .putNull(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey()) - .putNull(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey()) + .putNull(GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getKey()) .putNull("ingest.geoip.database_validity") ) .get(); @@ -149,6 +159,7 @@ public void cleanUp() throws Exception { @TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/75221") public void testInvalidTimestamp() throws Exception { assumeTrue("only test with fixture to have stable results", ENDPOINT != null); + putGeoIpPipeline(); ClusterUpdateSettingsResponse settingsResponse = client().admin() .cluster() .prepareUpdateSettings() @@ -160,7 +171,7 @@ public void testInvalidTimestamp() throws Exception { assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet()); }, 2, TimeUnit.MINUTES); - putPipeline(); + putGeoIpPipeline(); verifyUpdatedDatabase(); settingsResponse = client().admin() @@ -172,7 +183,9 @@ public void testInvalidTimestamp() throws Exception { settingsResponse = client().admin() .cluster() .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2))) + .setPersistentSettings( + Settings.builder().put(GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2)) + ) .get(); assertTrue(settingsResponse.isAcknowledged()); List geoIpTmpDirs = getGeoIpTmpDirs(); @@ -186,7 +199,7 @@ public void testInvalidTimestamp() throws Exception { } } }); - putPipeline(); + putGeoIpPipeline(); assertBusy(() -> { SimulateDocumentBaseResult result = simulatePipeline(); assertThat(result.getFailure(), nullValue()); @@ -221,7 +234,9 @@ public void testUpdatedTimestamp() throws Exception { ClusterUpdateSettingsResponse settingsResponse = client().admin() .cluster() .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2))) + .setPersistentSettings( + Settings.builder().put(GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2)) + ) .get(); assertTrue(settingsResponse.isAcknowledged()); assertBusy(() -> assertNotEquals(lastCheck, getGeoIpTaskState().getDatabases().get("GeoLite2-ASN.mmdb").lastCheck())); @@ -229,6 +244,7 @@ public void testUpdatedTimestamp() throws Exception { } public void testGeoIpDatabasesDownload() throws Exception { + putGeoIpPipeline(); ClusterUpdateSettingsResponse settingsResponse = client().admin() .cluster() .prepareUpdateSettings() @@ -283,12 +299,34 @@ public void testGeoIpDatabasesDownload() throws Exception { } } + public void testGeoIpDatabasesDownloadNoGeoipProcessors() throws Exception { + assumeTrue("only test with fixture to have stable results", ENDPOINT != null); + String pipelineId = randomAlphaOfLength(10); + putGeoIpPipeline(pipelineId); + ClusterUpdateSettingsResponse settingsResponse = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true)) + .get(); + assertTrue(settingsResponse.isAcknowledged()); + assertBusy(() -> { assertNull(getTask().getState()); }); + putNonGeoipPipeline(pipelineId); + assertBusy(() -> { assertNull(getTask().getState()); }); + putNonGeoipPipeline(pipelineId); + assertNull(getTask().getState()); + putGeoIpPipeline(); + assertBusy(() -> { + GeoIpTaskState state = getGeoIpTaskState(); + assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet()); + }, 2, TimeUnit.MINUTES); + } + @TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972") public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception { assumeTrue("only test with fixture to have stable results", ENDPOINT != null); setupDatabasesInConfigDirectory(); // setup: - putPipeline(); + putGeoIpPipeline(); // verify before updating dbs { @@ -355,7 +393,7 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception { @TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/79074") public void testStartWithNoDatabases() throws Exception { assumeTrue("only test with fixture to have stable results", ENDPOINT != null); - putPipeline(); + putGeoIpPipeline(); // Behaviour without any databases loaded: { @@ -438,7 +476,21 @@ private SimulateDocumentBaseResult simulatePipeline() throws IOException { return (SimulateDocumentBaseResult) simulateResponse.getResults().get(0); } - private void putPipeline() throws IOException { + /** + * This creates a pipeline with a geoip processor, which ought to cause the geoip downloader to begin (assuming it is enabled). + * @throws IOException + */ + private void putGeoIpPipeline() throws IOException { + putGeoIpPipeline("_id"); + } + + /** + * This creates a pipeline named pipelineId with a geoip processor, which ought to cause the geoip downloader to begin (assuming it is + * enabled). + * @param pipelineId The name of the new pipeline with a geoip processor + * @throws IOException + */ + private void putGeoIpPipeline(String pipelineId) throws IOException { BytesReference bytes; try (XContentBuilder builder = JsonXContent.contentBuilder()) { builder.startObject(); @@ -484,7 +536,45 @@ private void putPipeline() throws IOException { builder.endObject(); bytes = BytesReference.bytes(builder); } - assertAcked(client().admin().cluster().preparePutPipeline("_id", bytes, XContentType.JSON).get()); + assertAcked(client().admin().cluster().preparePutPipeline(pipelineId, bytes, XContentType.JSON).get()); + } + + /** + * This creates a pipeline named pipelineId that does _not_ have a geoip processor. + * @throws IOException + */ + private void putNonGeoipPipeline(String pipelineId) throws IOException { + BytesReference bytes; + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + { + builder.startArray("processors"); + { + builder.startObject(); + { + builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE); + builder.endObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE); + builder.endObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE); + builder.endObject(); + } + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + bytes = BytesReference.bytes(builder); + } + assertAcked(client().admin().cluster().preparePutPipeline(pipelineId, bytes, XContentType.JSON).get()); } private List getGeoIpTmpDirs() throws IOException { @@ -624,4 +714,32 @@ public int read(byte[] b, int off, int len) throws IOException { return read; } } + + /** + * This class defines a processor of type "test". + */ + public static final class NonGeoProcessorsPlugin extends Plugin implements IngestPlugin { + public static final String NON_GEO_PROCESSOR_TYPE = "test"; + + @Override + public Map getProcessors(Processor.Parameters parameters) { + Map procMap = new HashMap<>(); + procMap.put(NON_GEO_PROCESSOR_TYPE, (factories, tag, description, config) -> new AbstractProcessor(tag, description) { + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) {} + + @Override + public String getType() { + return NON_GEO_PROCESSOR_TYPE; + } + + @Override + public boolean isAsync() { + return false; + } + + }); + return procMap; + } + } } diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java index 6076063a38b5c..eea763351dd09 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java @@ -20,6 +20,8 @@ import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xcontent.json.JsonXContent; import org.junit.After; import java.io.IOException; @@ -29,6 +31,7 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -80,7 +83,7 @@ public void testStats() throws Exception { assertThat(jsonMapView.get("stats.databases_count"), equalTo(0)); assertThat(jsonMapView.get("stats.total_download_time"), equalTo(0)); assertEquals(0, jsonMapView.>get("nodes").size()); - + putPipeline(); ClusterUpdateSettingsResponse settingsResponse = client().admin() .cluster() .prepareUpdateSettings() @@ -108,6 +111,33 @@ public void testStats() throws Exception { }); } + private void putPipeline() throws IOException { + BytesReference bytes; + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + { + builder.startArray("processors"); + { + builder.startObject(); + { + builder.startObject("geoip"); + { + builder.field("field", "ip"); + builder.field("target_field", "ip-city"); + builder.field("database_file", "GeoLite2-City.mmdb"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + bytes = BytesReference.bytes(builder); + } + assertAcked(client().admin().cluster().preparePutPipeline("_id", bytes, XContentType.JSON).get()); + } + public static Map convertToMap(ToXContent part) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder(); part.toXContent(builder, EMPTY_PARAMS); diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskIT.java index 60be668272b2c..83fde48b39f3d 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskIT.java @@ -48,7 +48,7 @@ public void cleanUp() throws Exception { .setPersistentSettings( Settings.builder() .putNull(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey()) - .putNull(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey()) + .putNull(GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getKey()) .putNull("ingest.geoip.database_validity") ) .get() diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java index 6776ab9d629a2..0732674632b34 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -48,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Supplier; /** * Main component responsible for downloading new GeoIP databases. @@ -59,14 +59,6 @@ public class GeoIpDownloader extends AllocatedPersistentTask { private static final Logger logger = LogManager.getLogger(GeoIpDownloader.class); - public static final Setting POLL_INTERVAL_SETTING = Setting.timeSetting( - "ingest.geoip.downloader.poll.interval", - TimeValue.timeValueDays(3), - TimeValue.timeValueDays(1), - Property.Dynamic, - Property.NodeScope - ); - // for overriding in tests private static final String DEFAULT_ENDPOINT = System.getProperty( "ingest.geoip.downloader.endpoint.default", @@ -91,9 +83,16 @@ public class GeoIpDownloader extends AllocatedPersistentTask { // visible for testing protected volatile GeoIpTaskState state; - private volatile TimeValue pollInterval; private volatile Scheduler.ScheduledCancellable scheduled; private volatile GeoIpDownloaderStats stats = GeoIpDownloaderStats.EMPTY; + private final Supplier pollIntervalSupplier; + private final Supplier eagerDownloadSupplier; + /* + * This variable tells us whether we have at least one pipeline with a geoip processor. If there are no geoip processors then we do + * not download geoip databases (unless configured to eagerly download). Access is not protected because it is set in the constructor + * and then only ever updated on the cluster state update thread (it is also read on the generic thread). Non-private for unit testing. + */ + private final Supplier atLeastOneGeoipProcessorSupplier; GeoIpDownloader( Client client, @@ -106,7 +105,10 @@ public class GeoIpDownloader extends AllocatedPersistentTask { String action, String description, TaskId parentTask, - Map headers + Map headers, + Supplier pollIntervalSupplier, + Supplier eagerDownloadSupplier, + Supplier atLeastOneGeoipProcessorSupplier ) { super(id, type, action, description, parentTask, headers); this.httpClient = httpClient; @@ -114,15 +116,9 @@ public class GeoIpDownloader extends AllocatedPersistentTask { this.clusterService = clusterService; this.threadPool = threadPool; endpoint = ENDPOINT_SETTING.get(settings); - pollInterval = POLL_INTERVAL_SETTING.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(POLL_INTERVAL_SETTING, this::setPollInterval); - } - - public void setPollInterval(TimeValue pollInterval) { - this.pollInterval = pollInterval; - if (scheduled != null && scheduled.cancel()) { - scheduleNextRun(TimeValue.ZERO); - } + this.pollIntervalSupplier = pollIntervalSupplier; + this.eagerDownloadSupplier = eagerDownloadSupplier; + this.atLeastOneGeoipProcessorSupplier = atLeastOneGeoipProcessorSupplier; } // visible for testing @@ -130,6 +126,7 @@ void updateDatabases() throws IOException { var clusterState = clusterService.state(); var geoipIndex = clusterState.getMetadata().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); if (geoipIndex != null) { + logger.trace("The {} index is not null", GeoIpDownloader.DATABASES_INDEX); if (clusterState.getRoutingTable().index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) { throw new ElasticsearchException("not all primary shards of [" + DATABASES_INDEX + "] index are active"); } @@ -138,13 +135,18 @@ void updateDatabases() throws IOException { throw blockException; } } - - logger.debug("updating geoip databases"); - List> response = fetchDatabasesOverview(); - for (Map res : response) { - if (res.get("name").toString().endsWith(".tgz")) { - processDatabase(res); + if (eagerDownloadSupplier.get() || atLeastOneGeoipProcessorSupplier.get()) { + logger.trace("Updating geoip databases"); + List> response = fetchDatabasesOverview(); + for (Map res : response) { + if (res.get("name").toString().endsWith(".tgz")) { + processDatabase(res); + } } + } else { + logger.trace( + "Not updating geoip databases because no geoip processors exist in the cluster and eager downloading is not configured" + ); } } @@ -186,7 +188,7 @@ void processDatabase(Map databaseInfo) { } } catch (Exception e) { stats = stats.failedDownload(); - logger.error((Supplier) () -> "error downloading geoip database [" + name + "]", e); + logger.error((org.apache.logging.log4j.util.Supplier) () -> "error downloading geoip database [" + name + "]", e); } } @@ -266,6 +268,9 @@ void setState(GeoIpTaskState state) { this.state = state; } + /** + * Downloads the geoip databases now, and schedules them to be downloaded again after pollInterval. + */ void runDownloader() { if (isCancelled() || isCompleted()) { return; @@ -281,7 +286,22 @@ void runDownloader() { } catch (Exception e) { logger.error("exception during geoip databases cleanup", e); } - scheduleNextRun(pollInterval); + scheduleNextRun(pollIntervalSupplier.get()); + } + + /** + * This method requests that the downloader be rescheduled to run immediately (presumably because a dynamic property supplied by + * pollIntervalSupplier or eagerDownloadSupplier has changed, or a pipeline with a geoip processor has been added). This method does + * nothing if this task is cancelled, completed, or has not yet been scheduled to run for the first time. It cancels any existing + * scheduled run. + */ + public void requestReschedule() { + if (isCancelled() || isCompleted()) { + return; + } + if (scheduled != null && scheduled.cancel()) { + scheduleNextRun(TimeValue.ZERO); + } } private void cleanDatabases() { @@ -321,4 +341,5 @@ private void scheduleNextRun(TimeValue time) { scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC); } } + } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index c56fd9c2d0c53..7457738b75301 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -17,15 +17,20 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; +import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -35,7 +40,10 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; +import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; @@ -56,6 +64,20 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor POLL_INTERVAL_SETTING = Setting.timeSetting( + "ingest.geoip.downloader.poll.interval", + TimeValue.timeValueDays(3), + TimeValue.timeValueDays(1), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting EAGER_DOWNLOAD_SETTING = Setting.boolSetting( + "ingest.geoip.downloader.eager.download", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); private static final Logger logger = LogManager.getLogger(GeoIpDownloader.class); @@ -66,6 +88,10 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor currentTask = new AtomicReference<>(); + private volatile TimeValue pollInterval; + private volatile boolean eagerDownload; + private volatile boolean atLeastOneGeoipProcessor; + private final AtomicBoolean taskIsBootstrapped = new AtomicBoolean(false); GeoIpDownloaderTaskExecutor(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool) { super(GEOIP_DOWNLOADER, ThreadPool.Names.GENERIC); @@ -75,9 +101,18 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor pollInterval, + () -> eagerDownload, + () -> atLeastOneGeoipProcessor ); } @@ -140,12 +198,65 @@ public void clusterChanged(ClusterChangedEvent event) { return; } - clusterService.removeListener(this); - if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) { - startTask(() -> clusterService.addListener(this)); - } else { - stopTask(() -> clusterService.addListener(this)); + if (taskIsBootstrapped.getAndSet(true) == false) { + this.atLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(event.state()); + if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) { + startTask(() -> taskIsBootstrapped.set(false)); + } else { + stopTask(() -> taskIsBootstrapped.set(false)); + } } + + if (event.metadataChanged() && event.changedCustomMetadataSet().contains(IngestMetadata.TYPE)) { + boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(event.state()); + if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false) { + atLeastOneGeoipProcessor = true; + logger.trace("Scheduling runDownloader because a geoip processor has been added"); + GeoIpDownloader currentDownloader = getCurrentTask(); + if (currentDownloader != null) { + currentDownloader.requestReschedule(); + } + } else { + atLeastOneGeoipProcessor = newAtLeastOneGeoipProcessor; + } + } + } + + @SuppressWarnings("unchecked") + static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) { + List pipelineDefinitions = IngestService.getPipelines(clusterState); + return pipelineDefinitions.stream().anyMatch(pipelineDefinition -> { + Map pipelineMap = pipelineDefinition.getConfigAsMap(); + return hasAtLeastOneGeoipProcessor((List>) pipelineMap.get(Pipeline.PROCESSORS_KEY)); + }); + } + + private static boolean hasAtLeastOneGeoipProcessor(List> processors) { + return processors != null && processors.stream().anyMatch(GeoIpDownloaderTaskExecutor::hasAtLeastOneGeoipProcessor); + } + + private static boolean hasAtLeastOneGeoipProcessor(Map processor) { + return processor != null + && (processor.containsKey(GeoIpProcessor.TYPE) + || isProcessorWithOnFailureGeoIpProcessor(processor) + || isForeachProcessorWithGeoipProcessor(processor)); + } + + @SuppressWarnings("unchecked") + private static boolean isProcessorWithOnFailureGeoIpProcessor(Map processor) { + return processor != null + && processor.values() + .stream() + .anyMatch( + value -> value instanceof Map + && hasAtLeastOneGeoipProcessor(((Map>>) value).get("on_failure")) + ); + } + + @SuppressWarnings("unchecked") + private static boolean isForeachProcessorWithGeoipProcessor(Map processor) { + return processor.containsKey("foreach") + && hasAtLeastOneGeoipProcessor(((Map>) processor.get("foreach")).get("processor")); } private void startTask(Runnable onFailure) { diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index f69558171fd44..8aaf476b353ea 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -85,9 +85,10 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemInd public List> getSettings() { return Arrays.asList( CACHE_SIZE, + GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING, + GeoIpDownloaderTaskExecutor.ENABLED_SETTING, GeoIpDownloader.ENDPOINT_SETTING, - GeoIpDownloader.POLL_INTERVAL_SETTING, - GeoIpDownloaderTaskExecutor.ENABLED_SETTING + GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING ); } @@ -126,6 +127,7 @@ public Collection createComponents( } geoIpDownloaderTaskExecutor = new GeoIpDownloaderTaskExecutor(client, new HttpClient(), clusterService, threadPool); + geoIpDownloaderTaskExecutor.init(); return List.of(databaseRegistry.get(), geoIpDownloaderTaskExecutor); } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java new file mode 100644 index 0000000000000..5cbe205f5c9c7 --- /dev/null +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java @@ -0,0 +1,253 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.ingest.IngestMetadata; +import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { + public void testHasAtLeastOneGeoipProcessor() { + Map configs = new HashMap<>(); + IngestMetadata ingestMetadata = new IngestMetadata(configs); + ClusterState clusterState = mock(ClusterState.class); + Metadata metadata = mock(Metadata.class); + when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata); + when(clusterState.getMetadata()).thenReturn(metadata); + List expectHitsInputs = getPipelinesWithGeoIpProcessors(); + List expectMissesInputs = getPipelinesWithoutGeoIpProcessors(); + { + // Test that hasAtLeastOneGeoipProcessor returns true for any pipeline with a geoip processor: + for (String pipeline : expectHitsInputs) { + configs.clear(); + configs.put("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON)); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + } + } + { + // Test that hasAtLeastOneGeoipProcessor returns false for any pipeline without a geoip processor: + for (String pipeline : expectMissesInputs) { + configs.clear(); + configs.put("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON)); + assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + } + } + { + /* + * Now test that hasAtLeastOneGeoipProcessor returns true for a mix of pipelines, some which have geoip processors and some + * which do not: + */ + configs.clear(); + for (String pipeline : expectHitsInputs) { + String id = randomAlphaOfLength(20); + configs.put(id, new PipelineConfiguration(id, new BytesArray(pipeline), XContentType.JSON)); + } + for (String pipeline : expectMissesInputs) { + String id = randomAlphaOfLength(20); + configs.put(id, new PipelineConfiguration(id, new BytesArray(pipeline), XContentType.JSON)); + } + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + } + } + + /* + * This method returns an assorted list of pipelines that have geoip processors -- ones that ought to cause hasAtLeastOneGeoipProcessor + * to return true. + */ + private List getPipelinesWithGeoIpProcessors() { + String simpleGeoIpProcessor = """ + { + "processors":[ + { + "geoip":{ + "field":"provider" + } + } + ] + } + """; + String onFailureWithGeoIpProcessor = """ + { + "processors":[ + { + "rename":{ + "field":"provider", + "target_field":"cloud.provider", + "on_failure":[ + { + "geoip":{ + "field":"error.message" + } + } + ] + } + } + ] + } + """; + String foreachWithGeoIpProcessor = """ + { + "processors":[ + { + "foreach":{ + "field":"values", + "processor": + { + "geoip":{ + "field":"someField" + } + } + } + } + ] + } + """; + String nestedForeachWithGeoIpProcessor = """ + { + "processors":[ + { + "foreach":{ + "field":"values", + "processor": + { + "foreach":{ + "field":"someField", + "processor": + { + "geoip":{ + "field":"someField" + } + } + } + } + } + } + ] + } + """; + String nestedForeachWithOnFailureWithGeoIpProcessor = """ + { + "processors":[ + { + "foreach":{ + "field":"values", + "processor": + { + "foreach":{ + "field":"someField", + "processor": + { + "rename":{ + "field":"provider", + "target_field":"cloud.provider", + "on_failure":[ + { + "geoip":{ + "field":"error.message" + } + } + ] + } + } + } + } + } + } + ] + } + """; + String onFailureWithForeachWithGeoIp = """ + { + "processors":[ + { + "rename":{ + "field":"provider", + "target_field":"cloud.provider", + "on_failure":[ + { + "foreach":{ + "field":"values", + "processor": + { + "geoip":{ + "field":"someField" + } + } + } + } + ] + } + } + ] + } + """; + return List.of( + simpleGeoIpProcessor, + onFailureWithGeoIpProcessor, + foreachWithGeoIpProcessor, + nestedForeachWithGeoIpProcessor, + nestedForeachWithOnFailureWithGeoIpProcessor, + onFailureWithForeachWithGeoIp + ); + } + + /* + * This method returns an assorted list of pipelines that _do not_ have geoip processors -- ones that ought to cause + * hasAtLeastOneGeoipProcessor to return false. + */ + private List getPipelinesWithoutGeoIpProcessors() { + String empty = """ + { + } + """; + String noProcessors = """ + { + "processors":[ + ] + } + """; + String onFailureWithForeachWithSet = """ + { + "processors":[ + { + "rename":{ + "field":"provider", + "target_field":"cloud.provider", + "on_failure":[ + { + "foreach":{ + "field":"values", + "processor": + { + "set":{ + "field":"someField" + } + } + } + } + ] + } + } + ] + } + """; + return List.of(empty, noProcessors, onFailureWithForeachWithSet); + } +} diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java index 16088deb86b3d..9f3334a07d8f3 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java @@ -53,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -81,7 +82,12 @@ public void setup() { when(clusterService.getClusterSettings()).thenReturn( new ClusterSettings( Settings.EMPTY, - Set.of(GeoIpDownloader.ENDPOINT_SETTING, GeoIpDownloader.POLL_INTERVAL_SETTING, GeoIpDownloaderTaskExecutor.ENABLED_SETTING) + Set.of( + GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING, + GeoIpDownloader.ENDPOINT_SETTING, + GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING, + GeoIpDownloaderTaskExecutor.ENABLED_SETTING + ) ) ); ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of())); @@ -98,7 +104,10 @@ public void setup() { "", "", EMPTY_TASK_ID, - Collections.emptyMap() + Collections.emptyMap(), + () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), + () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), + () -> true ); } @@ -252,7 +261,10 @@ public void testProcessDatabaseNew() throws IOException { "", "", EMPTY_TASK_ID, - Collections.emptyMap() + Collections.emptyMap(), + () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), + () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), + () -> true ) { @Override void updateTaskState() { @@ -298,7 +310,10 @@ public void testProcessDatabaseUpdate() throws IOException { "", "", EMPTY_TASK_ID, - Collections.emptyMap() + Collections.emptyMap(), + () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), + () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), + () -> true ) { @Override void updateTaskState() { @@ -346,7 +361,10 @@ public void testProcessDatabaseSame() throws IOException { "", "", EMPTY_TASK_ID, - Collections.emptyMap() + Collections.emptyMap(), + () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), + () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), + () -> true ) { @Override void updateTaskState() { @@ -387,7 +405,10 @@ public void testUpdateTaskState() { "", "", EMPTY_TASK_ID, - Collections.emptyMap() + Collections.emptyMap(), + () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), + () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), + () -> true ) { @Override public void updatePersistentTaskState(PersistentTaskState state, ActionListener> listener) { @@ -414,7 +435,10 @@ public void testUpdateTaskStateError() { "", "", EMPTY_TASK_ID, - Collections.emptyMap() + Collections.emptyMap(), + () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), + () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), + () -> true ) { @Override public void updatePersistentTaskState(PersistentTaskState state, ActionListener> listener) { @@ -440,6 +464,7 @@ public void testUpdateDatabases() throws IOException { builder.close(); when(httpClient.getBytes("a.b?elastic_geoip_service_tos=agree")).thenReturn(baos.toByteArray()); Iterator> it = maps.iterator(); + final AtomicBoolean atLeastOneGeoipProcessor = new AtomicBoolean(false); geoIpDownloader = new GeoIpDownloader( client, httpClient, @@ -451,7 +476,10 @@ public void testUpdateDatabases() throws IOException { "", "", EMPTY_TASK_ID, - Collections.emptyMap() + Collections.emptyMap(), + () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), + () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), + atLeastOneGeoipProcessor::get ) { @Override void processDatabase(Map databaseInfo) { @@ -459,6 +487,9 @@ void processDatabase(Map databaseInfo) { } }; geoIpDownloader.updateDatabases(); + assertTrue(it.hasNext()); + atLeastOneGeoipProcessor.set(true); + geoIpDownloader.updateDatabases(); assertFalse(it.hasNext()); } diff --git a/modules/ingest-geoip/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientYamlTestSuiteIT.java b/modules/ingest-geoip/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientYamlTestSuiteIT.java index 5b40f4a6ada43..8584229ec171e 100644 --- a/modules/ingest-geoip/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientYamlTestSuiteIT.java +++ b/modules/ingest-geoip/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientYamlTestSuiteIT.java @@ -11,11 +11,17 @@ import com.carrotsearch.randomizedtesting.annotations.Name; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; import org.elasticsearch.client.Request; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; import org.junit.Before; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -36,6 +42,7 @@ public static Iterable parameters() throws Exception { @Before public void waitForDatabases() throws Exception { + putGeoipPipeline(); assertBusy(() -> { Request request = new Request("GET", "/_ingest/geoip/stats"); Map response = entityAsMap(client().performRequest(request)); @@ -53,4 +60,37 @@ public void waitForDatabases() throws Exception { }); } + /** + * This creates a pipeline with a geoip processor so that the GeoipDownloader will download its databases. + * @throws IOException + */ + private void putGeoipPipeline() throws IOException { + final BytesReference bytes; + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + { + builder.startArray("processors"); + { + builder.startObject(); + { + builder.startObject("geoip"); + { + builder.field("field", "ip"); + builder.field("target_field", "ip-city"); + builder.field("database_file", "GeoLite2-City.mmdb"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + bytes = BytesReference.bytes(builder); + } + Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/pipeline-with-geoip"); + putPipelineRequest.setEntity(new ByteArrayEntity(bytes.array(), ContentType.APPLICATION_JSON)); + client().performRequest(putPipelineRequest); + } + } diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java index 3dedd041d6465..eb0e97e1ecce1 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java @@ -26,7 +26,9 @@ public void testGeoIpDownloader() throws Exception { assertBusy(() -> { Response response = client().performRequest(new Request("GET", "_ingest/geoip/stats")); String tasks = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); - assertThat(tasks, Matchers.containsString("failed_downloads\":1")); + // The geoip downloader doesn't actually do anything since there are no geoip processors: + assertThat(tasks, Matchers.containsString("failed_downloads\":0")); + assertThat(tasks, Matchers.containsString("successful_downloads\":0")); }); } }