4444import org .elasticsearch .common .bytes .BytesReference ;
4545import org .elasticsearch .common .regex .Regex ;
4646import org .elasticsearch .common .settings .Settings ;
47+ import org .elasticsearch .common .util .CollectionUtils ;
4748import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
4849import org .elasticsearch .common .xcontent .XContentHelper ;
50+ import org .elasticsearch .core .Releasable ;
4951import org .elasticsearch .core .TimeValue ;
5052import org .elasticsearch .core .Tuple ;
5153import org .elasticsearch .env .Environment ;
@@ -713,15 +715,37 @@ protected void doRun() {
713715 continue ;
714716 }
715717
716- executePipelines (
717- i ,
718- pipelines .iterator (),
719- hasFinalPipeline ,
720- indexRequest ,
721- onDropped ,
722- onFailure ,
723- refs .acquireListener ()
724- );
718+ // start the stopwatch and acquire a ref to indicate that we're working on this document
719+ final long startTimeInNanos = System .nanoTime ();
720+ totalMetrics .preIngest ();
721+ final int slot = i ;
722+ final Releasable ref = refs .acquire ();
723+ // the document listener gives us three-way logic: a document can fail processing (1), or it can
724+ // be successfully processed. a successfully processed document can be kept (2) or dropped (3).
725+ final ActionListener <Boolean > documentListener = ActionListener .runAfter (new ActionListener <>() {
726+ @ Override
727+ public void onResponse (Boolean kept ) {
728+ assert kept != null ;
729+ if (kept == false ) {
730+ onDropped .accept (slot );
731+ }
732+ }
733+
734+ @ Override
735+ public void onFailure (Exception e ) {
736+ totalMetrics .ingestFailed ();
737+ onFailure .accept (slot , e );
738+ }
739+ }, () -> {
740+ // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate
741+ // that we're finished with this document
742+ final long ingestTimeInNanos = System .nanoTime () - startTimeInNanos ;
743+ totalMetrics .postIngest (ingestTimeInNanos );
744+ ref .close ();
745+ });
746+
747+ IngestDocument ingestDocument = newIngestDocument (indexRequest );
748+ executePipelines (pipelines .iterator (), hasFinalPipeline , indexRequest , ingestDocument , documentListener );
725749
726750 i ++;
727751 }
@@ -731,30 +755,25 @@ protected void doRun() {
731755 }
732756
733757 private void executePipelines (
734- final int slot ,
735- final Iterator <String > it ,
758+ final Iterator <String > pipelineIds ,
736759 final boolean hasFinalPipeline ,
737760 final IndexRequest indexRequest ,
738- final IntConsumer onDropped ,
739- final BiConsumer <Integer , Exception > onFailure ,
740- final ActionListener <Void > onFinished
761+ final IngestDocument ingestDocument ,
762+ final ActionListener <Boolean > listener
741763 ) {
742- assert it .hasNext ();
743- final String pipelineId = it .next ();
764+ assert pipelineIds .hasNext ();
765+ final String pipelineId = pipelineIds .next ();
744766 try {
745- PipelineHolder holder = pipelines .get (pipelineId );
767+ final PipelineHolder holder = pipelines .get (pipelineId );
746768 if (holder == null ) {
747769 throw new IllegalArgumentException ("pipeline with id [" + pipelineId + "] does not exist" );
748770 }
749- Pipeline pipeline = holder .pipeline ;
750- String originalIndex = indexRequest .indices ()[0 ];
751- long startTimeInNanos = System .nanoTime ();
752- totalMetrics .preIngest ();
753- innerExecute (slot , indexRequest , pipeline , onDropped , e -> {
754- long ingestTimeInNanos = System .nanoTime () - startTimeInNanos ;
755- totalMetrics .postIngest (ingestTimeInNanos );
771+ final Pipeline pipeline = holder .pipeline ;
772+ final String originalIndex = indexRequest .indices ()[0 ];
773+ executePipeline (ingestDocument , pipeline , (keep , e ) -> {
774+ assert keep != null ;
775+
756776 if (e != null ) {
757- totalMetrics .ingestFailed ();
758777 logger .debug (
759778 () -> format (
760779 "failed to execute pipeline [%s] for document [%s/%s]" ,
@@ -764,54 +783,105 @@ private void executePipelines(
764783 ),
765784 e
766785 );
767- onFailure .accept (slot , e );
768- // document failed! no further processing of this doc
769- onFinished .onResponse (null );
770- return ;
786+ listener .onFailure (e );
787+ return ; // document failed!
788+ }
789+
790+ if (keep == false ) {
791+ listener .onResponse (false );
792+ return ; // document dropped!
771793 }
772794
773- Iterator <String > newIt = it ;
795+ // update the index request so that we can execute additional pipelines (if any), etc
796+ updateIndexRequestMetadata (indexRequest , ingestDocument .getMetadata ());
797+ try {
798+ // check for self-references if necessary, (i.e. if a script processor has run), and clear the bit
799+ if (ingestDocument .doNoSelfReferencesCheck ()) {
800+ CollectionUtils .ensureNoSelfReferences (ingestDocument .getSource (), null );
801+ ingestDocument .doNoSelfReferencesCheck (false );
802+ }
803+ } catch (IllegalArgumentException ex ) {
804+ // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
805+ // In that case, we catch and wrap the exception, so we can include more details
806+ listener .onFailure (
807+ new IllegalArgumentException (
808+ format (
809+ "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]" ,
810+ pipelineId ,
811+ indexRequest .index (),
812+ indexRequest .id ()
813+ ),
814+ ex
815+ )
816+ );
817+ return ; // document failed!
818+ }
819+
820+ Iterator <String > newPipelineIds = pipelineIds ;
774821 boolean newHasFinalPipeline = hasFinalPipeline ;
775- String newIndex = indexRequest .indices ()[0 ];
822+ final String newIndex = indexRequest .indices ()[0 ];
776823
777824 if (Objects .equals (originalIndex , newIndex ) == false ) {
778- if (hasFinalPipeline && it .hasNext () == false ) {
779- totalMetrics .ingestFailed ();
780- onFailure .accept (
781- slot ,
782- new IllegalStateException ("final pipeline [" + pipelineId + "] can't change the target index" )
825+ if (hasFinalPipeline && pipelineIds .hasNext () == false ) {
826+ listener .onFailure (
827+ new IllegalStateException (
828+ format (
829+ "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]" ,
830+ pipelineId ,
831+ originalIndex ,
832+ newIndex ,
833+ indexRequest .id ()
834+ )
835+ )
783836 );
784- // document failed! no further processing of this doc
785- onFinished .onResponse (null );
786- return ;
837+ return ; // document failed!
787838 } else {
788839 indexRequest .isPipelineResolved (false );
789840 resolvePipelines (null , indexRequest , state .metadata ());
790841 if (IngestService .NOOP_PIPELINE_NAME .equals (indexRequest .getFinalPipeline ()) == false ) {
791- newIt = Collections .singleton (indexRequest .getFinalPipeline ()).iterator ();
842+ newPipelineIds = Collections .singleton (indexRequest .getFinalPipeline ()).iterator ();
792843 newHasFinalPipeline = true ;
793844 } else {
794- newIt = Collections .emptyIterator ();
845+ newPipelineIds = Collections .emptyIterator ();
795846 }
796847 }
797848 }
798849
799- if (newIt .hasNext ()) {
800- executePipelines (slot , newIt , newHasFinalPipeline , indexRequest , onDropped , onFailure , onFinished );
850+ if (newPipelineIds .hasNext ()) {
851+ executePipelines (newPipelineIds , newHasFinalPipeline , indexRequest , ingestDocument , listener );
801852 } else {
802- onFinished .onResponse (null );
853+ // update the index request's source and (potentially) cache the timestamp for TSDB
854+ updateIndexRequestSource (indexRequest , ingestDocument );
855+ cacheRawTimestamp (indexRequest , ingestDocument );
856+ listener .onResponse (true ); // document succeeded!
803857 }
804858 });
805859 } catch (Exception e ) {
806860 logger .debug (
807861 () -> format ("failed to execute pipeline [%s] for document [%s/%s]" , pipelineId , indexRequest .index (), indexRequest .id ()),
808862 e
809863 );
810- onFailure .accept (slot , e );
811- onFinished .onResponse (null );
864+ listener .onFailure (e ); // document failed!
812865 }
813866 }
814867
868+ private void executePipeline (
869+ final IngestDocument ingestDocument ,
870+ final Pipeline pipeline ,
871+ final BiConsumer <Boolean , Exception > handler
872+ ) {
873+ // adapt our {@code BiConsumer<Boolean, Exception>} handler shape to the
874+ // {@code BiConsumer<IngestDocument, Exception>} handler shape used internally
875+ // by ingest pipelines and processors
876+ ingestDocument .executePipeline (pipeline , (result , e ) -> {
877+ if (e != null ) {
878+ handler .accept (true , e );
879+ } else {
880+ handler .accept (result != null , null );
881+ }
882+ });
883+ }
884+
815885 public IngestStats stats () {
816886 IngestStats .Builder statsBuilder = new IngestStats .Builder ();
817887 statsBuilder .addTotalMetrics (totalMetrics );
@@ -863,56 +933,6 @@ static String getProcessorName(Processor processor) {
863933 return sb .toString ();
864934 }
865935
866- private void innerExecute (
867- final int slot ,
868- final IndexRequest indexRequest ,
869- final Pipeline pipeline ,
870- final IntConsumer itemDroppedHandler ,
871- final Consumer <Exception > handler
872- ) {
873- if (pipeline .getProcessors ().isEmpty ()) {
874- handler .accept (null );
875- return ;
876- }
877-
878- IngestDocument ingestDocument = newIngestDocument (indexRequest );
879- ingestDocument .executePipeline (pipeline , (result , e ) -> {
880- if (e != null ) {
881- handler .accept (e );
882- } else if (result == null ) {
883- itemDroppedHandler .accept (slot );
884- handler .accept (null );
885- } else {
886- updateIndexRequestMetadata (indexRequest , ingestDocument .getMetadata ());
887- try {
888- updateIndexRequestSource (indexRequest , ingestDocument );
889- } catch (IllegalArgumentException ex ) {
890- // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
891- // In that case, we catch and wrap the exception, so we can include which pipeline failed.
892- handler .accept (
893- new IllegalArgumentException (
894- "Failed to generate the source document for ingest pipeline [" + pipeline .getId () + "]" ,
895- ex
896- )
897- );
898- return ;
899- } catch (Exception ex ) {
900- // If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example,
901- // *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable
902- // collection, and another indexing thread modifies the shared reference while we're trying to ensure it has
903- // no self references.
904- handler .accept (
905- new RuntimeException ("Failed to generate the source document for ingest pipeline [" + pipeline .getId () + "]" , ex )
906- );
907- return ;
908- }
909- cacheRawTimestamp (indexRequest , ingestDocument );
910-
911- handler .accept (null );
912- }
913- });
914- }
915-
916936 /**
917937 * Builds a new ingest document from the passed-in index request.
918938 */
@@ -960,6 +980,9 @@ private static void updateIndexRequestMetadata(final IndexRequest request, final
960980 */
961981 private static void updateIndexRequestSource (final IndexRequest request , final IngestDocument document ) {
962982 boolean ensureNoSelfReferences = document .doNoSelfReferencesCheck ();
983+ // we already check for self references elsewhere (and clear the bit), so this should always be false,
984+ // keeping the check and assert as a guard against extraordinarily surprising circumstances
985+ assert ensureNoSelfReferences == false ;
963986 request .source (document .getSource (), request .getContentType (), ensureNoSelfReferences );
964987 }
965988
0 commit comments