Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added Cisco Meraki module {pull}40836[40836]
- Added Palo Alto Networks module {pull}40686[40686]
- Restore docker.network.in.* and docker.network.out.* fields in docker module {pull}40968[40968]
- Add `id` field to all the vSphere metricsets. {pull}41097[41097]
- Bump aerospike-client-go to version v7.7.1 and add support for basic auth in Aerospike module {pull}41233[41233]
- Only watch metadata for ReplicaSets in metricbeat k8s module {pull}41289[41289]
- Add support for region/zone for Vertex AI service in GCP module {pull}41551[41551]
- Add support for location label as an optional configuration parameter in GCP metrics metricset. {issue}41550[41550] {pull}41626[41626]
- Added `tier_preference`, `creation_date` and `version` fields to the `elasticsearch.index` metricset. {pull}41944[41944]

*Metricbeat*

Expand Down
21 changes: 21 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32130,6 +32130,27 @@ type: keyword

--

*`elasticsearch.index.tier_preference`*::
+
--
type: keyword

--

*`elasticsearch.index.creation_date`*::
+
--
type: date

--

*`elasticsearch.index.version`*::
+
--
type: keyword

--

*`elasticsearch.index.name`*::
+
--
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/cluster_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func eventMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.I
}

clusterStateMetrics := []string{"version", "master_node", "nodes", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics)
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics, []string{})
if err != nil {
return fmt.Errorf("failed to get cluster state from Elasticsearch: %w", err)
}
Expand Down
34 changes: 32 additions & 2 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,21 @@ func GetLicense(http *helper.HTTP, resetURI string) (*License, error) {
}

