Skip to content

Commit

Permalink
[cassandra] Allow turning off tags/logs indexing (#1915)
Browse files Browse the repository at this point in the history
* Added options to enable/disable indexing of tags by type

Signed-off-by: Joe Elliott <[email protected]>

* Added drop all filter

Signed-off-by: Joe Elliott <[email protected]>

* Added drop filters

Signed-off-by: Joe Elliott <[email protected]>

* Removed enable

Signed-off-by: Joe Elliott <[email protected]>

* Fixed issue in chained tag filter

Signed-off-by: Joe Elliott <[email protected]>

* make fmt

Signed-off-by: Joe Elliott <[email protected]>

* Removed NewCompositeFilter

Signed-off-by: Joe Elliott <[email protected]>

* Removed uber copyright

Signed-off-by: Joe Elliott <[email protected]>

* Changed TagFilterDropAll to use pointer receivers

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored and yurishkuro committed Nov 11, 2019
1 parent 0313d0d commit b6086ec
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 101 deletions.
24 changes: 18 additions & 6 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,30 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
}

func writerOptions(opts *Options) ([]cSpanStore.Option, error) {
var tagFilters []dbmodel.TagFilter

// drop all tag filters
if opts.DisableTagsIndex || opts.DisableProcessTagsIndex || opts.DisableLogsIndex {
tagFilters = append(tagFilters, dbmodel.NewTagFilterDropAll(opts.DisableTagsIndex, opts.DisableProcessTagsIndex, opts.DisableLogsIndex))
}

// black/white list tag filters
tagIndexBlacklist := opts.TagIndexBlacklist()
tagIndexWhitelist := opts.TagIndexWhitelist()

if len(tagIndexBlacklist) > 0 && len(tagIndexWhitelist) > 0 {
return nil, errors.New("only one of TagIndexBlacklist and TagIndexWhitelist can be specified")
}

var options []cSpanStore.Option
if len(tagIndexBlacklist) > 0 {
options = append(options, cSpanStore.TagFilter(dbmodel.NewBlacklistFilter(tagIndexBlacklist)))
tagFilters = append(tagFilters, dbmodel.NewBlacklistFilter(tagIndexBlacklist))
} else if len(tagIndexWhitelist) > 0 {
options = append(options, cSpanStore.TagFilter(dbmodel.NewWhitelistFilter(tagIndexWhitelist)))
tagFilters = append(tagFilters, dbmodel.NewWhitelistFilter(tagIndexWhitelist))
}

if len(tagFilters) == 0 {
return nil, nil
} else if len(tagFilters) == 1 {
return []cSpanStore.Option{cSpanStore.TagFilter(tagFilters[0])}, nil
}
return options, nil

return []cSpanStore.Option{cSpanStore.TagFilter(dbmodel.NewChainedTagFilter(tagFilters...))}, nil
}
24 changes: 20 additions & 4 deletions plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) {
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{"--cassandra-archive.enabled=true",
"--cassandra.enable-dependencies-v2=true",
"--cassandra.tag-index-whitelist=a,b,c",
"--cassandra.tag-index-blacklist=a,b,c"})
"--cassandra.index.tag-whitelist=a,b,c",
"--cassandra.index.tag-blacklist=a,b,c"})
f.InitFromViper(v)

