Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions x-pack/metricbeat/module/autoops_es/cat_shards/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/elastic/beats/v7/x-pack/metricbeat/module/autoops_es/events"

"golang.org/x/exp/maps"

"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/elastic/beats/v7/metricbeat/mb"
Expand Down Expand Up @@ -80,7 +82,7 @@ func eventsMapping(m *elasticsearch.MetricSet, r mb.ReporterV2, info *utils.Clus

transactionID := utils.NewUUIDV4()

sendNodeShardsEvent(r, info, nodeShardCountToMapArray(nodeShards), transactionID)
sendNodeShardsEvent(r, info, maps.Values(nodeShards), transactionID)

indexMetadata, err := getResolvedIndices(m)

Expand All @@ -95,7 +97,7 @@ func eventsMapping(m *elasticsearch.MetricSet, r mb.ReporterV2, info *utils.Clus
return err
}

func sendNodeShardsEvent(r mb.ReporterV2, info *utils.ClusterInfo, nodeToShards []map[string]any, transactionId string) {
func sendNodeShardsEvent(r mb.ReporterV2, info *utils.ClusterInfo, nodeToShards []NodeShardCount, transactionId string) {
r.Event(events.CreateEvent(info, mapstr.M{"node_shards_count": nodeToShards}, transactionId))
}

Expand All @@ -109,7 +111,7 @@ func sendNodeIndexShardsEvent(r mb.ReporterV2, info *utils.ClusterInfo, nodeInde
for i := 0; i < size; i += nodeIndexShardsPerEvent {
group := nodeIndexShards[i:min(i+nodeIndexShardsPerEvent, size)]

groups = append(groups, mapstr.M{"node_index_shards": convertObjectArrayToMapArray(group)})
groups = append(groups, mapstr.M{"node_index_shards": group})
}

events.CreateAndReportEvents(r, info, groups, transactionId)
Expand Down
45 changes: 22 additions & 23 deletions x-pack/metricbeat/module/autoops_es/cat_shards/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ var (
func TestSendNodeShardsEvent(t *testing.T) {
reporter := &mbtest.CapturingReporterV2{}
info := auto_ops_testing.CreateClusterInfo("8.15.3")
nodeToShards := map[string]NodeShardCount{
"node1": {
nodeToShards := []NodeShardCount{
{
NodeId: "node1",
NodeName: "name1",
Shards: 100,
Expand All @@ -43,7 +43,7 @@ func TestSendNodeShardsEvent(t *testing.T) {
RelocatingPrimaryShards: 1,
RelocatingReplicaShards: 2,
},
"node2": {
{
NodeId: "node2",
NodeName: "name2",
Shards: 99,
Expand All @@ -59,7 +59,7 @@ func TestSendNodeShardsEvent(t *testing.T) {
}
transactionId := "xyz"

sendNodeShardsEvent(reporter, &info, nodeShardCountToMapArray(nodeToShards), transactionId)
sendNodeShardsEvent(reporter, &info, nodeToShards, transactionId)

require.Equal(t, 0, len(reporter.GetErrors()))
require.Equal(t, 1, len(reporter.GetEvents()))
Expand All @@ -68,7 +68,7 @@ func TestSendNodeShardsEvent(t *testing.T) {

auto_ops_testing.CheckEventWithTransactionId(t, event, info, transactionId)

require.ElementsMatch(t, maps.Values(nodeToShards), mapArrayToType[NodeShardCount](auto_ops_testing.GetObjectValue(event.MetricSetFields, "node_shards_count").([]map[string]any)))
require.ElementsMatch(t, nodeToShards, auto_ops_testing.GetObjectValue(event.MetricSetFields, "node_shards_count"))
}

func TestSendNodeIndexShardsEventInBatch(t *testing.T) {
Expand All @@ -86,7 +86,7 @@ func TestSendNodeIndexShardsEventInBatch(t *testing.T) {

auto_ops_testing.CheckEventWithTransactionId(t, event, info, transactionId)

require.ElementsMatch(t, convertObjectArrayToMapArray(nodeIndexShards), auto_ops_testing.GetObjectValue(event.MetricSetFields, "node_index_shards"))
require.ElementsMatch(t, nodeIndexShards, auto_ops_testing.GetObjectValue(event.MetricSetFields, "node_index_shards"))
}

func TestSendNodeIndexShardsEvent(t *testing.T) {
Expand All @@ -106,16 +106,16 @@ func TestSendNodeIndexShardsEvent(t *testing.T) {

auto_ops_testing.CheckAllEventsUseSameTransactionId(t, events)

eventData := []map[string]any{}
eventData := []NodeIndexShards{}

for _, event := range events {
auto_ops_testing.CheckEventWithTransactionId(t, event, info, transactionId)

array := auto_ops_testing.GetObjectValue(event.MetricSetFields, "node_index_shards")
eventData = append(eventData, array.([]map[string]any)...)
eventData = append(eventData, array.([]NodeIndexShards)...)
}

require.ElementsMatch(t, convertObjectArrayToMapArray(nodeIndexShards), eventData)
require.ElementsMatch(t, nodeIndexShards, eventData)
}

func expectValidParsedData(t *testing.T, data metricset.FetcherData[[]JSONShard]) {
Expand All @@ -131,12 +131,12 @@ func expectValidParsedData(t *testing.T, data metricset.FetcherData[[]JSONShard]
nodeShardsCountEvents := auto_ops_testing.GetEventsWithField(t, events, "node_shards_count")

require.Equal(t, 1, len(nodeShardsCountEvents))
require.Equal(t, 2, len(auto_ops_testing.GetObjectValue(nodeShardsCountEvents[0].MetricSetFields, "node_shards_count").([]map[string]any)))
require.Equal(t, 2, len(auto_ops_testing.GetObjectValue(nodeShardsCountEvents[0].MetricSetFields, "node_shards_count").([]NodeShardCount)))

nodeIndexShardsEvents := auto_ops_testing.GetEventsWithField(t, events, "node_index_shards")

require.Equal(t, 1, len(nodeIndexShardsEvents))
require.LessOrEqual(t, 2, len(auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]map[string]any)))
require.LessOrEqual(t, 2, len(auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]NodeIndexShards)))
}

func expectValidParsedDetailedShards(t *testing.T, data metricset.FetcherData[[]JSONShard]) {
Expand All @@ -147,7 +147,7 @@ func expectValidParsedDetailedShards(t *testing.T, data metricset.FetcherData[[]
require.Equal(t, 2, len(events))

nodeShardCountsEvents := auto_ops_testing.GetEventsWithField(t, events, "node_shards_count")
nodeShardCounts := mapArrayToType[NodeShardCount](auto_ops_testing.GetObjectValue(nodeShardCountsEvents[0].MetricSetFields, "node_shards_count").([]map[string]any))
nodeShardCounts := auto_ops_testing.GetObjectValue(nodeShardCountsEvents[0].MetricSetFields, "node_shards_count").([]NodeShardCount)

require.Equal(t, 1, len(nodeShardCountsEvents))
require.Equal(t, 2, len(nodeShardCounts))
Expand All @@ -167,18 +167,17 @@ func expectValidParsedDetailedShards(t *testing.T, data metricset.FetcherData[[]
require.EqualValues(t, 0, node2.RelocatingReplicaShards)

nodeIndexShardsEvents := auto_ops_testing.GetEventsWithField(t, events, "node_index_shards")
nodeIndexShards := mapArrayToType[NodeIndexShards](auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]map[string]any))
nodeIndexShards := auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]NodeIndexShards)

require.Equal(t, 1, len(nodeIndexShardsEvents))
require.LessOrEqual(t, 14, len(nodeIndexShards))

myIndexNode2 := nodeIndexShards[slices.IndexFunc(nodeIndexShards, func(node NodeIndexShards) bool { return node.IndexNode == "my-index-node_id-node2" })]

switch data.Version {
case "7.17.0":
if data.Version == "7.17.0" {
require.Equal(t, 14, len(nodeIndexShards))
require.EqualValues(t, 14, myIndexNode2.TotalFractions)
case "8.15.3":
} else if data.Version == "8.15.3" {
require.Equal(t, 35, len(nodeIndexShards))
require.EqualValues(t, 35, myIndexNode2.TotalFractions)
}
Expand Down Expand Up @@ -236,7 +235,7 @@ func expectValidParsedDetailedShardsWithCache(t *testing.T, data metricset.Fetch
expectValidParsedDetailedShards(t, data)

nodeIndexShardsEvents := auto_ops_testing.GetEventsWithField(t, data.Reporter.GetEvents(), "node_index_shards")
nodeIndexShards := mapArrayToType[NodeIndexShards](auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]map[string]any))
nodeIndexShards := auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]NodeIndexShards)
myIndexNode2 := nodeIndexShards[slices.IndexFunc(nodeIndexShards, func(node NodeIndexShards) bool { return node.IndexNode == "my-index-node_id-node2" })]

require.NotNil(t, myIndexNode2.TimestampDiff)
Expand Down Expand Up @@ -264,14 +263,14 @@ func expectValidParsedWithoutResolvedIndexDataWithoutElasticSearchError(t *testi
nodeShardsCountEvents := auto_ops_testing.GetEventsWithField(t, events, "node_shards_count")

require.Equal(t, 1, len(nodeShardsCountEvents))
require.Equal(t, 2, len(auto_ops_testing.GetObjectValue(nodeShardsCountEvents[0].MetricSetFields, "node_shards_count").([]map[string]any)))
require.Equal(t, 2, len(auto_ops_testing.GetObjectValue(nodeShardsCountEvents[0].MetricSetFields, "node_shards_count").([]NodeShardCount)))

nodeIndexShardsEvents := auto_ops_testing.GetEventsWithField(t, events, "node_index_shards")

require.Equal(t, 1, len(nodeIndexShardsEvents))
require.LessOrEqual(t, 2, len(auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]map[string]any)))
require.LessOrEqual(t, 2, len(auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]NodeIndexShards)))

nodeIndexShards := mapArrayToType[NodeIndexShards](auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]map[string]any))
nodeIndexShards := auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]NodeIndexShards)
myIndexNode2 := nodeIndexShards[slices.IndexFunc(nodeIndexShards, func(node NodeIndexShards) bool { return node.IndexNode == "my-index-node_id-node2" })]

require.Nil(t, myIndexNode2.IndexType)
Expand All @@ -294,14 +293,14 @@ func expectValidParsedWithoutResolvedIndexDataWithElasticSearchError(t *testing.
nodeShardsCountEvents := auto_ops_testing.GetEventsWithField(t, events, "node_shards_count")

require.Equal(t, 1, len(nodeShardsCountEvents))
require.Equal(t, 2, len(auto_ops_testing.GetObjectValue(nodeShardsCountEvents[0].MetricSetFields, "node_shards_count").([]map[string]any)))
require.Equal(t, 2, len(auto_ops_testing.GetObjectValue(nodeShardsCountEvents[0].MetricSetFields, "node_shards_count").([]NodeShardCount)))

nodeIndexShardsEvents := auto_ops_testing.GetEventsWithField(t, events, "node_index_shards")

require.Equal(t, 1, len(nodeIndexShardsEvents))
require.LessOrEqual(t, 2, len(auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]map[string]any)))
require.LessOrEqual(t, 2, len(auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]NodeIndexShards)))

