Skip to content

Commit 3b11529

Browse files
committed
Passthru cluster stats as well
1 parent 12fc997 commit 3b11529

File tree

1 file changed

+7
-138
lines changed

1 file changed

+7
-138
lines changed

metricbeat/module/elasticsearch/cluster_stats/data_xpack.go

Lines changed: 7 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -29,125 +29,9 @@ import (
2929
"github.com/elastic/beats/metricbeat/module/elasticsearch"
3030

3131
"github.com/elastic/beats/libbeat/common"
32-
s "github.com/elastic/beats/libbeat/common/schema"
33-
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
3432
"github.com/elastic/beats/metricbeat/mb"
3533
)
3634

37-
var (
38-
clusterStatsSchema = s.Schema{
39-
"cluster_uuid": c.Str("cluster_uuid"),
40-
"timestamp": c.Int("timestamp"),
41-
"status": c.Str("status"),
42-
"indices": c.Dict("indices", s.Schema{
43-
"count": c.Int("count"),
44-
"shards": c.Dict("shards", s.Schema{
45-
"total": c.Int("total"),
46-
"primaries": c.Int("primaries"),
47-
"replication": c.Int("replication"),
48-
"index": c.Dict("index", s.Schema{
49-
"shards": c.Dict("shards", s.Schema{
50-
"min": c.Int("min"),
51-
"max": c.Int("max"),
52-
"avg": c.Int("avg"),
53-
}),
54-
"primaries": c.Dict("primaries", s.Schema{
55-
"min": c.Int("min"),
56-
"max": c.Int("max"),
57-
"avg": c.Int("avg"),
58-
}),
59-
"replication": c.Dict("replication", s.Schema{
60-
"min": c.Int("min"),
61-
"max": c.Int("max"),
62-
"avg": c.Int("avg"),
63-
}),
64-
}),
65-
}),
66-
"docs": c.Dict("docs", s.Schema{
67-
"count": c.Int("count"),
68-
"deleted": c.Int("deleted"),
69-
}),
70-
"store": c.Dict("store", s.Schema{
71-
"size_in_bytes": c.Int("size_in_bytes"),
72-
}),
73-
"fielddata": c.Dict("fielddata", s.Schema{
74-
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
75-
"evictions": c.Int("evictions"),
76-
}),
77-
"query_cache": c.Dict("query_cache", s.Schema{
78-
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
79-
"total_count": c.Int("total_count"),
80-
"hit_count": c.Int("hit_count"),
81-
"miss_count": c.Int("miss_count"),
82-
"cache_size": c.Int("cache_size"),
83-
"cache_count": c.Int("cache_count"),
84-
"evictions": c.Int("evictions"),
85-
}),
86-
"completion": c.Dict("completion", s.Schema{
87-
"size_in_bytes": c.Int("size_in_bytes"),
88-
}),
89-
"segments": c.Dict("segments", s.Schema{
90-
"count": c.Int("count"),
91-
"memory_in_bytes": c.Int("memory_in_bytes"),
92-
"terms_memory_in_bytes": c.Int("terms_memory_in_bytes"),
93-
"stored_fields_memory_in_bytes": c.Int("stored_fields_memory_in_bytes"),
94-
"term_vectors_memory_in_bytes": c.Int("term_vectors_memory_in_bytes"),
95-
"norms_memory_in_bytes": c.Int("norms_memory_in_bytes"),
96-
"points_memory_in_bytes": c.Int("points_memory_in_bytes"),
97-
"doc_values_memory_in_bytes": c.Int("doc_values_memory_in_bytes"),
98-
99-
"index_writer_memory_in_bytes": c.Int("index_writer_memory_in_bytes"),
100-
"version_map_memory_in_bytes": c.Int("version_map_memory_in_bytes"),
101-
"fixed_bit_set_memory_in_bytes": c.Int("fixed_bit_set_memory_in_bytes"),
102-
"max_unsafe_auto_id_timestamp": c.Int("max_unsafe_auto_id_timestamp"),
103-
}),
104-
}),
105-
"nodes": c.Dict("nodes", s.Schema{
106-
"count": c.Dict("count", s.Schema{
107-
"total": c.Int("total"),
108-
"data": c.Int("data"),
109-
"coordinating_only": c.Int("coordinating_only"),
110-
"master": c.Int("master"),
111-
"ingest": c.Int("ingest"),
112-
}),
113-
"os": c.Dict("os", s.Schema{
114-
"available_processors": c.Int("available_processors"),
115-
"allocated_processors": c.Int("allocated_processors"),
116-
"mem": c.Dict("mem", s.Schema{
117-
"total_in_bytes": c.Int("total_in_bytes"),
118-
"free_in_bytes": c.Int("free_in_bytes"),
119-
"used_in_bytes": c.Int("used_in_bytes"),
120-
"free_percent": c.Int("free_percent"),
121-
"used_percent": c.Int("used_percent"),
122-
}),
123-
}),
124-
"process": c.Dict("process", s.Schema{
125-
"cpu": c.Dict("cpu", s.Schema{
126-
"percent": c.Int("percent"),
127-
}),
128-
"open_file_descriptors": c.Dict("open_file_descriptors", s.Schema{
129-
"min": c.Int("min"),
130-
"max": c.Int("max"),
131-
"avg": c.Int("avg"),
132-
}),
133-
}),
134-
"jvm": c.Dict("jvm", s.Schema{
135-
"max_uptime_in_millis": c.Int("max_uptime_in_millis"),
136-
"mem": c.Dict("mem", s.Schema{
137-
"heap_used_in_bytes": c.Int("heap_used_in_bytes"),
138-
"heap_max_in_bytes": c.Int("heap_max_in_bytes"),
139-
}),
140-
"threads": c.Int("threads"),
141-
}),
142-
"fs": c.Dict("fs", s.Schema{
143-
"total_in_bytes": c.Int("total_in_bytes"),
144-
"free_in_bytes": c.Int("free_in_bytes"),
145-
"available_in_bytes": c.Int("available_in_bytes"),
146-
}),
147-
}),
148-
}
149-
)
150-
15135
func passthruField(fieldPath string, sourceData, targetData common.MapStr) error {
15236
fieldValue, err := sourceData.GetValue(fieldPath)
15337
if err != nil {
@@ -274,30 +158,15 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
274158
return err
275159
}
276160

277-
clusterStats, err := clusterStatsSchema.Apply(data)
278-
if err != nil {
279-
return err
280-
}
281-
282-
dataMS := common.MapStr(data)
161+
clusterStats := common.MapStr(data)
283162

284-
passthruFields := []string{
285-
"indices.segments.file_sizes", // object with dynamic keys
286-
"nodes.versions", // array of strings
287-
"nodes.os.names", // array of objects
288-
"nodes.jvm.versions", // array of objects
289-
"nodes.plugins", // array of objects
290-
"nodes.network_types", // object with dynamic keys
291-
}
292-
for _, fieldPath := range passthruFields {
293-
if err = passthruField(fieldPath, dataMS, clusterStats); err != nil {
294-
return err
295-
}
163+
value, err := clusterStats.GetValue("cluster_name")
164+
if err != nil {
165+
return elastic.MakeErrorForMissingField("cluster_name", elastic.Elasticsearch)
296166
}
297-
298-
clusterName, ok := data["cluster_name"].(string)
167+
clusterName, ok := value.(string)
299168
if !ok {
300-
return elastic.MakeErrorForMissingField("cluster_name", elastic.Elasticsearch)
169+
return fmt.Errorf("cluster name is not a string")
301170
}
302171

303172
info, err := elasticsearch.GetInfo(m.HTTP, m.HTTP.GetURI())
@@ -315,7 +184,7 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
315184
return err
316185
}
317186

318-
if err = passthruField("status", dataMS, clusterState); err != nil {
187+
if err = passthruField("status", clusterStats, clusterState); err != nil {
319188
return err
320189
}
321190

0 commit comments

Comments
 (0)