From d693032c2fc3e6590bb6167c642a30d32b2c91d2 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Mon, 21 Dec 2020 14:25:04 +0300 Subject: [PATCH] Allow add modifiers on service level --- emitter.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/emitter.go b/emitter.go index 574aa0de..bc666611 100644 --- a/emitter.go +++ b/emitter.go @@ -79,17 +79,22 @@ func (e *Emitter) Close() { // CopyMulty copies from 1 reader to multiple writers func (e *Emitter) CopyMulty(src PluginReader, writers ...PluginWriter) error { wIndex := 0 - modifier := NewHTTPModifier(&Settings.ModifierConfig) + globalModifier := NewHTTPModifier(&Settings.ModifierConfig) filteredRequests := make(map[string]int64) filteredRequestsLastCleanTime := time.Now().UnixNano() filteredCount := 0 // Optimisatio to not check service ID on each write var outServices [][]byte + serviceModifiers := make(map[string]*HTTPModifier) for _, p := range writers { outServices = append(outServices, []byte(reflect.ValueOf(p).Elem().FieldByName("Service").String())) } + for s, cfg := range Settings.Services { + serviceModifiers[s] = NewHTTPModifier(&cfg.ModifierConfig) + } + for { msg, err := src.PluginRead() if err != nil { @@ -112,16 +117,29 @@ func (e *Emitter) CopyMulty(src PluginReader, writers ...PluginWriter) error { if Settings.Verbose >= 3 { Debug(3, "[EMITTER] input: ", byteutils.SliceToString(msg.Meta[:len(msg.Meta)-1]), " from: ", src) } - if modifier != nil { + if globalModifier != nil { Debug(3, "[EMITTER] modifier:", requestID, "from:", src) if isRequestPayload(msg.Meta) { - msg.Data = modifier.Rewrite(msg.Data) + msg.Data = globalModifier.Rewrite(msg.Data) // If modifier tells to skip request if len(msg.Data) == 0 { filteredRequests[requestID] = time.Now().UnixNano() filteredCount++ continue } + + if len(meta) > 4 && len(meta[4]) > 0 { + if m, found := serviceModifiers[byteutils.SliceToString(meta[4])]; found { + msg.Data = m.Rewrite(msg.Data) + // If modifier tells to skip request + if len(msg.Data) == 0 { + filteredRequests[requestID] = time.Now().UnixNano() + filteredCount++ + continue + } + } + } + Debug(3, "[EMITTER] Rewritten input:", requestID, "from:", src) } else {