@@ -947,10 +947,8 @@ private void innerExecute(
947947 boolean ensureNoSelfReferences = ingestDocument .doNoSelfReferencesCheck ();
948948 indexRequest .source (ingestDocument .getSource (), indexRequest .getContentType (), ensureNoSelfReferences );
949949 } catch (IllegalArgumentException ex ) {
950- // An IllegalArgumentException can be thrown when an ingest
951- // processor creates a source map that is self-referencing.
952- // In that case, we catch and wrap the exception so we can
953- // include which pipeline failed.
950+ // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
951+ // In that case, we catch and wrap the exception, so we can include which pipeline failed.
954952 totalMetrics .ingestFailed ();
955953 handler .accept (
956954 new IllegalArgumentException (
@@ -959,6 +957,19 @@ private void innerExecute(
959957 )
960958 );
961959 return ;
960+ } catch (Exception ex ) {
961+ // If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example,
962+ // *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable
963+ // collection, and another indexing thread modifies the shared reference while we're trying to ensure it has
964+ // no self references.
965+ totalMetrics .ingestFailed ();
966+ handler .accept (
967+ new RuntimeException (
968+ "Failed to generate the source document for ingest pipeline [" + pipeline .getId () + "]" ,
969+ ex
970+ )
971+ );
972+ return ;
962973 }
963974 Map <String , String > map ;
964975 if ((map = metadata .getDynamicTemplates ()) != null ) {
0 commit comments