Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow named filters to let multiple rollups hang off one instance #755

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions pkg/cat/kkc.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,28 @@ func NewKTranslate(config *ktranslate.Config, log logger.ContextL, registry go_m
if err != nil {
return nil, err
}
kc.filters = filters
kc.doFilter = len(filters) > 0

fullSet := []filter.FilterWrapper{}
for _, filter := range filters {
if filter.GetName() == "" { // No name means a global application.
fullSet = append(fullSet, filter)
continue
}

found := false
for _, roll := range rolls { // If the name matches, only use this filter for this rollup.
if filter.GetName() == roll.GetName() {
roll.SetFilter(filter)
found = true
}
}
if !found {
log.Warnf("Skipping named filter %s, no matching rollup found.", filter.GetName())
}
}

kc.filters = fullSet
kc.doFilter = len(fullSet) > 0

// Grab the custom data directly from a file.
if config.MappingFile != "" {
Expand Down
10 changes: 10 additions & 0 deletions pkg/filter/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ type AddrFilter struct {
cf func(map[string]interface{}) bool
dimension []string
value *net.IPNet
name string
}

func newAddrFilter(log logger.Underlying, fd FilterDef) (*AddrFilter, error) {
sf := &AddrFilter{
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "addrFilter"}, log),
dimension: strings.Split(fd.Dimension, "."),
name: fd.Name,
}

_, val, err := net.ParseCIDR(fd.Value)
Expand All @@ -45,12 +47,20 @@ func newAddrFilter(log logger.Underlying, fd FilterDef) (*AddrFilter, error) {

func (f *AddrFilter) Filter(in *kt.JCHF) bool {
mapr := in.ToMap()
return f.FilterMap(mapr)
}

func (f *AddrFilter) FilterMap(mapr map[string]interface{}) bool {
if !f.cf(mapr) {
return false
}
return true
}

func (f *AddrFilter) GetName() string {
return f.name
}

func (f *AddrFilter) addrEquals(chf map[string]interface{}) bool {
if dd, ok := chf[f.dimension[0]]; ok {
switch dim := dd.(type) {
Expand Down
26 changes: 25 additions & 1 deletion pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type FilterType string

type Filter interface {
Filter(*kt.JCHF) bool
FilterMap(map[string]interface{}) bool
GetName() string
}

type FilterWrapper []Filter
Expand All @@ -62,12 +64,13 @@ type FilterDef struct {
Operator Operator
Value string
FType FilterType
Name string
}

type FilterDefWrapper []FilterDef

func (f *FilterDef) String() string {
return fmt.Sprintf("%s Filter: %s %s %s", f.FType, f.Dimension, f.Operator, f.Value)
return fmt.Sprintf("%s Filter: %s %s %s name=%s", f.FType, f.Dimension, f.Operator, f.Value, f.Name)
}

func (f FilterDefWrapper) String() string {
Expand Down Expand Up @@ -105,6 +108,10 @@ func (i *FilterDefs) Set(value string) error {
}
pts = append([]string{string(ftype)}, pts...)
}
name := "" // default to a global filter, so no name.
if len(pts) == 5 {
name = pts[4]
}
ptn := make([]string, len(pts))
for i, p := range pts {
ptn[i] = strings.TrimSpace(p)
Expand All @@ -114,6 +121,7 @@ func (i *FilterDefs) Set(value string) error {
Dimension: ptn[1],
Operator: Operator(ptn[2]),
Value: ptn[3],
Name: name,
})
}
*i = append(*i, inner)
Expand Down Expand Up @@ -173,3 +181,19 @@ func (fs FilterWrapper) Filter(chf *kt.JCHF) bool {
}
return false
}

func (fs FilterWrapper) FilterMap(chf map[string]interface{}) bool {
for _, f := range fs {
if f.FilterMap(chf) {
return true
}
}
return false
}

func (fs FilterWrapper) GetName() string {
if len(fs) > 0 {
return fs[0].GetName()
}
return ""
}
10 changes: 10 additions & 0 deletions pkg/filter/int.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ type IntFilter struct {
cf func(map[string]interface{}) bool
dimension []string
value int64
name string
}

func newIntFilter(log logger.Underlying, fd FilterDef) (*IntFilter, error) {
sf := &IntFilter{
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "intFilter"}, log),
dimension: strings.Split(fd.Dimension, "."),
name: fd.Name,
}

val, err := strconv.Atoi(fd.Value)
Expand All @@ -47,12 +49,20 @@ func newIntFilter(log logger.Underlying, fd FilterDef) (*IntFilter, error) {

func (f *IntFilter) Filter(in *kt.JCHF) bool {
mapr := in.ToMap()
return f.FilterMap(mapr)
}

func (f *IntFilter) FilterMap(mapr map[string]interface{}) bool {
if !f.cf(mapr) {
return false
}
return true
}

func (f *IntFilter) GetName() string {
return f.name
}

func (f *IntFilter) intEquals(chf map[string]interface{}) bool {
if dd, ok := chf[f.dimension[0]]; ok {
switch dim := dd.(type) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/filter/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ type StringFilter struct {
cf func(map[string]interface{}) bool
dimension []string
value string
name string
}

func newStringFilter(log logger.Underlying, fd FilterDef) (*StringFilter, error) {
sf := &StringFilter{
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "stringFilter"}, log),
dimension: strings.Split(fd.Dimension, "."),
value: fd.Value,
name: fd.Name,
}

switch fd.Operator {
Expand All @@ -39,12 +41,20 @@ func newStringFilter(log logger.Underlying, fd FilterDef) (*StringFilter, error)

func (f *StringFilter) Filter(in *kt.JCHF) bool {
mapr := in.ToMap()
return f.FilterMap(mapr)
}

func (f *StringFilter) FilterMap(mapr map[string]interface{}) bool {
if !f.cf(mapr) {
return false
}
return true
}

func (f *StringFilter) GetName() string {
return f.name
}

func (f *StringFilter) stringEquals(chf map[string]interface{}) bool {
if dd, ok := chf[f.dimension[0]]; ok {
switch dim := dd.(type) {
Expand Down
43 changes: 43 additions & 0 deletions pkg/rollup/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func init() {
type Roller interface {
Add([]map[string]interface{})
Export() []Rollup
SetFilter(filter.FilterWrapper)
GetName() string
}

type Rollup struct {
Expand All @@ -82,6 +84,16 @@ type RollupDef struct {
Name string
}

/**
-filters "string,custom_str.dst_network_bndry,==,external,sum_external"
-rollups s_sum,sum_external,in_bytes,custom_str.src_subscriber_id
-rollups s_sum,sum_external,in_bytes,custom_str.output_provider
-filters "string,custom_str.src_subscriber_id,!=,'',sum_all_sub_id"
-rollups s_sum,sum_all_sub_id,in_bytes,custom_str.output_provider
*/

func (r *RollupDef) String() string {
return fmt.Sprintf("Name: %s, Method: %s, Adjust Sample Rate: %v, Metric: %v, Dimensions: %v", r.Name, r.Method, r.Sample, r.Metrics, r.Dimensions)
}
Expand Down Expand Up @@ -174,6 +186,8 @@ type rollupBase struct {
dtime time.Time
name string
primaryDim int
filters []filter.FilterWrapper
hasFilters bool
}

func (r *rollupBase) init(rd RollupDef) error {
Expand Down Expand Up @@ -289,3 +303,32 @@ func combo(dims []string, multiDims [][]string) []string {
}
return ret
}

func (r *rollupBase) filter(in []map[string]interface{}) []map[string]interface{} {
res := make([]map[string]interface{}, 0, len(in))
for _, flow := range in {
keep := true
for _, f := range r.filters {
if !f.FilterMap(flow) {
keep = false
break
}
}
if keep {
res = append(res, flow)
}
}
return res
}

func (r *rollupBase) SetFilter(fw filter.FilterWrapper) {
if r.filters == nil {
r.filters = []filter.FilterWrapper{}
r.hasFilters = true
}
r.filters = append(r.filters, fw)
}

func (r *rollupBase) GetName() string {
return r.name
}
103 changes: 102 additions & 1 deletion pkg/rollup/rollup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/kentik/ktranslate"
"github.com/kentik/ktranslate/pkg/eggs/logger"
lt "github.com/kentik/ktranslate/pkg/eggs/logger/testing"
"github.com/kentik/ktranslate/pkg/filter"
"github.com/kentik/ktranslate/pkg/kt"

"github.com/stretchr/testify/assert"
Expand All @@ -16,7 +17,6 @@ import (
func TestRollup(t *testing.T) {
l := lt.NewTestContextL(logger.NilContext, t).GetLogger().GetUnderlyingLogger()
assert := assert.New(t)
// filters are type,dimension,operator,value
rolls := []ktranslate.RollupConfig{
ktranslate.RollupConfig{
JoinKey: "^",
Expand Down Expand Up @@ -134,6 +134,107 @@ func TestRollup(t *testing.T) {
}
}

func TestRollupFilter(t *testing.T) {
l := lt.NewTestContextL(logger.NilContext, t).GetLogger().GetUnderlyingLogger()
assert := assert.New(t)
rolls := []ktranslate.RollupConfig{
ktranslate.RollupConfig{
JoinKey: "^",
TopK: 1,
Formats: []string{"s_sum,name_one,in_bytes,foo"},
KeepUndefined: true,
},
}

inputs := [][]map[string]interface{}{
[]map[string]interface{}{
map[string]interface{}{
"in_bytes": int64(5),
"foo": "aaa",
"filter": int64(1),
"sample_rate": int64(1),
"provider": kt.Provider("pp"),
},
map[string]interface{}{
"in_bytes": int64(15),
"foo": "aaa",
"filter": int64(1),
"sample_rate": int64(1),
"provider": kt.Provider("pp"),
},
map[string]interface{}{
"in_bytes": int64(20),
"foo": "aaa",
"filter": int64(2),
"sample_rate": int64(1),
"provider": kt.Provider("pp"),
},
map[string]interface{}{
"in_bytes": int64(2),
"foo": "aaa",
"filter": int64(2),
"sample_rate": int64(1),
"provider": kt.Provider("pp"),
},
},
}

filters := [][]string{
[]string{
"int,filter,==,1,name_one",
},
}

outputs := []map[string]interface{}{
map[string]interface{}{
"metric": 20,
"dimensions": []string{"aaa"},
},
}

for i, roll := range rolls {
rd, err := GetRollups(l, &roll)
assert.NoError(err)
assert.Equal(len(roll.Formats), len(rd))

fs, err := filter.GetFilters(l, filters[i])
assert.NoError(err)

fullSet := []filter.FilterWrapper{}
for _, filter := range fs {
if filter.GetName() == "" { // No name means a global application.
fullSet = append(fullSet, filter)
continue
}

found := false
for _, ri := range rd {
if filter.GetName() == ri.GetName() {
ri.SetFilter(filter)
found = true
}
}
if !found {
t.Errorf("No name match for filter %v", filter.GetName())
}
}
assert.Equal(0, len(fullSet))

for _, ri := range rd {
ri.Add(inputs[i])
time.Sleep(50 * time.Microsecond)
res := ri.Export()

assert.Equal(roll.TopK, len(res), i)
assert.Equal(outputs[i]["metric"].(int), int(res[0].Metric), res)
dims := strings.Split(res[0].Dimension, res[0].KeyJoin)
for j, dim := range dims {
assert.Equal(outputs[i]["dimensions"].([]string)[j], dim, res)
}
}
}
}

func BenchmarkRollups(b *testing.B) {
l := lt.NewBenchContextL(logger.NilContext, b).GetLogger().GetUnderlyingLogger()
assert := assert.New(b)
Expand Down
Loading
Loading