nodeIndexShards := mapArrayToType[NodeIndexShards](auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]map[string]any))
nodeIndexShards := auto_ops_testing.GetObjectValue(nodeIndexShardsEvents[0].MetricSetFields, "node_index_shards").([]NodeIndexShards)
myIndexNode2 := nodeIndexShards[slices.IndexFunc(nodeIndexShards, func(node NodeIndexShards) bool { return node.IndexNode == "my-index-node_id-node2" })]

require.Nil(t, myIndexNode2.IndexType)
Expand Down
22 changes: 0 additions & 22 deletions x-pack/metricbeat/module/autoops_es/cat_shards/index_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,28 +210,6 @@ func appendNodeShards(nodeShards map[string]NodeShardCount, shard *Shard) {
nodeShards[shard.node_id] = node
}

func nodeShardCountToMapArray(nodeShards map[string]NodeShardCount) []map[string]any {
mapArray := make([]map[string]any, 0, len(nodeShards))

for _, nodeShard := range nodeShards {
mapArray = append(mapArray, map[string]any{
"node_id": nodeShard.NodeId,
"node_name": nodeShard.NodeName,
"shards_count": nodeShard.Shards,
"primary_shards": nodeShard.PrimaryShards,
"replica_shards": nodeShard.ReplicaShards,
"initializing_shards": nodeShard.InitializingShards,
"initializing_primary_shards": nodeShard.InitializingPrimaryShards,
"initializing_replica_shards": nodeShard.InitializingReplicaShards,
"relocating_shards": nodeShard.RelocatingShards,
"relocating_primary_shards": nodeShard.RelocatingPrimaryShards,
"relocating_replica_shards": nodeShard.RelocatingReplicaShards,
})
}

return mapArray
}

