diff --git a/pkg/engine/internal/executor/expressions.go b/pkg/engine/internal/executor/expressions.go index 460c5dbe6352c..57fb59dc6d893 100644 --- a/pkg/engine/internal/executor/expressions.go +++ b/pkg/engine/internal/executor/expressions.go @@ -17,7 +17,6 @@ func newExpressionEvaluator() expressionEvaluator { } func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record) (arrow.Array, error) { - switch expr := expr.(type) { case *physical.LiteralExpr: diff --git a/pkg/engine/internal/executor/expressions_test.go b/pkg/engine/internal/executor/expressions_test.go index b3b5b35ce057b..e212da5d44517 100644 --- a/pkg/engine/internal/executor/expressions_test.go +++ b/pkg/engine/internal/executor/expressions_test.go @@ -551,7 +551,7 @@ func TestEvaluateUnaryCastExpression(t *testing.T) { arr, ok := colVec.(*array.Struct) require.True(t, ok) - require.Equal(t, 3, arr.NumField()) //value, error, errorDetails + require.Equal(t, 3, arr.NumField()) // value, error, errorDetails // Convert struct to rows for comparison actual, err := structToRows(arr) @@ -590,16 +590,35 @@ func TestEvaluateUnaryCastExpression(t *testing.T) { }) } -func TestEvaluateParseExpression_Logfmt(t *testing.T) { - var ( - colMsg = "utf8.builtin.message" - ) +func structToRows(structArr *array.Struct) (arrowtest.Rows, error) { + // Get the struct type to extract schema information + structType := structArr.DataType().(*arrow.StructType) + + // Create schema from struct fields + schema := arrow.NewSchema(structType.Fields(), nil) + + // Extract field arrays from the struct + columns := make([]arrow.Array, structArr.NumField()) + for i := 0; i < structArr.NumField(); i++ { + columns[i] = structArr.Field(i) + } + + // Create and return the record + record := array.NewRecord(schema, columns, int64(structArr.Len())) + defer record.Release() + return arrowtest.RecordRows(record) +} + +func TestLogfmtParser(t *testing.T) { + colMsg := "utf8.builtin.message" for _, tt := range []struct { name string schema *arrow.Schema input arrowtest.Rows requestedKeys []string + strict bool + keepEmpty bool expectedFields int expectedOutput arrowtest.Rows }{ @@ -614,6 +633,8 @@ func TestEvaluateParseExpression_Logfmt(t *testing.T) { {colMsg: "level=debug status=201"}, }, requestedKeys: []string{"level", "status"}, + strict: false, + keepEmpty: false, expectedFields: 2, // 2 columns: level, status expectedOutput: arrowtest.Rows{ { @@ -641,6 +662,8 @@ func TestEvaluateParseExpression_Logfmt(t *testing.T) { {colMsg: "level=info"}, }, requestedKeys: []string{"level"}, + strict: false, + keepEmpty: false, expectedFields: 1, // 1 column: level expectedOutput: arrowtest.Rows{ { @@ -665,6 +688,8 @@ func TestEvaluateParseExpression_Logfmt(t *testing.T) { {colMsg: "level=\"unclosed status=500"}, // Unclosed quote error }, requestedKeys: []string{"level", "status"}, + strict: false, + keepEmpty: false, expectedFields: 4, // 4 columns: level, status, __error__, __error_details__ expectedOutput: arrowtest.Rows{ { @@ -698,7 +723,9 @@ func TestEvaluateParseExpression_Logfmt(t *testing.T) { {colMsg: "level=error status=500 method=POST duration=123ms"}, }, requestedKeys: nil, // nil means extract all keys - expectedFields: 5, // 5 columns: code, duration, level, method, status + strict: false, + keepEmpty: false, + expectedFields: 5, // 5 columns: code, duration, level, method, status expectedOutput: arrowtest.Rows{ { "utf8.parsed.code": nil, @@ -735,7 +762,9 @@ func TestEvaluateParseExpression_Logfmt(t *testing.T) { {colMsg: "level=debug method=POST"}, // Valid line }, requestedKeys: nil, // nil means extract all keys - expectedFields: 5, // 5 columns: level, method, status, __error__, __error_details__ + strict: false, + keepEmpty: false, + expectedFields: 5, // 5 columns: level, method, status, __error__, __error_details__ expectedOutput: arrowtest.Rows{ { "utf8.parsed.level": "info", @@ -767,6 +796,350 @@ func TestEvaluateParseExpression_Logfmt(t *testing.T) { }, }, }, + { + name: "strict mode stops on first error", + schema: arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + }, nil), + input: arrowtest.Rows{ + {colMsg: "level=info status=200 method=GET"}, // Valid line + {colMsg: "level==error code=500"}, // Double equals error + {colMsg: "level=debug method=POST"}, // Valid line + }, + requestedKeys: nil, + strict: true, + keepEmpty: false, + expectedFields: 5, // level, method, status, __error__, __error_details__ + expectedOutput: arrowtest.Rows{ + { + "utf8.parsed.level": "info", + "utf8.parsed.method": "GET", + "utf8.parsed.status": "200", + "utf8.generated.__error__": nil, + "utf8.generated.__error_details__": nil, + }, + { + "utf8.parsed.level": nil, + "utf8.parsed.method": nil, + "utf8.parsed.status": nil, + "utf8.generated.__error__": "LogfmtParserErr", + "utf8.generated.__error_details__": "logfmt syntax error at pos 7 : unexpected '='", + }, + { + "utf8.parsed.level": "debug", + "utf8.parsed.method": "POST", + "utf8.parsed.status": nil, + "utf8.generated.__error__": nil, + "utf8.generated.__error_details__": nil, + }, + }, + }, + { + name: "non-strict mode continues parsing on error", + schema: arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + }, nil), + input: arrowtest.Rows{ + {colMsg: "level=info status=200 method=GET"}, // Valid line + {colMsg: "code=500 level= error=true"}, // All fields parsed successfully (no double equals) + {colMsg: "level=debug method=POST"}, // Valid line + }, + requestedKeys: nil, + strict: false, + keepEmpty: false, + expectedFields: 5, // code, error, level, method, status (no errors, so no error columns) + expectedOutput: arrowtest.Rows{ + { + "utf8.parsed.code": nil, + "utf8.parsed.error": nil, + "utf8.parsed.level": "info", + "utf8.parsed.method": "GET", + "utf8.parsed.status": "200", + }, + { + "utf8.parsed.code": "500", + "utf8.parsed.error": "true", + "utf8.parsed.level": nil, + "utf8.parsed.method": nil, + "utf8.parsed.status": nil, + }, + { + "utf8.parsed.code": nil, + "utf8.parsed.error": nil, + "utf8.parsed.level": "debug", + "utf8.parsed.method": "POST", + "utf8.parsed.status": nil, + }, + }, + }, + { + name: "keepEmpty mode retains empty values", + schema: arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + }, nil), + input: arrowtest.Rows{ + {colMsg: "level=info status= method=GET"}, + {colMsg: "level= status=200"}, + }, + requestedKeys: nil, + strict: false, + keepEmpty: true, + expectedFields: 3, // level, method, status + expectedOutput: arrowtest.Rows{ + { + "utf8.parsed.level": "info", + "utf8.parsed.method": "GET", + "utf8.parsed.status": "", + }, + { + "utf8.parsed.level": "", + "utf8.parsed.method": nil, + "utf8.parsed.status": "200", + }, + }, + }, + { + name: "without keepEmpty mode skips empty values", + schema: arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + }, nil), + input: arrowtest.Rows{ + {colMsg: "level=info status= method=GET"}, + {colMsg: "level= status=200"}, + }, + requestedKeys: nil, + strict: false, + keepEmpty: false, + expectedFields: 3, // level, method, status + expectedOutput: arrowtest.Rows{ + { + "utf8.parsed.level": "info", + "utf8.parsed.method": "GET", + "utf8.parsed.status": nil, + }, + { + "utf8.parsed.level": nil, + "utf8.parsed.method": nil, + "utf8.parsed.status": "200", + }, + }, + }, + { + name: "strict and keepEmpty both enabled", + schema: arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + }, nil), + input: arrowtest.Rows{ + {colMsg: "level= status=200"}, + {colMsg: "level==error method=POST"}, + }, + requestedKeys: nil, + strict: true, + keepEmpty: true, + expectedFields: 4, // level, status, __error__, __error_details__ + expectedOutput: arrowtest.Rows{ + { + "utf8.parsed.level": "", + "utf8.parsed.status": "200", + "utf8.generated.__error__": nil, + "utf8.generated.__error_details__": nil, + }, + { + "utf8.parsed.level": nil, + "utf8.parsed.status": nil, + "utf8.generated.__error__": "LogfmtParserErr", + "utf8.generated.__error_details__": "logfmt syntax error at pos 7 : unexpected '='", + }, + }, + }, + { + name: "strict mode with requested keys stops on error", + schema: arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + }, nil), + input: arrowtest.Rows{ + {colMsg: "level=info status=200 method=GET"}, + {colMsg: "level==error status=500"}, // Error in requested key + {colMsg: "level=debug status=201"}, + }, + requestedKeys: []string{"level", "status"}, + strict: true, + keepEmpty: false, + expectedFields: 4, // level, status, __error__, __error_details__ + expectedOutput: arrowtest.Rows{ + { + "utf8.parsed.level": "info", + "utf8.parsed.status": "200", + "utf8.generated.__error__": nil, + "utf8.generated.__error_details__": nil, + }, + { + "utf8.parsed.level": nil, + "utf8.parsed.status": nil, + "utf8.generated.__error__": "LogfmtParserErr", + "utf8.generated.__error_details__": "logfmt syntax error at pos 7 : unexpected '='", + }, + { + "utf8.parsed.level": "debug", + "utf8.parsed.status": "201", + "utf8.generated.__error__": nil, + "utf8.generated.__error_details__": nil, + }, + }, + }, + { + name: "keepEmpty with requested keys retains empty values", + schema: arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + }, nil), + input: arrowtest.Rows{ + {colMsg: "level=info status= method=GET"}, + {colMsg: "level= status=200 method=POST"}, + {colMsg: "level=debug status=404"}, + }, + requestedKeys: []string{"level", "status"}, + strict: false, + keepEmpty: true, + expectedFields: 2, // level, status (no errors) + expectedOutput: arrowtest.Rows{ + { + "utf8.parsed.level": "info", + "utf8.parsed.status": "", + }, + { + "utf8.parsed.level": "", + "utf8.parsed.status": "200", + }, + { + "utf8.parsed.level": "debug", + "utf8.parsed.status": "404", + }, + }, + }, + { + name: "without keepEmpty and requested keys skips empty values", + schema: arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + }, nil), + input: arrowtest.Rows{ + {colMsg: "level=info status= method=GET"}, + {colMsg: "level= status=200 method=POST"}, + {colMsg: "level=debug status=404"}, + }, + requestedKeys: []string{"level", "status"}, + strict: false, + keepEmpty: false, + expectedFields: 2, // level, status (no errors) + expectedOutput: arrowtest.Rows{ + { + "utf8.parsed.level": "info", + "utf8.parsed.status": nil, + }, + { + "utf8.parsed.level": nil, + "utf8.parsed.status": "200", + }, + { + "utf8.parsed.level": "debug", + "utf8.parsed.status": "404", + }, + }, + }, + { + name: "strict and keepEmpty with requested keys", + schema: arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + }, nil), + input: arrowtest.Rows{ + {colMsg: "level= status=200 method=GET"}, + {colMsg: "level=info status= method=POST"}, + {colMsg: "level==error status=500"}, + }, + requestedKeys: []string{"level", "status"}, + strict: true, + keepEmpty: true, + expectedFields: 4, // level, status, __error__, __error_details__ + expectedOutput: arrowtest.Rows{ + { + "utf8.parsed.level": "", + "utf8.parsed.status": "200", + "utf8.generated.__error__": nil, + "utf8.generated.__error_details__": nil, + }, + { + "utf8.parsed.level": "info", + "utf8.parsed.status": "", + "utf8.generated.__error__": nil, + "utf8.generated.__error_details__": nil, + }, + { + "utf8.parsed.level": nil, + "utf8.parsed.status": nil, + "utf8.generated.__error__": "LogfmtParserErr", + "utf8.generated.__error_details__": "logfmt syntax error at pos 7 : unexpected '='", + }, + }, + }, + { + name: "strict mode does not ignore errors in non-requested keys", + schema: arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + }, nil), + input: arrowtest.Rows{ + {colMsg: "level=info status=200 bad==value"}, // Error in non-requested key still causes error + {colMsg: "level=warn status=404"}, + }, + strict: true, + keepEmpty: false, + requestedKeys: []string{"level", "status"}, + expectedFields: 4, // level, status, __error__, __error_details__ + expectedOutput: arrowtest.Rows{ + { + "utf8.parsed.level": nil, + "utf8.parsed.status": nil, + "utf8.generated.__error__": "LogfmtParserErr", + "utf8.generated.__error_details__": "logfmt syntax error at pos 27 : unexpected '='", + }, + { + "utf8.parsed.level": "warn", + "utf8.parsed.status": "404", + "utf8.generated.__error__": nil, + "utf8.generated.__error_details__": nil, + }, + }, + }, + { + name: "keepEmpty only affects present keys with empty values", + schema: arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN("utf8.builtin.message", true), + }, nil), + input: arrowtest.Rows{ + {colMsg: "level=info status="}, + {colMsg: "level= method=GET"}, + {colMsg: "status=200"}, + }, + strict: false, + keepEmpty: true, + requestedKeys: []string{"level", "status", "method"}, + expectedFields: 3, // level, status, method + expectedOutput: arrowtest.Rows{ + { + "utf8.parsed.level": "info", + "utf8.parsed.status": "", + "utf8.parsed.method": nil, + }, + { + "utf8.parsed.level": "", + "utf8.parsed.status": nil, + "utf8.parsed.method": "GET", + }, + { + "utf8.parsed.level": nil, + "utf8.parsed.status": "200", + "utf8.parsed.method": nil, + }, + }, + }, } { t.Run(tt.name, func(t *testing.T) { expr := &physical.VariadicExpr{ @@ -778,6 +1151,12 @@ func TestEvaluateParseExpression_Logfmt(t *testing.T) { &physical.LiteralExpr{ Literal: types.NewLiteral(tt.requestedKeys), }, + &physical.LiteralExpr{ + Literal: types.NewLiteral(tt.strict), + }, + &physical.LiteralExpr{ + Literal: types.NewLiteral(tt.keepEmpty), + }, }, } e := newExpressionEvaluator() @@ -792,7 +1171,7 @@ func TestEvaluateParseExpression_Logfmt(t *testing.T) { require.True(t, ok) defer arr.Release() - require.Equal(t, tt.expectedFields, arr.NumField()) //value, error, errorDetails + require.Equal(t, tt.expectedFields, arr.NumField()) // Convert record to rows for comparison actual, err := structToRows(arr) @@ -802,25 +1181,6 @@ func TestEvaluateParseExpression_Logfmt(t *testing.T) { } } -func structToRows(structArr *array.Struct) (arrowtest.Rows, error) { - // Get the struct type to extract schema information - structType := structArr.DataType().(*arrow.StructType) - - // Create schema from struct fields - schema := arrow.NewSchema(structType.Fields(), nil) - - // Extract field arrays from the struct - columns := make([]arrow.Array, structArr.NumField()) - for i := 0; i < structArr.NumField(); i++ { - columns[i] = structArr.Field(i) - } - - // Create and return the record - record := array.NewRecord(schema, columns, int64(structArr.Len())) - defer record.Release() - return arrowtest.RecordRows(record) -} - func TestEvaluateParseExpression_JSON(t *testing.T) { var ( colTS = "timestamp_ns.builtin.timestamp" @@ -1174,6 +1534,12 @@ func TestEvaluateParseExpression_JSON(t *testing.T) { &physical.LiteralExpr{ Literal: types.NewLiteral(tt.requestedKeys), }, + &physical.LiteralExpr{ + Literal: types.NewLiteral(false), + }, + &physical.LiteralExpr{ + Literal: types.NewLiteral(false), + }, }, } e := newExpressionEvaluator() @@ -1187,7 +1553,7 @@ func TestEvaluateParseExpression_JSON(t *testing.T) { arr, ok := col.(*array.Struct) require.True(t, ok) - require.Equal(t, tt.expectedFields, arr.NumField()) //value, error, errorDetails + require.Equal(t, tt.expectedFields, arr.NumField()) // value, error, errorDetails // Convert record to rows for comparison actual, err := structToRows(arr) @@ -1196,50 +1562,3 @@ func TestEvaluateParseExpression_JSON(t *testing.T) { }) } } - -func TestEvaluateParseExpression_HandleMissingRequestedKeys(t *testing.T) { - colMsg := "utf8.builtin.message" - alloc := memory.DefaultAllocator - - expr := &physical.VariadicExpr{ - Op: types.VariadicOpParseJSON, - Expressions: []physical.Expression{ - &physical.ColumnExpr{ - Ref: semconv.ColumnIdentMessage.ColumnRef(), - }, - }, - } - e := newExpressionEvaluator() - - input := arrowtest.Rows{ - {colMsg: `{"level": "error", "status": "500"}`}, - {colMsg: `{"level": "info", "status": "200"}`}, - {colMsg: `{"level": "debug", "status": "201"}`}, - } - - schema := arrow.NewSchema([]arrow.Field{ - semconv.FieldFromFQN("utf8.builtin.message", true), - }, nil) - - record := input.Record(alloc, schema) - col, err := e.eval(expr, record) - require.NoError(t, err) - id := col.DataType().ID() - require.Equal(t, arrow.STRUCT, id) - - arr, ok := col.(*array.Struct) - require.True(t, ok) - - require.Equal(t, 2, arr.NumField()) //level, status - - // Convert record to rows for comparison - actual, err := structToRows(arr) - require.NoError(t, err) - - expectedOutput := arrowtest.Rows{ - {"utf8.parsed.level": "error", "utf8.parsed.status": "500"}, - {"utf8.parsed.level": "info", "utf8.parsed.status": "200"}, - {"utf8.parsed.level": "debug", "utf8.parsed.status": "201"}, - } - require.Equal(t, expectedOutput, actual) -} diff --git a/pkg/engine/internal/executor/parse.go b/pkg/engine/internal/executor/parse.go index 94f86d68b0663..e054c5569496f 100644 --- a/pkg/engine/internal/executor/parse.go +++ b/pkg/engine/internal/executor/parse.go @@ -16,7 +16,7 @@ import ( func parseFn(op types.VariadicOp) VariadicFunction { return VariadicFunctionFunc(func(args ...arrow.Array) (arrow.Array, error) { - sourceCol, requestedKeys, err := extractParseFnParameters(args) + sourceCol, requestedKeys, strict, keepEmpty, err := extractParseFnParameters(args) if err != nil { panic(err) } @@ -25,7 +25,7 @@ func parseFn(op types.VariadicOp) VariadicFunction { var parsedColumns []arrow.Array switch op { case types.VariadicOpParseLogfmt: - headers, parsedColumns = buildLogfmtColumns(sourceCol, requestedKeys) + headers, parsedColumns = buildLogfmtColumns(sourceCol, requestedKeys, strict, keepEmpty) case types.VariadicOpParseJSON: headers, parsedColumns = buildJSONColumns(sourceCol, requestedKeys) default: @@ -51,58 +51,75 @@ func parseFn(op types.VariadicOp) VariadicFunction { }) } -func extractParseFnParameters(args []arrow.Array) (*array.String, []string, error) { +func extractParseFnParameters(args []arrow.Array) (*array.String, []string, bool, bool, error) { // Valid signatures: - //parse(sourceColVec) - //parse(sourceColVec, requestedKeys) - if len(args) < 1 || len(args) > 2 { - return nil, nil, fmt.Errorf("parse function expected 1 or 2 arguments, got %d", len(args)) + // parse(sourceColVec, requestedKeys, strict, keepEmpty) + if len(args) != 4 { + return nil, nil, false, false, fmt.Errorf("parse function expected 4 arguments, got %d", len(args)) } - var sourceColArr, requestedKeysArr arrow.Array + var sourceColArr, requestedKeysArr, strictArr, keepEmptyArr arrow.Array sourceColArr = args[0] - if len(args) == 2 { - requestedKeysArr = args[1] - } + requestedKeysArr = args[1] + strictArr = args[2] + keepEmptyArr = args[3] if sourceColArr == nil { - return nil, nil, fmt.Errorf("parse function arguments did not include a source ColumnVector to parse") + return nil, nil, false, false, fmt.Errorf("parse function arguments did not include a source ColumnVector to parse") } sourceCol, ok := sourceColArr.(*array.String) if !ok { - return nil, nil, fmt.Errorf("parse can only operate on string column types, got %T", sourceColArr) + return nil, nil, false, false, fmt.Errorf("parse can only operate on string column types, got %T", sourceColArr) } var requestedKeys []string + var strict, keepEmpty bool - // Rquested keys will be the same for all rows, so we only need the first one + // Requested keys will be the same for all rows, so we only need the first one reqKeysIdx := 0 - if requestedKeysArr == nil || requestedKeysArr.IsNull(reqKeysIdx) { - return sourceCol, requestedKeys, nil + if requestedKeysArr != nil && !requestedKeysArr.IsNull(reqKeysIdx) { + reqKeysList, ok := requestedKeysArr.(*array.List) + if !ok { + return nil, nil, false, false, fmt.Errorf("requested keys must be a list of string arrays, got %T", requestedKeysArr) + } + + firstRow, ok := util.ArrayListValue(reqKeysList, reqKeysIdx).([]string) + if !ok { + return nil, nil, false, false, fmt.Errorf("requested keys must be a list of string arrays, got a list of %T", firstRow) + } + requestedKeys = append(requestedKeys, firstRow...) } - reqKeysList, ok := requestedKeysArr.(*array.List) - if !ok { - return nil, nil, fmt.Errorf("requested keys must be a list of string arrays, got %T", requestedKeysArr) + // Extract strict flag (boolean scalar array) + if strictArr != nil && strictArr.Len() > 0 { + boolArr, ok := strictArr.(*array.Boolean) + if !ok { + return nil, nil, false, false, fmt.Errorf("strict flag must be a boolean, got %T", strictArr) + } + strict = boolArr.Value(0) } - firstRow, ok := util.ArrayListValue(reqKeysList, reqKeysIdx).([]string) - if !ok { - return nil, nil, fmt.Errorf("requested keys must be a list of string arrays, got a list of %T", firstRow) + // Extract keepEmpty flag (boolean scalar array) + if keepEmptyArr != nil && keepEmptyArr.Len() > 0 { + boolArr, ok := keepEmptyArr.(*array.Boolean) + if !ok { + return nil, nil, false, false, fmt.Errorf("keepEmpty flag must be a boolean, got %T", keepEmptyArr) + } + keepEmpty = boolArr.Value(0) } - requestedKeys = append(requestedKeys, firstRow...) - return sourceCol, requestedKeys, nil + + return sourceCol, requestedKeys, strict, keepEmpty, nil } // parseFunc represents a function that parses a single line and returns key-value pairs -type parseFunc func(line string, requestedKeys []string) (map[string]string, error) +type parseFunc func(line string) (map[string]string, error) // buildColumns builds Arrow columns from input lines using the provided parser // Returns the column headers, the Arrow columns, and any error -func buildColumns(input *array.String, requestedKeys []string, parseFunc parseFunc, errorType string) ([]string, []arrow.Array) { +func buildColumns(input *array.String, _ []string, parseFunc parseFunc, errorType string) ([]string, []arrow.Array) { columnBuilders := make(map[string]*array.StringBuilder) - columnOrder := parseLines(input, requestedKeys, columnBuilders, parseFunc, errorType) + columnOrder := parseLines(input, columnBuilders, parseFunc, errorType) // Build final arrays columns := make([]arrow.Array, 0, len(columnOrder)) @@ -118,14 +135,14 @@ func buildColumns(input *array.String, requestedKeys []string, parseFunc parseFu } // parseLines discovers columns dynamically as lines are parsed -func parseLines(input *array.String, requestedKeys []string, columnBuilders map[string]*array.StringBuilder, parseFunc parseFunc, errorType string) []string { +func parseLines(input *array.String, columnBuilders map[string]*array.StringBuilder, parseFunc parseFunc, errorType string) []string { columnOrder := []string{} var errorBuilder, errorDetailsBuilder *array.StringBuilder hasErrorColumns := false for i := 0; i < input.Len(); i++ { line := input.Value(i) - parsed, err := parseFunc(line, requestedKeys) + parsed, err := parseFunc(line) // Handle error columns if err != nil { diff --git a/pkg/engine/internal/executor/parse_json.go b/pkg/engine/internal/executor/parse_json.go index 508da25997a3f..9f10ab90b97e9 100644 --- a/pkg/engine/internal/executor/parse_json.go +++ b/pkg/engine/internal/executor/parse_json.go @@ -32,7 +32,10 @@ var ( ) func buildJSONColumns(input *array.String, requestedKeys []string) ([]string, []arrow.Array) { - return buildColumns(input, requestedKeys, parseJSONLine, types.JSONParserErrorType) + parseFunc := func(line string) (map[string]string, error) { + return parseJSONLine(line, requestedKeys) + } + return buildColumns(input, requestedKeys, parseFunc, types.JSONParserErrorType) } // parseJSONLine parses a single JSON line and extracts key-value pairs diff --git a/pkg/engine/internal/executor/parse_logfmt.go b/pkg/engine/internal/executor/parse_logfmt.go index a81726c4c433d..dc67f287e695c 100644 --- a/pkg/engine/internal/executor/parse_logfmt.go +++ b/pkg/engine/internal/executor/parse_logfmt.go @@ -8,14 +8,19 @@ import ( "github.com/grafana/loki/v3/pkg/logql/log/logfmt" ) -func buildLogfmtColumns(input *array.String, requestedKeys []string) ([]string, []arrow.Array) { - return buildColumns(input, requestedKeys, tokenizeLogfmt, types.LogfmtParserErrorType) +func buildLogfmtColumns(input *array.String, requestedKeys []string, strict bool, keepEmpty bool) ([]string, []arrow.Array) { + parseFunc := func(line string) (map[string]string, error) { + return tokenizeLogfmt(line, requestedKeys, strict, keepEmpty) + } + return buildColumns(input, requestedKeys, parseFunc, types.LogfmtParserErrorType) } // tokenizeLogfmt parses logfmt input using the standard decoder // Returns a map of key-value pairs with first-wins semantics for duplicates // If requestedKeys is provided, the result will be filtered to only include those keys -func tokenizeLogfmt(input string, requestedKeys []string) (map[string]string, error) { +// If strict is true, parsing stops on the first error +// If keepEmpty is true, empty values are retained in the result +func tokenizeLogfmt(input string, requestedKeys []string, strict bool, keepEmpty bool) (map[string]string, error) { result := make(map[string]string) var requestedKeyLookup map[string]struct{} @@ -36,8 +41,8 @@ func tokenizeLogfmt(input string, requestedKeys []string) (map[string]string, er } val := decoder.Value() - if len(val) == 0 { - // TODO: retain empty values if --keep-empty is set + if !keepEmpty && len(val) == 0 { + // Skip empty values unless keepEmpty is set continue } @@ -51,6 +56,11 @@ func tokenizeLogfmt(input string, requestedKeys []string) (map[string]string, er // Check for parsing errors if err := decoder.Err(); err != nil { + if strict { + // In strict mode, return the error immediately + return nil, err + } + // In non-strict mode, return partial results with the error return result, err } diff --git a/pkg/engine/internal/planner/logical/builder.go b/pkg/engine/internal/planner/logical/builder.go index 9ed5839ae88d4..6f8fbf7a21737 100644 --- a/pkg/engine/internal/planner/logical/builder.go +++ b/pkg/engine/internal/planner/logical/builder.go @@ -40,7 +40,7 @@ func (b *Builder) Limit(skip uint32, fetch uint32) *Builder { } // Parse applies a [Parse] operation to the Builder. -func (b *Builder) Parse(op types.VariadicOp) *Builder { +func (b *Builder) Parse(op types.VariadicOp, strict bool, keepEmpty bool) *Builder { val := &FunctionOp{ Op: op, Values: []Value{ @@ -48,6 +48,10 @@ func (b *Builder) Parse(op types.VariadicOp) *Builder { &ColumnRef{ Ref: semconv.ColumnIdentMessage.ColumnRef(), }, + // nil for requested keys (to be filled in by projection pushdown optimizer) + NewLiteral([]string{}), + NewLiteral(strict), + NewLiteral(keepEmpty), }, } return b.ProjectExpand(val) diff --git a/pkg/engine/internal/planner/logical/planner.go b/pkg/engine/internal/planner/logical/planner.go index eea821e163f5b..69ec417f3d889 100644 --- a/pkg/engine/internal/planner/logical/planner.go +++ b/pkg/engine/internal/planner/logical/planner.go @@ -14,8 +14,10 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" ) -var errUnimplemented = errors.New("query contains unimplemented features") -var unimplementedFeature = func(s string) error { return fmt.Errorf("%w: %s", errUnimplemented, s) } +var ( + errUnimplemented = errors.New("query contains unimplemented features") + unimplementedFeature = func(s string) error { return fmt.Errorf("%w: %s", errUnimplemented, s) } +) // BuildPlan converts a LogQL query represented as [logql.Params] into a logical [Plan]. // It may return an error as second argument in case the traversal of the AST of the query fails. @@ -66,6 +68,8 @@ func buildPlanForLogQuery( predicates []Value postParsePredicates []Value hasLogfmtParser bool + logfmtStrict bool + logfmtKeepEmpty bool hasJSONParser bool ) @@ -84,17 +88,9 @@ func buildPlanForLogQuery( // which would lead to multiple predicates of the same expression. return false // do not traverse children case *syntax.LogfmtParserExpr: - // TODO: support --strict and --keep-empty - if e.Strict { - err = errUnimplemented - return false - } - if e.KeepEmpty { - err = errUnimplemented - return false - } - hasLogfmtParser = true + logfmtStrict = e.Strict + logfmtKeepEmpty = e.KeepEmpty return true // continue traversing to find label filters case *syntax.LineParserExpr: switch e.Op { @@ -190,10 +186,11 @@ func buildPlanForLogQuery( // for example, the query `{app="foo"} | json | line_format "{{.nested_json}}" | json ` is valid, and will need // multiple parse stages. We will handle this in a future PR. if hasLogfmtParser { - builder = builder.Parse(types.VariadicOpParseLogfmt) + builder = builder.Parse(types.VariadicOpParseLogfmt, logfmtStrict, logfmtKeepEmpty) } if hasJSONParser { - builder = builder.Parse(types.VariadicOpParseJSON) + // JSON has no parameters + builder = builder.Parse(types.VariadicOpParseJSON, false, false) } for _, value := range postParsePredicates { builder = builder.Select(value) @@ -265,11 +262,11 @@ func walkRangeAggregation(e *syntax.RangeAggregationExpr, params logql.Params) ( rangeAggType = types.RangeAggregationTypeCount case syntax.OpRangeTypeSum: rangeAggType = types.RangeAggregationTypeSum - //case syntax.OpRangeTypeMax: + // case syntax.OpRangeTypeMax: // rangeAggType = types.RangeAggregationTypeMax - //case syntax.OpRangeTypeMin: + // case syntax.OpRangeTypeMin: // rangeAggType = types.RangeAggregationTypeMin - //case syntax.OpRangeTypeBytesRate: + // case syntax.OpRangeTypeBytesRate: // rangeAggType = types.RangeAggregationTypeBytes // bytes_rate is implemented as bytes_over_time/$interval case syntax.OpRangeTypeRate: if e.Left.Unwrap != nil { @@ -286,7 +283,7 @@ func walkRangeAggregation(e *syntax.RangeAggregationExpr, params logql.Params) ( ) switch e.Operation { - //case syntax.OpRangeTypeBytesRate: + // case syntax.OpRangeTypeBytesRate: // // bytes_rate is implemented as bytes_over_time/$interval // builder = builder.BinOpRight(types.BinaryOpDiv, &Literal{ // Literal: NewLiteral(rangeInterval.Seconds()), @@ -445,11 +442,11 @@ func convertVectorAggregationType(op string) types.VectorAggregationType { switch op { case syntax.OpTypeSum: return types.VectorAggregationTypeSum - //case syntax.OpTypeCount: + // case syntax.OpTypeCount: // return types.VectorAggregationTypeCount - //case syntax.OpTypeMax: + // case syntax.OpTypeMax: // return types.VectorAggregationTypeMax - //case syntax.OpTypeMin: + // case syntax.OpTypeMin: // return types.VectorAggregationTypeMin default: return types.VectorAggregationTypeInvalid diff --git a/pkg/engine/internal/planner/logical/planner_test.go b/pkg/engine/internal/planner/logical/planner_test.go index 6d6baf7dd595e..afccf6940d022 100644 --- a/pkg/engine/internal/planner/logical/planner_test.go +++ b/pkg/engine/internal/planner/logical/planner_test.go @@ -437,7 +437,7 @@ func TestPlannerCreatesProjectionWithParseOperation(t *testing.T) { %4 = SELECT %2 [predicate=%3] %5 = LT builtin.timestamp 1970-01-01T02:00:00Z %6 = SELECT %4 [predicate=%5] -%7 = PROJECT %6 [mode=*E, expr=PARSE_LOGFMT(builtin.message)] +%7 = PROJECT %6 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)] %8 = EQ ambiguous.level "error" %9 = SELECT %7 [predicate=%8] %10 = RANGE_AGGREGATION %9 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] @@ -468,7 +468,7 @@ RETURN %12 %4 = SELECT %2 [predicate=%3] %5 = LT builtin.timestamp 1970-01-01T02:00:00Z %6 = SELECT %4 [predicate=%5] -%7 = PROJECT %6 [mode=*E, expr=PARSE_LOGFMT(builtin.message)] +%7 = PROJECT %6 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)] %8 = EQ ambiguous.level "error" %9 = SELECT %7 [predicate=%8] %10 = SORT %9 [column=builtin.timestamp, asc=false, nulls_first=false] @@ -499,7 +499,7 @@ RETURN %12 %4 = SELECT %2 [predicate=%3] %5 = LT builtin.timestamp 1970-01-01T02:00:00Z %6 = SELECT %4 [predicate=%5] -%7 = PROJECT %6 [mode=*E, expr=PARSE_JSON(builtin.message)] +%7 = PROJECT %6 [mode=*E, expr=PARSE_JSON(builtin.message, [], false, false)] %8 = EQ ambiguous.level "error" %9 = SELECT %7 [predicate=%8] %10 = RANGE_AGGREGATION %9 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] @@ -529,7 +529,7 @@ RETURN %12 %4 = SELECT %2 [predicate=%3] %5 = LT builtin.timestamp 1970-01-01T02:00:00Z %6 = SELECT %4 [predicate=%5] -%7 = PROJECT %6 [mode=*E, expr=PARSE_JSON(builtin.message)] +%7 = PROJECT %6 [mode=*E, expr=PARSE_JSON(builtin.message, [], false, false)] %8 = EQ ambiguous.level "error" %9 = SELECT %7 [predicate=%8] %10 = SORT %9 [column=builtin.timestamp, asc=false, nulls_first=false] @@ -568,7 +568,7 @@ RETURN %12 %8 = SELECT %6 [predicate=%7] %9 = SELECT %8 [predicate=%2] %10 = SELECT %9 [predicate=%3] -%11 = PROJECT %10 [mode=*E, expr=PARSE_LOGFMT(builtin.message)] +%11 = PROJECT %10 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)] %12 = EQ ambiguous.level "debug" %13 = SELECT %11 [predicate=%12] %14 = SORT %13 [column=builtin.timestamp, asc=false, nulls_first=false] @@ -605,7 +605,7 @@ RETURN %16 %8 = SELECT %6 [predicate=%7] %9 = SELECT %8 [predicate=%2] %10 = SELECT %9 [predicate=%3] -%11 = PROJECT %10 [mode=*E, expr=PARSE_LOGFMT(builtin.message)] +%11 = PROJECT %10 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)] %12 = EQ ambiguous.level "debug" %13 = SELECT %11 [predicate=%12] %14 = RANGE_AGGREGATION %13 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] diff --git a/pkg/engine/internal/planner/physical/optimizer.go b/pkg/engine/internal/planner/physical/optimizer.go index a738be40f7715..7093877f85da0 100644 --- a/pkg/engine/internal/planner/physical/optimizer.go +++ b/pkg/engine/internal/planner/physical/optimizer.go @@ -380,16 +380,22 @@ func (r *projectionPushdown) handleParse(expr *VariadicExpr, projections []Colum panic(err) } - existingKeys, ok := exprs.requestedKeysExpr.Literal.(types.StringListLiteral) - if !ok { - panic(fmt.Errorf("expected requested keys to be a list of strings, got %T", exprs.requestedKeysExpr.Literal)) - } - requestedKeys := make(map[string]bool) - for _, k := range existingKeys { - requestedKeys[k] = true + + // Handle both null and string list literals for requested keys + switch keys := exprs.requestedKeysExpr.Literal.(type) { + case types.StringListLiteral: + for _, k := range keys { + requestedKeys[k] = true + } + case types.NullLiteral: + // Start with empty set + default: + panic(fmt.Errorf("expected requested keys to be a list of strings or null, got %T", exprs.requestedKeysExpr.Literal)) } + initialKeyCount := len(requestedKeys) + for _, p := range ambiguousProjections { colExpr, ok := p.(*ColumnExpr) if !ok { @@ -402,7 +408,7 @@ func (r *projectionPushdown) handleParse(expr *VariadicExpr, projections []Colum } } - changed := len(requestedKeys) > len(existingKeys) + changed := len(requestedKeys) > initialKeyCount if changed { // Convert back to sorted slice newKeys := slices.Collect(maps.Keys(requestedKeys)) @@ -419,20 +425,20 @@ func (r *projectionPushdown) handleParse(expr *VariadicExpr, projections []Colum type parseExprs struct { sourceColumnExpr *ColumnExpr requestedKeysExpr *LiteralExpr + strictExpr *LiteralExpr + keepEmptyExpr *LiteralExpr } // Unpack unpacks the given expressions into valid expressions for parse. // Valid expressions for parse are ones that will evaluate into valid arguments for a [parseFn]. // The valid signatures for a [parseFn] are: -// parseFn(sourceCol [arrow.Array]) -// parseFn(sourceCol [arrow.Array], requestedKeys [arrow.Array]). +// parseFn(sourceCol [arrow.Array], requestedKeys [arrow.Array], strict [arrow.Array], keepEmpty [arrow.Array]). // // Therefore the valid exprssions are (order matters): -// [sourceColExpr *ColumnExpr] -> parseFn(sourceColVec arrow.Array) -// [sourceColExpr *ColumnExpr, requestedKeysExpr *LiteralExpr] -> parseFn(sourceColVec arrow.Array, requestedKeys arrow.Array) +// [sourceColExpr *ColumnExpr, requestedKeysExpr *LiteralExpr, strictExpr *LiteralExpr, keepEmptyExpr *LiteralExpr] -> parseFn(sourceColVec arrow.Array, requestedKeys arrow.Array, strict arrow.Array, keepEmpty arrow.Array) func (a *parseExprs) Unpack(exprs []Expression) error { - if len(exprs) < 1 || len(exprs) > 2 { - return fmt.Errorf("expected to unpack 1 or 2 expressions, got %d", len(exprs)) + if len(exprs) != 4 { + return fmt.Errorf("expected to unpack 4 expressions, got %d", len(exprs)) } var ok bool @@ -441,13 +447,17 @@ func (a *parseExprs) Unpack(exprs []Expression) error { return fmt.Errorf("expected source column to be a column expression, got %T", exprs[0]) } - if len(exprs) == 2 { - a.requestedKeysExpr, ok = exprs[1].(*LiteralExpr) - if !ok { - return fmt.Errorf("expected requested keys to be a literal expression, got %T", exprs[1]) - } - } else { - a.requestedKeysExpr = &LiteralExpr{Literal: types.NewLiteral([]string{})} + a.requestedKeysExpr, ok = exprs[1].(*LiteralExpr) + if !ok { + return fmt.Errorf("expected requested keys to be a literal expression, got %T", exprs[1]) + } + a.strictExpr, ok = exprs[2].(*LiteralExpr) + if !ok { + return fmt.Errorf("expected strict to be a literal expression, got %T", exprs[2]) + } + a.keepEmptyExpr, ok = exprs[3].(*LiteralExpr) + if !ok { + return fmt.Errorf("expected keepEmpty to be a literal expression, got %T", exprs[3]) } return nil @@ -456,15 +466,18 @@ func (a *parseExprs) Unpack(exprs []Expression) error { // Pack packs parse specific expressions back into generic expressions. // It will resues [dst] if has enough capacity, otherwise it will allocate a new slice. func (a *parseExprs) Pack(dst []Expression) []Expression { - if cap(dst) >= 2 { - dst = dst[:2] + if cap(dst) >= 4 { + dst = dst[:4] + clear(dst[4:]) } else { - dst = make([]Expression, 2) + dst = make([]Expression, 4) } // order matters dst[0] = a.sourceColumnExpr dst[1] = a.requestedKeysExpr + dst[2] = a.strictExpr + dst[3] = a.keepEmptyExpr return dst } diff --git a/pkg/engine/internal/planner/physical/optimizer_test.go b/pkg/engine/internal/planner/physical/optimizer_test.go index a7222ace49b97..6cab391e0a5bb 100644 --- a/pkg/engine/internal/planner/physical/optimizer_test.go +++ b/pkg/engine/internal/planner/physical/optimizer_test.go @@ -589,7 +589,7 @@ func TestProjectionPushdown_PushesRequestedKeysToParseOperations(t *testing.T) { }) // Add parse but no filters requiring parsed fields - builder = builder.Parse(types.VariadicOpParseLogfmt) + builder = builder.Parse(types.VariadicOpParseLogfmt, false, false) return builder.Value() }, }, @@ -609,7 +609,7 @@ func TestProjectionPushdown_PushesRequestedKeysToParseOperations(t *testing.T) { }) // Don't set RequestedKeys here - optimization should determine them - builder = builder.Parse(types.VariadicOpParseLogfmt) + builder = builder.Parse(types.VariadicOpParseLogfmt, false, false) // Add filter with ambiguous column filterExpr := &logical.BinOp{ @@ -636,7 +636,7 @@ func TestProjectionPushdown_PushesRequestedKeysToParseOperations(t *testing.T) { Shard: logical.NewShard(0, 1), }) - builder = builder.Parse(types.VariadicOpParseLogfmt) + builder = builder.Parse(types.VariadicOpParseLogfmt, false, false) // Add filter on label column (should be skipped) labelFilter := &logical.BinOp{ @@ -680,7 +680,7 @@ func TestProjectionPushdown_PushesRequestedKeysToParseOperations(t *testing.T) { Shard: logical.NewShard(0, 1), }) - builder = builder.Parse(types.VariadicOpParseLogfmt) + builder = builder.Parse(types.VariadicOpParseLogfmt, false, false) // Range aggregation with PartitionBy builder = builder.RangeAggregation( @@ -715,7 +715,7 @@ func TestProjectionPushdown_PushesRequestedKeysToParseOperations(t *testing.T) { }) // Don't set RequestedKeys here - optimization should determine them - builder = builder.Parse(types.VariadicOpParseLogfmt) + builder = builder.Parse(types.VariadicOpParseLogfmt, false, false) // Add filter with ambiguous column filterExpr := &logical.BinOp{ @@ -764,7 +764,7 @@ func TestProjectionPushdown_PushesRequestedKeysToParseOperations(t *testing.T) { }) // Add parse without specifying RequestedKeys - builder = builder.Parse(types.VariadicOpParseLogfmt) + builder = builder.Parse(types.VariadicOpParseLogfmt, false, false) // Add filter on ambiguous column filterExpr := &logical.BinOp{ @@ -831,9 +831,10 @@ func TestProjectionPushdown_PushesRequestedKeysToParseOperations(t *testing.T) { for _, expr := range projectionNode.Expressions { switch expr := expr.(type) { case *VariadicExpr: - for _, e := range expr.Expressions { - switch e := e.(type) { - case *LiteralExpr: + // Parse expressions: [sourceCol, requestedKeys, strict, keepEmpty] + // We want the requestedKeys (index 1) + if len(expr.Expressions) >= 2 { + if e, ok := expr.Expressions[1].(*LiteralExpr); ok { requestedKeys = e } } @@ -841,7 +842,17 @@ func TestProjectionPushdown_PushesRequestedKeysToParseOperations(t *testing.T) { } if len(tt.expectedParseKeysRequested) == 0 { - require.Nil(t, requestedKeys, "Projection should have no requested keys") + // When no keys are requested, we expect either nil or a NullLiteral or an empty list + if requestedKeys != nil { + switch lit := requestedKeys.Literal.(type) { + case types.NullLiteral: + // OK - null literal + case types.StringListLiteral: + require.Empty(t, lit.Value(), "Projection should have no requested keys") + default: + t.Fatalf("Unexpected literal type: %T", requestedKeys.Literal) + } + } } else { require.NotNil(t, requestedKeys, "Projection should have requested keys") actual := requestedKeys.Literal.(types.StringListLiteral) diff --git a/pkg/engine/internal/planner/physical/planner_test.go b/pkg/engine/internal/planner/physical/planner_test.go index 4389bb6129409..9fa945e45e834 100644 --- a/pkg/engine/internal/planner/physical/planner_test.go +++ b/pkg/engine/internal/planner/physical/planner_test.go @@ -270,7 +270,7 @@ func TestPlanner_Convert_WithParse(t *testing.T) { Shard: logical.NewShard(0, 1), }, ).Parse( - types.VariadicOpParseLogfmt, + types.VariadicOpParseLogfmt, false, false, ).Select( &logical.BinOp{ Left: logical.NewColumnRef("level", types.ColumnTypeAmbiguous), @@ -317,7 +317,7 @@ func TestPlanner_Convert_WithParse(t *testing.T) { require.Equal(t, types.VariadicOpParseLogfmt, expr.Op) funcArgs := expr.Expressions - require.Len(t, funcArgs, 1) + require.Len(t, funcArgs, 4) sourcCol, ok := funcArgs[0].(*ColumnExpr) require.True(t, ok) @@ -329,7 +329,7 @@ func TestPlanner_Convert_WithParse(t *testing.T) { require.NoError(t, err) funcArgs = expr.Expressions - require.Len(t, funcArgs, 1) + require.Len(t, funcArgs, 4) sourcCol, ok = funcArgs[0].(*ColumnExpr) require.True(t, ok) @@ -353,7 +353,7 @@ func TestPlanner_Convert_WithParse(t *testing.T) { Shard: logical.NewShard(0, 1), }, ).Parse( - types.VariadicOpParseLogfmt, + types.VariadicOpParseLogfmt, false, false, ).Select( &logical.BinOp{ Left: logical.NewColumnRef("level", types.ColumnTypeAmbiguous), @@ -413,7 +413,7 @@ func TestPlanner_Convert_WithParse(t *testing.T) { require.Equal(t, types.VariadicOpParseLogfmt, expr.Op) funcArgs := expr.Expressions - require.Len(t, funcArgs, 1) + require.Len(t, funcArgs, 4) sourcCol, ok := funcArgs[0].(*ColumnExpr) require.True(t, ok) @@ -425,7 +425,7 @@ func TestPlanner_Convert_WithParse(t *testing.T) { require.NoError(t, err) funcArgs = expr.Expressions - require.Len(t, funcArgs, 2) + require.Len(t, funcArgs, 4) sourcCol, ok = funcArgs[0].(*ColumnExpr) require.True(t, ok) diff --git a/pkg/engine/internal/planner/planner_test.go b/pkg/engine/internal/planner/planner_test.go index 5107ad4532417..97d49aec8d872 100644 --- a/pkg/engine/internal/planner/planner_test.go +++ b/pkg/engine/internal/planner/planner_test.go @@ -187,7 +187,7 @@ Limit offset=0 limit=1000 └── TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000 └── Filter predicate[0]=EQ(ambiguous.level, "error") └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message)) + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) └── Compat src=metadata dst=metadata collision=label └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) predicate[2]=MATCH_STR(builtin.message, "bar") ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() @@ -204,7 +204,7 @@ Limit offset=0 limit=1000 └── TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000 └── Projection all=true drop=(ambiguous.service_name, ambiguous.__error__) └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message)) + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) └── Compat src=metadata dst=metadata collision=label └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() @@ -225,7 +225,7 @@ VectorAggregation operation=sum group_by=(ambiguous.bar) └── Projection all=true expand=(CAST_DURATION(ambiguous.request_duration)) └── Filter predicate[0]=NEQ(ambiguous.request_duration, "") └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [bar, request_duration])) + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [bar, request_duration], false, false)) └── Compat src=metadata dst=metadata collision=label └── ScanSet num_targets=2 projections=(ambiguous.bar, builtin.message, ambiguous.request_duration, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() @@ -241,9 +241,9 @@ VectorAggregation operation=sum └── Parallelize └── Projection all=true drop=(ambiguous.__error__, ambiguous.__error_details__) └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_JSON(builtin.message, [])) + └── Projection all=true expand=(PARSE_JSON(builtin.message, [], false, false)) └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [])) + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) └── Filter predicate[0]=EQ(ambiguous.detected_level, "error") └── Compat src=metadata dst=metadata collision=label └── ScanSet num_targets=2 projections=(ambiguous.detected_level, builtin.message, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) @@ -275,7 +275,7 @@ Limit offset=0 limit=1000 └── Parallelize └── TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000 └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message)) + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) └── Compat src=metadata dst=metadata collision=label └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() @@ -290,7 +290,7 @@ VectorAggregation operation=sum group_by=(ambiguous.bar) └── RangeAggregation operation=count start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s partition_by=(ambiguous.bar) └── Parallelize └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [bar])) + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [bar], false, false)) └── Compat src=metadata dst=metadata collision=label └── ScanSet num_targets=2 projections=(ambiguous.bar, builtin.message, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() diff --git a/pkg/engine/internal/types/literal.go b/pkg/engine/internal/types/literal.go index acca5e0a65de1..a919a331ea91d 100644 --- a/pkg/engine/internal/types/literal.go +++ b/pkg/engine/internal/types/literal.go @@ -12,8 +12,7 @@ import ( "github.com/grafana/loki/v3/pkg/engine/internal/util" ) -type NullLiteral struct { -} +type NullLiteral struct{} // String implements Literal. func (n NullLiteral) String() string {