Skip to content

Commit

Permalink
feat: Drain uses different tokenizer based on log format (#13384)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jul 4, 2024
1 parent 69b805d commit bc01e6f
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 103 deletions.
58 changes: 36 additions & 22 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func DefaultConfig() *Config {
}
}

func New(config *Config, metrics *Metrics) *Drain {
func New(config *Config, format string, metrics *Metrics) *Drain {
if config.LogClusterDepth < 3 {
panic("depth argument must be at least 3")
}
Expand All @@ -156,14 +156,24 @@ func New(config *Config, metrics *Metrics) *Drain {
if metrics != nil {
evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() }
}
var tokenizer LineTokenizer
switch format {
case FormatJSON:
tokenizer = newJSONTokenizer(config.ParamString)
case FormatLogfmt:
tokenizer = newLogfmtTokenizer(config.ParamString)
default:
tokenizer = newPunctuationTokenizer()
}

d := &Drain{
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: newPunctuationTokenizer(),
tokenizer: tokenizer,
maxAllowedLineLength: 3000,
format: format,
}
return d
}
Expand All @@ -176,6 +186,9 @@ type Drain struct {
metrics *Metrics
tokenizer LineTokenizer
maxAllowedLineLength int
format string
tokens []string
state interface{}
}

func (d *Drain) Clusters() []*LogCluster {
Expand All @@ -190,8 +203,8 @@ func (d *Drain) Train(content string, ts int64) *LogCluster {
if len(content) > d.maxAllowedLineLength {
return nil
}
tokens, state := d.tokenizer.Tokenize(content)
return d.train(tokens, state, ts)
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state)
return d.train(d.tokens, d.state, ts)
}

func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster {
Expand All @@ -200,13 +213,16 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster
}
if d.metrics != nil {
d.metrics.TokensPerLine.Observe(float64(len(tokens)))
d.metrics.StatePerLine.Observe(float64(len(state.([]int))))
if stateInts, ok := state.([]int); ok {
d.metrics.StatePerLine.Observe(float64(len(stateInts)))
}
}
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false)
// Match no existing log cluster
if matchCluster == nil {
d.clustersCounter++
clusterID := d.clustersCounter
tokens, state = d.tokenizer.Clone(tokens, state)
matchCluster = &LogCluster{
Tokens: tokens,
TokenState: state,
Expand All @@ -222,8 +238,7 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster
d.metrics.PatternsDetectedTotal.Inc()
}
} else {
newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens)
matchCluster.Tokens = newTemplateTokens
matchCluster.Tokens = d.createTemplate(tokens, matchCluster.Tokens)
matchCluster.append(model.TimeFromUnixNano(ts))
// Touch cluster to update its state in the cache.
d.idToCluster.Get(matchCluster.id)
Expand All @@ -232,12 +247,13 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster
}

func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens, state := d.tokenizer.Tokenize(content)
tokens, state := d.tokenizer.Tokenize(content, d.tokens, d.state)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true)
// Match no existing log cluster
if matchCluster == nil {
d.clustersCounter++
clusterID := d.clustersCounter
tokens, state = d.tokenizer.Clone(tokens, state)
matchCluster = &LogCluster{
Tokens: tokens,
TokenState: state,
Expand All @@ -246,8 +262,7 @@ func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample)
d.idToCluster.Set(clusterID, matchCluster)
d.addSeqToPrefixTree(d.rootNode, matchCluster)
} else {
newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens)
matchCluster.Tokens = newTemplateTokens
matchCluster.Tokens = d.createTemplate(tokens, matchCluster.Tokens)
// Touch cluster to update its state in the cache.
d.idToCluster.Get(matchCluster.id)
}
Expand Down Expand Up @@ -277,7 +292,7 @@ func deduplicatePlaceholders(line string, placeholder string) string {
}
builder = append(builder, line[low:]...)

return unsafe.String(unsafe.SliceData(builder), len(builder))
return unsafeString(builder)
}

