diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index d5c5b214b5449..d6bd3881e01d2 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -100,7 +100,9 @@ impl ElasticsearchCommon { ); if let Some(pipeline) = &config.pipeline { - query_params.insert("pipeline".into(), pipeline.into()); + if !pipeline.is_empty() { + query_params.insert("pipeline".into(), pipeline.into()); + } } let bulk_url = { diff --git a/src/sinks/elasticsearch/integration_tests.rs b/src/sinks/elasticsearch/integration_tests.rs index 5860576522a94..7e2d7400d0d5b 100644 --- a/src/sinks/elasticsearch/integration_tests.rs +++ b/src/sinks/elasticsearch/integration_tests.rs @@ -126,6 +126,28 @@ async fn ensure_pipeline_in_params() { assert_eq!(common.query_params["pipeline"], pipeline); } +#[tokio::test] +async fn ensure_empty_pipeline_not_in_params() { + let index = gen_index(); + let pipeline = String::from(""); + + let config = ElasticsearchConfig { + endpoints: vec![http_server()], + bulk: BulkConfig { + index, + ..Default::default() + }, + pipeline: Some(pipeline.clone()), + batch: batch_settings(), + ..Default::default() + }; + let common = ElasticsearchCommon::parse_single(&config) + .await + .expect("Config error"); + + assert_eq!(common.query_params.get("pipeline"), None); +} + #[tokio::test] async fn structures_events_correctly() { let index = gen_index();