Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ impl ElasticsearchCommon {
)
.await
{
Ok(version) => version,
Ok(version) => {
debug!(message = "Auto-detected Elasticsearch API version.", %version);
version
}
// This error should be fatal, but for now we only emit it as a warning
// to make the transition smoother.
Err(error) => {
Expand All @@ -156,11 +159,11 @@ impl ElasticsearchCommon {
// This is by no means a perfect assumption but it's the best we can
// make with the data we have.
let assumed_version = if config.suppress_type_name { 6 } else { 8 };
debug!(message = "Assumed ElasticsearchApi based on config setting suppress_type_name.",
debug!(message = "Assumed Elasticsearch API version based on config setting suppress_type_name.",
%assumed_version,
%config.suppress_type_name
);
warn!(message = "Failed to determine Elasticsearch version from `/_cluster/state/version`. Please fix the reported error or set an API version explicitly via `api_version`.",
warn!(message = "Failed to determine Elasticsearch API version. Please fix the reported error or set an API version explicitly via `api_version`.",
%assumed_version,
%error
);
Expand Down Expand Up @@ -277,28 +280,34 @@ async fn get_version(
proxy_config: &ProxyConfig,
) -> crate::Result<usize> {
#[derive(Deserialize)]
struct ClusterState {
version: Option<usize>,
struct Version {
number: Option<String>,
}
#[derive(Deserialize)]
struct ResponsePayload {
version: Option<Version>,
}

let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
let response = get(
base_url,
http_auth,
aws_auth,
region,
request,
client,
"/_cluster/state/version",
)
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
let response = get(base_url, http_auth, aws_auth, region, request, client, "/")
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;

let (_, body) = response.into_parts();
let mut body = body::aggregate(body).await?;
let body = body.copy_to_bytes(body.remaining());
let ClusterState { version } = serde_json::from_slice(&body)?;
version.ok_or_else(||"Unexpected response from Elasticsearch endpoint `/_cluster/state/version`. Missing `version`. Consider setting `api_version` option.".into())
let ResponsePayload { version } = serde_json::from_slice(&body)?;
if let Some(version) = version {
if let Some(number) = version.number {
let v: Vec<&str> = number.split('.').collect();
if !v.is_empty() {
if let Ok(major_version) = v[0].parse::<usize>() {
return Ok(major_version);
}
}
}
}
Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into())
}

async fn get(
Expand Down