func (d *Drain) PatternString(c *LogCluster) string {
Expand Down Expand Up @@ -313,13 +328,6 @@ func (d *Drain) Delete(cluster *LogCluster) {
d.idToCluster.cache.Remove(cluster.id)
}

// Match against an already existing cluster. Match shall be perfect (sim_th=1.0). New cluster will not be created as a result of this call, nor any cluster modifications.
func (d *Drain) Match(content string) *LogCluster {
contentTokens, _ := d.tokenizer.Tokenize(content)
matchCluster := d.treeSearch(d.rootNode, contentTokens, 1.0, true)
return matchCluster
}

func (d *Drain) treeSearch(rootNode *Node, tokens []string, simTh float64, includeParams bool) *LogCluster {
tokenCount := len(tokens)

Expand Down Expand Up @@ -511,12 +519,18 @@ func (d *Drain) createTemplate(tokens, matchClusterTokens []string) []string {
if len(tokens) != len(matchClusterTokens) {
panic("seq1 seq2 be of same length")
}
retVal := make([]string, len(matchClusterTokens))
copy(retVal, matchClusterTokens)
for i := range tokens {
if tokens[i] != matchClusterTokens[i] {
retVal[i] = d.config.ParamString
matchClusterTokens[i] = d.config.ParamString
}
}
return retVal
return matchClusterTokens
}

func unsafeString(s []byte) string {
return unsafe.String(unsafe.SliceData(s), len(s))
}

func unsafeBytes(s string) []byte {
return unsafe.Slice(unsafe.StringData(s), len(s))
}
2 changes: 1 addition & 1 deletion pkg/pattern/drain/drain_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) {
line := scanner.Text()
lines = append(lines, line)
}
drain := New(DefaultConfig(), nil)
drain := New(DefaultConfig(), DetectLogFormat(lines[0]), nil)

b.ReportAllocs()
b.ResetTimer()
Expand Down
80 changes: 17 additions & 63 deletions pkg/pattern/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
format string
}{
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: `testdata/agent-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: `testdata/ingester-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
Expand All @@ -66,7 +66,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: `testdata/drone-json.txt`,
format: FormatJSON,
patterns: []string{
Expand All @@ -79,7 +79,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/distributor-logfmt.txt",
format: FormatLogfmt,
patterns: []string{
Expand All @@ -91,7 +91,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/journald.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/kafka.txt",
format: FormatUnknown,
patterns: []string{
Expand All @@ -232,7 +232,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/kubernetes.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -273,15 +273,15 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/vault.txt",
format: FormatUnknown,
patterns: []string{
`<_> [INFO] expiration: revoked lease: lease_id=<_>`,
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/calico.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/grafana-ruler.txt",
format: FormatLogfmt,
patterns: []string{
Expand Down Expand Up @@ -426,6 +426,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
}

for _, tt := range tests {
tt := tt
t.Run(tt.inputFile, func(t *testing.T) {
file, err := os.Open(tt.inputFile)
require.NoError(t, err)
Expand Down Expand Up @@ -461,53 +462,6 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
}
}

func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
t.Parallel()
tests := []struct {
name string
drain *Drain
inputLines []string
}{
{
name: "should match each line against a pattern",
drain: New(DefaultConfig(), nil),
inputLines: []string{
"test test test test",
"test test test test",
"test test test test",
"test test test test",
},
},
{
name: "should also match newlines",
drain: New(DefaultConfig(), nil),
inputLines: []string{
`test test test test
`,
`test test test test
`,
`test test test test
`,
`test test test test
`,
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
for _, line := range tt.inputLines {
tt.drain.Train(line, 0)
}

for _, line := range tt.inputLines {
match := tt.drain.Match(line)
require.NotNil(t, match, `Line should match a cluster`)
}
})
}
}

func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) {
t.Parallel()
tests := []struct {
Expand All @@ -517,7 +471,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
}{
{
name: "should extract patterns that all lines match",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
"test 1 test test",
"test 2 test test",
Expand All @@ -527,7 +481,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line ends with newlines",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
`test 1 test test
`,
Expand All @@ -541,7 +495,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line ends with empty space",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
`test 1 test test `,
`test 2 test test `,
Expand All @@ -551,7 +505,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line starts with empty space",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
` test 1 test test`,
` test 2 test test`,
Expand All @@ -561,7 +515,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "Scheduler patterns are matchable",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
`ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`,
`ts=2024-05-30T12:50:36.350575929Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`,
Expand Down Expand Up @@ -659,7 +613,7 @@ func TestDrain_PruneTreeClearsOldBranches(t *testing.T) {
}{
{
name: "should prune old branches",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
"test test test A",
"test test test B",
Expand Down
Loading

0 comments on commit bc01e6f

Please sign in to comment.