// after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests,
Expand Down Expand Up @@ -143,15 +143,31 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) {
func TestWriterOptions(t *testing.T) {
opts := NewOptions("cassandra")
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{"--cassandra.tag-index-whitelist=a,b,c"})
command.ParseFlags([]string{"--cassandra.index.tag-whitelist=a,b,c"})
opts.InitFromViper(v)

options, _ := writerOptions(opts)
assert.Len(t, options, 1)

opts = NewOptions("cassandra")
v, command = config.Viperize(opts.AddFlags)
command.ParseFlags([]string{"--cassandra.tag-index-blacklist=a,b,c"})
command.ParseFlags([]string{"--cassandra.index.tag-blacklist=a,b,c"})
opts.InitFromViper(v)

options, _ = writerOptions(opts)
assert.Len(t, options, 1)

opts = NewOptions("cassandra")
v, command = config.Viperize(opts.AddFlags)
command.ParseFlags([]string{"--cassandra.index.tags=false"})
opts.InitFromViper(v)

options, _ = writerOptions(opts)
assert.Len(t, options, 1)

opts = NewOptions("cassandra")
v, command = config.Viperize(opts.AddFlags)
command.ParseFlags([]string{"--cassandra.index.tags=false", "--cassandra.index.tag-blacklist=a,b,c"})
opts.InitFromViper(v)

options, _ = writerOptions(opts)
Expand Down
44 changes: 32 additions & 12 deletions plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,25 @@ const (

// common storage settings
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
suffixTagIndexBlacklist = ".tag-index-blacklist"
suffixTagIndexWhitelist = ".tag-index-whitelist"
suffixIndexTagsBlacklist = ".index.tag-blacklist"
suffixIndexTagsWhitelist = ".index.tag-whitelist"
suffixIndexLogs = ".index.logs"
suffixIndexTags = ".index.tags"
suffixIndexProcessTags = ".index.process-tags"
)

// Options contains various type of Cassandra configs and provides the ability
// to bind them to command line flag and apply overlays, so that some configurations
// (e.g. archive) may be underspecified and infer the rest of its parameters from primary.
type Options struct {
primary *namespaceConfig
others map[string]*namespaceConfig
SpanStoreWriteCacheTTL time.Duration
tagIndexBlacklist string
tagIndexWhitelist string
primary *namespaceConfig
others map[string]*namespaceConfig
SpanStoreWriteCacheTTL time.Duration
tagIndexBlacklist string
tagIndexWhitelist string
DisableLogsIndex bool
DisableTagsIndex bool
DisableProcessTagsIndex bool
}

// the Servers field in config.Configuration is a list, which we cannot represent with flags.
Expand Down Expand Up @@ -121,14 +127,25 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
opt.SpanStoreWriteCacheTTL,
"The duration to wait before rewriting an existing service or operation name")
flagSet.String(
opt.primary.namespace+suffixTagIndexBlacklist,
opt.primary.namespace+suffixIndexTagsBlacklist,
opt.tagIndexBlacklist,
"The comma-separated list of span tags to blacklist from being indexed. All other tags will be indexed. Mutually exclusive with the whitelist option.")
flagSet.String(
opt.primary.namespace+suffixTagIndexWhitelist,
opt.primary.namespace+suffixIndexTagsWhitelist,
opt.tagIndexWhitelist,
"The comma-separated list of span tags to whitelist for being indexed. All other tags will not be indexed. Mutually exclusive with the blacklist option.")

flagSet.Bool(
opt.primary.namespace+suffixIndexLogs,
!opt.DisableLogsIndex,
"Controls log field indexing. Set to false to disable.")
flagSet.Bool(
opt.primary.namespace+suffixIndexTags,
!opt.DisableTagsIndex,
"Controls tag indexing. Set to false to disable.")
flagSet.Bool(
opt.primary.namespace+suffixIndexProcessTags,
!opt.DisableProcessTagsIndex,
"Controls process tag indexing. Set to false to disable.")
}

func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
Expand Down Expand Up @@ -235,8 +252,11 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
cfg.initFromViper(v)
}
opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.primary.namespace + suffixSpanStoreWriteCacheTTL)
opt.tagIndexBlacklist = stripWhiteSpace(v.GetString(opt.primary.namespace + suffixTagIndexBlacklist))
opt.tagIndexWhitelist = stripWhiteSpace(v.GetString(opt.primary.namespace + suffixTagIndexWhitelist))
opt.tagIndexBlacklist = stripWhiteSpace(v.GetString(opt.primary.namespace + suffixIndexTagsBlacklist))
opt.tagIndexWhitelist = stripWhiteSpace(v.GetString(opt.primary.namespace + suffixIndexTagsWhitelist))
opt.DisableTagsIndex = !v.GetBool(opt.primary.namespace + suffixIndexTags)
opt.DisableLogsIndex = !v.GetBool(opt.primary.namespace + suffixIndexLogs)
opt.DisableProcessTagsIndex = !v.GetBool(opt.primary.namespace + suffixIndexProcessTags)
}

