3030import org .elasticsearch .cluster .ClusterStateApplier ;
3131import org .elasticsearch .cluster .ClusterStateTaskConfig ;
3232import org .elasticsearch .cluster .ClusterStateTaskExecutor ;
33- import org .elasticsearch .cluster .ClusterStateUpdateTask ;
33+ import org .elasticsearch .cluster .ClusterStateTaskListener ;
3434import org .elasticsearch .cluster .metadata .DataStream .TimestampField ;
3535import org .elasticsearch .cluster .metadata .IndexAbstraction ;
3636import org .elasticsearch .cluster .metadata .IndexMetadata ;
4242import org .elasticsearch .cluster .service .ClusterService ;
4343import org .elasticsearch .common .Priority ;
4444import org .elasticsearch .common .bytes .BytesReference ;
45- import org .elasticsearch .common .collect .ImmutableOpenMap ;
4645import org .elasticsearch .common .regex .Regex ;
4746import org .elasticsearch .common .settings .Settings ;
4847import org .elasticsearch .common .util .CollectionUtils ;
4948import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
5049import org .elasticsearch .common .xcontent .XContentHelper ;
51- import org .elasticsearch .core .SuppressForbidden ;
5250import org .elasticsearch .core .TimeValue ;
5351import org .elasticsearch .core .Tuple ;
5452import org .elasticsearch .env .Environment ;
@@ -110,38 +108,37 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
110108 /**
111109 * Cluster state task executor for ingest pipeline operations
112110 */
113- private static final ClusterStateTaskExecutor <PipelineClusterStateUpdateTask > PIPELINE_TASK_EXECUTOR = (currentState , taskContexts ) -> {
114- ClusterState state = currentState ;
111+ static final ClusterStateTaskExecutor <PipelineClusterStateUpdateTask > PIPELINE_TASK_EXECUTOR = (currentState , taskContexts ) -> {
112+ final var allIndexMetadata = currentState .metadata ().indices ().values ();
113+ final IngestMetadata initialIngestMetadata = currentState .metadata ().custom (IngestMetadata .TYPE );
114+ var currentIngestMetadata = initialIngestMetadata ;
115115 for (final var taskContext : taskContexts ) {
116116 try {
117117 final var task = taskContext .getTask ();
118- state = task .execute (state );
119- taskContext .success (new ClusterStateTaskExecutor . LegacyClusterTaskResultActionListener ( task , currentState ));
118+ currentIngestMetadata = task .execute (currentIngestMetadata , allIndexMetadata );
119+ taskContext .success (task . listener . map ( ignored -> AcknowledgedResponse . TRUE ));
120120 } catch (Exception e ) {
121121 taskContext .onFailure (e );
122122 }
123123 }
124- return state ;
124+ final var finalIngestMetadata = currentIngestMetadata ;
125+ return finalIngestMetadata == initialIngestMetadata
126+ ? currentState
127+ : currentState .copyAndUpdateMetadata (b -> b .putCustom (IngestMetadata .TYPE , finalIngestMetadata ));
125128 };
126129
127130 /**
128131 * Specialized cluster state update task specifically for ingest pipeline operations.
129132 * These operations all receive an AcknowledgedResponse.
130133 */
131- private abstract static class PipelineClusterStateUpdateTask extends ClusterStateUpdateTask {
132- private final ActionListener <AcknowledgedResponse > listener ;
134+ abstract static class PipelineClusterStateUpdateTask implements ClusterStateTaskListener {
135+ final ActionListener <AcknowledgedResponse > listener ;
133136
134- PipelineClusterStateUpdateTask (TimeValue timeout , ActionListener <AcknowledgedResponse > listener ) {
135- super (timeout );
137+ PipelineClusterStateUpdateTask (ActionListener <AcknowledgedResponse > listener ) {
136138 this .listener = listener ;
137139 }
138140
139- public abstract ClusterState doExecute (ClusterState currentState ) throws Exception ;
140-
141- @ Override
142- public ClusterState execute (ClusterState currentState ) throws Exception {
143- return doExecute (currentState );
144- }
141+ public abstract IngestMetadata execute (IngestMetadata currentIngestMetadata , Collection <IndexMetadata > allIndexMetadata );
145142
146143 @ Override
147144 public void onFailure (Exception e ) {
@@ -150,7 +147,7 @@ public void onFailure(Exception e) {
150147
151148 @ Override
152149 public void clusterStateProcessed (ClusterState oldState , ClusterState newState ) {
153- listener . onResponse ( AcknowledgedResponse . TRUE ) ;
150+ assert false : "should not be called" ;
154151 }
155152 }
156153
@@ -336,56 +333,51 @@ public ScriptService getScriptService() {
336333 public void delete (DeletePipelineRequest request , ActionListener <AcknowledgedResponse > listener ) {
337334 clusterService .submitStateUpdateTask (
338335 "delete-pipeline-" + request .getId (),
339- new PipelineClusterStateUpdateTask (request .timeout (), listener ) {
340- @ Override
341- public ClusterState doExecute (ClusterState currentState ) {
342- return innerDelete (request , currentState );
343- }
344- },
336+ new DeletePipelineClusterStateUpdateTask (listener , request ),
345337 ClusterStateTaskConfig .build (Priority .NORMAL , request .masterNodeTimeout ()),
346338 PIPELINE_TASK_EXECUTOR
347339 );
348340 }
349341
350- @ SuppressForbidden (reason = "legacy usage of unbatched task" ) // TODO add support for batching here
351- private void submitUnbatchedTask (@ SuppressWarnings ("SameParameterValue" ) String source , ClusterStateUpdateTask task ) {
352- clusterService .submitUnbatchedStateUpdateTask (source , task );
353- }
342+ // visible for testing
343+ static class DeletePipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask {
344+ private final DeletePipelineRequest request ;
354345
355- static ClusterState innerDelete (DeletePipelineRequest request , ClusterState currentState ) {
356- IngestMetadata currentIngestMetadata = currentState .metadata ().custom (IngestMetadata .TYPE );
357- if (currentIngestMetadata == null ) {
358- return currentState ;
346+ DeletePipelineClusterStateUpdateTask (ActionListener <AcknowledgedResponse > listener , DeletePipelineRequest request ) {
347+ super (listener );
348+ this .request = request ;
359349 }
360- Map < String , PipelineConfiguration > pipelines = currentIngestMetadata . getPipelines ();
361- Set < String > toRemove = new HashSet <>();
362- for ( String pipelineKey : pipelines . keySet () ) {
363- if (Regex . simpleMatch ( request . getId (), pipelineKey ) ) {
364- toRemove . add ( pipelineKey ) ;
350+
351+ @ Override
352+ public IngestMetadata execute ( IngestMetadata currentIngestMetadata , Collection < IndexMetadata > allIndexMetadata ) {
353+ if (currentIngestMetadata == null ) {
354+ return null ;
365355 }
356+ Map <String , PipelineConfiguration > pipelines = currentIngestMetadata .getPipelines ();
357+ Set <String > toRemove = new HashSet <>();
358+ for (String pipelineKey : pipelines .keySet ()) {
359+ if (Regex .simpleMatch (request .getId (), pipelineKey )) {
360+ toRemove .add (pipelineKey );
361+ }
362+ }
363+ if (toRemove .isEmpty () && Regex .isMatchAllPattern (request .getId ()) == false ) {
364+ throw new ResourceNotFoundException ("pipeline [{}] is missing" , request .getId ());
365+ } else if (toRemove .isEmpty ()) {
366+ return currentIngestMetadata ;
367+ }
368+ final Map <String , PipelineConfiguration > pipelinesCopy = new HashMap <>(pipelines );
369+ for (String key : toRemove ) {
370+ validateNotInUse (key , allIndexMetadata );
371+ pipelinesCopy .remove (key );
372+ }
373+ return new IngestMetadata (pipelinesCopy );
366374 }
367- if (toRemove .isEmpty () && Regex .isMatchAllPattern (request .getId ()) == false ) {
368- throw new ResourceNotFoundException ("pipeline [{}] is missing" , request .getId ());
369- } else if (toRemove .isEmpty ()) {
370- return currentState ;
371- }
372- final Map <String , PipelineConfiguration > pipelinesCopy = new HashMap <>(pipelines );
373- ImmutableOpenMap <String , IndexMetadata > indices = currentState .metadata ().indices ();
374- for (String key : toRemove ) {
375- validateNotInUse (key , indices );
376- pipelinesCopy .remove (key );
377- }
378- ClusterState .Builder newState = ClusterState .builder (currentState );
379- newState .metadata (
380- Metadata .builder (currentState .getMetadata ()).putCustom (IngestMetadata .TYPE , new IngestMetadata (pipelinesCopy )).build ()
381- );
382- return newState .build ();
383375 }
384376
385- static void validateNotInUse (String pipeline , ImmutableOpenMap < String , IndexMetadata > indices ) {
377+ static void validateNotInUse (String pipeline , Collection < IndexMetadata > allIndexMetadata ) {
386378 List <String > defaultPipelineIndices = new ArrayList <>();
387379 List <String > finalPipelineIndices = new ArrayList <>();
388- for (IndexMetadata indexMetadata : indices . values () ) {
380+ for (IndexMetadata indexMetadata : allIndexMetadata ) {
389381 String defaultPipeline = IndexSettings .DEFAULT_PIPELINE .get (indexMetadata .getSettings ());
390382 String finalPipeline = IndexSettings .FINAL_PIPELINE .get (indexMetadata .getSettings ());
391383 if (pipeline .equals (defaultPipeline )) {
@@ -499,12 +491,7 @@ public void putPipeline(
499491 validatePipeline (ingestInfos , request .getId (), config );
500492 clusterService .submitStateUpdateTask (
501493 "put-pipeline-" + request .getId (),
502- new PipelineClusterStateUpdateTask (request .timeout (), listener ) {
503- @ Override
504- public ClusterState doExecute (ClusterState currentState ) {
505- return innerPut (request , currentState );
506- }
507- },
494+ new PutPipelineClusterStateUpdateTask (listener , request ),
508495 ClusterStateTaskConfig .build (Priority .NORMAL , request .masterNodeTimeout ()),
509496 PIPELINE_TASK_EXECUTOR
510497 );
@@ -570,74 +557,78 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(
570557 }
571558
572559 // visible for testing
573- static ClusterState innerPut (PutPipelineRequest request , ClusterState currentState ) {
574- IngestMetadata currentIngestMetadata = currentState .metadata ().custom (IngestMetadata .TYPE );
575-
576- BytesReference pipelineSource = request .getSource ();
577- if (request .getVersion () != null ) {
578- var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata .getPipelines ().get (request .getId ()) : null ;
579- if (currentPipeline == null ) {
580- throw new IllegalArgumentException (
581- String .format (
582- Locale .ROOT ,
583- "version conflict, required version [%s] for pipeline [%s] but no pipeline was found" ,
584- request .getVersion (),
585- request .getId ()
586- )
587- );
588- }
560+ static class PutPipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask {
561+ private final PutPipelineRequest request ;
589562
590- final Integer currentVersion = currentPipeline .getVersion ();
591- if (Objects .equals (request .getVersion (), currentVersion ) == false ) {
592- throw new IllegalArgumentException (
593- String .format (
594- Locale .ROOT ,
595- "version conflict, required version [%s] for pipeline [%s] but current version is [%s]" ,
596- request .getVersion (),
597- request .getId (),
598- currentVersion
599- )
600- );
601- }
563+ PutPipelineClusterStateUpdateTask (ActionListener <AcknowledgedResponse > listener , PutPipelineRequest request ) {
564+ super (listener );
565+ this .request = request ;
566+ }
602567
603- var pipelineConfig = XContentHelper .convertToMap (request .getSource (), false , request .getXContentType ()).v2 ();
604- final Integer specifiedVersion = (Integer ) pipelineConfig .get ("version" );
605- if (pipelineConfig .containsKey ("version" ) && Objects .equals (specifiedVersion , currentVersion )) {
606- throw new IllegalArgumentException (
607- String .format (
608- Locale .ROOT ,
609- "cannot update pipeline [%s] with the same version [%s]" ,
610- request .getId (),
611- request .getVersion ()
612- )
613- );
614- }
568+ @ Override
569+ public IngestMetadata execute (IngestMetadata currentIngestMetadata , Collection <IndexMetadata > allIndexMetadata ) {
570+ BytesReference pipelineSource = request .getSource ();
571+ if (request .getVersion () != null ) {
572+ var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata .getPipelines ().get (request .getId ()) : null ;
573+ if (currentPipeline == null ) {
574+ throw new IllegalArgumentException (
575+ String .format (
576+ Locale .ROOT ,
577+ "version conflict, required version [%s] for pipeline [%s] but no pipeline was found" ,
578+ request .getVersion (),
579+ request .getId ()
580+ )
581+ );
582+ }
615583
616- // if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
617- if (specifiedVersion == null ) {
618- pipelineConfig .put ("version" , request .getVersion () == null ? 1 : request .getVersion () + 1 );
619- try {
620- var builder = XContentBuilder .builder (request .getXContentType ().xContent ()).map (pipelineConfig );
621- pipelineSource = BytesReference .bytes (builder );
622- } catch (IOException e ) {
623- throw new IllegalStateException (e );
584+ final Integer currentVersion = currentPipeline .getVersion ();
585+ if (Objects .equals (request .getVersion (), currentVersion ) == false ) {
586+ throw new IllegalArgumentException (
587+ String .format (
588+ Locale .ROOT ,
589+ "version conflict, required version [%s] for pipeline [%s] but current version is [%s]" ,
590+ request .getVersion (),
591+ request .getId (),
592+ currentVersion
593+ )
594+ );
595+ }
596+
597+ var pipelineConfig = XContentHelper .convertToMap (request .getSource (), false , request .getXContentType ()).v2 ();
598+ final Integer specifiedVersion = (Integer ) pipelineConfig .get ("version" );
599+ if (pipelineConfig .containsKey ("version" ) && Objects .equals (specifiedVersion , currentVersion )) {
600+ throw new IllegalArgumentException (
601+ String .format (
602+ Locale .ROOT ,
603+ "cannot update pipeline [%s] with the same version [%s]" ,
604+ request .getId (),
605+ request .getVersion ()
606+ )
607+ );
608+ }
609+
610+ // if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
611+ if (specifiedVersion == null ) {
612+ pipelineConfig .put ("version" , request .getVersion () == null ? 1 : request .getVersion () + 1 );
613+ try {
614+ var builder = XContentBuilder .builder (request .getXContentType ().xContent ()).map (pipelineConfig );
615+ pipelineSource = BytesReference .bytes (builder );
616+ } catch (IOException e ) {
617+ throw new IllegalStateException (e );
618+ }
624619 }
625620 }
626- }
627621
628- Map <String , PipelineConfiguration > pipelines ;
629- if (currentIngestMetadata != null ) {
630- pipelines = new HashMap <>(currentIngestMetadata .getPipelines ());
631- } else {
632- pipelines = new HashMap <>();
633- }
622+ Map <String , PipelineConfiguration > pipelines ;
623+ if (currentIngestMetadata != null ) {
624+ pipelines = new HashMap <>(currentIngestMetadata .getPipelines ());
625+ } else {
626+ pipelines = new HashMap <>();
627+ }
634628
635- pipelines .put (request .getId (), new PipelineConfiguration (request .getId (), pipelineSource , request .getXContentType ()));
636- ClusterState .Builder newState = ClusterState .builder (currentState );
637- newState .metadata (
638- Metadata .builder (currentState .getMetadata ()).putCustom (IngestMetadata .TYPE , new IngestMetadata (pipelines )).build ()
639- );
640- return newState .build ();
629+ pipelines .put (request .getId (), new PipelineConfiguration (request .getId (), pipelineSource , request .getXContentType ()));
630+ return new IngestMetadata (pipelines );
631+ }
641632 }
642633
643634 void validatePipeline (Map <DiscoveryNode , IngestInfo > ingestInfos , String pipelineId , Map <String , Object > pipelineConfig )
0 commit comments