From 532aa042577947477f9378d5dd46843045907295 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Wed, 3 Apr 2019 19:49:22 -0400 Subject: [PATCH 01/10] Add convert processor The `convert` processor converts a field in the event to a different type, such as converting a string to an integer. For a full description of the processor's capabilities see the add documentation. Closes #8124 --- CHANGELOG.next.asciidoc | 1 + libbeat/cmd/instance/imports.go | 1 + libbeat/docs/processors-using.asciidoc | 47 +++ libbeat/processors/convert/config.go | 133 ++++++++ libbeat/processors/convert/convert.go | 299 ++++++++++++++++++ libbeat/processors/convert/convert_test.go | 269 ++++++++++++++++ .../javascript/module/processor/processor.go | 2 + 7 files changed, 752 insertions(+) create mode 100644 libbeat/processors/convert/config.go create mode 100644 libbeat/processors/convert/convert.go create mode 100644 libbeat/processors/convert/convert_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cd521c0c70fa..a90fd7406f14 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -135,6 +135,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Updated go-seccomp-bpf library to v1.1.0 which updates syscall lists for Linux v5.0. {pull}NNNN[NNNN] - Add `add_observer_metadata` processor. {pull}11394[11394] - Add `decode_csv_fields` processor. {pull}11753[11753] +- Add `convert` processor for converting data types of fields. {issue}8124[8124] {pull}11686[11686] *Auditbeat* diff --git a/libbeat/cmd/instance/imports.go b/libbeat/cmd/instance/imports.go index 413961edd472..0d1dac5513f4 100644 --- a/libbeat/cmd/instance/imports.go +++ b/libbeat/cmd/instance/imports.go @@ -32,6 +32,7 @@ import ( _ "github.com/elastic/beats/libbeat/processors/add_observer_metadata" _ "github.com/elastic/beats/libbeat/processors/add_process_metadata" _ "github.com/elastic/beats/libbeat/processors/communityid" + _ "github.com/elastic/beats/libbeat/processors/convert" _ "github.com/elastic/beats/libbeat/processors/dissect" _ "github.com/elastic/beats/libbeat/processors/dns" _ "github.com/elastic/beats/libbeat/publisher/includes" // Register publisher pipeline modules diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 7a682217cc58..4e7a58459eaf 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -209,6 +209,7 @@ The supported processors are: * <> * <> * <> + * <> ifdef::has_decode_csv_fields_processor[] * <> endif::[] @@ -907,6 +908,52 @@ silently continue without adding the target field. The processor also accepts an optional `seed` parameter that must be a 16-bit unsigned integer. This value gets incorporated into all generated hashes. +[[convert]] +=== Convert + +The `convert` processor converts a field in the event to a different type, such +as converting a string to an integer. + +The supported types include: `integer`, `long`, `float`, `double`, `string`, +`boolean`, and `ip`. + +The `ip` type is effectively an alias for `string`, but with an added validation +that the value is an IPv4 or IPv6 address. + +[source,yaml] +---- +processors: + - convert: + fields: + - {from: "src_ip", to: "source.ip", type: "ip"} + - {from: "src_port", to: "source.port", type: "integer"} + ignore_missing: true + ignore_failure: true +---- + +The `convert` processor has the following configuration settings: + +`fields`:: (Required) This is the list of fields to convert. At least one item +must be contained in the list. Each item in the list must have a `from` key that +specifies the source field. The `to` key is optional and specifies where to +assign the converted value. If `to` is omitted then the `from` field is updated +in-place. The `type` key specifies the data type to convert the value to. If +`type` is omitted then the processor copies or renames the field without any +type conversion. + +`ignore_missing`:: (Optional) If `true` the processor continues to the next +field when the `from` key is not found in the event. If false then the processor +returns an error and does not process the remaining fields. Default is `false`. + +`ignore_failure`:: (Optional) If true type conversion failures are ignored and +the processor continues to the next field. + +`tag`:: (Optional) An identifier for this processor. Useful for debugging. + +`mode`:: (Optional) When both `from` and `to` are defined for a field then +`mode` controls whether to `copy` or `rename` the field when the type conversion +is successful. Default is `copy`. + [[drop-event]] === Drop events diff --git a/libbeat/processors/convert/config.go b/libbeat/processors/convert/config.go new file mode 100644 index 000000000000..2905914fa53d --- /dev/null +++ b/libbeat/processors/convert/config.go @@ -0,0 +1,133 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package convert + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" +) + +func defaultConfig() config { + return config{ + IgnoreMissing: false, + Mode: copyMode, + } +} + +type config struct { + Fields []field `config:"fields" validate:"required"` // List of fields to convert. + Tag string `config:"tag"` // Processor ID for debug and metrics. + IgnoreMissing bool `config:"ignore_missing"` // Skip field when From field is missing. + IgnoreFailure bool `config:"ignore_failure"` // Ignore conversion errors. + Mode mode `config:"mode"` // Mode (copy vs rename). +} + +type field struct { + From string `config:"from" validate:"required"` + To string `config:"to"` + Type dataType `config:"type"` +} + +func (f field) Validate() error { + if f.To == "" && f.Type == unset { + return errors.New("each field must have a 'to' or a 'type'") + } + return nil +} + +func (f field) String() string { + return fmt.Sprintf("{from=%v, to=%v, type=%v}", f.From, f.To, f.Type) +} + +type dataType uint8 + +// List of dataTypes. +const ( + unset dataType = iota + Integer + Long + Float + Double + String + Boolean + IP +) + +var dataTypeNames = map[dataType]string{ + unset: "[unset]", + Integer: "integer", + Long: "long", + Float: "float", + Double: "double", + String: "string", + Boolean: "boolean", + IP: "ip", +} + +func (dt dataType) String() string { + return dataTypeNames[dt] +} + +func (dt dataType) MarshalText() ([]byte, error) { + return []byte(dt.String()), nil +} + +func (dt *dataType) Unpack(s string) error { + s = strings.ToLower(s) + for typ, name := range dataTypeNames { + if s == name { + *dt = typ + return nil + } + } + return errors.Errorf("invalid data type: %v", s) +} + +type mode uint8 + +// List of modes. +const ( + copyMode mode = iota + renameMode +) + +var modeNames = map[mode]string{ + copyMode: "copy", + renameMode: "rename", +} + +func (m mode) String() string { + return modeNames[m] +} + +func (m mode) MarshalText() ([]byte, error) { + return []byte(m.String()), nil +} + +func (m *mode) Unpack(s string) error { + s = strings.ToLower(s) + for md, name := range modeNames { + if s == name { + *m = md + return nil + } + } + return errors.Errorf("invalid mode: %v", s) +} diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go new file mode 100644 index 000000000000..adfc5d710e70 --- /dev/null +++ b/libbeat/processors/convert/convert.go @@ -0,0 +1,299 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package convert + +import ( + "fmt" + "net" + "strconv" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" +) + +const logName = "processor.convert" + +func init() { + processors.RegisterPlugin("convert", New) +} + +type processor struct { + config + log *logp.Logger +} + +// New constructs a new convert processor. +func New(cfg *common.Config) (processors.Processor, error) { + c := defaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, errors.Wrap(err, "fail to unpack the convert processor configuration") + } + + return newConvert(c) +} + +func newConvert(c config) (*processor, error) { + log := logp.NewLogger(logName) + if c.Tag != "" { + log = log.With("instance_id", c.Tag) + } + + return &processor{config: c, log: log}, nil +} + +func (p *processor) String() string { + return fmt.Sprintf("convert=[fields=%v, ignore_failure=%v, ignore_missing=%v, instance_id=%v, mode=%v]", + p.Fields, p.IgnoreFailure, p.IgnoreMissing, p.Tag, p.Mode) +} + +func (p *processor) Run(event *beat.Event) (*beat.Event, error) { + for _, conv := range p.Fields { + v, _ := event.GetValue(conv.From) + if v == nil { + if !p.IgnoreMissing { + return event, annotateError(p.Tag, errors.Errorf("field [%v] is missing, cannot be converted to type [%v]", conv.From, conv.Type)) + } + continue + } + + if conv.Type > unset { + t, err := p.transformType(conv.Type, v) + if err != nil { + if !p.IgnoreFailure { + return event, annotateError(p.Tag, errors.Wrapf(err, "unable to convert field [%v] value [%v] to [%v]", conv.From, v, conv.Type)) + } + continue + } + v = t + } + + if conv.To != "" { + event.PutValue(conv.To, v) + + if p.Mode == renameMode { + event.Delete(conv.From) + } + } else { + // In-place conversion. + event.PutValue(conv.From, v) + } + } + + return event, nil +} + +func (p *processor) transformType(typ dataType, value interface{}) (interface{}, error) { + switch typ { + case String: + return toString(value) + case Long: + return toLong(value) + case Integer: + return toInteger(value) + case Float: + return toFloat(value) + case Double: + return toDouble(value) + case Boolean: + return toBoolean(value) + case IP: + return toIP(value) + default: + return value, nil + } +} + +func toString(value interface{}) (string, error) { + switch v := value.(type) { + case string: + return v, nil + default: + return fmt.Sprintf("%v", value), nil + } +} + +func toLong(value interface{}) (int64, error) { + switch v := value.(type) { + case string: + return strconv.ParseInt(v, 0, 64) + case int: + return int64(v), nil + case int8: + return int64(v), nil + case int16: + return int64(v), nil + case int32: + return int64(v), nil + case int64: + return v, nil + case uint: + return int64(v), nil + case uint8: + return int64(v), nil + case uint16: + return int64(v), nil + case uint32: + return int64(v), nil + case uint64: + return int64(v), nil + case float32: + return int64(v), nil + case float64: + return int64(v), nil + default: + return 0, errors.Errorf("invalid conversion of [%T] to long", value) + } +} + +func toInteger(value interface{}) (int32, error) { + switch v := value.(type) { + case string: + i, err := strconv.ParseInt(v, 0, 32) + return int32(i), err + case int: + return int32(v), nil + case int8: + return int32(v), nil + case int16: + return int32(v), nil + case int32: + return v, nil + case int64: + return int32(v), nil + case uint: + return int32(v), nil + case uint8: + return int32(v), nil + case uint16: + return int32(v), nil + case uint32: + return int32(v), nil + case uint64: + return int32(v), nil + case float32: + return int32(v), nil + case float64: + return int32(v), nil + default: + return 0, errors.Errorf("invalid conversion of [%T] to integer", value) + } +} + +func toFloat(value interface{}) (float32, error) { + switch v := value.(type) { + case string: + f, err := strconv.ParseFloat(v, 32) + return float32(f), err + case int: + return float32(v), nil + case int8: + return float32(v), nil + case int16: + return float32(v), nil + case int32: + return float32(v), nil + case int64: + return float32(v), nil + case uint: + return float32(v), nil + case uint8: + return float32(v), nil + case uint16: + return float32(v), nil + case uint32: + return float32(v), nil + case uint64: + return float32(v), nil + case float32: + return v, nil + case float64: + return float32(v), nil + default: + return 0, errors.Errorf("invalid conversion of [%T] to float", value) + } +} + +func toDouble(value interface{}) (float64, error) { + switch v := value.(type) { + case string: + f, err := strconv.ParseFloat(v, 64) + return float64(f), err + case int: + return float64(v), nil + case int8: + return float64(v), nil + case int16: + return float64(v), nil + case int32: + return float64(v), nil + case int64: + return float64(v), nil + case uint: + return float64(v), nil + case uint8: + return float64(v), nil + case uint16: + return float64(v), nil + case uint32: + return float64(v), nil + case uint64: + return float64(v), nil + case float32: + return float64(v), nil + case float64: + return v, nil + default: + return 0, errors.Errorf("invalid conversion of [%T] to float", value) + } +} + +func toBoolean(value interface{}) (bool, error) { + switch v := value.(type) { + case string: + return strconv.ParseBool(v) + case bool: + return v, nil + default: + return false, errors.Errorf("invalid conversion of [%T] to boolean", value) + } +} + +func toIP(value interface{}) (string, error) { + switch v := value.(type) { + case string: + // This is validating that the value is an IP. + if net.ParseIP(v) != nil { + return v, nil + } + } + return "", errors.Errorf("invalid conversion of [%T] to IP", value) +} + +func annotateError(id string, err error) error { + if err == nil { + return nil + } + if id != "" { + return errors.Wrapf(err, "failed in processor.convert with instance_id=%v", id) + } + return errors.Wrap(err, "failed in processor.convert") +} diff --git a/libbeat/processors/convert/convert_test.go b/libbeat/processors/convert/convert_test.go new file mode 100644 index 000000000000..6a76389decfd --- /dev/null +++ b/libbeat/processors/convert/convert_test.go @@ -0,0 +1,269 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package convert + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestConvert(t *testing.T) { + t.Run("ignore_missing", func(t *testing.T) { + c := defaultConfig() + c.Fields = append(c.Fields, field{From: "src", To: "dst", Type: Integer}) + + p, err := newConvert(c) + if err != nil { + t.Fatal(err) + } + + evt := &beat.Event{Fields: common.MapStr{}} + + _, err = p.Run(evt) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), "field [src] is missing") + } + + p.IgnoreMissing = true + _, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } + }) + + t.Run("ignore_failure", func(t *testing.T) { + c := defaultConfig() + c.Fields = append(c.Fields, field{From: "source.address", To: "source.ip", Type: IP}) + + p, err := newConvert(c) + if err != nil { + t.Fatal(err) + } + + evt := &beat.Event{Fields: common.MapStr{"source": common.MapStr{"address": "host.local"}}} + + _, err = p.Run(evt) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), "unable to convert") + } + + p.IgnoreFailure = true + _, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } + }) + + t.Run("mode", func(t *testing.T) { + c := defaultConfig() + c.Fields = append(c.Fields, field{From: "source.address", To: "source.ip", Type: IP}) + + p, err := newConvert(c) + if err != nil { + t.Fatal(err) + } + + const loopback = "127.0.0.1" + fields := common.MapStr{"source": common.MapStr{"address": loopback}} + + t.Run("copy", func(t *testing.T) { + evt := &beat.Event{Fields: fields.Clone()} + evt, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } + address, _ := evt.GetValue("source.address") + assert.Equal(t, loopback, address) + ip, _ := evt.GetValue("source.ip") + assert.Equal(t, loopback, ip) + }) + + t.Run("rename", func(t *testing.T) { + p.Mode = renameMode + + evt := &beat.Event{Fields: fields.Clone()} + evt, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } + _, err := evt.GetValue("source.address") + assert.Error(t, err) + ip, _ := evt.GetValue("source.ip") + assert.Equal(t, loopback, ip) + }) + }) + + t.Run("string", func(t *testing.T) { + c := defaultConfig() + c.Tag = "convert_ip" + c.Fields = append(c.Fields, field{From: "source.address", To: "source.ip", Type: IP}) + + p, err := newConvert(c) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, "convert=[fields=[{from=source.address, to=source.ip, type=ip}], "+ + "ignore_failure=false, ignore_missing=false, instance_id=convert_ip, mode=copy]", + p.String()) + }) +} + +type testCase struct { + Type dataType + In interface{} + Out interface{} + Err bool +} + +var testCases = []testCase{ + {String, "x", "x", false}, + {String, 1, "1", false}, + {String, 1.1, "1.1", false}, + {String, true, "true", false}, + + {Long, "x", nil, true}, + {Long, true, nil, true}, + {Long, "1", int64(1), false}, + {Long, int(1), int64(1), false}, + {Long, int8(1), int64(1), false}, + {Long, int16(1), int64(1), false}, + {Long, int32(1), int64(1), false}, + {Long, int64(1), int64(1), false}, + {Long, uint(1), int64(1), false}, + {Long, uint8(1), int64(1), false}, + {Long, uint16(1), int64(1), false}, + {Long, uint32(1), int64(1), false}, + {Long, uint64(1), int64(1), false}, + {Long, float32(1), int64(1), false}, + {Long, float64(1), int64(1), false}, + + {Integer, "x", nil, true}, + {Integer, true, nil, true}, + {Integer, "1", int32(1), false}, + {Integer, int(1), int32(1), false}, + {Integer, int8(1), int32(1), false}, + {Integer, int16(1), int32(1), false}, + {Integer, int32(1), int32(1), false}, + {Integer, int64(1), int32(1), false}, + {Integer, uint(1), int32(1), false}, + {Integer, uint8(1), int32(1), false}, + {Integer, uint16(1), int32(1), false}, + {Integer, uint32(1), int32(1), false}, + {Integer, uint64(1), int32(1), false}, + {Integer, float32(1), int32(1), false}, + {Integer, float64(1), int32(1), false}, + + {Float, "x", nil, true}, + {Float, true, nil, true}, + {Float, "1", float32(1), false}, + {Float, "1.1", float32(1.1), false}, + {Float, int(1), float32(1), false}, + {Float, int8(1), float32(1), false}, + {Float, int16(1), float32(1), false}, + {Float, int32(1), float32(1), false}, + {Float, int64(1), float32(1), false}, + {Float, uint(1), float32(1), false}, + {Float, uint8(1), float32(1), false}, + {Float, uint16(1), float32(1), false}, + {Float, uint32(1), float32(1), false}, + {Float, uint64(1), float32(1), false}, + {Float, float32(1), float32(1), false}, + {Float, float64(1), float32(1), false}, + + {Double, "x", nil, true}, + {Double, true, nil, true}, + {Double, "1", float64(1), false}, + {Double, "1.1", float64(1.1), false}, + {Double, int(1), float64(1), false}, + {Double, int8(1), float64(1), false}, + {Double, int16(1), float64(1), false}, + {Double, int32(1), float64(1), false}, + {Double, int64(1), float64(1), false}, + {Double, uint(1), float64(1), false}, + {Double, uint8(1), float64(1), false}, + {Double, uint16(1), float64(1), false}, + {Double, uint32(1), float64(1), false}, + {Double, uint64(1), float64(1), false}, + {Double, float32(1), float64(1), false}, + {Double, float64(1), float64(1), false}, + + {Boolean, "x", nil, true}, + {Boolean, 1, nil, true}, + {Boolean, 0, nil, true}, + {Boolean, "TrUe", nil, true}, + {Boolean, true, true, false}, + {Boolean, "1", true, false}, + {Boolean, "t", true, false}, + {Boolean, "T", true, false}, + {Boolean, "TRUE", true, false}, + {Boolean, "true", true, false}, + {Boolean, "True", true, false}, + {Boolean, false, false, false}, + {Boolean, "0", false, false}, + {Boolean, "f", false, false}, + {Boolean, "F", false, false}, + {Boolean, "FALSE", false, false}, + {Boolean, "false", false, false}, + {Boolean, "False", false, false}, + + {IP, "x", nil, true}, + {IP, "365.0.0.0", "365.0.0.0", true}, + {IP, "0.0.0.0", "0.0.0.0", false}, + {IP, "::1", "::1", false}, +} + +func TestDataTypes(t *testing.T) { + const key = "key" + + for _, tc := range testCases { + // Give the test a friendly name. + var prefix string + if tc.Err { + prefix = "cannot " + } + name := fmt.Sprintf("%v%T %v to %v", prefix, tc.In, tc.In, tc.Type) + + tc := tc + t.Run(name, func(t *testing.T) { + c := defaultConfig() + c.Fields = append(c.Fields, field{From: key, Type: tc.Type}) + + p, err := newConvert(c) + if err != nil { + t.Fatal(err) + } + + event, err := p.Run(&beat.Event{Fields: common.MapStr{key: tc.In}}) + if tc.Err { + assert.Error(t, err) + return + } else if err != nil { + t.Fatalf("%+v", err) + } + + v := event.Fields[key] + assert.Equal(t, tc.Out, v) + }) + } +} diff --git a/libbeat/processors/script/javascript/module/processor/processor.go b/libbeat/processors/script/javascript/module/processor/processor.go index 25c9cae62090..97b8af138308 100644 --- a/libbeat/processors/script/javascript/module/processor/processor.go +++ b/libbeat/processors/script/javascript/module/processor/processor.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/libbeat/processors/add_observer_metadata" "github.com/elastic/beats/libbeat/processors/add_process_metadata" "github.com/elastic/beats/libbeat/processors/communityid" + "github.com/elastic/beats/libbeat/processors/convert" "github.com/elastic/beats/libbeat/processors/decode_csv_fields" "github.com/elastic/beats/libbeat/processors/dissect" "github.com/elastic/beats/libbeat/processors/dns" @@ -50,6 +51,7 @@ var constructors = map[string]processors.Constructor{ "AddLocale": add_locale.New, "AddProcessMetadata": add_process_metadata.New, "CommunityID": communityid.New, + "Convert": convert.New, "CopyFields": actions.NewCopyFields, "DecodeCSVField": decode_csv_fields.NewDecodeCSVField, "DecodeJSONFields": actions.NewDecodeJSONFields, From 9d2db6c1ae1d525385358e9fc12e56760e135d12 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Tue, 9 Apr 2019 22:41:45 -0400 Subject: [PATCH 02/10] Document ignore_failure default value --- libbeat/docs/processors-using.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 4e7a58459eaf..a1dfda927743 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -946,7 +946,7 @@ field when the `from` key is not found in the event. If false then the processor returns an error and does not process the remaining fields. Default is `false`. `ignore_failure`:: (Optional) If true type conversion failures are ignored and -the processor continues to the next field. +the processor continues to the next field. Default is `false`. `tag`:: (Optional) An identifier for this processor. Useful for debugging. From 6996287a4a9fb3068fe76f8b1258e7d505b8d2f1 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Wed, 17 Apr 2019 18:07:21 -0400 Subject: [PATCH 03/10] clone when copying map fields --- libbeat/processors/convert/convert.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go index adfc5d710e70..0880be8361bf 100644 --- a/libbeat/processors/convert/convert.go +++ b/libbeat/processors/convert/convert.go @@ -87,10 +87,12 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { } if conv.To != "" { - event.PutValue(conv.To, v) - - if p.Mode == renameMode { + switch p.Mode { + case renameMode: + event.PutValue(conv.To, v) event.Delete(conv.From) + case copyMode: + event.PutValue(conv.To, cloneValue(v)) } } else { // In-place conversion. @@ -297,3 +299,17 @@ func annotateError(id string, err error) error { } return errors.Wrap(err, "failed in processor.convert") } + +// cloneValue returns a shallow copy of a map. All other types are passed +// through in the return. This should be used when making straight copies of +// maps without doing any type conversions. +func cloneValue(value interface{}) interface{} { + switch v := value.(type) { + case common.MapStr: + return v.Clone() + case map[string]interface{}: + return common.MapStr(v).Clone() + default: + return value + } +} From d770dc7987057eaa1d647fb908c303e3cc80ad0e Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 26 Apr 2019 17:11:53 +0200 Subject: [PATCH 04/10] Clone event to allow for rollback on error --- libbeat/processors/convert/convert.go | 60 ++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go index 0880be8361bf..5b78a17d6a6c 100644 --- a/libbeat/processors/convert/convert.go +++ b/libbeat/processors/convert/convert.go @@ -39,6 +39,8 @@ func init() { type processor struct { config log *logp.Logger + + converted []interface{} // Temporary storage for converted values. } // New constructs a new convert processor. @@ -57,7 +59,7 @@ func newConvert(c config) (*processor, error) { log = log.With("instance_id", c.Tag) } - return &processor{config: c, log: log}, nil + return &processor{config: c, log: log, converted: make([]interface{}, len(c.Fields))}, nil } func (p *processor) String() string { @@ -65,14 +67,26 @@ func (p *processor) String() string { p.Fields, p.IgnoreFailure, p.IgnoreMissing, p.Tag, p.Mode) } +var ignoredFailure = struct{}{} + +func resetValues(s []interface{}) { + for i := range s { + s[i] = nil + } +} + func (p *processor) Run(event *beat.Event) (*beat.Event, error) { - for _, conv := range p.Fields { - v, _ := event.GetValue(conv.From) - if v == nil { - if !p.IgnoreMissing { - return event, annotateError(p.Tag, errors.Errorf("field [%v] is missing, cannot be converted to type [%v]", conv.From, conv.Type)) + defer resetValues(p.converted) + + // Validate the conversions. + for i, conv := range p.Fields { + v, err := event.GetValue(conv.From) + if err != nil { + if p.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { + p.converted[i] = ignoredFailure + continue } - continue + return event, annotateError(p.Tag, errors.Errorf("field [%v] is missing, cannot be converted to type [%v]", conv.From, conv.Type)) } if conv.Type > unset { @@ -81,18 +95,44 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { if !p.IgnoreFailure { return event, annotateError(p.Tag, errors.Wrapf(err, "unable to convert field [%v] value [%v] to [%v]", conv.From, v, conv.Type)) } + p.converted[i] = ignoredFailure continue } v = t } + p.converted[i] = v + } + + var clone = event.Fields + if len(p.Fields) > 1 && !p.IgnoreFailure { + // Clone the fields to allow the processor to undo the operation on + // failure (like a transaction). If there is only one conversion then + // cloning is unnecessary because there are no previous changes to + // rollback (so avoid the expensive clone operation). + clone = event.Fields.Clone() + } + + for i, conv := range p.Fields { + v := p.converted[i] + if v == ignoredFailure { + continue + } + if conv.To != "" { switch p.Mode { case renameMode: - event.PutValue(conv.To, v) - event.Delete(conv.From) + if _, err := event.PutValue(conv.To, v); err != nil { + event.Fields = clone + return event, errors.Wrap(err, "") + } else { + event.Delete(conv.From) + } case copyMode: - event.PutValue(conv.To, cloneValue(v)) + if _, err := event.PutValue(conv.To, cloneValue(v)); err != nil { + event.Fields = clone + return event, errors.Wrap(err, "") + } } } else { // In-place conversion. From 320eaac419f4bb27d5a2235799b40d6c447a1048 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Tue, 30 Apr 2019 23:27:16 -0400 Subject: [PATCH 05/10] Add tests --- libbeat/processors/convert/convert.go | 60 ++++++++------ libbeat/processors/convert/convert_test.go | 94 +++++++++++++++++++++- 2 files changed, 126 insertions(+), 28 deletions(-) diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go index 5b78a17d6a6c..82cbf66dfcd5 100644 --- a/libbeat/processors/convert/convert.go +++ b/libbeat/processors/convert/convert.go @@ -18,6 +18,7 @@ package convert import ( + "encoding/json" "fmt" "net" "strconv" @@ -63,8 +64,8 @@ func newConvert(c config) (*processor, error) { } func (p *processor) String() string { - return fmt.Sprintf("convert=[fields=%v, ignore_failure=%v, ignore_missing=%v, instance_id=%v, mode=%v]", - p.Fields, p.IgnoreFailure, p.IgnoreMissing, p.Tag, p.Mode) + json, _ := json.Marshal(p.config) + return "convert=" + string(json) } var ignoredFailure = struct{}{} @@ -80,37 +81,31 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { // Validate the conversions. for i, conv := range p.Fields { - v, err := event.GetValue(conv.From) + v, err := convertField(event, conv.From, conv.Type) if err != nil { - if p.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { - p.converted[i] = ignoredFailure - continue - } - return event, annotateError(p.Tag, errors.Errorf("field [%v] is missing, cannot be converted to type [%v]", conv.From, conv.Type)) - } - - if conv.Type > unset { - t, err := p.transformType(conv.Type, v) - if err != nil { + switch cause := errors.Cause(err); cause { + case common.ErrKeyNotFound: + if !p.IgnoreMissing { + return event, annotateError(p.Tag, errors.Errorf("field [%v] is missing, cannot be converted to type [%v]", conv.From, conv.Type)) + } + default: if !p.IgnoreFailure { return event, annotateError(p.Tag, errors.Wrapf(err, "unable to convert field [%v] value [%v] to [%v]", conv.From, v, conv.Type)) } - p.converted[i] = ignoredFailure - continue } - v = t + p.converted[i] = ignoredFailure } - p.converted[i] = v } - var clone = event.Fields + saved := *event if len(p.Fields) > 1 && !p.IgnoreFailure { // Clone the fields to allow the processor to undo the operation on // failure (like a transaction). If there is only one conversion then // cloning is unnecessary because there are no previous changes to // rollback (so avoid the expensive clone operation). - clone = event.Fields.Clone() + saved.Fields = event.Fields.Clone() + saved.Meta = event.Meta.Clone() } for i, conv := range p.Fields { @@ -123,15 +118,12 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { switch p.Mode { case renameMode: if _, err := event.PutValue(conv.To, v); err != nil { - event.Fields = clone - return event, errors.Wrap(err, "") - } else { - event.Delete(conv.From) + return &saved, annotateError(p.Tag, errors.Wrapf(err, "failed to put field [%v]", conv.To)) } + event.Delete(conv.From) case copyMode: if _, err := event.PutValue(conv.To, cloneValue(v)); err != nil { - event.Fields = clone - return event, errors.Wrap(err, "") + return &saved, annotateError(p.Tag, errors.Wrapf(err, "failed to put field [%v]", conv.To)) } } } else { @@ -143,7 +135,23 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (p *processor) transformType(typ dataType, value interface{}) (interface{}, error) { +func convertField(event *beat.Event, from string, typ dataType) (interface{}, error) { + v, err := event.GetValue(from) + if err != nil { + return nil, err + } + + if typ > unset { + v, err = transformType(typ, v) + if err != nil { + return nil, err + } + } + + return v, nil +} + +func transformType(typ dataType, value interface{}) (interface{}, error) { switch typ { case String: return toString(value) diff --git a/libbeat/processors/convert/convert_test.go b/libbeat/processors/convert/convert_test.go index 6a76389decfd..030820b4a282 100644 --- a/libbeat/processors/convert/convert_test.go +++ b/libbeat/processors/convert/convert_test.go @@ -123,12 +123,102 @@ func TestConvert(t *testing.T) { t.Fatal(err) } - assert.Equal(t, "convert=[fields=[{from=source.address, to=source.ip, type=ip}], "+ - "ignore_failure=false, ignore_missing=false, instance_id=convert_ip, mode=copy]", + assert.Equal(t, `convert={"Fields":`+ + `[{"From":"source.address","To":"source.ip","Type":"ip"}],`+ + `"Tag":"convert_ip","IgnoreMissing":false,"IgnoreFailure":false,"Mode":"copy"}`, p.String()) }) } +func TestConvertRun(t *testing.T) { + tests := map[string]struct { + config common.MapStr + input beat.Event + expected beat.Event + fail bool + }{ + "missing field": { + config: common.MapStr{ + "fields": []common.MapStr{ + {"from": "port", "type": "integer"}, + {"from": "address", "to": "ip", "type": "ip"}, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "port": "80", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "port": "80", + }, + }, + fail: true, + }, + "put error no clone": { + config: common.MapStr{ + "fields": []common.MapStr{ + {"from": "port", "to": "port.number", "type": "integer"}, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "port": "80", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "port": "80", + }, + }, + fail: true, + }, + "put error with clone": { + config: common.MapStr{ + "fields": []common.MapStr{ + {"from": "id", "to": "event.id", "type": "integer"}, + {"from": "port", "to": "port.number", "type": "integer"}, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "id": "32", + "port": "80", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "id": "32", + "port": "80", + }, + }, + fail: true, + }, + } + + for title, tt := range tests { + t.Run(title, func(t *testing.T) { + processor, err := New(common.MustNewConfigFrom(tt.config)) + if err != nil { + t.Fatal(err) + } + result, err := processor.Run(&tt.input) + if tt.expected.Fields != nil { + assert.Equal(t, tt.expected.Fields.Flatten(), result.Fields.Flatten()) + assert.Equal(t, tt.expected.Meta.Flatten(), result.Meta.Flatten()) + assert.Equal(t, tt.expected.Timestamp, result.Timestamp) + } + if tt.fail { + assert.Error(t, err) + t.Log("got expected error", err) + return + } + assert.NoError(t, err) + }) + } +} + type testCase struct { Type dataType In interface{} From cf08088fb7462a5fd51d97b05345fc233cab1e9b Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Wed, 1 May 2019 18:35:46 -0400 Subject: [PATCH 06/10] Add missing continue statement --- libbeat/processors/convert/convert.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go index 82cbf66dfcd5..edd8f58ae9df 100644 --- a/libbeat/processors/convert/convert.go +++ b/libbeat/processors/convert/convert.go @@ -94,6 +94,7 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { } } p.converted[i] = ignoredFailure + continue } p.converted[i] = v } @@ -174,10 +175,12 @@ func transformType(typ dataType, value interface{}) (interface{}, error) { func toString(value interface{}) (string, error) { switch v := value.(type) { - case string: - return v, nil default: return fmt.Sprintf("%v", value), nil + case string: + return v, nil + case nil: + return "", errors.New("invalid conversion of [null] to string") } } From 71123417e40e53bc62893068b0d693ba80cef9cd Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Thu, 2 May 2019 16:54:53 -0400 Subject: [PATCH 07/10] Change ignore_failure to fail_on_error to align with beat conventions --- libbeat/docs/processors-using.asciidoc | 6 +++--- libbeat/processors/convert/config.go | 3 ++- libbeat/processors/convert/convert.go | 18 +++++++++--------- libbeat/processors/convert/convert_test.go | 20 +++++++++++++++++--- 4 files changed, 31 insertions(+), 16 deletions(-) diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index a1dfda927743..045a3f41f9fa 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -928,7 +928,7 @@ processors: - {from: "src_ip", to: "source.ip", type: "ip"} - {from: "src_port", to: "source.port", type: "integer"} ignore_missing: true - ignore_failure: true + fail_on_error: false ---- The `convert` processor has the following configuration settings: @@ -945,8 +945,8 @@ type conversion. field when the `from` key is not found in the event. If false then the processor returns an error and does not process the remaining fields. Default is `false`. -`ignore_failure`:: (Optional) If true type conversion failures are ignored and -the processor continues to the next field. Default is `false`. +`fail_on_error`:: (Optional) If false type conversion failures are ignored and +the processor continues to the next field. Default is `true`. `tag`:: (Optional) An identifier for this processor. Useful for debugging. diff --git a/libbeat/processors/convert/config.go b/libbeat/processors/convert/config.go index 2905914fa53d..53af8bc80ba7 100644 --- a/libbeat/processors/convert/config.go +++ b/libbeat/processors/convert/config.go @@ -27,6 +27,7 @@ import ( func defaultConfig() config { return config{ IgnoreMissing: false, + FailOnError: true, Mode: copyMode, } } @@ -35,7 +36,7 @@ type config struct { Fields []field `config:"fields" validate:"required"` // List of fields to convert. Tag string `config:"tag"` // Processor ID for debug and metrics. IgnoreMissing bool `config:"ignore_missing"` // Skip field when From field is missing. - IgnoreFailure bool `config:"ignore_failure"` // Ignore conversion errors. + FailOnError bool `config:"fail_on_error"` // Ignore errors (missing fields / conversion failures). Mode mode `config:"mode"` // Mode (copy vs rename). } diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go index edd8f58ae9df..02e3c7e32596 100644 --- a/libbeat/processors/convert/convert.go +++ b/libbeat/processors/convert/convert.go @@ -85,11 +85,11 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { if err != nil { switch cause := errors.Cause(err); cause { case common.ErrKeyNotFound: - if !p.IgnoreMissing { + if !p.IgnoreMissing && p.FailOnError { return event, annotateError(p.Tag, errors.Errorf("field [%v] is missing, cannot be converted to type [%v]", conv.From, conv.Type)) } default: - if !p.IgnoreFailure { + if p.FailOnError { return event, annotateError(p.Tag, errors.Wrapf(err, "unable to convert field [%v] value [%v] to [%v]", conv.From, v, conv.Type)) } } @@ -100,7 +100,7 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { } saved := *event - if len(p.Fields) > 1 && !p.IgnoreFailure { + if len(p.Fields) > 1 && p.FailOnError { // Clone the fields to allow the processor to undo the operation on // failure (like a transaction). If there is only one conversion then // cloning is unnecessary because there are no previous changes to @@ -118,12 +118,12 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { if conv.To != "" { switch p.Mode { case renameMode: - if _, err := event.PutValue(conv.To, v); err != nil { + if _, err := event.PutValue(conv.To, v); err != nil && p.FailOnError { return &saved, annotateError(p.Tag, errors.Wrapf(err, "failed to put field [%v]", conv.To)) } event.Delete(conv.From) case copyMode: - if _, err := event.PutValue(conv.To, cloneValue(v)); err != nil { + if _, err := event.PutValue(conv.To, cloneValue(v)); err != nil && p.FailOnError { return &saved, annotateError(p.Tag, errors.Wrapf(err, "failed to put field [%v]", conv.To)) } } @@ -175,12 +175,12 @@ func transformType(typ dataType, value interface{}) (interface{}, error) { func toString(value interface{}) (string, error) { switch v := value.(type) { - default: - return fmt.Sprintf("%v", value), nil - case string: - return v, nil case nil: return "", errors.New("invalid conversion of [null] to string") + case string: + return v, nil + default: + return fmt.Sprintf("%v", value), nil } } diff --git a/libbeat/processors/convert/convert_test.go b/libbeat/processors/convert/convert_test.go index 030820b4a282..dde9d42af89b 100644 --- a/libbeat/processors/convert/convert_test.go +++ b/libbeat/processors/convert/convert_test.go @@ -49,9 +49,16 @@ func TestConvert(t *testing.T) { if err != nil { t.Fatal(err) } + + p.IgnoreMissing = false + p.FailOnError = false + _, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } }) - t.Run("ignore_failure", func(t *testing.T) { + t.Run("fail_on_error", func(t *testing.T) { c := defaultConfig() c.Fields = append(c.Fields, field{From: "source.address", To: "source.ip", Type: IP}) @@ -67,7 +74,7 @@ func TestConvert(t *testing.T) { assert.Contains(t, err.Error(), "unable to convert") } - p.IgnoreFailure = true + p.FailOnError = false _, err = p.Run(evt) if err != nil { t.Fatal(err) @@ -125,7 +132,7 @@ func TestConvert(t *testing.T) { assert.Equal(t, `convert={"Fields":`+ `[{"From":"source.address","To":"source.ip","Type":"ip"}],`+ - `"Tag":"convert_ip","IgnoreMissing":false,"IgnoreFailure":false,"Mode":"copy"}`, + `"Tag":"convert_ip","IgnoreMissing":false,"FailOnError":true,"Mode":"copy"}`, p.String()) }) } @@ -227,11 +234,13 @@ type testCase struct { } var testCases = []testCase{ + {String, nil, nil, true}, {String, "x", "x", false}, {String, 1, "1", false}, {String, 1.1, "1.1", false}, {String, true, "true", false}, + {Long, nil, nil, true}, {Long, "x", nil, true}, {Long, true, nil, true}, {Long, "1", int64(1), false}, @@ -248,6 +257,7 @@ var testCases = []testCase{ {Long, float32(1), int64(1), false}, {Long, float64(1), int64(1), false}, + {Integer, nil, nil, true}, {Integer, "x", nil, true}, {Integer, true, nil, true}, {Integer, "1", int32(1), false}, @@ -264,6 +274,7 @@ var testCases = []testCase{ {Integer, float32(1), int32(1), false}, {Integer, float64(1), int32(1), false}, + {Float, nil, nil, true}, {Float, "x", nil, true}, {Float, true, nil, true}, {Float, "1", float32(1), false}, @@ -281,6 +292,7 @@ var testCases = []testCase{ {Float, float32(1), float32(1), false}, {Float, float64(1), float32(1), false}, + {Double, nil, nil, true}, {Double, "x", nil, true}, {Double, true, nil, true}, {Double, "1", float64(1), false}, @@ -298,6 +310,7 @@ var testCases = []testCase{ {Double, float32(1), float64(1), false}, {Double, float64(1), float64(1), false}, + {Boolean, nil, nil, true}, {Boolean, "x", nil, true}, {Boolean, 1, nil, true}, {Boolean, 0, nil, true}, @@ -317,6 +330,7 @@ var testCases = []testCase{ {Boolean, "false", false, false}, {Boolean, "False", false, false}, + {IP, nil, nil, true}, {IP, "x", nil, true}, {IP, "365.0.0.0", "365.0.0.0", true}, {IP, "0.0.0.0", "0.0.0.0", false}, From 15663e29635ea55df0d96c6a60ca4fb7a0d27447 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Thu, 2 May 2019 17:19:23 -0400 Subject: [PATCH 08/10] Add more tests for ignore_missing + fail_on_error --- libbeat/processors/convert/convert_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/libbeat/processors/convert/convert_test.go b/libbeat/processors/convert/convert_test.go index dde9d42af89b..1b27ac611d98 100644 --- a/libbeat/processors/convert/convert_test.go +++ b/libbeat/processors/convert/convert_test.go @@ -39,12 +39,23 @@ func TestConvert(t *testing.T) { evt := &beat.Event{Fields: common.MapStr{}} + // Defaults. + p.IgnoreMissing = false + p.FailOnError = true _, err = p.Run(evt) if assert.Error(t, err) { assert.Contains(t, err.Error(), "field [src] is missing") } p.IgnoreMissing = true + p.FailOnError = true + _, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } + + p.IgnoreMissing = true + p.FailOnError = false _, err = p.Run(evt) if err != nil { t.Fatal(err) From d34714776f576bc458743b78bc8e9cce081bbded Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 3 May 2019 11:49:02 -0400 Subject: [PATCH 09/10] Clone []interface{} too --- libbeat/processors/convert/convert.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go index 02e3c7e32596..e2bfbef9504d 100644 --- a/libbeat/processors/convert/convert.go +++ b/libbeat/processors/convert/convert.go @@ -360,6 +360,13 @@ func cloneValue(value interface{}) interface{} { return v.Clone() case map[string]interface{}: return common.MapStr(v).Clone() + case []interface{}: + len := len(v) + newArr := make([]interface{}, len) + for idx, val := range v { + newArr[idx] = cloneValue(val) + } + return newArr default: return value } From 40a02f9d60f32f298c61f0a2868882f85f8061f3 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Tue, 7 May 2019 17:18:43 -0400 Subject: [PATCH 10/10] Enhance conversion error messages --- libbeat/processors/convert/convert.go | 128 +++++++++++++-------- libbeat/processors/convert/convert_test.go | 31 ++++- 2 files changed, 106 insertions(+), 53 deletions(-) diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go index e2bfbef9504d..8e84d93de230 100644 --- a/libbeat/processors/convert/convert.go +++ b/libbeat/processors/convert/convert.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "strconv" + "strings" "github.com/pkg/errors" @@ -79,26 +80,12 @@ func resetValues(s []interface{}) { func (p *processor) Run(event *beat.Event) (*beat.Event, error) { defer resetValues(p.converted) - // Validate the conversions. - for i, conv := range p.Fields { - v, err := convertField(event, conv.From, conv.Type) - if err != nil { - switch cause := errors.Cause(err); cause { - case common.ErrKeyNotFound: - if !p.IgnoreMissing && p.FailOnError { - return event, annotateError(p.Tag, errors.Errorf("field [%v] is missing, cannot be converted to type [%v]", conv.From, conv.Type)) - } - default: - if p.FailOnError { - return event, annotateError(p.Tag, errors.Wrapf(err, "unable to convert field [%v] value [%v] to [%v]", conv.From, v, conv.Type)) - } - } - p.converted[i] = ignoredFailure - continue - } - p.converted[i] = v + // Convert the fields and write the results to temporary storage. + if err := p.convertFields(event); err != nil { + return event, err } + // Backup original event. saved := *event if len(p.Fields) > 1 && p.FailOnError { // Clone the fields to allow the processor to undo the operation on @@ -109,47 +96,76 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { saved.Meta = event.Meta.Clone() } + // Update the event with the converted values. + if err := p.writeToEvent(event); err != nil { + return &saved, err + } + + return event, nil +} + +func (p *processor) convertFields(event *beat.Event) error { + // Write conversion results to temporary storage. for i, conv := range p.Fields { + v, err := p.convertField(event, conv) + if err != nil { + if p.FailOnError { + return err + } + v = ignoredFailure + } + p.converted[i] = v + } + + return nil +} + +func (p *processor) convertField(event *beat.Event, conversion field) (interface{}, error) { + v, err := event.GetValue(conversion.From) + if err != nil { + if p.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { + return ignoredFailure, nil + } + return nil, newConvertError(conversion, err, p.Tag, "field [%v] is missing", conversion.From) + } + + if conversion.Type > unset { + t, err := transformType(conversion.Type, v) + if err != nil { + return nil, newConvertError(conversion, err, p.Tag, "unable to convert value [%v]", v) + } + v = t + } + + return v, nil +} + +func (p *processor) writeToEvent(event *beat.Event) error { + for i, conversion := range p.Fields { v := p.converted[i] if v == ignoredFailure { continue } - if conv.To != "" { + if conversion.To != "" { switch p.Mode { case renameMode: - if _, err := event.PutValue(conv.To, v); err != nil && p.FailOnError { - return &saved, annotateError(p.Tag, errors.Wrapf(err, "failed to put field [%v]", conv.To)) + if _, err := event.PutValue(conversion.To, v); err != nil && p.FailOnError { + return newConvertError(conversion, err, p.Tag, "failed to put field [%v]", conversion.To) } - event.Delete(conv.From) + event.Delete(conversion.From) case copyMode: - if _, err := event.PutValue(conv.To, cloneValue(v)); err != nil && p.FailOnError { - return &saved, annotateError(p.Tag, errors.Wrapf(err, "failed to put field [%v]", conv.To)) + if _, err := event.PutValue(conversion.To, cloneValue(v)); err != nil && p.FailOnError { + return newConvertError(conversion, err, p.Tag, "failed to put field [%v]", conversion.To) } } } else { // In-place conversion. - event.PutValue(conv.From, v) - } - } - - return event, nil -} - -func convertField(event *beat.Event, from string, typ dataType) (interface{}, error) { - v, err := event.GetValue(from) - if err != nil { - return nil, err - } - - if typ > unset { - v, err = transformType(typ, v) - if err != nil { - return nil, err + event.PutValue(conversion.From, v) } } - return v, nil + return nil } func transformType(typ dataType, value interface{}) (interface{}, error) { @@ -337,18 +353,32 @@ func toIP(value interface{}) (string, error) { if net.ParseIP(v) != nil { return v, nil } + return "", errors.New("value is not a valid IP address") + default: + return "", errors.Errorf("invalid conversion of [%T] to IP", value) } - return "", errors.Errorf("invalid conversion of [%T] to IP", value) } -func annotateError(id string, err error) error { - if err == nil { - return nil +func newConvertError(conversion field, cause error, tag string, message string, params ...interface{}) error { + var buf strings.Builder + buf.WriteString("failed in processor.convert") + if tag != "" { + buf.WriteString(" with instance_id=") + buf.WriteString(tag) } - if id != "" { - return errors.Wrapf(err, "failed in processor.convert with instance_id=%v", id) + buf.WriteString(": conversion of field [") + buf.WriteString(conversion.From) + buf.WriteString("] to type [") + buf.WriteString(conversion.Type.String()) + buf.WriteString("]") + if conversion.To != "" { + buf.WriteString(" with target field [") + buf.WriteString(conversion.To) + buf.WriteString("]") } - return errors.Wrap(err, "failed in processor.convert") + buf.WriteString(" failed: ") + fmt.Fprintf(&buf, message, params...) + return errors.Wrapf(cause, buf.String()) } // cloneValue returns a shallow copy of a map. All other types are passed diff --git a/libbeat/processors/convert/convert_test.go b/libbeat/processors/convert/convert_test.go index 1b27ac611d98..2469fe1848c9 100644 --- a/libbeat/processors/convert/convert_test.go +++ b/libbeat/processors/convert/convert_test.go @@ -150,10 +150,11 @@ func TestConvert(t *testing.T) { func TestConvertRun(t *testing.T) { tests := map[string]struct { - config common.MapStr - input beat.Event - expected beat.Event - fail bool + config common.MapStr + input beat.Event + expected beat.Event + fail bool + errContains string }{ "missing field": { config: common.MapStr{ @@ -213,6 +214,25 @@ func TestConvertRun(t *testing.T) { }, fail: true, }, + "invalid conversion": { + config: common.MapStr{ + "fields": []common.MapStr{ + {"from": "address", "to": "ip", "type": "ip"}, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "address": "-", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "address": "-", + }, + }, + fail: true, + errContains: "unable to convert value [-]: value is not a valid IP address", + }, } for title, tt := range tests { @@ -230,6 +250,9 @@ func TestConvertRun(t *testing.T) { if tt.fail { assert.Error(t, err) t.Log("got expected error", err) + if tt.errContains != "" { + assert.Contains(t, err.Error(), tt.errContains) + } return } assert.NoError(t, err)