-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathopmon_plugin.go
119 lines (105 loc) · 4.55 KB
/
opmon_plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package main
import (
"github.com/globalsign/mgo/bson"
"github.com/olivere/elastic"
"github.com/rwynn/monstache/monstachemap"
"fmt"
"regexp"
"strings"
"time"
)
const debug = false
const indexPrefix = "opmon-"
// A mapper to add opmon values needed for Kibana
func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error) {
var instance string
re := regexp.MustCompile("^query_db_(.+)$")
match := re.FindStringSubmatch(input.Database)
if len(match) == 2 {
instance = strings.ToLower(match[1])
} else {
if debug {
fmt.Println("plugin#Map: not processing documents from database with unknown pattern", input.Database)
}
output = &monstachemap.MapperPluginOutput{Drop: true}
return
}
doc := input.Document
//fmt.Println("%#v", doc)
data, ok := doc["client"].(map[string]interface {})
if !ok {
data = doc["producer"].(map[string]interface {})
}
// Cast to string (empty if nil)
clientXRoadInstance, _ := data["clientXRoadInstance"].(string)
clientMemberClass, _ := data["clientMemberClass"].(string)
clientMemberCode, _ := data["clientMemberCode"].(string)
clientSubsystemCode, _ := data["clientSubsystemCode"].(string)
serviceXRoadInstance, _ := data["serviceXRoadInstance"].(string)
serviceMemberClass, _ := data["serviceMemberClass"].(string)
serviceMemberCode, _ := data["serviceMemberCode"].(string)
serviceSubsystemCode, _ := data["serviceSubsystemCode"].(string)
serviceCode, _ := data["serviceCode"].(string)
serviceVersion, _ := data["serviceVersion"].(string)
doc["requestInTs"] = data["requestInTs"]
doc["consumerMember"] = clientXRoadInstance + "/" + clientMemberClass + "/" + clientMemberCode
doc["consumerSystem"] = clientXRoadInstance + "/" + clientMemberClass + "/" + clientMemberCode + "/" + clientSubsystemCode
doc["producerMember"] = serviceXRoadInstance + "/" + serviceMemberClass + "/" + serviceMemberCode
doc["producerSystem"] = serviceXRoadInstance + "/" + serviceMemberClass + "/" + serviceMemberCode + "/" + serviceSubsystemCode
doc["service"] = serviceXRoadInstance + "/" + serviceMemberClass + "/" + serviceMemberCode + "/" + serviceSubsystemCode + "/" + serviceCode + "/" + serviceVersion
doc["serviceCode"] = serviceCode
doc["succeeded"] = data["succeeded"]
doc["messageUserId"] = data["messageUserId"]
doc["clientSecurityServerAddress"] = data["clientSecurityServerAddress"]
doc["serviceSecurityServerAddress"] = data["serviceSecurityServerAddress"]
doc["valid"] = true
if data["serviceXRoadInstance"] == nil {
doc["valid"] = false
}
t := time.Unix(0, data["requestInTs"].(int64) * 1000000).UTC()
timeSuffix := t.Format("2006-01")
output = &monstachemap.MapperPluginOutput{Document: doc, Index: indexPrefix + instance + "-" + timeSuffix}
return
}
// Searching for document migrations between indexes and deleting old version
func Process(input *monstachemap.ProcessPluginInput) (err error) {
// Processing only update operations
if input.Operation == "u" {
doc := input.Document
client, okC := doc["client"].(map[string]interface {});
producer, okP := doc["producer"].(map[string]interface {})
if !okC || !okP {
return
}
tC := time.Unix(0, client["requestInTs"].(int64) * 1000000).UTC()
timeSuffixClient := tC.Format("2006-01")
tP := time.Unix(0, producer["requestInTs"].(int64) * 1000000).UTC()
timeSuffixProducer := tP.Format("2006-01")
if timeSuffixClient == timeSuffixProducer {
// No migration of document needed
return
}
var instance string
re := regexp.MustCompile("^query_db_(.+)$")
match := re.FindStringSubmatch(input.Database)
if len(match) == 2 {
instance = strings.ToLower(match[1])
} else {
if debug {
fmt.Println("plugin#Process: not processing documents from database with unknown pattern", input.Database)
}
return
}
id := doc["_id"].(bson.ObjectId).Hex()
if debug {
fmt.Println("plugin#Process: Deleting _id=" + id + " from index=" + indexPrefix + instance + "-" + timeSuffixProducer)
}
bulk := input.ElasticBulkProcessor
req := elastic.NewBulkDeleteRequest()
req.Id(id)
req.Index(indexPrefix + instance + "-" + timeSuffixProducer)
req.Type("_doc")
bulk.Add(req)
}
return
}