diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cd521c0c70fa..a90fd7406f14 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -135,6 +135,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Updated go-seccomp-bpf library to v1.1.0 which updates syscall lists for Linux v5.0. {pull}NNNN[NNNN] - Add `add_observer_metadata` processor. {pull}11394[11394] - Add `decode_csv_fields` processor. {pull}11753[11753] +- Add `convert` processor for converting data types of fields. {issue}8124[8124] {pull}11686[11686] *Auditbeat* diff --git a/libbeat/cmd/instance/imports.go b/libbeat/cmd/instance/imports.go index 413961edd472..0d1dac5513f4 100644 --- a/libbeat/cmd/instance/imports.go +++ b/libbeat/cmd/instance/imports.go @@ -32,6 +32,7 @@ import ( _ "github.com/elastic/beats/libbeat/processors/add_observer_metadata" _ "github.com/elastic/beats/libbeat/processors/add_process_metadata" _ "github.com/elastic/beats/libbeat/processors/communityid" + _ "github.com/elastic/beats/libbeat/processors/convert" _ "github.com/elastic/beats/libbeat/processors/dissect" _ "github.com/elastic/beats/libbeat/processors/dns" _ "github.com/elastic/beats/libbeat/publisher/includes" // Register publisher pipeline modules diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 7a682217cc58..045a3f41f9fa 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -209,6 +209,7 @@ The supported processors are: * <> * <> * <> + * <> ifdef::has_decode_csv_fields_processor[] * <> endif::[] @@ -907,6 +908,52 @@ silently continue without adding the target field. The processor also accepts an optional `seed` parameter that must be a 16-bit unsigned integer. This value gets incorporated into all generated hashes. +[[convert]] +=== Convert + +The `convert` processor converts a field in the event to a different type, such +as converting a string to an integer. + +The supported types include: `integer`, `long`, `float`, `double`, `string`, +`boolean`, and `ip`. + +The `ip` type is effectively an alias for `string`, but with an added validation +that the value is an IPv4 or IPv6 address. + +[source,yaml] +---- +processors: + - convert: + fields: + - {from: "src_ip", to: "source.ip", type: "ip"} + - {from: "src_port", to: "source.port", type: "integer"} + ignore_missing: true + fail_on_error: false +---- + +The `convert` processor has the following configuration settings: + +`fields`:: (Required) This is the list of fields to convert. At least one item +must be contained in the list. Each item in the list must have a `from` key that +specifies the source field. The `to` key is optional and specifies where to +assign the converted value. If `to` is omitted then the `from` field is updated +in-place. The `type` key specifies the data type to convert the value to. If +`type` is omitted then the processor copies or renames the field without any +type conversion. + +`ignore_missing`:: (Optional) If `true` the processor continues to the next +field when the `from` key is not found in the event. If false then the processor +returns an error and does not process the remaining fields. Default is `false`. + +`fail_on_error`:: (Optional) If false type conversion failures are ignored and +the processor continues to the next field. Default is `true`. + +`tag`:: (Optional) An identifier for this processor. Useful for debugging. + +`mode`:: (Optional) When both `from` and `to` are defined for a field then +`mode` controls whether to `copy` or `rename` the field when the type conversion +is successful. Default is `copy`. + [[drop-event]] === Drop events diff --git a/libbeat/processors/convert/config.go b/libbeat/processors/convert/config.go new file mode 100644 index 000000000000..53af8bc80ba7 --- /dev/null +++ b/libbeat/processors/convert/config.go @@ -0,0 +1,134 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package convert + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" +) + +func defaultConfig() config { + return config{ + IgnoreMissing: false, + FailOnError: true, + Mode: copyMode, + } +} + +type config struct { + Fields []field `config:"fields" validate:"required"` // List of fields to convert. + Tag string `config:"tag"` // Processor ID for debug and metrics. + IgnoreMissing bool `config:"ignore_missing"` // Skip field when From field is missing. + FailOnError bool `config:"fail_on_error"` // Ignore errors (missing fields / conversion failures). + Mode mode `config:"mode"` // Mode (copy vs rename). +} + +type field struct { + From string `config:"from" validate:"required"` + To string `config:"to"` + Type dataType `config:"type"` +} + +func (f field) Validate() error { + if f.To == "" && f.Type == unset { + return errors.New("each field must have a 'to' or a 'type'") + } + return nil +} + +func (f field) String() string { + return fmt.Sprintf("{from=%v, to=%v, type=%v}", f.From, f.To, f.Type) +} + +type dataType uint8 + +// List of dataTypes. +const ( + unset dataType = iota + Integer + Long + Float + Double + String + Boolean + IP +) + +var dataTypeNames = map[dataType]string{ + unset: "[unset]", + Integer: "integer", + Long: "long", + Float: "float", + Double: "double", + String: "string", + Boolean: "boolean", + IP: "ip", +} + +func (dt dataType) String() string { + return dataTypeNames[dt] +} + +func (dt dataType) MarshalText() ([]byte, error) { + return []byte(dt.String()), nil +} + +func (dt *dataType) Unpack(s string) error { + s = strings.ToLower(s) + for typ, name := range dataTypeNames { + if s == name { + *dt = typ + return nil + } + } + return errors.Errorf("invalid data type: %v", s) +} + +type mode uint8 + +// List of modes. +const ( + copyMode mode = iota + renameMode +) + +var modeNames = map[mode]string{ + copyMode: "copy", + renameMode: "rename", +} + +func (m mode) String() string { + return modeNames[m] +} + +func (m mode) MarshalText() ([]byte, error) { + return []byte(m.String()), nil +} + +func (m *mode) Unpack(s string) error { + s = strings.ToLower(s) + for md, name := range modeNames { + if s == name { + *m = md + return nil + } + } + return errors.Errorf("invalid mode: %v", s) +} diff --git a/libbeat/processors/convert/convert.go b/libbeat/processors/convert/convert.go new file mode 100644 index 000000000000..8e84d93de230 --- /dev/null +++ b/libbeat/processors/convert/convert.go @@ -0,0 +1,403 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package convert + +import ( + "encoding/json" + "fmt" + "net" + "strconv" + "strings" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" +) + +const logName = "processor.convert" + +func init() { + processors.RegisterPlugin("convert", New) +} + +type processor struct { + config + log *logp.Logger + + converted []interface{} // Temporary storage for converted values. +} + +// New constructs a new convert processor. +func New(cfg *common.Config) (processors.Processor, error) { + c := defaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, errors.Wrap(err, "fail to unpack the convert processor configuration") + } + + return newConvert(c) +} + +func newConvert(c config) (*processor, error) { + log := logp.NewLogger(logName) + if c.Tag != "" { + log = log.With("instance_id", c.Tag) + } + + return &processor{config: c, log: log, converted: make([]interface{}, len(c.Fields))}, nil +} + +func (p *processor) String() string { + json, _ := json.Marshal(p.config) + return "convert=" + string(json) +} + +var ignoredFailure = struct{}{} + +func resetValues(s []interface{}) { + for i := range s { + s[i] = nil + } +} + +func (p *processor) Run(event *beat.Event) (*beat.Event, error) { + defer resetValues(p.converted) + + // Convert the fields and write the results to temporary storage. + if err := p.convertFields(event); err != nil { + return event, err + } + + // Backup original event. + saved := *event + if len(p.Fields) > 1 && p.FailOnError { + // Clone the fields to allow the processor to undo the operation on + // failure (like a transaction). If there is only one conversion then + // cloning is unnecessary because there are no previous changes to + // rollback (so avoid the expensive clone operation). + saved.Fields = event.Fields.Clone() + saved.Meta = event.Meta.Clone() + } + + // Update the event with the converted values. + if err := p.writeToEvent(event); err != nil { + return &saved, err + } + + return event, nil +} + +func (p *processor) convertFields(event *beat.Event) error { + // Write conversion results to temporary storage. + for i, conv := range p.Fields { + v, err := p.convertField(event, conv) + if err != nil { + if p.FailOnError { + return err + } + v = ignoredFailure + } + p.converted[i] = v + } + + return nil +} + +func (p *processor) convertField(event *beat.Event, conversion field) (interface{}, error) { + v, err := event.GetValue(conversion.From) + if err != nil { + if p.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { + return ignoredFailure, nil + } + return nil, newConvertError(conversion, err, p.Tag, "field [%v] is missing", conversion.From) + } + + if conversion.Type > unset { + t, err := transformType(conversion.Type, v) + if err != nil { + return nil, newConvertError(conversion, err, p.Tag, "unable to convert value [%v]", v) + } + v = t + } + + return v, nil +} + +func (p *processor) writeToEvent(event *beat.Event) error { + for i, conversion := range p.Fields { + v := p.converted[i] + if v == ignoredFailure { + continue + } + + if conversion.To != "" { + switch p.Mode { + case renameMode: + if _, err := event.PutValue(conversion.To, v); err != nil && p.FailOnError { + return newConvertError(conversion, err, p.Tag, "failed to put field [%v]", conversion.To) + } + event.Delete(conversion.From) + case copyMode: + if _, err := event.PutValue(conversion.To, cloneValue(v)); err != nil && p.FailOnError { + return newConvertError(conversion, err, p.Tag, "failed to put field [%v]", conversion.To) + } + } + } else { + // In-place conversion. + event.PutValue(conversion.From, v) + } + } + + return nil +} + +func transformType(typ dataType, value interface{}) (interface{}, error) { + switch typ { + case String: + return toString(value) + case Long: + return toLong(value) + case Integer: + return toInteger(value) + case Float: + return toFloat(value) + case Double: + return toDouble(value) + case Boolean: + return toBoolean(value) + case IP: + return toIP(value) + default: + return value, nil + } +} + +func toString(value interface{}) (string, error) { + switch v := value.(type) { + case nil: + return "", errors.New("invalid conversion of [null] to string") + case string: + return v, nil + default: + return fmt.Sprintf("%v", value), nil + } +} + +func toLong(value interface{}) (int64, error) { + switch v := value.(type) { + case string: + return strconv.ParseInt(v, 0, 64) + case int: + return int64(v), nil + case int8: + return int64(v), nil + case int16: + return int64(v), nil + case int32: + return int64(v), nil + case int64: + return v, nil + case uint: + return int64(v), nil + case uint8: + return int64(v), nil + case uint16: + return int64(v), nil + case uint32: + return int64(v), nil + case uint64: + return int64(v), nil + case float32: + return int64(v), nil + case float64: + return int64(v), nil + default: + return 0, errors.Errorf("invalid conversion of [%T] to long", value) + } +} + +func toInteger(value interface{}) (int32, error) { + switch v := value.(type) { + case string: + i, err := strconv.ParseInt(v, 0, 32) + return int32(i), err + case int: + return int32(v), nil + case int8: + return int32(v), nil + case int16: + return int32(v), nil + case int32: + return v, nil + case int64: + return int32(v), nil + case uint: + return int32(v), nil + case uint8: + return int32(v), nil + case uint16: + return int32(v), nil + case uint32: + return int32(v), nil + case uint64: + return int32(v), nil + case float32: + return int32(v), nil + case float64: + return int32(v), nil + default: + return 0, errors.Errorf("invalid conversion of [%T] to integer", value) + } +} + +func toFloat(value interface{}) (float32, error) { + switch v := value.(type) { + case string: + f, err := strconv.ParseFloat(v, 32) + return float32(f), err + case int: + return float32(v), nil + case int8: + return float32(v), nil + case int16: + return float32(v), nil + case int32: + return float32(v), nil + case int64: + return float32(v), nil + case uint: + return float32(v), nil + case uint8: + return float32(v), nil + case uint16: + return float32(v), nil + case uint32: + return float32(v), nil + case uint64: + return float32(v), nil + case float32: + return v, nil + case float64: + return float32(v), nil + default: + return 0, errors.Errorf("invalid conversion of [%T] to float", value) + } +} + +func toDouble(value interface{}) (float64, error) { + switch v := value.(type) { + case string: + f, err := strconv.ParseFloat(v, 64) + return float64(f), err + case int: + return float64(v), nil + case int8: + return float64(v), nil + case int16: + return float64(v), nil + case int32: + return float64(v), nil + case int64: + return float64(v), nil + case uint: + return float64(v), nil + case uint8: + return float64(v), nil + case uint16: + return float64(v), nil + case uint32: + return float64(v), nil + case uint64: + return float64(v), nil + case float32: + return float64(v), nil + case float64: + return v, nil + default: + return 0, errors.Errorf("invalid conversion of [%T] to float", value) + } +} + +func toBoolean(value interface{}) (bool, error) { + switch v := value.(type) { + case string: + return strconv.ParseBool(v) + case bool: + return v, nil + default: + return false, errors.Errorf("invalid conversion of [%T] to boolean", value) + } +} + +func toIP(value interface{}) (string, error) { + switch v := value.(type) { + case string: + // This is validating that the value is an IP. + if net.ParseIP(v) != nil { + return v, nil + } + return "", errors.New("value is not a valid IP address") + default: + return "", errors.Errorf("invalid conversion of [%T] to IP", value) + } +} + +func newConvertError(conversion field, cause error, tag string, message string, params ...interface{}) error { + var buf strings.Builder + buf.WriteString("failed in processor.convert") + if tag != "" { + buf.WriteString(" with instance_id=") + buf.WriteString(tag) + } + buf.WriteString(": conversion of field [") + buf.WriteString(conversion.From) + buf.WriteString("] to type [") + buf.WriteString(conversion.Type.String()) + buf.WriteString("]") + if conversion.To != "" { + buf.WriteString(" with target field [") + buf.WriteString(conversion.To) + buf.WriteString("]") + } + buf.WriteString(" failed: ") + fmt.Fprintf(&buf, message, params...) + return errors.Wrapf(cause, buf.String()) +} + +// cloneValue returns a shallow copy of a map. All other types are passed +// through in the return. This should be used when making straight copies of +// maps without doing any type conversions. +func cloneValue(value interface{}) interface{} { + switch v := value.(type) { + case common.MapStr: + return v.Clone() + case map[string]interface{}: + return common.MapStr(v).Clone() + case []interface{}: + len := len(v) + newArr := make([]interface{}, len) + for idx, val := range v { + newArr[idx] = cloneValue(val) + } + return newArr + default: + return value + } +} diff --git a/libbeat/processors/convert/convert_test.go b/libbeat/processors/convert/convert_test.go new file mode 100644 index 000000000000..2469fe1848c9 --- /dev/null +++ b/libbeat/processors/convert/convert_test.go @@ -0,0 +1,407 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package convert + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestConvert(t *testing.T) { + t.Run("ignore_missing", func(t *testing.T) { + c := defaultConfig() + c.Fields = append(c.Fields, field{From: "src", To: "dst", Type: Integer}) + + p, err := newConvert(c) + if err != nil { + t.Fatal(err) + } + + evt := &beat.Event{Fields: common.MapStr{}} + + // Defaults. + p.IgnoreMissing = false + p.FailOnError = true + _, err = p.Run(evt) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), "field [src] is missing") + } + + p.IgnoreMissing = true + p.FailOnError = true + _, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } + + p.IgnoreMissing = true + p.FailOnError = false + _, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } + + p.IgnoreMissing = false + p.FailOnError = false + _, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } + }) + + t.Run("fail_on_error", func(t *testing.T) { + c := defaultConfig() + c.Fields = append(c.Fields, field{From: "source.address", To: "source.ip", Type: IP}) + + p, err := newConvert(c) + if err != nil { + t.Fatal(err) + } + + evt := &beat.Event{Fields: common.MapStr{"source": common.MapStr{"address": "host.local"}}} + + _, err = p.Run(evt) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), "unable to convert") + } + + p.FailOnError = false + _, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } + }) + + t.Run("mode", func(t *testing.T) { + c := defaultConfig() + c.Fields = append(c.Fields, field{From: "source.address", To: "source.ip", Type: IP}) + + p, err := newConvert(c) + if err != nil { + t.Fatal(err) + } + + const loopback = "127.0.0.1" + fields := common.MapStr{"source": common.MapStr{"address": loopback}} + + t.Run("copy", func(t *testing.T) { + evt := &beat.Event{Fields: fields.Clone()} + evt, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } + address, _ := evt.GetValue("source.address") + assert.Equal(t, loopback, address) + ip, _ := evt.GetValue("source.ip") + assert.Equal(t, loopback, ip) + }) + + t.Run("rename", func(t *testing.T) { + p.Mode = renameMode + + evt := &beat.Event{Fields: fields.Clone()} + evt, err = p.Run(evt) + if err != nil { + t.Fatal(err) + } + _, err := evt.GetValue("source.address") + assert.Error(t, err) + ip, _ := evt.GetValue("source.ip") + assert.Equal(t, loopback, ip) + }) + }) + + t.Run("string", func(t *testing.T) { + c := defaultConfig() + c.Tag = "convert_ip" + c.Fields = append(c.Fields, field{From: "source.address", To: "source.ip", Type: IP}) + + p, err := newConvert(c) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, `convert={"Fields":`+ + `[{"From":"source.address","To":"source.ip","Type":"ip"}],`+ + `"Tag":"convert_ip","IgnoreMissing":false,"FailOnError":true,"Mode":"copy"}`, + p.String()) + }) +} + +func TestConvertRun(t *testing.T) { + tests := map[string]struct { + config common.MapStr + input beat.Event + expected beat.Event + fail bool + errContains string + }{ + "missing field": { + config: common.MapStr{ + "fields": []common.MapStr{ + {"from": "port", "type": "integer"}, + {"from": "address", "to": "ip", "type": "ip"}, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "port": "80", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "port": "80", + }, + }, + fail: true, + }, + "put error no clone": { + config: common.MapStr{ + "fields": []common.MapStr{ + {"from": "port", "to": "port.number", "type": "integer"}, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "port": "80", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "port": "80", + }, + }, + fail: true, + }, + "put error with clone": { + config: common.MapStr{ + "fields": []common.MapStr{ + {"from": "id", "to": "event.id", "type": "integer"}, + {"from": "port", "to": "port.number", "type": "integer"}, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "id": "32", + "port": "80", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "id": "32", + "port": "80", + }, + }, + fail: true, + }, + "invalid conversion": { + config: common.MapStr{ + "fields": []common.MapStr{ + {"from": "address", "to": "ip", "type": "ip"}, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "address": "-", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "address": "-", + }, + }, + fail: true, + errContains: "unable to convert value [-]: value is not a valid IP address", + }, + } + + for title, tt := range tests { + t.Run(title, func(t *testing.T) { + processor, err := New(common.MustNewConfigFrom(tt.config)) + if err != nil { + t.Fatal(err) + } + result, err := processor.Run(&tt.input) + if tt.expected.Fields != nil { + assert.Equal(t, tt.expected.Fields.Flatten(), result.Fields.Flatten()) + assert.Equal(t, tt.expected.Meta.Flatten(), result.Meta.Flatten()) + assert.Equal(t, tt.expected.Timestamp, result.Timestamp) + } + if tt.fail { + assert.Error(t, err) + t.Log("got expected error", err) + if tt.errContains != "" { + assert.Contains(t, err.Error(), tt.errContains) + } + return + } + assert.NoError(t, err) + }) + } +} + +type testCase struct { + Type dataType + In interface{} + Out interface{} + Err bool +} + +var testCases = []testCase{ + {String, nil, nil, true}, + {String, "x", "x", false}, + {String, 1, "1", false}, + {String, 1.1, "1.1", false}, + {String, true, "true", false}, + + {Long, nil, nil, true}, + {Long, "x", nil, true}, + {Long, true, nil, true}, + {Long, "1", int64(1), false}, + {Long, int(1), int64(1), false}, + {Long, int8(1), int64(1), false}, + {Long, int16(1), int64(1), false}, + {Long, int32(1), int64(1), false}, + {Long, int64(1), int64(1), false}, + {Long, uint(1), int64(1), false}, + {Long, uint8(1), int64(1), false}, + {Long, uint16(1), int64(1), false}, + {Long, uint32(1), int64(1), false}, + {Long, uint64(1), int64(1), false}, + {Long, float32(1), int64(1), false}, + {Long, float64(1), int64(1), false}, + + {Integer, nil, nil, true}, + {Integer, "x", nil, true}, + {Integer, true, nil, true}, + {Integer, "1", int32(1), false}, + {Integer, int(1), int32(1), false}, + {Integer, int8(1), int32(1), false}, + {Integer, int16(1), int32(1), false}, + {Integer, int32(1), int32(1), false}, + {Integer, int64(1), int32(1), false}, + {Integer, uint(1), int32(1), false}, + {Integer, uint8(1), int32(1), false}, + {Integer, uint16(1), int32(1), false}, + {Integer, uint32(1), int32(1), false}, + {Integer, uint64(1), int32(1), false}, + {Integer, float32(1), int32(1), false}, + {Integer, float64(1), int32(1), false}, + + {Float, nil, nil, true}, + {Float, "x", nil, true}, + {Float, true, nil, true}, + {Float, "1", float32(1), false}, + {Float, "1.1", float32(1.1), false}, + {Float, int(1), float32(1), false}, + {Float, int8(1), float32(1), false}, + {Float, int16(1), float32(1), false}, + {Float, int32(1), float32(1), false}, + {Float, int64(1), float32(1), false}, + {Float, uint(1), float32(1), false}, + {Float, uint8(1), float32(1), false}, + {Float, uint16(1), float32(1), false}, + {Float, uint32(1), float32(1), false}, + {Float, uint64(1), float32(1), false}, + {Float, float32(1), float32(1), false}, + {Float, float64(1), float32(1), false}, + + {Double, nil, nil, true}, + {Double, "x", nil, true}, + {Double, true, nil, true}, + {Double, "1", float64(1), false}, + {Double, "1.1", float64(1.1), false}, + {Double, int(1), float64(1), false}, + {Double, int8(1), float64(1), false}, + {Double, int16(1), float64(1), false}, + {Double, int32(1), float64(1), false}, + {Double, int64(1), float64(1), false}, + {Double, uint(1), float64(1), false}, + {Double, uint8(1), float64(1), false}, + {Double, uint16(1), float64(1), false}, + {Double, uint32(1), float64(1), false}, + {Double, uint64(1), float64(1), false}, + {Double, float32(1), float64(1), false}, + {Double, float64(1), float64(1), false}, + + {Boolean, nil, nil, true}, + {Boolean, "x", nil, true}, + {Boolean, 1, nil, true}, + {Boolean, 0, nil, true}, + {Boolean, "TrUe", nil, true}, + {Boolean, true, true, false}, + {Boolean, "1", true, false}, + {Boolean, "t", true, false}, + {Boolean, "T", true, false}, + {Boolean, "TRUE", true, false}, + {Boolean, "true", true, false}, + {Boolean, "True", true, false}, + {Boolean, false, false, false}, + {Boolean, "0", false, false}, + {Boolean, "f", false, false}, + {Boolean, "F", false, false}, + {Boolean, "FALSE", false, false}, + {Boolean, "false", false, false}, + {Boolean, "False", false, false}, + + {IP, nil, nil, true}, + {IP, "x", nil, true}, + {IP, "365.0.0.0", "365.0.0.0", true}, + {IP, "0.0.0.0", "0.0.0.0", false}, + {IP, "::1", "::1", false}, +} + +func TestDataTypes(t *testing.T) { + const key = "key" + + for _, tc := range testCases { + // Give the test a friendly name. + var prefix string + if tc.Err { + prefix = "cannot " + } + name := fmt.Sprintf("%v%T %v to %v", prefix, tc.In, tc.In, tc.Type) + + tc := tc + t.Run(name, func(t *testing.T) { + c := defaultConfig() + c.Fields = append(c.Fields, field{From: key, Type: tc.Type}) + + p, err := newConvert(c) + if err != nil { + t.Fatal(err) + } + + event, err := p.Run(&beat.Event{Fields: common.MapStr{key: tc.In}}) + if tc.Err { + assert.Error(t, err) + return + } else if err != nil { + t.Fatalf("%+v", err) + } + + v := event.Fields[key] + assert.Equal(t, tc.Out, v) + }) + } +} diff --git a/libbeat/processors/script/javascript/module/processor/processor.go b/libbeat/processors/script/javascript/module/processor/processor.go index 25c9cae62090..97b8af138308 100644 --- a/libbeat/processors/script/javascript/module/processor/processor.go +++ b/libbeat/processors/script/javascript/module/processor/processor.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/libbeat/processors/add_observer_metadata" "github.com/elastic/beats/libbeat/processors/add_process_metadata" "github.com/elastic/beats/libbeat/processors/communityid" + "github.com/elastic/beats/libbeat/processors/convert" "github.com/elastic/beats/libbeat/processors/decode_csv_fields" "github.com/elastic/beats/libbeat/processors/dissect" "github.com/elastic/beats/libbeat/processors/dns" @@ -50,6 +51,7 @@ var constructors = map[string]processors.Constructor{ "AddLocale": add_locale.New, "AddProcessMetadata": add_process_metadata.New, "CommunityID": communityid.New, + "Convert": convert.New, "CopyFields": actions.NewCopyFields, "DecodeCSVField": decode_csv_fields.NewDecodeCSVField, "DecodeJSONFields": actions.NewDecodeJSONFields,