2727import org .elasticsearch .index .query .BoolQueryBuilder ;
2828import org .elasticsearch .index .query .MatchQueryBuilder ;
2929import org .elasticsearch .index .query .RangeQueryBuilder ;
30+ import org .elasticsearch .ingest .AbstractProcessor ;
3031import org .elasticsearch .ingest .IngestDocument ;
32+ import org .elasticsearch .ingest .Processor ;
3133import org .elasticsearch .ingest .geoip .stats .GeoIpDownloaderStatsAction ;
3234import org .elasticsearch .persistent .PersistentTaskParams ;
3335import org .elasticsearch .persistent .PersistentTasksCustomMetadata ;
36+ import org .elasticsearch .plugins .IngestPlugin ;
3437import org .elasticsearch .plugins .Plugin ;
3538import org .elasticsearch .reindex .ReindexPlugin ;
3639import org .elasticsearch .search .SearchHit ;
5154import java .util .ArrayList ;
5255import java .util .Arrays ;
5356import java .util .Collection ;
57+ import java .util .HashMap ;
5458import java .util .Iterator ;
5559import java .util .List ;
5660import java .util .Map ;
5761import java .util .Set ;
5862import java .util .concurrent .TimeUnit ;
63+ import java .util .function .BiConsumer ;
5964import java .util .stream .Collectors ;
6065import java .util .stream .Stream ;
6166import java .util .stream .StreamSupport ;
@@ -82,7 +87,12 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
8287
8388 @ Override
8489 protected Collection <Class <? extends Plugin >> nodePlugins () {
85- return Arrays .asList (ReindexPlugin .class , IngestGeoIpPlugin .class , GeoIpProcessorNonIngestNodeIT .IngestGeoIpSettingsPlugin .class );
90+ return Arrays .asList (
91+ ReindexPlugin .class ,
92+ IngestGeoIpPlugin .class ,
93+ GeoIpProcessorNonIngestNodeIT .IngestGeoIpSettingsPlugin .class ,
94+ NonGeoProcessorsPlugin .class
95+ );
8696 }
8797
8898 @ Override
@@ -104,7 +114,7 @@ public void cleanUp() throws Exception {
104114 .setPersistentSettings (
105115 Settings .builder ()
106116 .putNull (GeoIpDownloaderTaskExecutor .ENABLED_SETTING .getKey ())
107- .putNull (GeoIpDownloader .POLL_INTERVAL_SETTING .getKey ())
117+ .putNull (GeoIpDownloaderTaskExecutor .POLL_INTERVAL_SETTING .getKey ())
108118 .putNull ("ingest.geoip.database_validity" )
109119 )
110120 .get ();
@@ -149,6 +159,7 @@ public void cleanUp() throws Exception {
149159 @ TestLogging (value = "org.elasticsearch.ingest.geoip:TRACE" , reason = "https://github.com/elastic/elasticsearch/issues/75221" )
150160 public void testInvalidTimestamp () throws Exception {
151161 assumeTrue ("only test with fixture to have stable results" , ENDPOINT != null );
162+ putGeoIpPipeline ();
152163 ClusterUpdateSettingsResponse settingsResponse = client ().admin ()
153164 .cluster ()
154165 .prepareUpdateSettings ()
@@ -160,7 +171,7 @@ public void testInvalidTimestamp() throws Exception {
160171 assertEquals (Set .of ("GeoLite2-ASN.mmdb" , "GeoLite2-City.mmdb" , "GeoLite2-Country.mmdb" ), state .getDatabases ().keySet ());
161172 }, 2 , TimeUnit .MINUTES );
162173
163- putPipeline ();
174+ putGeoIpPipeline ();
164175 verifyUpdatedDatabase ();
165176
166177 settingsResponse = client ().admin ()
@@ -172,7 +183,9 @@ public void testInvalidTimestamp() throws Exception {
172183 settingsResponse = client ().admin ()
173184 .cluster ()
174185 .prepareUpdateSettings ()
175- .setPersistentSettings (Settings .builder ().put (GeoIpDownloader .POLL_INTERVAL_SETTING .getKey (), TimeValue .timeValueDays (2 )))
186+ .setPersistentSettings (
187+ Settings .builder ().put (GeoIpDownloaderTaskExecutor .POLL_INTERVAL_SETTING .getKey (), TimeValue .timeValueDays (2 ))
188+ )
176189 .get ();
177190 assertTrue (settingsResponse .isAcknowledged ());
178191 List <Path > geoIpTmpDirs = getGeoIpTmpDirs ();
@@ -186,7 +199,7 @@ public void testInvalidTimestamp() throws Exception {
186199 }
187200 }
188201 });
189- putPipeline ();
202+ putGeoIpPipeline ();
190203 assertBusy (() -> {
191204 SimulateDocumentBaseResult result = simulatePipeline ();
192205 assertThat (result .getFailure (), nullValue ());
@@ -221,14 +234,17 @@ public void testUpdatedTimestamp() throws Exception {
221234 ClusterUpdateSettingsResponse settingsResponse = client ().admin ()
222235 .cluster ()
223236 .prepareUpdateSettings ()
224- .setPersistentSettings (Settings .builder ().put (GeoIpDownloader .POLL_INTERVAL_SETTING .getKey (), TimeValue .timeValueDays (2 )))
237+ .setPersistentSettings (
238+ Settings .builder ().put (GeoIpDownloaderTaskExecutor .POLL_INTERVAL_SETTING .getKey (), TimeValue .timeValueDays (2 ))
239+ )
225240 .get ();
226241 assertTrue (settingsResponse .isAcknowledged ());
227242 assertBusy (() -> assertNotEquals (lastCheck , getGeoIpTaskState ().getDatabases ().get ("GeoLite2-ASN.mmdb" ).lastCheck ()));
228243 testGeoIpDatabasesDownload ();
229244 }
230245
231246 public void testGeoIpDatabasesDownload () throws Exception {
247+ putGeoIpPipeline ();
232248 ClusterUpdateSettingsResponse settingsResponse = client ().admin ()
233249 .cluster ()
234250 .prepareUpdateSettings ()
@@ -283,12 +299,34 @@ public void testGeoIpDatabasesDownload() throws Exception {
283299 }
284300 }
285301
302+ public void testGeoIpDatabasesDownloadNoGeoipProcessors () throws Exception {
303+ assumeTrue ("only test with fixture to have stable results" , ENDPOINT != null );
304+ String pipelineId = randomAlphaOfLength (10 );
305+ putGeoIpPipeline (pipelineId );
306+ ClusterUpdateSettingsResponse settingsResponse = client ().admin ()
307+ .cluster ()
308+ .prepareUpdateSettings ()
309+ .setPersistentSettings (Settings .builder ().put (GeoIpDownloaderTaskExecutor .ENABLED_SETTING .getKey (), true ))
310+ .get ();
311+ assertTrue (settingsResponse .isAcknowledged ());
312+ assertBusy (() -> { assertNull (getTask ().getState ()); });
313+ putNonGeoipPipeline (pipelineId );
314+ assertBusy (() -> { assertNull (getTask ().getState ()); });
315+ putNonGeoipPipeline (pipelineId );
316+ assertNull (getTask ().getState ());
317+ putGeoIpPipeline ();
318+ assertBusy (() -> {
319+ GeoIpTaskState state = getGeoIpTaskState ();
320+ assertEquals (Set .of ("GeoLite2-ASN.mmdb" , "GeoLite2-City.mmdb" , "GeoLite2-Country.mmdb" ), state .getDatabases ().keySet ());
321+ }, 2 , TimeUnit .MINUTES );
322+ }
323+
286324 @ TestLogging (value = "org.elasticsearch.ingest.geoip:TRACE" , reason = "https://github.com/elastic/elasticsearch/issues/69972" )
287325 public void testUseGeoIpProcessorWithDownloadedDBs () throws Exception {
288326 assumeTrue ("only test with fixture to have stable results" , ENDPOINT != null );
289327 setupDatabasesInConfigDirectory ();
290328 // setup:
291- putPipeline ();
329+ putGeoIpPipeline ();
292330
293331 // verify before updating dbs
294332 {
@@ -355,7 +393,7 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
355393 @ TestLogging (value = "org.elasticsearch.ingest.geoip:TRACE" , reason = "https://github.com/elastic/elasticsearch/issues/79074" )
356394 public void testStartWithNoDatabases () throws Exception {
357395 assumeTrue ("only test with fixture to have stable results" , ENDPOINT != null );
358- putPipeline ();
396+ putGeoIpPipeline ();
359397
360398 // Behaviour without any databases loaded:
361399 {
@@ -438,7 +476,21 @@ private SimulateDocumentBaseResult simulatePipeline() throws IOException {
438476 return (SimulateDocumentBaseResult ) simulateResponse .getResults ().get (0 );
439477 }
440478
441- private void putPipeline () throws IOException {
479+ /**
480+ * This creates a pipeline with a geoip processor, which ought to cause the geoip downloader to begin (assuming it is enabled).
481+ * @throws IOException
482+ */
483+ private void putGeoIpPipeline () throws IOException {
484+ putGeoIpPipeline ("_id" );
485+ }
486+
487+ /**
488+ * This creates a pipeline named pipelineId with a geoip processor, which ought to cause the geoip downloader to begin (assuming it is
489+ * enabled).
490+ * @param pipelineId The name of the new pipeline with a geoip processor
491+ * @throws IOException
492+ */
493+ private void putGeoIpPipeline (String pipelineId ) throws IOException {
442494 BytesReference bytes ;
443495 try (XContentBuilder builder = JsonXContent .contentBuilder ()) {
444496 builder .startObject ();
@@ -484,7 +536,45 @@ private void putPipeline() throws IOException {
484536 builder .endObject ();
485537 bytes = BytesReference .bytes (builder );
486538 }
487- assertAcked (client ().admin ().cluster ().preparePutPipeline ("_id" , bytes , XContentType .JSON ).get ());
539+ assertAcked (client ().admin ().cluster ().preparePutPipeline (pipelineId , bytes , XContentType .JSON ).get ());
540+ }
541+
542+ /**
543+ * This creates a pipeline named pipelineId that does _not_ have a geoip processor.
544+ * @throws IOException
545+ */
546+ private void putNonGeoipPipeline (String pipelineId ) throws IOException {
547+ BytesReference bytes ;
548+ try (XContentBuilder builder = JsonXContent .contentBuilder ()) {
549+ builder .startObject ();
550+ {
551+ builder .startArray ("processors" );
552+ {
553+ builder .startObject ();
554+ {
555+ builder .startObject (NonGeoProcessorsPlugin .NON_GEO_PROCESSOR_TYPE );
556+ builder .endObject ();
557+ }
558+ builder .endObject ();
559+ builder .startObject ();
560+ {
561+ builder .startObject (NonGeoProcessorsPlugin .NON_GEO_PROCESSOR_TYPE );
562+ builder .endObject ();
563+ }
564+ builder .endObject ();
565+ builder .startObject ();
566+ {
567+ builder .startObject (NonGeoProcessorsPlugin .NON_GEO_PROCESSOR_TYPE );
568+ builder .endObject ();
569+ }
570+ builder .endObject ();
571+ }
572+ builder .endArray ();
573+ }
574+ builder .endObject ();
575+ bytes = BytesReference .bytes (builder );
576+ }
577+ assertAcked (client ().admin ().cluster ().preparePutPipeline (pipelineId , bytes , XContentType .JSON ).get ());
488578 }
489579
490580 private List <Path > getGeoIpTmpDirs () throws IOException {
@@ -624,4 +714,32 @@ public int read(byte[] b, int off, int len) throws IOException {
624714 return read ;
625715 }
626716 }
717+
718+ /**
719+ * This class defines a processor of type "test".
720+ */
721+ public static final class NonGeoProcessorsPlugin extends Plugin implements IngestPlugin {
722+ public static final String NON_GEO_PROCESSOR_TYPE = "test" ;
723+
724+ @ Override
725+ public Map <String , Processor .Factory > getProcessors (Processor .Parameters parameters ) {
726+ Map <String , Processor .Factory > procMap = new HashMap <>();
727+ procMap .put (NON_GEO_PROCESSOR_TYPE , (factories , tag , description , config ) -> new AbstractProcessor (tag , description ) {
728+ @ Override
729+ public void execute (IngestDocument ingestDocument , BiConsumer <IngestDocument , Exception > handler ) {}
730+
731+ @ Override
732+ public String getType () {
733+ return NON_GEO_PROCESSOR_TYPE ;
734+ }
735+
736+ @ Override
737+ public boolean isAsync () {
738+ return false ;
739+ }
740+
741+ });
742+ return procMap ;
743+ }
744+ }
627745}
0 commit comments