func indexShardsToNodeIndexShards(nodeIndexShardsMap map[string]NodeIndexShards, index string, shards []Shard) {
status := GREEN
indexStatus := &status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,86 +615,6 @@ func TestIndexShardsToNodeIndexShardsGreenIndex(t *testing.T) {
require.Equal(t, STARTED, assignedShard4.State)
}

func TestNodeShardCountToMapArray(t *testing.T) {
// Create a map with multiple node shard counts
nodeShards := map[string]NodeShardCount{
"node1": {
NodeId: "node1",
NodeName: "node-1",
Shards: 10,
PrimaryShards: 5,
ReplicaShards: 5,
InitializingShards: 2,
InitializingPrimaryShards: 1,
InitializingReplicaShards: 1,
RelocatingShards: 3,
RelocatingPrimaryShards: 1,
RelocatingReplicaShards: 2,
},
"node2": {
NodeId: "node2",
NodeName: "node-2",
Shards: 8,
PrimaryShards: 4,
ReplicaShards: 4,
InitializingShards: 1,
InitializingPrimaryShards: 1,
InitializingReplicaShards: 0,
RelocatingShards: 0,
RelocatingPrimaryShards: 0,
RelocatingReplicaShards: 0,
},
}

// Call the function
mapArray := nodeShardCountToMapArray(nodeShards)

// Verify the results
require.Equal(t, 2, len(mapArray))

// Check that we have entries for both nodes
foundNode1 := false
foundNode2 := false

for _, nodeMap := range mapArray {
if nodeId, ok := nodeMap["node_id"].(string); ok {
switch nodeId {
case "node1":
foundNode1 = true
require.Equal(t, "node-1", nodeMap["node_name"])
require.EqualValues(t, 10, nodeMap["shards_count"])
require.EqualValues(t, 5, nodeMap["primary_shards"])
require.EqualValues(t, 5, nodeMap["replica_shards"])
require.EqualValues(t, 2, nodeMap["initializing_shards"])
require.EqualValues(t, 1, nodeMap["initializing_primary_shards"])
require.EqualValues(t, 1, nodeMap["initializing_replica_shards"])
require.EqualValues(t, 3, nodeMap["relocating_shards"])
require.EqualValues(t, 1, nodeMap["relocating_primary_shards"])
require.EqualValues(t, 2, nodeMap["relocating_replica_shards"])
case "node2":
foundNode2 = true
require.Equal(t, "node-2", nodeMap["node_name"])
require.EqualValues(t, 8, nodeMap["shards_count"])
require.EqualValues(t, 4, nodeMap["primary_shards"])
require.EqualValues(t, 4, nodeMap["replica_shards"])
require.EqualValues(t, 1, nodeMap["initializing_shards"])
require.EqualValues(t, 1, nodeMap["initializing_primary_shards"])
require.EqualValues(t, 0, nodeMap["initializing_replica_shards"])
require.EqualValues(t, 0, nodeMap["relocating_shards"])
require.EqualValues(t, 0, nodeMap["relocating_primary_shards"])
require.EqualValues(t, 0, nodeMap["relocating_replica_shards"])
default:
require.Fail(t, "Unexpected node in map array", nodeId)
}
} else {
require.Fail(t, "Unexpected node in map array", nodeId)
}
}

require.True(t, foundNode1, "Node1 should be in the map array")
require.True(t, foundNode2, "Node2 should be in the map array")
}

func TestIndexShardsToNodeIndexShardsRelocating(t *testing.T) {
docs := int64(100001)
segments := int64(10)
Expand Down
27 changes: 0 additions & 27 deletions x-pack/metricbeat/module/autoops_es/cat_shards/struct_to_map.go

This file was deleted.

Loading