Skip to content

Commit

Permalink
Allow add modifiers on service level
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed Dec 22, 2020
1 parent f287efd commit d693032
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,22 @@ func (e *Emitter) Close() {
// CopyMulty copies from 1 reader to multiple writers
func (e *Emitter) CopyMulty(src PluginReader, writers ...PluginWriter) error {
wIndex := 0
modifier := NewHTTPModifier(&Settings.ModifierConfig)
globalModifier := NewHTTPModifier(&Settings.ModifierConfig)
filteredRequests := make(map[string]int64)
filteredRequestsLastCleanTime := time.Now().UnixNano()
filteredCount := 0

// Optimisatio to not check service ID on each write
var outServices [][]byte
serviceModifiers := make(map[string]*HTTPModifier)
for _, p := range writers {
outServices = append(outServices, []byte(reflect.ValueOf(p).Elem().FieldByName("Service").String()))
}

for s, cfg := range Settings.Services {
serviceModifiers[s] = NewHTTPModifier(&cfg.ModifierConfig)
}

for {
msg, err := src.PluginRead()
if err != nil {
Expand All @@ -112,16 +117,29 @@ func (e *Emitter) CopyMulty(src PluginReader, writers ...PluginWriter) error {
if Settings.Verbose >= 3 {
Debug(3, "[EMITTER] input: ", byteutils.SliceToString(msg.Meta[:len(msg.Meta)-1]), " from: ", src)
}
if modifier != nil {
if globalModifier != nil {
Debug(3, "[EMITTER] modifier:", requestID, "from:", src)
if isRequestPayload(msg.Meta) {
msg.Data = modifier.Rewrite(msg.Data)
msg.Data = globalModifier.Rewrite(msg.Data)
// If modifier tells to skip request
if len(msg.Data) == 0 {
filteredRequests[requestID] = time.Now().UnixNano()
filteredCount++
continue
}

if len(meta) > 4 && len(meta[4]) > 0 {
if m, found := serviceModifiers[byteutils.SliceToString(meta[4])]; found {
msg.Data = m.Rewrite(msg.Data)
// If modifier tells to skip request
if len(msg.Data) == 0 {
filteredRequests[requestID] = time.Now().UnixNano()
filteredCount++
continue
}
}
}

Debug(3, "[EMITTER] Rewritten input:", requestID, "from:", src)

} else {
Expand Down

0 comments on commit d693032

Please sign in to comment.