Skip to content

Commit

Permalink
Allow named filters to let multiple rollups hang off one instance (#755)
Browse files Browse the repository at this point in the history
  • Loading branch information
i3149 authored Sep 30, 2024
1 parent 400064f commit 8371210
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 4 deletions.
24 changes: 22 additions & 2 deletions pkg/cat/kkc.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,28 @@ func NewKTranslate(config *ktranslate.Config, log logger.ContextL, registry go_m
if err != nil {
return nil, err
}
kc.filters = filters
kc.doFilter = len(filters) > 0

fullSet := []filter.FilterWrapper{}
for _, filter := range filters {
if filter.GetName() == "" { // No name means a global application.
fullSet = append(fullSet, filter)
continue
}

found := false
for _, roll := range rolls { // If the name matches, only use this filter for this rollup.
if filter.GetName() == roll.GetName() {
roll.SetFilter(filter)
found = true
}
}
if !found {
log.Warnf("Skipping named filter %s, no matching rollup found.", filter.GetName())
}
}

kc.filters = fullSet
kc.doFilter = len(fullSet) > 0

// Grab the custom data directly from a file.
if config.MappingFile != "" {
Expand Down
10 changes: 10 additions & 0 deletions pkg/filter/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ type AddrFilter struct {
cf func(map[string]interface{}) bool
dimension []string
value *net.IPNet
name string
}

func newAddrFilter(log logger.Underlying, fd FilterDef) (*AddrFilter, error) {
sf := &AddrFilter{
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "addrFilter"}, log),
dimension: strings.Split(fd.Dimension, "."),
name: fd.Name,
}

_, val, err := net.ParseCIDR(fd.Value)
Expand All @@ -45,12 +47,20 @@ func newAddrFilter(log logger.Underlying, fd FilterDef) (*AddrFilter, error) {

func (f *AddrFilter) Filter(in *kt.JCHF) bool {
mapr := in.ToMap()
return f.FilterMap(mapr)
}

func (f *AddrFilter) FilterMap(mapr map[string]interface{}) bool {
if !f.cf(mapr) {
return false
}
return true
}

func (f *AddrFilter) GetName() string {
return f.name
}

func (f *AddrFilter) addrEquals(chf map[string]interface{}) bool {
if dd, ok := chf[f.dimension[0]]; ok {
switch dim := dd.(type) {
Expand Down
26 changes: 25 additions & 1 deletion pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type FilterType string

type Filter interface {
Filter(*kt.JCHF) bool
FilterMap(map[string]interface{}) bool
GetName() string
}

type FilterWrapper []Filter
Expand All @@ -62,12 +64,13 @@ type FilterDef struct {
Operator Operator
Value string
FType FilterType
Name string
}

type FilterDefWrapper []FilterDef

func (f *FilterDef) String() string {
return fmt.Sprintf("%s Filter: %s %s %s", f.FType, f.Dimension, f.Operator, f.Value)
return fmt.Sprintf("%s Filter: %s %s %s name=%s", f.FType, f.Dimension, f.Operator, f.Value, f.Name)
}

func (f FilterDefWrapper) String() string {
Expand Down Expand Up @@ -105,6 +108,10 @@ func (i *FilterDefs) Set(value string) error {
}
pts = append([]string{string(ftype)}, pts...)
}
name := "" // default to a global filter, so no name.
if len(pts) == 5 {
name = pts[4]
}
ptn := make([]string, len(pts))
for i, p := range pts {
ptn[i] = strings.TrimSpace(p)
Expand All @@ -114,6 +121,7 @@ func (i *FilterDefs) Set(value string) error {
Dimension: ptn[1],
Operator: Operator(ptn[2]),
Value: ptn[3],
Name: name,
})
}
*i = append(*i, inner)
Expand Down Expand Up @@ -173,3 +181,19 @@ func (fs FilterWrapper) Filter(chf *kt.JCHF) bool {
}
return false
}

func (fs FilterWrapper) FilterMap(chf map[string]interface{}) bool {
for _, f := range fs {
if f.FilterMap(chf) {
return true
}
}
return false
}

func (fs FilterWrapper) GetName() string {
if len(fs) > 0 {
return fs[0].GetName()
}
return ""
}
10 changes: 10 additions & 0 deletions pkg/filter/int.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ type IntFilter struct {
cf func(map[string]interface{}) bool
dimension []string
value int64
name string
}

func newIntFilter(log logger.Underlying, fd FilterDef) (*IntFilter, error) {
sf := &IntFilter{
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "intFilter"}, log),
dimension: strings.Split(fd.Dimension, "."),
name: fd.Name,
}

val, err := strconv.Atoi(fd.Value)
Expand All @@ -47,12 +49,20 @@ func newIntFilter(log logger.Underlying, fd FilterDef) (*IntFilter, error) {

func (f *IntFilter) Filter(in *kt.JCHF) bool {
mapr := in.ToMap()
return f.FilterMap(mapr)
}

func (f *IntFilter) FilterMap(mapr map[string]interface{}) bool {
if !f.cf(mapr) {
return false
}
return true
}

func (f *IntFilter) GetName() string {
return f.name
}

func (f *IntFilter) intEquals(chf map[string]interface{}) bool {
if dd, ok := chf[f.dimension[0]]; ok {
switch dim := dd.(type) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/filter/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ type StringFilter struct {
cf func(map[string]interface{}) bool
dimension []string
value string
name string
}

func newStringFilter(log logger.Underlying, fd FilterDef) (*StringFilter, error) {
sf := &StringFilter{
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "stringFilter"}, log),
dimension: strings.Split(fd.Dimension, "."),
value: fd.Value,
name: fd.Name,
}

switch fd.Operator {
Expand All @@ -39,12 +41,20 @@ func newStringFilter(log logger.Underlying, fd FilterDef) (*StringFilter, error)

func (f *StringFilter) Filter(in *kt.JCHF) bool {
mapr := in.ToMap()
return f.FilterMap(mapr)
}

func (f *StringFilter) FilterMap(mapr map[string]interface{}) bool {
if !f.cf(mapr) {
return false
}
return true
}

func (f *StringFilter) GetName() string {
return f.name
}

func (f *StringFilter) stringEquals(chf map[string]interface{}) bool {
if dd, ok := chf[f.dimension[0]]; ok {
switch dim := dd.(type) {
Expand Down
43 changes: 43 additions & 0 deletions pkg/rollup/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func init() {
type Roller interface {
Add([]map[string]interface{})
Export() []Rollup
SetFilter(filter.FilterWrapper)
GetName() string
}

type Rollup struct {
Expand All @@ -82,6 +84,16 @@ type RollupDef struct {
Name string
}

/**
-filters "string,custom_str.dst_network_bndry,==,external,sum_external"
-rollups s_sum,sum_external,in_bytes,custom_str.src_subscriber_id
-rollups s_sum,sum_external,in_bytes,custom_str.output_provider
-filters "string,custom_str.src_subscriber_id,!=,'',sum_all_sub_id"
-rollups s_sum,sum_all_sub_id,in_bytes,custom_str.output_provider
*/

func (r *RollupDef) String() string {
return fmt.Sprintf("Name: %s, Method: %s, Adjust Sample Rate: %v, Metric: %v, Dimensions: %v", r.Name, r.Method, r.Sample, r.Metrics, r.Dimensions)
}
Expand Down Expand Up @@ -174,6 +186,8 @@ type rollupBase struct {
dtime time.Time
name string
primaryDim int
filters []filter.FilterWrapper
hasFilters bool
}

func (r *rollupBase) init(rd RollupDef) error {
Expand Down Expand Up @@ -289,3 +303,32 @@ func combo(dims []string, multiDims [][]string) []string {
}
return ret
}

func (r *rollupBase) filter(in []map[string]interface{}) []map[string]interface{} {
res := make([]map[string]interface{}, 0, len(in))
for _, flow := range in {
keep := true
for _, f := range r.filters {
if !f.FilterMap(flow) {
keep = false
break
}
}
if keep {
res = append(res, flow)
}
}
return res
}

func (r *rollupBase) SetFilter(fw filter.FilterWrapper) {
if r.filters == nil {
r.filters = []filter.FilterWrapper{}
r.hasFilters = true
}
r.filters = append(r.filters, fw)
}

func (r *rollupBase) GetName() string {
return r.name
}
103 changes: 102 additions & 1 deletion pkg/rollup/rollup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/kentik/ktranslate"
"github.com/kentik/ktranslate/pkg/eggs/logger"
lt "github.com/kentik/ktranslate/pkg/eggs/logger/testing"
"github.com/kentik/ktranslate/pkg/filter"
"github.com/kentik/ktranslate/pkg/kt"

"github.com/stretchr/testify/assert"
Expand All @@ -16,7 +17,6 @@ import (
func TestRollup(t *testing.T) {
l := lt.NewTestContextL(logger.NilContext, t).GetLogger().GetUnderlyingLogger()
assert := assert.New(t)
// filters are type,dimension,operator,value
rolls := []ktranslate.RollupConfig{
ktranslate.RollupConfig{
JoinKey: "^",
Expand Down Expand Up @@ -134,6 +134,107 @@ func TestRollup(t *testing.T) {
}
}

func TestRollupFilter(t *testing.T) {
l := lt.NewTestContextL(logger.NilContext, t).GetLogger().GetUnderlyingLogger()
assert := assert.New(t)
rolls := []ktranslate.RollupConfig{
ktranslate.RollupConfig{
JoinKey: "^",
TopK: 1,
Formats: []string{"s_sum,name_one,in_bytes,foo"},
KeepUndefined: true,
},
}

inputs := [][]map[string]interface{}{
[]map[string]interface{}{
map[string]interface{}{
"in_bytes": int64(5),
"foo": "aaa",
"filter": int64(1),
"sample_rate": int64(1),
"provider": kt.Provider("pp"),
},
map[string]interface{}{
"in_bytes": int64(15),
"foo": "aaa",
"filter": int64(1),
"sample_rate": int64(1),
"provider": kt.Provider("pp"),
},
map[string]interface{}{
"in_bytes": int64(20),
"foo": "aaa",
"filter": int64(2),
"sample_rate": int64(1),
"provider": kt.Provider("pp"),
},
map[string]interface{}{
"in_bytes": int64(2),
"foo": "aaa",
"filter": int64(2),
"sample_rate": int64(1),
"provider": kt.Provider("pp"),
},
},
}

filters := [][]string{
[]string{
"int,filter,==,1,name_one",
},
}

outputs := []map[string]interface{}{
map[string]interface{}{
"metric": 20,
"dimensions": []string{"aaa"},
},
}

for i, roll := range rolls {
rd, err := GetRollups(l, &roll)
assert.NoError(err)
assert.Equal(len(roll.Formats), len(rd))

fs, err := filter.GetFilters(l, filters[i])
assert.NoError(err)

fullSet := []filter.FilterWrapper{}
for _, filter := range fs {
if filter.GetName() == "" { // No name means a global application.
fullSet = append(fullSet, filter)
continue
}

found := false
for _, ri := range rd {
if filter.GetName() == ri.GetName() {
ri.SetFilter(filter)
found = true
}
}
if !found {
t.Errorf("No name match for filter %v", filter.GetName())
}
}
assert.Equal(0, len(fullSet))

for _, ri := range rd {
ri.Add(inputs[i])
time.Sleep(50 * time.Microsecond)
res := ri.Export()

assert.Equal(roll.TopK, len(res), i)
assert.Equal(outputs[i]["metric"].(int), int(res[0].Metric), res)
dims := strings.Split(res[0].Dimension, res[0].KeyJoin)
for j, dim := range dims {
assert.Equal(outputs[i]["dimensions"].([]string)[j], dim, res)
}
}
}
}

func BenchmarkRollups(b *testing.B) {
l := lt.NewBenchContextL(logger.NilContext, b).GetLogger().GetUnderlyingLogger()
assert := assert.New(b)
Expand Down
Loading

0 comments on commit 8371210

Please sign in to comment.