diff --git a/pkg/cat/kkc.go b/pkg/cat/kkc.go index 715e774b..aff25ef3 100644 --- a/pkg/cat/kkc.go +++ b/pkg/cat/kkc.go @@ -116,8 +116,28 @@ func NewKTranslate(config *ktranslate.Config, log logger.ContextL, registry go_m if err != nil { return nil, err } - kc.filters = filters - kc.doFilter = len(filters) > 0 + + fullSet := []filter.FilterWrapper{} + for _, filter := range filters { + if filter.GetName() == "" { // No name means a global application. + fullSet = append(fullSet, filter) + continue + } + + found := false + for _, roll := range rolls { // If the name matches, only use this filter for this rollup. + if filter.GetName() == roll.GetName() { + roll.SetFilter(filter) + found = true + } + } + if !found { + log.Warnf("Skipping named filter %s, no matching rollup found.", filter.GetName()) + } + } + + kc.filters = fullSet + kc.doFilter = len(fullSet) > 0 // Grab the custom data directly from a file. if config.MappingFile != "" { diff --git a/pkg/filter/addr.go b/pkg/filter/addr.go index bb99098b..401a21a7 100644 --- a/pkg/filter/addr.go +++ b/pkg/filter/addr.go @@ -15,12 +15,14 @@ type AddrFilter struct { cf func(map[string]interface{}) bool dimension []string value *net.IPNet + name string } func newAddrFilter(log logger.Underlying, fd FilterDef) (*AddrFilter, error) { sf := &AddrFilter{ ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "addrFilter"}, log), dimension: strings.Split(fd.Dimension, "."), + name: fd.Name, } _, val, err := net.ParseCIDR(fd.Value) @@ -45,12 +47,20 @@ func newAddrFilter(log logger.Underlying, fd FilterDef) (*AddrFilter, error) { func (f *AddrFilter) Filter(in *kt.JCHF) bool { mapr := in.ToMap() + return f.FilterMap(mapr) +} + +func (f *AddrFilter) FilterMap(mapr map[string]interface{}) bool { if !f.cf(mapr) { return false } return true } +func (f *AddrFilter) GetName() string { + return f.name +} + func (f *AddrFilter) addrEquals(chf map[string]interface{}) bool { if dd, ok := chf[f.dimension[0]]; ok { switch dim := dd.(type) { diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index cc3ba82b..f1fd2f10 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -53,6 +53,8 @@ type FilterType string type Filter interface { Filter(*kt.JCHF) bool + FilterMap(map[string]interface{}) bool + GetName() string } type FilterWrapper []Filter @@ -62,12 +64,13 @@ type FilterDef struct { Operator Operator Value string FType FilterType + Name string } type FilterDefWrapper []FilterDef func (f *FilterDef) String() string { - return fmt.Sprintf("%s Filter: %s %s %s", f.FType, f.Dimension, f.Operator, f.Value) + return fmt.Sprintf("%s Filter: %s %s %s name=%s", f.FType, f.Dimension, f.Operator, f.Value, f.Name) } func (f FilterDefWrapper) String() string { @@ -105,6 +108,10 @@ func (i *FilterDefs) Set(value string) error { } pts = append([]string{string(ftype)}, pts...) } + name := "" // default to a global filter, so no name. + if len(pts) == 5 { + name = pts[4] + } ptn := make([]string, len(pts)) for i, p := range pts { ptn[i] = strings.TrimSpace(p) @@ -114,6 +121,7 @@ func (i *FilterDefs) Set(value string) error { Dimension: ptn[1], Operator: Operator(ptn[2]), Value: ptn[3], + Name: name, }) } *i = append(*i, inner) @@ -173,3 +181,19 @@ func (fs FilterWrapper) Filter(chf *kt.JCHF) bool { } return false } + +func (fs FilterWrapper) FilterMap(chf map[string]interface{}) bool { + for _, f := range fs { + if f.FilterMap(chf) { + return true + } + } + return false +} + +func (fs FilterWrapper) GetName() string { + if len(fs) > 0 { + return fs[0].GetName() + } + return "" +} diff --git a/pkg/filter/int.go b/pkg/filter/int.go index b070f333..03138f6c 100644 --- a/pkg/filter/int.go +++ b/pkg/filter/int.go @@ -15,12 +15,14 @@ type IntFilter struct { cf func(map[string]interface{}) bool dimension []string value int64 + name string } func newIntFilter(log logger.Underlying, fd FilterDef) (*IntFilter, error) { sf := &IntFilter{ ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "intFilter"}, log), dimension: strings.Split(fd.Dimension, "."), + name: fd.Name, } val, err := strconv.Atoi(fd.Value) @@ -47,12 +49,20 @@ func newIntFilter(log logger.Underlying, fd FilterDef) (*IntFilter, error) { func (f *IntFilter) Filter(in *kt.JCHF) bool { mapr := in.ToMap() + return f.FilterMap(mapr) +} + +func (f *IntFilter) FilterMap(mapr map[string]interface{}) bool { if !f.cf(mapr) { return false } return true } +func (f *IntFilter) GetName() string { + return f.name +} + func (f *IntFilter) intEquals(chf map[string]interface{}) bool { if dd, ok := chf[f.dimension[0]]; ok { switch dim := dd.(type) { diff --git a/pkg/filter/string.go b/pkg/filter/string.go index 08597459..f57b39e6 100644 --- a/pkg/filter/string.go +++ b/pkg/filter/string.go @@ -14,6 +14,7 @@ type StringFilter struct { cf func(map[string]interface{}) bool dimension []string value string + name string } func newStringFilter(log logger.Underlying, fd FilterDef) (*StringFilter, error) { @@ -21,6 +22,7 @@ func newStringFilter(log logger.Underlying, fd FilterDef) (*StringFilter, error) ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "stringFilter"}, log), dimension: strings.Split(fd.Dimension, "."), value: fd.Value, + name: fd.Name, } switch fd.Operator { @@ -39,12 +41,20 @@ func newStringFilter(log logger.Underlying, fd FilterDef) (*StringFilter, error) func (f *StringFilter) Filter(in *kt.JCHF) bool { mapr := in.ToMap() + return f.FilterMap(mapr) +} + +func (f *StringFilter) FilterMap(mapr map[string]interface{}) bool { if !f.cf(mapr) { return false } return true } +func (f *StringFilter) GetName() string { + return f.name +} + func (f *StringFilter) stringEquals(chf map[string]interface{}) bool { if dd, ok := chf[f.dimension[0]]; ok { switch dim := dd.(type) { diff --git a/pkg/rollup/rollup.go b/pkg/rollup/rollup.go index 6c92dcab..f623b4c5 100644 --- a/pkg/rollup/rollup.go +++ b/pkg/rollup/rollup.go @@ -56,6 +56,8 @@ func init() { type Roller interface { Add([]map[string]interface{}) Export() []Rollup + SetFilter(filter.FilterWrapper) + GetName() string } type Rollup struct { @@ -82,6 +84,16 @@ type RollupDef struct { Name string } +/** + -filters "string,custom_str.dst_network_bndry,==,external,sum_external" + + -rollups s_sum,sum_external,in_bytes,custom_str.src_subscriber_id + -rollups s_sum,sum_external,in_bytes,custom_str.output_provider + + -filters "string,custom_str.src_subscriber_id,!=,'',sum_all_sub_id" + -rollups s_sum,sum_all_sub_id,in_bytes,custom_str.output_provider +*/ + func (r *RollupDef) String() string { return fmt.Sprintf("Name: %s, Method: %s, Adjust Sample Rate: %v, Metric: %v, Dimensions: %v", r.Name, r.Method, r.Sample, r.Metrics, r.Dimensions) } @@ -174,6 +186,8 @@ type rollupBase struct { dtime time.Time name string primaryDim int + filters []filter.FilterWrapper + hasFilters bool } func (r *rollupBase) init(rd RollupDef) error { @@ -289,3 +303,32 @@ func combo(dims []string, multiDims [][]string) []string { } return ret } + +func (r *rollupBase) filter(in []map[string]interface{}) []map[string]interface{} { + res := make([]map[string]interface{}, 0, len(in)) + for _, flow := range in { + keep := true + for _, f := range r.filters { + if !f.FilterMap(flow) { + keep = false + break + } + } + if keep { + res = append(res, flow) + } + } + return res +} + +func (r *rollupBase) SetFilter(fw filter.FilterWrapper) { + if r.filters == nil { + r.filters = []filter.FilterWrapper{} + r.hasFilters = true + } + r.filters = append(r.filters, fw) +} + +func (r *rollupBase) GetName() string { + return r.name +} diff --git a/pkg/rollup/rollup_test.go b/pkg/rollup/rollup_test.go index b0370f38..fa942b3e 100644 --- a/pkg/rollup/rollup_test.go +++ b/pkg/rollup/rollup_test.go @@ -8,6 +8,7 @@ import ( "github.com/kentik/ktranslate" "github.com/kentik/ktranslate/pkg/eggs/logger" lt "github.com/kentik/ktranslate/pkg/eggs/logger/testing" + "github.com/kentik/ktranslate/pkg/filter" "github.com/kentik/ktranslate/pkg/kt" "github.com/stretchr/testify/assert" @@ -16,7 +17,6 @@ import ( func TestRollup(t *testing.T) { l := lt.NewTestContextL(logger.NilContext, t).GetLogger().GetUnderlyingLogger() assert := assert.New(t) - // filters are type,dimension,operator,value rolls := []ktranslate.RollupConfig{ ktranslate.RollupConfig{ JoinKey: "^", @@ -134,6 +134,107 @@ func TestRollup(t *testing.T) { } } +func TestRollupFilter(t *testing.T) { + l := lt.NewTestContextL(logger.NilContext, t).GetLogger().GetUnderlyingLogger() + assert := assert.New(t) + rolls := []ktranslate.RollupConfig{ + ktranslate.RollupConfig{ + JoinKey: "^", + TopK: 1, + Formats: []string{"s_sum,name_one,in_bytes,foo"}, + KeepUndefined: true, + }, + } + + inputs := [][]map[string]interface{}{ + []map[string]interface{}{ + map[string]interface{}{ + "in_bytes": int64(5), + "foo": "aaa", + "filter": int64(1), + "sample_rate": int64(1), + "provider": kt.Provider("pp"), + }, + map[string]interface{}{ + "in_bytes": int64(15), + "foo": "aaa", + "filter": int64(1), + "sample_rate": int64(1), + "provider": kt.Provider("pp"), + }, + map[string]interface{}{ + "in_bytes": int64(20), + "foo": "aaa", + "filter": int64(2), + "sample_rate": int64(1), + "provider": kt.Provider("pp"), + }, + map[string]interface{}{ + "in_bytes": int64(2), + "foo": "aaa", + "filter": int64(2), + "sample_rate": int64(1), + "provider": kt.Provider("pp"), + }, + }, + } + + filters := [][]string{ + []string{ + "int,filter,==,1,name_one", + }, + } + + outputs := []map[string]interface{}{ + map[string]interface{}{ + "metric": 20, + "dimensions": []string{"aaa"}, + }, + } + + for i, roll := range rolls { + rd, err := GetRollups(l, &roll) + assert.NoError(err) + assert.Equal(len(roll.Formats), len(rd)) + + fs, err := filter.GetFilters(l, filters[i]) + assert.NoError(err) + + fullSet := []filter.FilterWrapper{} + for _, filter := range fs { + if filter.GetName() == "" { // No name means a global application. + fullSet = append(fullSet, filter) + continue + } + + found := false + for _, ri := range rd { + if filter.GetName() == ri.GetName() { + ri.SetFilter(filter) + found = true + } + } + if !found { + t.Errorf("No name match for filter %v", filter.GetName()) + } + } + assert.Equal(0, len(fullSet)) + + for _, ri := range rd { + ri.Add(inputs[i]) + time.Sleep(50 * time.Microsecond) + res := ri.Export() + + assert.Equal(roll.TopK, len(res), i) + assert.Equal(outputs[i]["metric"].(int), int(res[0].Metric), res) + dims := strings.Split(res[0].Dimension, res[0].KeyJoin) + for j, dim := range dims { + assert.Equal(outputs[i]["dimensions"].([]string)[j], dim, res) + } + } + } +} + func BenchmarkRollups(b *testing.B) { l := lt.NewBenchContextL(logger.NilContext, b).GetLogger().GetUnderlyingLogger() assert := assert.New(b) diff --git a/pkg/rollup/stats.go b/pkg/rollup/stats.go index 7c620a04..2852b27f 100644 --- a/pkg/rollup/stats.go +++ b/pkg/rollup/stats.go @@ -150,6 +150,10 @@ func (r *StatsRollup) addSum(in []map[string]interface{}) { } func (r *StatsRollup) Add(in []map[string]interface{}) { + if r.hasFilters { + in = r.filter(in) + } + if r.isSum { // this is a fast path for pure additive rollups. r.addSum(in) return diff --git a/pkg/rollup/unique.go b/pkg/rollup/unique.go index b0e788d5..814c1057 100644 --- a/pkg/rollup/unique.go +++ b/pkg/rollup/unique.go @@ -56,6 +56,10 @@ func newUniqueRollup(log logger.Underlying, rd RollupDef, cfg *ktranslate.Rollup } func (r *UniqueRollup) Add(in []map[string]interface{}) { + if r.hasFilters { + in = r.filter(in) + } + uniques := map[string]gohll.HLL{} count := map[string]uint64{} prov := map[string]kt.Provider{}