func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
Expand Down
9 changes: 7 additions & 2 deletions plugin/storage/cassandra/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ func TestOptionsWithFlags(t *testing.T) {
"--cas.consistency=ONE",
"--cas.proto-version=3",
"--cas.socket-keep-alive=42s",
"--cas.tag-index-blacklist=blerg, blarg,blorg ",
"--cas.tag-index-whitelist=flerg, flarg,florg ",
"--cas.index.tag-blacklist=blerg, blarg,blorg ",
"--cas.index.tag-whitelist=flerg, flarg,florg ",
"--cas.index.tags=true",
"--cas.index.process-tags=false",
// enable aux with a couple overrides
"--cas-aux.enabled=true",
"--cas-aux.keyspace=jaeger-archive",
Expand All @@ -78,6 +80,9 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, false, primary.EnableDependenciesV2)
assert.Equal(t, []string{"blerg", "blarg", "blorg"}, opts.TagIndexBlacklist())
assert.Equal(t, []string{"flerg", "flarg", "florg"}, opts.TagIndexWhitelist())
assert.Equal(t, false, opts.DisableTagsIndex)
assert.Equal(t, true, opts.DisableProcessTagsIndex)
assert.Equal(t, false, opts.DisableLogsIndex)

aux := opts.Get("cas-aux")
require.NotNil(t, aux)
Expand Down
35 changes: 0 additions & 35 deletions plugin/storage/cassandra/spanstore/dbmodel/log_fields_filter.go

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion plugin/storage/cassandra/spanstore/dbmodel/tag_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (tf ChainedTagFilter) FilterTags(span *model.Span, tags model.KeyValues) mo
// FilterLogFields calls each FilterLogFields
func (tf ChainedTagFilter) FilterLogFields(span *model.Span, logFields model.KeyValues) model.KeyValues {
for _, f := range tf {
logFields = f.FilterProcessTags(span, logFields)
logFields = f.FilterLogFields(span, logFields)
}
return logFields
}
Expand Down
59 changes: 59 additions & 0 deletions plugin/storage/cassandra/spanstore/dbmodel/tag_filter_drop_all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbmodel

import (
"github.com/jaegertracing/jaeger/model"
)

// TagFilterDropAll filters all fields of a given type.
type TagFilterDropAll struct {
dropTags bool
dropProcessTags bool
dropLogs bool
}

// NewTagFilterDropAll return a filter that filters all of the specified type
func NewTagFilterDropAll(dropTags bool, dropProcessTags bool, dropLogs bool) *TagFilterDropAll {
return &TagFilterDropAll{
dropTags: dropTags,
dropProcessTags: dropProcessTags,
dropLogs: dropLogs,
}
}

// FilterProcessTags implements TagFilter
func (f *TagFilterDropAll) FilterProcessTags(span *model.Span, processTags model.KeyValues) model.KeyValues {
if f.dropProcessTags {
return model.KeyValues{}
}
return processTags
}

// FilterTags implements TagFilter
func (f *TagFilterDropAll) FilterTags(span *model.Span, tags model.KeyValues) model.KeyValues {
if f.dropTags {
return model.KeyValues{}
}
return tags
}

// FilterLogFields implements TagFilter
func (f *TagFilterDropAll) FilterLogFields(span *model.Span, logFields model.KeyValues) model.KeyValues {
if f.dropLogs {
return model.KeyValues{}
}
return logFields
}
Loading

0 comments on commit b6086ec

Please sign in to comment.