Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add GCP organization and project details to ECS cloud fields. {pull}40461[40461]
- Add support for specifying a custom endpoint for GCP service clients. {issue}40848[40848] {pull}40918[40918]
- Fix incorrect handling of types in SQL module. {issue}40090[40090] {pull}41607[41607]
- Added `creation_date` and `tier_preference` fields to the Elasticsearch module. {pull}41652[41652]

*Osquerybeat*

Expand Down
14 changes: 14 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32130,6 +32130,20 @@ type: keyword

--

*`elasticsearch.index.tier_preference`*::
+
--
type: keyword

--

*`elasticsearch.index.creation_date`*::
+
--
type: date

--

*`elasticsearch.index.name`*::
+
--
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/cluster_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func eventMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.I
}

clusterStateMetrics := []string{"version", "master_node", "nodes", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics)
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics, []string{})
if err != nil {
return fmt.Errorf("failed to get cluster state from Elasticsearch: %w", err)
}
Expand Down
12 changes: 10 additions & 2 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,21 @@ func GetLicense(http *helper.HTTP, resetURI string) (*License, error) {
}

// GetClusterState returns cluster state information.
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (mapstr.M, error) {
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string, filterPaths []string) (mapstr.M, error) {
queryParams := []string{"local=true"}
clusterStateURI := "_cluster/state"
if len(metrics) > 0 {
clusterStateURI += "/" + strings.Join(metrics, ",")
}

content, err := fetchPath(http, resetURI, clusterStateURI, "local=true")
if len(filterPaths) > 0 {
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
queryParams = append(queryParams, filterPathQueryParam)
}

queryString := strings.Join(queryParams, "&")

content, err := fetchPath(http, resetURI, clusterStateURI, queryString)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/fields.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions metricbeat/module/elasticsearch/index/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@
}
},
"status": "green",
"tier_preference": "data_content",
"creation_date": 1731657995821,
"hidden": true,
"shards": {
"total": 1,
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/module/elasticsearch/index/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
type: keyword
- name: status
type: keyword
- name: tier_preference
type: keyword
- name: creation_date
type: date
- name: name
type: keyword
description: >
Expand Down
94 changes: 85 additions & 9 deletions metricbeat/module/elasticsearch/index/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package index
import (
"encoding/json"
"fmt"
"strconv"

"github.com/joeshaw/multierror"

Expand All @@ -40,9 +41,11 @@ type Index struct {
Primaries primaries `json:"primaries"`
Total total `json:"total"`

Index string `json:"index"`
Status string `json:"status"`
Shards shardStats `json:"shards"`
Index string `json:"index"`
Status string `json:"status"`
TierPreference string `json:"tier_preference"`
CreationDate int `json:"creation_date"`
Shards shardStats `json:"shards"`
}

type primaries struct {
Expand Down Expand Up @@ -179,8 +182,18 @@ type bulkStats struct {
}

func eventsMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.Info, content []byte, isXpack bool) error {
clusterStateMetrics := []string{"routing_table"}
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics)
clusterStateMetrics := []string{"routing_table", "metadata"}
filterPaths := []string{
"routing_table",
"metadata.indices.**.settings.index.**._tier_preference",
"metadata.indices.**.settings.index.creation_date",
}
clusterState, err := elasticsearch.GetClusterState(
httpClient,
httpClient.GetURI(),
clusterStateMetrics,
filterPaths,
)
if err != nil {
return fmt.Errorf("failure retrieving cluster state from Elasticsearch: %w", err)
}
Expand Down Expand Up @@ -249,6 +262,23 @@ func addClusterStateFields(idx *Index, clusterState mapstr.M) error {
return fmt.Errorf("failed to get index routing table from cluster state: %w", err)
}

indexMetadata, err := getClusterStateMetricForIndex(clusterState, idx.Index, "metadata")
if err != nil {
return fmt.Errorf("failed to get index metadata from cluster state: %w", err)
}

indexTierPreference, err := getIndexTierPreferenceFromMetadata(indexMetadata)
if err == nil {
// Tier preference is optional, so only set it if it exists
idx.TierPreference = indexTierPreference
}

indexCreationDate, err := getIndexCreationDateFromMetadata(indexMetadata)
if err != nil {
return fmt.Errorf("failed to get index creation date from metadata: %w", err)
}
idx.CreationDate = indexCreationDate

shards, err := getShardsFromRoutingTable(indexRoutingTable)
if err != nil {
return fmt.Errorf("failed to get shards from routing table: %w", err)
Expand Down Expand Up @@ -285,6 +315,38 @@ func getClusterStateMetricForIndex(clusterState mapstr.M, index, metricKey strin
return mapstr.M(metric), nil
}

func getIndexTierPreferenceFromMetadata(indexMetadata mapstr.M) (string, error) {
fieldKey := "settings.index.routing.allocation.include._tier_preference"
value, err := indexMetadata.GetValue(fieldKey)
if err != nil {
fieldKey = "settings.index.routing.allocation.require._tier_preference"
value, err = indexMetadata.GetValue(fieldKey)
if err != nil {
return "", fmt.Errorf("'"+fieldKey+"': %w", err)
}
}

tierPreference, ok := value.(string)
if !ok {
return "", elastic.MakeErrorForMissingField(fieldKey, elastic.Elasticsearch)
}
return tierPreference, nil
}

func getIndexCreationDateFromMetadata(indexMetadata mapstr.M) (int, error) {
fieldKey := "settings.index.creation_date"
value, err := indexMetadata.GetValue(fieldKey)
if err != nil {
return 0, fmt.Errorf("'"+fieldKey+"': %w", err)
}

creationDate, err := strconv.Atoi(value.(string))
if err != nil {
return 0, fmt.Errorf("failed to parse index creation date: %w", err)
}
return creationDate, nil
}

func getIndexStatus(shards map[string]interface{}) (string, error) {
if len(shards) == 0 {
// No shards, index is red
Expand All @@ -308,8 +370,15 @@ func getIndexStatus(shards map[string]interface{}) (string, error) {

shard := mapstr.M(s)

isPrimary := shard["primary"].(bool)
state := shard["state"].(string)
isPrimary, ok := shard["primary"].(bool)
if !ok {
return "", fmt.Errorf("%v.shards[%v].primary is not a boolean", indexName, shardIdx)
}

state, ok := shard["state"].(string)
if !ok {
return "", fmt.Errorf("%v.shards[%v].state is not a string", indexName, shardIdx)
}

if isPrimary {
areAllPrimariesStarted = areAllPrimariesStarted && (state == "STARTED")
Expand Down Expand Up @@ -357,8 +426,15 @@ func getIndexShardStats(shards mapstr.M) (*shardStats, error) {

shard := mapstr.M(s)

isPrimary := shard["primary"].(bool)
state := shard["state"].(string)
isPrimary, ok := shard["primary"].(bool)
if !ok {
return nil, fmt.Errorf("%v.shards[%v].primary is not a boolean", indexName, shardIdx)
}

state, ok := shard["state"].(string)
if !ok {
return nil, fmt.Errorf("%v.shards[%v].state is not a string", indexName, shardIdx)
}

if isPrimary {
primaries++
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/index/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func createEsMuxer(esVersion, license string, ccrEnabled bool) *http.ServeMux {
w.Write(input)
}))

mux.Handle("/_cluster/state/metadata,routing_table", http.HandlerFunc(
mux.Handle("/_cluster/state/routing_table,metadata", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
input, _ := ioutil.ReadFile("./_meta/test/cluster_state.710.json")
w.Write(input)
Expand Down