// GetClusterState returns cluster state information.
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (mapstr.M, error) {
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string, filterPaths []string) (mapstr.M, error) {
queryParams := []string{"local=true"}
clusterStateURI := "_cluster/state"
if len(metrics) > 0 {
clusterStateURI += "/" + strings.Join(metrics, ",")
}

content, err := fetchPath(http, resetURI, clusterStateURI, "local=true")
if len(filterPaths) > 0 {
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
queryParams = append(queryParams, filterPathQueryParam)
}

queryString := strings.Join(queryParams, "&")

content, err := fetchPath(http, resetURI, clusterStateURI, queryString)
if err != nil {
return nil, err
}
Expand All @@ -304,6 +312,28 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (maps
return clusterState, err
}

func GetIndexSettings(http *helper.HTTP, resetURI string, indexPattern string, filterPaths []string) (mapstr.M, error) {

queryParams := []string{"local=true", "expand_wildcards=hidden,all"}
indicesSettingsURI := indexPattern + "/_settings"

if len(filterPaths) > 0 {
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
queryParams = append(queryParams, filterPathQueryParam)
}

queryString := strings.Join(queryParams, "&")

content, err := fetchPath(http, resetURI, indicesSettingsURI, queryString)
if err != nil {
return nil, err
}

var indicesSettings map[string]interface{}
err = json.Unmarshal(content, &indicesSettings)
return indicesSettings, err
}

// GetClusterSettingsWithDefaults returns cluster settings.
func GetClusterSettingsWithDefaults(http *helper.HTTP, resetURI string, filterPaths []string) (mapstr.M, error) {
return GetClusterSettings(http, resetURI, true, filterPaths)
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/fields.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions metricbeat/module/elasticsearch/index/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@
}
},
"status": "green",
"tier_preference": "data_content",
"creation_date": 1731657995821,
"version": "8505000",
"hidden": true,
"shards": {
"total": 1,
Expand Down
6 changes: 6 additions & 0 deletions metricbeat/module/elasticsearch/index/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
type: keyword
- name: status
type: keyword
- name: tier_preference
type: keyword
- name: creation_date
type: date
- name: version
type: keyword
- name: name
type: keyword
description: >
Expand Down
105 changes: 97 additions & 8 deletions metricbeat/module/elasticsearch/index/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package index
import (
"encoding/json"
"fmt"
"strconv"

"github.com/joeshaw/multierror"

Expand All @@ -40,9 +41,12 @@ type Index struct {
Primaries primaries `json:"primaries"`
Total total `json:"total"`

Index string `json:"index"`
Status string `json:"status"`
Shards shardStats `json:"shards"`
Index string `json:"index"`
Status string `json:"status"`
TierPreference string `json:"tier_preference"`
CreationDate int `json:"creation_date"`
Version string `json:"version"`
Shards shardStats `json:"shards"`
}

type primaries struct {
Expand Down Expand Up @@ -180,11 +184,19 @@ type bulkStats struct {

func eventsMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.Info, content []byte, isXpack bool) error {
clusterStateMetrics := []string{"routing_table"}
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics)
clusterStateFilterPaths := []string{"routing_table"}
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics, clusterStateFilterPaths)
if err != nil {
return fmt.Errorf("failure retrieving cluster state from Elasticsearch: %w", err)
}

indicesSettingsPattern := "*,.*"
indicesSettingsFilterPaths := []string{"*.settings.index.creation_date", "*.settings.index.**._tier_preference", "*.settings.index.version.created"}
indicesSettings, err := elasticsearch.GetIndexSettings(httpClient, httpClient.GetURI(), indicesSettingsPattern, indicesSettingsFilterPaths)
if err != nil {
return fmt.Errorf("failure retrieving index settings from Elasticsearch: %w", err)
}

var indicesStats stats
if err := parseAPIResponse(content, &indicesStats); err != nil {
return fmt.Errorf("failure parsing Indices Stats Elasticsearch API response: %w", err)
Expand All @@ -204,6 +216,12 @@ func eventsMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.
continue
}

err = addIndexSettings(&idx, indicesSettings)
if err != nil {
errs = append(errs, fmt.Errorf("failure adding index settings: %w", err))
continue
}

event.ModuleFields.Put("cluster.id", info.ClusterID)
event.ModuleFields.Put("cluster.name", info.ClusterName)

Expand Down Expand Up @@ -271,6 +289,63 @@ func addClusterStateFields(idx *Index, clusterState mapstr.M) error {
return nil
}

func addIndexSettings(idx *Index, indicesSettings mapstr.M) error {

// Recover the index settings for our specific index
indexSettingsValue, err := indicesSettings.GetValue(idx.Index)
if err != nil {
return fmt.Errorf("failed to get index settings for index %s: %w", idx.Index, err)
}

indexSettings, ok := indexSettingsValue.(map[string]interface{})
if !ok {
return fmt.Errorf("index settings is not a map for index: %s", idx.Index)
}

indexCreationDate, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.creation_date")
if err != nil {
return fmt.Errorf("failed to get index creation date: %w", err)
}

idx.CreationDate, err = strconv.Atoi(indexCreationDate)
if err != nil {
return fmt.Errorf("failed to convert index creation date to int: %w", err)
}

indexTierPreference, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.routing.allocation.require._tier_preference")
if err != nil {
indexTierPreference, err = getIndexSettingForIndex(indexSettings, idx.Index, "index.routing.allocation.include._tier_preference")
if err != nil {
return fmt.Errorf("failed to get index tier preference: %w", err)
}
}

idx.TierPreference = indexTierPreference

indexVersion, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.version.created")
if err != nil {
return fmt.Errorf("failed to get index version: %w", err)
}

idx.Version = indexVersion

return nil
}

func getIndexSettingForIndex(indexSettings mapstr.M, index, settingKey string) (string, error) {
fieldKey := "settings." + settingKey
value, err := indexSettings.GetValue(fieldKey)
if err != nil {
return "", fmt.Errorf("'"+fieldKey+"': %w", err)
}

setting, ok := value.(string)
if !ok {
return "", elastic.MakeErrorForMissingField(fieldKey, elastic.Elasticsearch)
}
return setting, nil
}

func getClusterStateMetricForIndex(clusterState mapstr.M, index, metricKey string) (mapstr.M, error) {
fieldKey := metricKey + ".indices." + index
value, err := clusterState.GetValue(fieldKey)
Expand Down Expand Up @@ -308,8 +383,15 @@ func getIndexStatus(shards map[string]interface{}) (string, error) {

shard := mapstr.M(s)

isPrimary := shard["primary"].(bool)
state := shard["state"].(string)
isPrimary, ok := shard["primary"].(bool)
if !ok {
return "", fmt.Errorf("%v.shards[%v].primary is not a boolean", indexName, shardIdx)
}

state, ok := shard["state"].(string)
if !ok {
return "", fmt.Errorf("%v.shards[%v].state is not a string", indexName, shardIdx)
}

if isPrimary {
areAllPrimariesStarted = areAllPrimariesStarted && (state == "STARTED")
Expand Down Expand Up @@ -357,8 +439,15 @@ func getIndexShardStats(shards mapstr.M) (*shardStats, error) {

shard := mapstr.M(s)

isPrimary := shard["primary"].(bool)
state := shard["state"].(string)
isPrimary, ok := shard["primary"].(bool)
if !ok {
return nil, fmt.Errorf("%v.shards[%v].primary is not a boolean", indexName, shardIdx)
}

state, ok := shard["state"].(string)
if !ok {
return nil, fmt.Errorf("%v.shards[%v].state is not a string", indexName, shardIdx)
}

if isPrimary {
primaries++
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_index_management(self):
assert len(es.cat.templates(name='metricbeat-*', h='name')) > 0

@unittest.skipUnless(INTEGRATION_TESTS, "integration test")
@pytest.mark.timeout(8*60, func_only=True)
@pytest.mark.timeout(8 * 60, func_only=True)
def test_dashboards(self):
"""
Test that the dashboards can be loaded with `setup --dashboards`
Expand Down