From e0dc30681223e63d841249583ceeab2c3bcc9e9b Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 Nov 2019 12:07:29 -0800 Subject: [PATCH 01/50] WIP: Flake ID processor --- libbeat/processors/flake_id/generator.go | 75 ++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 libbeat/processors/flake_id/generator.go diff --git a/libbeat/processors/flake_id/generator.go b/libbeat/processors/flake_id/generator.go new file mode 100644 index 000000000000..4461d1646732 --- /dev/null +++ b/libbeat/processors/flake_id/generator.go @@ -0,0 +1,75 @@ +package flake_id + +import ( + "crypto/rand" + "net" +) + +// Golang port of https://github.com/elastic/elasticsearch/commit/9c1ac95ba8e593c90b4681f2a554b12ff677cf89 + +type id []byte + +const MAC_ADDR_LEN = 6 + +func (i *id) Base64() string { + return "TODO" +} + +func getSecureMungedAddress() ([]byte, error) { + addr, err := getMacAddress() + if err != nil { + return nil, err + } + + if !isValidAddress(addr) { + addr = constructDummyMulticastAddress() + } + + munged := make([]byte, MAC_ADDR_LEN) + _, err = rand.Read(munged) + if err != nil { + return nil, err + } + + for i := 0; i < MAC_ADDR_LEN; i++ { + munged[i] ^= addr[i] + } + + return munged, nil +} + +func getMacAddress() ([]byte, error) { + interfaces, err := net.Interfaces() + if err != nil { + return nil, err + + } + for _, i := range interfaces { + if i.Flags != net.FlagLoopback { + // Pick the first valid non loopback address we find + addr := i.HardwareAddr + if isValidAddress(addr) { + return addr, nil + } + } + } + + // Could not find a mac address + return nil, nil +} + +func isValidAddress(addr []byte) bool { + if addr == nil || len(addr) != 6 { + return false + } + + for _, b := range addr { + if b != 0x00 { + return true // If any of the bytes are non zero assume a good address + } + } +} + +func constructDummyMulticastAddress() []byte { + +} From 4269333027145e4dd4741c915b2dc6f05b617667 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 12:42:40 -0800 Subject: [PATCH 02/50] Fleshing out implementation of generator --- libbeat/processors/flake_id/generator.go | 137 ++++++++++++++--------- libbeat/processors/flake_id/mac.go | 99 ++++++++++++++++ 2 files changed, 185 insertions(+), 51 deletions(-) create mode 100644 libbeat/processors/flake_id/mac.go diff --git a/libbeat/processors/flake_id/generator.go b/libbeat/processors/flake_id/generator.go index 4461d1646732..4171d6059a09 100644 --- a/libbeat/processors/flake_id/generator.go +++ b/libbeat/processors/flake_id/generator.go @@ -1,75 +1,110 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package flake_id import ( - "crypto/rand" - "net" + "encoding/base64" + "math/rand" + "sync" + "time" ) -// Golang port of https://github.com/elastic/elasticsearch/commit/9c1ac95ba8e593c90b4681f2a554b12ff677cf89 - -type id []byte - -const MAC_ADDR_LEN = 6 +// Golang port of https://github.com/elastic/elasticsearch/blob/a666fb22664284d8e2114841ebb58ea4e1924691/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java -func (i *id) Base64() string { - return "TODO" -} +var ( + sequenceNumber uint32 + lastTimestamp *time.Time + mac []byte + mu sync.Mutex +) -func getSecureMungedAddress() ([]byte, error) { - addr, err := getMacAddress() +func init() { + m, err := getSecureMungedMACAddress() if err != nil { - return nil, err + panic(err) } + mac = m + sequenceNumber = rand.Uint32() +} - if !isValidAddress(addr) { - addr = constructDummyMulticastAddress() - } +func getBase64UUID() string { + mu.Lock() + defer mu.Unlock() - munged := make([]byte, MAC_ADDR_LEN) - _, err = rand.Read(munged) - if err != nil { - return nil, err - } + sequenceNumber += 1 - for i := 0; i < MAC_ADDR_LEN; i++ { - munged[i] ^= addr[i] + // We only use bottom 3 bytes for the sequence number. + s := sequenceNumber & 0xffffff + + timestamp := getTimestamp() + if s == 0 { + // Always force the clock to increment whenever sequence number is 0, in case we have a long time-slip backwards. + timestamp.Add(1 * time.Millisecond) } + lastTimestamp = timestamp - return munged, nil -} + t := timestamp.UnixNano() / 1000 // timestamp in ms-since-epoch -func getMacAddress() ([]byte, error) { - interfaces, err := net.Interfaces() - if err != nil { - return nil, err + uuidBytes := make([]byte, 15) - } - for _, i := range interfaces { - if i.Flags != net.FlagLoopback { - // Pick the first valid non loopback address we find - addr := i.HardwareAddr - if isValidAddress(addr) { - return addr, nil - } - } - } + //// We have auto-generated ids, which are usually used for append-only workloads. + //// So we try to optimize the order of bytes for indexing speed (by having quite + //// unique bytes close to the beginning of the ids so that sorting is fast) and + //// compression (by making sure we share common prefixes between enough ids). + + // We use the sequence number rather than the timestamp because the distribution of + // the timestamp depends too much on the ingestion rate, so it is less reliable. + uuidBytes[0] = byte(s) // copy lowest-order byte from sequence number + uuidBytes[1] = byte(s >> 16) // copy 3rd lowest-order byte from sequence number + + // Now we start focusing on compression and put bytes that should not change too often. + uuidBytes[2] = byte(t >> 16) // 3rd lowest-order byte from timestamp; changes every ~65 secs + uuidBytes[3] = byte(t >> 24) // 4th lowest-order byte from timestamp; changes every ~4.5h + uuidBytes[4] = byte(t >> 32) // 5th lowest-order byte from timestamp; changes every ~50 days + uuidBytes[5] = byte(t >> 40) // 6th lowest-order byte from timestamp; changes every 35 years + + // Copy mac address bytes (6 bytes) + uuidBytes = append(uuidBytes, mac...) - // Could not find a mac address - return nil, nil + // Finally we put the remaining bytes, which will likely not be compressed at all. + uuidBytes[12] = byte(t >> 8) // 2nd lowest-order byte from timestamp + uuidBytes[13] = byte(s >> 8) // 2nd lowest-order byte from sequence number + uuidBytes[14] = byte(t) + + // See also: more detailed explanation of byte choices at + // https://github.com/elastic/elasticsearch/blob/a666fb22664284d8e2114841ebb58ea4e1924691/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java#L80-L95 + + return base64.RawURLEncoding.EncodeToString(uuidBytes) } -func isValidAddress(addr []byte) bool { - if addr == nil || len(addr) != 6 { - return false - } +func getTimestamp() *time.Time { + // Don't let timestamp go backwards, at least "on our watch" (while this process is running). We are still vulnerable if we are + // shut down, clock goes backwards, and we restart... for this we randomize the sequenceNumber on init to decrease chance of + // collision. + now := time.Now() - for _, b := range addr { - if b != 0x00 { - return true // If any of the bytes are non zero assume a good address - } + if lastTimestamp == nil { + return &now } -} -func constructDummyMulticastAddress() []byte { + if lastTimestamp.After(now) { + return lastTimestamp + } + return &now } diff --git a/libbeat/processors/flake_id/mac.go b/libbeat/processors/flake_id/mac.go new file mode 100644 index 000000000000..d612b3c45cb1 --- /dev/null +++ b/libbeat/processors/flake_id/mac.go @@ -0,0 +1,99 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package flake_id + +import ( + "crypto/rand" + "net" +) + +// Golang port of https://github.com/elastic/elasticsearch/blob/a666fb22664284d8e2114841ebb58ea4e1924691/server/src/main/java/org/elasticsearch/common/MacAddressProvider.java + +type id []byte + +const MacAddrLen = 6 + +func getSecureMungedMACAddress() ([]byte, error) { + addr, err := getMacAddress() + if err != nil { + return nil, err + } + + if !isValidAddress(addr) { + addr, err = constructDummyMulticastAddress() + if err != nil { + return nil, err + } + } + + munged := make([]byte, MacAddrLen) + _, err = rand.Read(munged) + if err != nil { + return nil, err + } + + for i := 0; i < MacAddrLen; i++ { + munged[i] ^= addr[i] + } + + return munged, nil +} + +func getMacAddress() ([]byte, error) { + interfaces, err := net.Interfaces() + if err != nil { + return nil, err + + } + for _, i := range interfaces { + if i.Flags != net.FlagLoopback { + // Pick the first valid non loopback address we find + addr := i.HardwareAddr + if isValidAddress(addr) { + return addr, nil + } + } + } + + // Could not find a mac address + return nil, nil +} + +func isValidAddress(addr []byte) bool { + if addr == nil || len(addr) != 6 { + return false + } + + for _, b := range addr { + if b != 0x00 { + return true // If any of the bytes are non zero assume a good address + } + } +} + +func constructDummyMulticastAddress() ([]byte, error) { + dummy := make([]byte, MacAddrLen) + _, err := rand.Read(dummy) + if err != nil { + return nil, err + } + + // Set the broadcast bit to indicate this is not a _real_ mac address + dummy[0] |= byte(0x01) + return dummy, nil +} From bfa27b1ee886ccabb2916d801f08827da87bffd7 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 12:55:15 -0800 Subject: [PATCH 03/50] Rename package --- .../processors/{flake_id => elasticsearch_id}/generator.go | 4 ++-- libbeat/processors/{flake_id => elasticsearch_id}/mac.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename libbeat/processors/{flake_id => elasticsearch_id}/generator.go (97%) rename libbeat/processors/{flake_id => elasticsearch_id}/mac.go (98%) diff --git a/libbeat/processors/flake_id/generator.go b/libbeat/processors/elasticsearch_id/generator.go similarity index 97% rename from libbeat/processors/flake_id/generator.go rename to libbeat/processors/elasticsearch_id/generator.go index 4171d6059a09..5e258afa109a 100644 --- a/libbeat/processors/flake_id/generator.go +++ b/libbeat/processors/elasticsearch_id/generator.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package flake_id +package elasticsearch_id import ( "encoding/base64" @@ -68,7 +68,7 @@ func getBase64UUID() string { //// compression (by making sure we share common prefixes between enough ids). // We use the sequence number rather than the timestamp because the distribution of - // the timestamp depends too much on the ingestion rate, so it is less reliable. + // the timestamp depends too much on the indexing rate, so it is less reliable. uuidBytes[0] = byte(s) // copy lowest-order byte from sequence number uuidBytes[1] = byte(s >> 16) // copy 3rd lowest-order byte from sequence number diff --git a/libbeat/processors/flake_id/mac.go b/libbeat/processors/elasticsearch_id/mac.go similarity index 98% rename from libbeat/processors/flake_id/mac.go rename to libbeat/processors/elasticsearch_id/mac.go index d612b3c45cb1..cebe0e7afe5e 100644 --- a/libbeat/processors/flake_id/mac.go +++ b/libbeat/processors/elasticsearch_id/mac.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package flake_id +package elasticsearch_id import ( "crypto/rand" From 98a891d8fdb24ac121f3efa0912e08d8cc76e648 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 12:56:44 -0800 Subject: [PATCH 04/50] Unexport const --- libbeat/processors/elasticsearch_id/mac.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libbeat/processors/elasticsearch_id/mac.go b/libbeat/processors/elasticsearch_id/mac.go index cebe0e7afe5e..7ba03b81d178 100644 --- a/libbeat/processors/elasticsearch_id/mac.go +++ b/libbeat/processors/elasticsearch_id/mac.go @@ -26,7 +26,7 @@ import ( type id []byte -const MacAddrLen = 6 +const addrLen = 6 func getSecureMungedMACAddress() ([]byte, error) { addr, err := getMacAddress() @@ -41,13 +41,13 @@ func getSecureMungedMACAddress() ([]byte, error) { } } - munged := make([]byte, MacAddrLen) + munged := make([]byte, addrLen) _, err = rand.Read(munged) if err != nil { return nil, err } - for i := 0; i < MacAddrLen; i++ { + for i := 0; i < addrLen; i++ { munged[i] ^= addr[i] } @@ -87,7 +87,7 @@ func isValidAddress(addr []byte) bool { } func constructDummyMulticastAddress() ([]byte, error) { - dummy := make([]byte, MacAddrLen) + dummy := make([]byte, addrLen) _, err := rand.Read(dummy) if err != nil { return nil, err From 1133141eba10c7dc252d141420516c9b22a070b5 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 12:57:15 -0800 Subject: [PATCH 05/50] Use increment operator --- libbeat/processors/elasticsearch_id/generator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/elasticsearch_id/generator.go b/libbeat/processors/elasticsearch_id/generator.go index 5e258afa109a..806232be21cd 100644 --- a/libbeat/processors/elasticsearch_id/generator.go +++ b/libbeat/processors/elasticsearch_id/generator.go @@ -46,7 +46,7 @@ func getBase64UUID() string { mu.Lock() defer mu.Unlock() - sequenceNumber += 1 + sequenceNumber++ // We only use bottom 3 bytes for the sequence number. s := sequenceNumber & 0xffffff From f35064bdc3b1dd60c13237b763cc257d43bf62f7 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 16:59:48 -0800 Subject: [PATCH 06/50] Adding processor scaffolding --- libbeat/processors/elasticsearch_id/config.go | 29 +++++++++ .../elasticsearch_id/elasticsearch_id.go | 65 +++++++++++++++++++ libbeat/processors/elasticsearch_id/errors.go | 41 ++++++++++++ 3 files changed, 135 insertions(+) create mode 100644 libbeat/processors/elasticsearch_id/config.go create mode 100644 libbeat/processors/elasticsearch_id/elasticsearch_id.go create mode 100644 libbeat/processors/elasticsearch_id/errors.go diff --git a/libbeat/processors/elasticsearch_id/config.go b/libbeat/processors/elasticsearch_id/config.go new file mode 100644 index 000000000000..66cf6bfaf19d --- /dev/null +++ b/libbeat/processors/elasticsearch_id/config.go @@ -0,0 +1,29 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch_id + +// Config for Elasticsearch ID processor. +type Config struct { + TargetField string `config:"target_field"` // Target field for the ID +} + +func defaultConfig() Config { + return Config{ + TargetField: "metadata.id", + } +} diff --git a/libbeat/processors/elasticsearch_id/elasticsearch_id.go b/libbeat/processors/elasticsearch_id/elasticsearch_id.go new file mode 100644 index 000000000000..b1a151fe4631 --- /dev/null +++ b/libbeat/processors/elasticsearch_id/elasticsearch_id.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch_id + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" +) + +func init() { + processors.RegisterPlugin("elasticsearch_id", New) + jsprocessor.RegisterPlugin("ElasticsearchID", New) +} + +const processorName = "elasticsearch_id" + +type elasticsearchID struct { + config Config +} + +// New constructs a new Elasticsearch ID processor. +func New(cfg *common.Config) (processors.Processor, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, makeErrConfigUnpack(err) + } + + p := &elasticsearchID{ + config, + } + + return p, nil +} + +// Run enriches the given event with an ID +func (p *elasticsearchID) Run(event *beat.Event) (*beat.Event, error) { + id := getBase64UUID() + if _, err := event.PutValue(p.config.TargetField, id); err != nil { + return nil, makeErrComputeID(err) + } + + return event, nil +} + +func (p *elasticsearchID) String() string { + return fmt.Sprintf("%v=[target_field=[%v]]", processorName, p.config.TargetField) +} diff --git a/libbeat/processors/elasticsearch_id/errors.go b/libbeat/processors/elasticsearch_id/errors.go new file mode 100644 index 000000000000..9f38094d8e57 --- /dev/null +++ b/libbeat/processors/elasticsearch_id/errors.go @@ -0,0 +1,41 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch_id + +import ( + "fmt" +) + +type ( + errConfigUnpack struct{ cause error } + errComputeID struct{ cause error } +) + +func makeErrConfigUnpack(cause error) errConfigUnpack { + return errConfigUnpack{cause} +} +func (e errConfigUnpack) Error() string { + return fmt.Sprintf("failed to unpack %v processor configuration: %v", processorName, e.cause) +} + +func makeErrComputeID(cause error) errComputeID { + return errComputeID{cause} +} +func (e errComputeID) Error() string { + return fmt.Sprintf("failed to compute ID: %v", e.cause) +} From f6ed19c2058d0a9db70fca7081827b1cecea6a0a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 17:14:20 -0800 Subject: [PATCH 07/50] Fixing default field --- libbeat/processors/elasticsearch_id/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/elasticsearch_id/config.go b/libbeat/processors/elasticsearch_id/config.go index 66cf6bfaf19d..e04d2fbc86a9 100644 --- a/libbeat/processors/elasticsearch_id/config.go +++ b/libbeat/processors/elasticsearch_id/config.go @@ -24,6 +24,6 @@ type Config struct { func defaultConfig() Config { return Config{ - TargetField: "metadata.id", + TargetField: "@metadata.id", } } From e2d85feaca748984a121fb1dbfb26901ff90418a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 17:15:23 -0800 Subject: [PATCH 08/50] Adding CHANGELOG entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6af30ecd9f6e..8a61352fdd1d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -360,6 +360,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add consumer_lag in Kafka consumergroup metricset {pull}14822[14822] - Make use of consumer_lag in Kafka dashboard {pull}14863[14863] - Refactor kubernetes autodiscover to enable different resource based discovery {pull}14738[14738] +- Add `elasticsearch_id` processor. {pull}14524[14524] *Auditbeat* From 477fe7f1956e4035e460c9a6bf0611301a179613 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 17:25:56 -0800 Subject: [PATCH 09/50] Fixing compile errors --- libbeat/processors/elasticsearch_id/elasticsearch_id.go | 1 + libbeat/processors/elasticsearch_id/elasticsearch_id_test.go | 1 + libbeat/processors/elasticsearch_id/mac.go | 2 ++ 3 files changed, 4 insertions(+) create mode 100644 libbeat/processors/elasticsearch_id/elasticsearch_id_test.go diff --git a/libbeat/processors/elasticsearch_id/elasticsearch_id.go b/libbeat/processors/elasticsearch_id/elasticsearch_id.go index b1a151fe4631..b3b0539da180 100644 --- a/libbeat/processors/elasticsearch_id/elasticsearch_id.go +++ b/libbeat/processors/elasticsearch_id/elasticsearch_id.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" + jsprocessor "github.com/elastic/beats/libbeat/processors/script/javascript/module/processor" ) func init() { diff --git a/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go b/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go new file mode 100644 index 000000000000..92754afb3e5b --- /dev/null +++ b/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go @@ -0,0 +1 @@ +package elasticsearch_id diff --git a/libbeat/processors/elasticsearch_id/mac.go b/libbeat/processors/elasticsearch_id/mac.go index 7ba03b81d178..3d9715c2c648 100644 --- a/libbeat/processors/elasticsearch_id/mac.go +++ b/libbeat/processors/elasticsearch_id/mac.go @@ -84,6 +84,8 @@ func isValidAddress(addr []byte) bool { return true // If any of the bytes are non zero assume a good address } } + + return false } func constructDummyMulticastAddress() ([]byte, error) { From 998a85ff28abb0802a24327cb2aca4beb7adb135 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 17:31:21 -0800 Subject: [PATCH 10/50] WIP: unit tests --- .../elasticsearch_id/elasticsearch_id_test.go | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go b/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go index 92754afb3e5b..8707f9597f40 100644 --- a/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go +++ b/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go @@ -1 +1,42 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package elasticsearch_id + +import ( + "testing" + + "github.com/elastic/beats/libbeat/common" + + "github.com/elastic/beats/libbeat/beat" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultTargetField(t *testing.T) { + p, err := New(common.MustNewConfigFrom(nil)) + assert.NoError(t, err) + + testEvent := &beat.Event{} + + newEvent, err := p.Run(testEvent) + assert.NoError(t, err) + + v, err := newEvent.GetValue("@metadata.id") + assert.NoError(t, err) + assert.NotEmpty(t, v) +} From e4a2af69494046069c067d7221a952e7cde9f87d Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 17:47:25 -0800 Subject: [PATCH 11/50] Fixing byte copy --- libbeat/processors/elasticsearch_id/generator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/elasticsearch_id/generator.go b/libbeat/processors/elasticsearch_id/generator.go index 806232be21cd..dec5cfa4e60e 100644 --- a/libbeat/processors/elasticsearch_id/generator.go +++ b/libbeat/processors/elasticsearch_id/generator.go @@ -79,7 +79,7 @@ func getBase64UUID() string { uuidBytes[5] = byte(t >> 40) // 6th lowest-order byte from timestamp; changes every 35 years // Copy mac address bytes (6 bytes) - uuidBytes = append(uuidBytes, mac...) + copy(uuidBytes[6:6+addrLen], mac) // Finally we put the remaining bytes, which will likely not be compressed at all. uuidBytes[12] = byte(t >> 8) // 2nd lowest-order byte from timestamp From 229d57d06a7d3173f0a0673ddb515c5d72f86c8e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 17:47:33 -0800 Subject: [PATCH 12/50] Fixing up tests --- .../elasticsearch_id/elasticsearch_id_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go b/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go index 8707f9597f40..d67125dbb060 100644 --- a/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go +++ b/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go @@ -18,6 +18,7 @@ package elasticsearch_id import ( + "encoding/base64" "testing" "github.com/elastic/beats/libbeat/common" @@ -39,4 +40,13 @@ func TestDefaultTargetField(t *testing.T) { v, err := newEvent.GetValue("@metadata.id") assert.NoError(t, err) assert.NotEmpty(t, v) + + assertLen(t, v) +} + +func assertLen(t *testing.T, value interface{}) { + assert.IsType(t, "", value) + decoded, err := base64.RawURLEncoding.DecodeString(value.(string)) + assert.NoError(t, err) + assert.Len(t, decoded, 15) } From c9a7bae89d5eae93f7a77f8cd9dc96339828b58e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 17:53:04 -0800 Subject: [PATCH 13/50] Adding test TODOs --- .../elasticsearch_id/elasticsearch_id_test.go | 2 ++ .../elasticsearch_id/generator_test.go | 21 +++++++++++++++++ .../processors/elasticsearch_id/mac_test.go | 23 +++++++++++++++++++ 3 files changed, 46 insertions(+) create mode 100644 libbeat/processors/elasticsearch_id/generator_test.go create mode 100644 libbeat/processors/elasticsearch_id/mac_test.go diff --git a/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go b/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go index d67125dbb060..6fd51021bc85 100644 --- a/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go +++ b/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go @@ -44,6 +44,8 @@ func TestDefaultTargetField(t *testing.T) { assertLen(t, v) } +// TODO: non default + func assertLen(t *testing.T, value interface{}) { assert.IsType(t, "", value) decoded, err := base64.RawURLEncoding.DecodeString(value.(string)) diff --git a/libbeat/processors/elasticsearch_id/generator_test.go b/libbeat/processors/elasticsearch_id/generator_test.go new file mode 100644 index 000000000000..d53b8b6ffafe --- /dev/null +++ b/libbeat/processors/elasticsearch_id/generator_test.go @@ -0,0 +1,21 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch_id + +// TODO: test consecutive +// TODO: benchmarking diff --git a/libbeat/processors/elasticsearch_id/mac_test.go b/libbeat/processors/elasticsearch_id/mac_test.go new file mode 100644 index 000000000000..ad3b33ff2bed --- /dev/null +++ b/libbeat/processors/elasticsearch_id/mac_test.go @@ -0,0 +1,23 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch_id + +// TODO: test isValidAddress (length, not all zeros) +// TODO: test constructDummyMulticastAddress (length, multicast bit) +// TODO: test getSecureMungedMACAddress (length) +// TODO: getMacAddress (not loopback, length) From 4cc8c56e34b5d3a471fa0f9acfafe94b47e93b53 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 18:03:37 -0800 Subject: [PATCH 14/50] Adding non-default target field unit test --- .../elasticsearch_id/elasticsearch_id_test.go | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go b/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go index 6fd51021bc85..790161f6724f 100644 --- a/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go +++ b/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go @@ -39,15 +39,37 @@ func TestDefaultTargetField(t *testing.T) { v, err := newEvent.GetValue("@metadata.id") assert.NoError(t, err) - assert.NotEmpty(t, v) - assertLen(t, v) } -// TODO: non default +func TestNonDefaultTargetField(t *testing.T) { + cfg := common.MustNewConfigFrom(common.MapStr{ + "target_field": "foo", + }) + p, err := New(cfg) + assert.NoError(t, err) + + testEvent := &beat.Event{ + Fields: common.MapStr{}, + } + + newEvent, err := p.Run(testEvent) + assert.NoError(t, err) + + v, err := newEvent.GetValue("foo") + assert.NoError(t, err) + assertLen(t, v) + + v, err = newEvent.GetValue("@metadata.id") + assert.NoError(t, err) + assert.Nil(t, v) +} func assertLen(t *testing.T, value interface{}) { assert.IsType(t, "", value) + + assert.NotEmpty(t, value) + decoded, err := base64.RawURLEncoding.DecodeString(value.(string)) assert.NoError(t, err) assert.Len(t, decoded, 15) From a6b36460bb193f3ba0d507d63beea821f93dfdb7 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 18:05:15 -0800 Subject: [PATCH 15/50] Adding one more test TODO --- libbeat/processors/elasticsearch_id/generator_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/processors/elasticsearch_id/generator_test.go b/libbeat/processors/elasticsearch_id/generator_test.go index d53b8b6ffafe..72367967a7e7 100644 --- a/libbeat/processors/elasticsearch_id/generator_test.go +++ b/libbeat/processors/elasticsearch_id/generator_test.go @@ -17,5 +17,6 @@ package elasticsearch_id +// TODO: bytes 7-12 are mac address // TODO: test consecutive // TODO: benchmarking From 950cd5beb14d90b3b8a1c56298241cd11e22bc38 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 18:09:52 -0800 Subject: [PATCH 16/50] Adding TODO for post-benchmarking --- libbeat/processors/elasticsearch_id/generator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/processors/elasticsearch_id/generator.go b/libbeat/processors/elasticsearch_id/generator.go index dec5cfa4e60e..810c260c0b46 100644 --- a/libbeat/processors/elasticsearch_id/generator.go +++ b/libbeat/processors/elasticsearch_id/generator.go @@ -43,6 +43,7 @@ func init() { } func getBase64UUID() string { + // TODO: consider moving only critical section into IIFE? Check after benchmarking. mu.Lock() defer mu.Unlock() From 02cdbf6e957cd2eaddbd4bf6bf46d906ddcf3d81 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 18:45:34 -0800 Subject: [PATCH 17/50] Introduce type --- .../{elasticsearch_id => uuid}/config.go | 4 ++- .../{elasticsearch_id => uuid}/errors.go | 10 +++++- .../generator/elasticsearch}/generator.go | 4 +-- .../elasticsearch}/generator_test.go | 3 +- .../generator/elasticsearch}/mac.go | 2 +- .../generator/elasticsearch}/mac_test.go | 2 +- libbeat/processors/uuid/generator/errors.go | 33 +++++++++++++++++++ .../processors/uuid/generator/generator.go | 31 +++++++++++++++++ .../uuid/generator/generator_test.go | 20 +++++++++++ .../elasticsearch_id.go => uuid/uuid.go} | 27 +++++++++------ .../uuid_test.go} | 19 +++-------- 11 files changed, 123 insertions(+), 32 deletions(-) rename libbeat/processors/{elasticsearch_id => uuid}/config.go (90%) rename libbeat/processors/{elasticsearch_id => uuid}/errors.go (85%) rename libbeat/processors/{elasticsearch_id => uuid/generator/elasticsearch}/generator.go (98%) rename libbeat/processors/{elasticsearch_id => uuid/generator/elasticsearch}/generator_test.go (93%) rename libbeat/processors/{elasticsearch_id => uuid/generator/elasticsearch}/mac.go (98%) rename libbeat/processors/{elasticsearch_id => uuid/generator/elasticsearch}/mac_test.go (97%) create mode 100644 libbeat/processors/uuid/generator/errors.go create mode 100644 libbeat/processors/uuid/generator/generator.go create mode 100644 libbeat/processors/uuid/generator/generator_test.go rename libbeat/processors/{elasticsearch_id/elasticsearch_id.go => uuid/uuid.go} (76%) rename libbeat/processors/{elasticsearch_id/elasticsearch_id_test.go => uuid/uuid_test.go} (83%) diff --git a/libbeat/processors/elasticsearch_id/config.go b/libbeat/processors/uuid/config.go similarity index 90% rename from libbeat/processors/elasticsearch_id/config.go rename to libbeat/processors/uuid/config.go index e04d2fbc86a9..19fe06c4ae01 100644 --- a/libbeat/processors/elasticsearch_id/config.go +++ b/libbeat/processors/uuid/config.go @@ -15,15 +15,17 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch_id +package uuid // Config for Elasticsearch ID processor. type Config struct { TargetField string `config:"target_field"` // Target field for the ID + Type string `config:"type"` // Type of ID } func defaultConfig() Config { return Config{ TargetField: "@metadata.id", + Type: "elasticsearch", } } diff --git a/libbeat/processors/elasticsearch_id/errors.go b/libbeat/processors/uuid/errors.go similarity index 85% rename from libbeat/processors/elasticsearch_id/errors.go rename to libbeat/processors/uuid/errors.go index 9f38094d8e57..7ab5b0de9b78 100644 --- a/libbeat/processors/elasticsearch_id/errors.go +++ b/libbeat/processors/uuid/errors.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch_id +package uuid import ( "fmt" @@ -24,6 +24,7 @@ import ( type ( errConfigUnpack struct{ cause error } errComputeID struct{ cause error } + errUnknownType struct{ typ string } ) func makeErrConfigUnpack(cause error) errConfigUnpack { @@ -39,3 +40,10 @@ func makeErrComputeID(cause error) errComputeID { func (e errComputeID) Error() string { return fmt.Sprintf("failed to compute ID: %v", e.cause) } + +func makeErrUnknownType(typ string) errUnknownType { + return errUnknownType{typ} +} +func (e errUnknownType) Error() string { + return fmt.Sprintf("invalid type [%s]", e.typ) +} diff --git a/libbeat/processors/elasticsearch_id/generator.go b/libbeat/processors/uuid/generator/elasticsearch/generator.go similarity index 98% rename from libbeat/processors/elasticsearch_id/generator.go rename to libbeat/processors/uuid/generator/elasticsearch/generator.go index 810c260c0b46..9c487e930507 100644 --- a/libbeat/processors/elasticsearch_id/generator.go +++ b/libbeat/processors/uuid/generator/elasticsearch/generator.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch_id +package elasticsearch import ( "encoding/base64" @@ -42,7 +42,7 @@ func init() { sequenceNumber = rand.Uint32() } -func getBase64UUID() string { +func GetBase64UUID() string { // TODO: consider moving only critical section into IIFE? Check after benchmarking. mu.Lock() defer mu.Unlock() diff --git a/libbeat/processors/elasticsearch_id/generator_test.go b/libbeat/processors/uuid/generator/elasticsearch/generator_test.go similarity index 93% rename from libbeat/processors/elasticsearch_id/generator_test.go rename to libbeat/processors/uuid/generator/elasticsearch/generator_test.go index 72367967a7e7..b9afdbda5fca 100644 --- a/libbeat/processors/elasticsearch_id/generator_test.go +++ b/libbeat/processors/uuid/generator/elasticsearch/generator_test.go @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch_id +package elasticsearch +// TODO: check decoded length == 15 bytes // TODO: bytes 7-12 are mac address // TODO: test consecutive // TODO: benchmarking diff --git a/libbeat/processors/elasticsearch_id/mac.go b/libbeat/processors/uuid/generator/elasticsearch/mac.go similarity index 98% rename from libbeat/processors/elasticsearch_id/mac.go rename to libbeat/processors/uuid/generator/elasticsearch/mac.go index 3d9715c2c648..8398d4e4c2fc 100644 --- a/libbeat/processors/elasticsearch_id/mac.go +++ b/libbeat/processors/uuid/generator/elasticsearch/mac.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch_id +package elasticsearch import ( "crypto/rand" diff --git a/libbeat/processors/elasticsearch_id/mac_test.go b/libbeat/processors/uuid/generator/elasticsearch/mac_test.go similarity index 97% rename from libbeat/processors/elasticsearch_id/mac_test.go rename to libbeat/processors/uuid/generator/elasticsearch/mac_test.go index ad3b33ff2bed..8c7e902dc0a1 100644 --- a/libbeat/processors/elasticsearch_id/mac_test.go +++ b/libbeat/processors/uuid/generator/elasticsearch/mac_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch_id +package elasticsearch // TODO: test isValidAddress (length, not all zeros) // TODO: test constructDummyMulticastAddress (length, multicast bit) diff --git a/libbeat/processors/uuid/generator/errors.go b/libbeat/processors/uuid/generator/errors.go new file mode 100644 index 000000000000..dda17c32e617 --- /dev/null +++ b/libbeat/processors/uuid/generator/errors.go @@ -0,0 +1,33 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package generator + +import ( + "fmt" +) + +type ( + errUnknownType struct{ typ string } +) + +func makeErrUnknownType(typ string) errUnknownType { + return errUnknownType{typ} +} +func (e errUnknownType) Error() string { + return fmt.Sprintf("invalid type [%s]", e.typ) +} diff --git a/libbeat/processors/uuid/generator/generator.go b/libbeat/processors/uuid/generator/generator.go new file mode 100644 index 000000000000..048cd83b2eb5 --- /dev/null +++ b/libbeat/processors/uuid/generator/generator.go @@ -0,0 +1,31 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package generator + +import "github.com/elastic/beats/libbeat/processors/uuid/generator/elasticsearch" + +type generatorFn func() string + +func Factory(typ string) (generatorFn, error) { + switch typ { + case "elasticsearch": + return elasticsearch.GetBase64UUID, nil + default: + return nil, makeErrUnknownType(typ) + } +} diff --git a/libbeat/processors/uuid/generator/generator_test.go b/libbeat/processors/uuid/generator/generator_test.go new file mode 100644 index 000000000000..27ef319fa787 --- /dev/null +++ b/libbeat/processors/uuid/generator/generator_test.go @@ -0,0 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package generator + +// TODO: test unknown type diff --git a/libbeat/processors/elasticsearch_id/elasticsearch_id.go b/libbeat/processors/uuid/uuid.go similarity index 76% rename from libbeat/processors/elasticsearch_id/elasticsearch_id.go rename to libbeat/processors/uuid/uuid.go index b3b0539da180..621175927ec6 100644 --- a/libbeat/processors/elasticsearch_id/elasticsearch_id.go +++ b/libbeat/processors/uuid/uuid.go @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch_id +package uuid import ( "fmt" + "github.com/elastic/beats/libbeat/processors/uuid/generator" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" @@ -27,24 +29,24 @@ import ( ) func init() { - processors.RegisterPlugin("elasticsearch_id", New) - jsprocessor.RegisterPlugin("ElasticsearchID", New) + processors.RegisterPlugin("uuid", New) + jsprocessor.RegisterPlugin("UUID", New) } -const processorName = "elasticsearch_id" +const processorName = "uuid" -type elasticsearchID struct { +type uuid struct { config Config } -// New constructs a new Elasticsearch ID processor. +// New constructs a new UUID processor. func New(cfg *common.Config) (processors.Processor, error) { config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return nil, makeErrConfigUnpack(err) } - p := &elasticsearchID{ + p := &uuid{ config, } @@ -52,8 +54,13 @@ func New(cfg *common.Config) (processors.Processor, error) { } // Run enriches the given event with an ID -func (p *elasticsearchID) Run(event *beat.Event) (*beat.Event, error) { - id := getBase64UUID() +func (p *uuid) Run(event *beat.Event) (*beat.Event, error) { + idFn, err := generator.Factory(p.config.Type) + if err != nil { + return nil, makeErrComputeID(err) + } + + id := idFn() if _, err := event.PutValue(p.config.TargetField, id); err != nil { return nil, makeErrComputeID(err) } @@ -61,6 +68,6 @@ func (p *elasticsearchID) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (p *elasticsearchID) String() string { +func (p *uuid) String() string { return fmt.Sprintf("%v=[target_field=[%v]]", processorName, p.config.TargetField) } diff --git a/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go b/libbeat/processors/uuid/uuid_test.go similarity index 83% rename from libbeat/processors/elasticsearch_id/elasticsearch_id_test.go rename to libbeat/processors/uuid/uuid_test.go index 790161f6724f..9c2e9ff27ded 100644 --- a/libbeat/processors/elasticsearch_id/elasticsearch_id_test.go +++ b/libbeat/processors/uuid/uuid_test.go @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch_id +package uuid import ( - "encoding/base64" "testing" "github.com/elastic/beats/libbeat/common" @@ -39,7 +38,7 @@ func TestDefaultTargetField(t *testing.T) { v, err := newEvent.GetValue("@metadata.id") assert.NoError(t, err) - assertLen(t, v) + assert.NotEmpty(t, v) } func TestNonDefaultTargetField(t *testing.T) { @@ -58,19 +57,9 @@ func TestNonDefaultTargetField(t *testing.T) { v, err := newEvent.GetValue("foo") assert.NoError(t, err) - assertLen(t, v) + assert.NotEmpty(t, v) v, err = newEvent.GetValue("@metadata.id") assert.NoError(t, err) - assert.Nil(t, v) -} - -func assertLen(t *testing.T, value interface{}) { - assert.IsType(t, "", value) - - assert.NotEmpty(t, value) - - decoded, err := base64.RawURLEncoding.DecodeString(value.(string)) - assert.NoError(t, err) - assert.Len(t, decoded, 15) + assert.Empty(t, v) } From 6881da2871e9afd976d751d17ddccecdf126de54 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 23:03:20 -0800 Subject: [PATCH 18/50] Adding unit test for factory --- .../uuid/generator/generator_test.go | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/libbeat/processors/uuid/generator/generator_test.go b/libbeat/processors/uuid/generator/generator_test.go index 27ef319fa787..64294cbadaff 100644 --- a/libbeat/processors/uuid/generator/generator_test.go +++ b/libbeat/processors/uuid/generator/generator_test.go @@ -17,4 +17,47 @@ package generator -// TODO: test unknown type +import ( + "reflect" + "runtime" + "testing" + + "github.com/elastic/beats/libbeat/processors/uuid/generator/elasticsearch" + + "github.com/stretchr/testify/assert" +) + +func TestFactory(t *testing.T) { + tests := map[string]struct { + expectedGeneratorFn generatorFn + expectedErr error + }{ + "elasticsearch": { + elasticsearch.GetBase64UUID, + nil, + }, + "foobar": { + nil, + makeErrUnknownType("foobar"), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + typ := name + fn, err := Factory(typ) + if test.expectedGeneratorFn != nil { + fnName := getGeneratorFuncName(fn) + expectedFnName := getGeneratorFuncName(test.expectedGeneratorFn) + assert.Equal(t, fnName, expectedFnName) + } + if test.expectedErr != nil { + assert.EqualError(t, err, test.expectedErr.Error()) + } + }) + } +} + +func getGeneratorFuncName(fn generatorFn) string { + return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() +} From b3af9e81079e54b50f0c76703c0a5cd7b30e69a2 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 14 Nov 2019 23:17:23 -0800 Subject: [PATCH 19/50] Adding unit test for mac --- .../uuid/generator/elasticsearch/mac_test.go | 42 ++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/libbeat/processors/uuid/generator/elasticsearch/mac_test.go b/libbeat/processors/uuid/generator/elasticsearch/mac_test.go index 8c7e902dc0a1..eaa6943295bd 100644 --- a/libbeat/processors/uuid/generator/elasticsearch/mac_test.go +++ b/libbeat/processors/uuid/generator/elasticsearch/mac_test.go @@ -17,7 +17,47 @@ package elasticsearch -// TODO: test isValidAddress (length, not all zeros) +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsValidAddress(t *testing.T) { + tests := map[string]struct { + addr []byte + expected bool + }{ + "nil": { + nil, + false, + }, + "too_short": { + []byte{0xde, 0xad, 0xbe, 0xef}, + false, + }, + "too_long": { + []byte{0xbe, 0xa7, 0x5a, 0x43, 0xda, 0xbe, 0x57}, + false, + }, + "all_zeros": { + []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + false, + }, + "good": { + []byte{0xbe, 0xa7, 0x5a, 0x43, 0x90, 0x0d}, + true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + v := isValidAddress(test.addr) + assert.Equal(t, test.expected, v) + }) + } +} + // TODO: test constructDummyMulticastAddress (length, multicast bit) // TODO: test getSecureMungedMACAddress (length) // TODO: getMacAddress (not loopback, length) From ae02eb2d47da4ddeb34421a342ab85c678f3294a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 15 Nov 2019 08:23:08 -0800 Subject: [PATCH 20/50] Adding unit test for mac --- .../uuid/generator/elasticsearch/mac_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/libbeat/processors/uuid/generator/elasticsearch/mac_test.go b/libbeat/processors/uuid/generator/elasticsearch/mac_test.go index eaa6943295bd..8a3a9923e63f 100644 --- a/libbeat/processors/uuid/generator/elasticsearch/mac_test.go +++ b/libbeat/processors/uuid/generator/elasticsearch/mac_test.go @@ -58,6 +58,14 @@ func TestIsValidAddress(t *testing.T) { } } -// TODO: test constructDummyMulticastAddress (length, multicast bit) +func TestConstructDummyMulticastAddress(t *testing.T) { + addr, err := constructDummyMulticastAddress() + assert.NoError(t, err) + assert.Len(t, addr, addrLen) + + firstOctet := addr[0] + assert.EqualValues(t, 0x01, firstOctet&0x01) +} + // TODO: test getSecureMungedMACAddress (length) // TODO: getMacAddress (not loopback, length) From b60e88c29fe23be3eef4852118189810abfcfee2 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 15 Nov 2019 10:00:34 -0800 Subject: [PATCH 21/50] Fleshing out remaining mac unit tests --- .../uuid/generator/elasticsearch/mac_test.go | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/libbeat/processors/uuid/generator/elasticsearch/mac_test.go b/libbeat/processors/uuid/generator/elasticsearch/mac_test.go index 8a3a9923e63f..f9629143378b 100644 --- a/libbeat/processors/uuid/generator/elasticsearch/mac_test.go +++ b/libbeat/processors/uuid/generator/elasticsearch/mac_test.go @@ -18,6 +18,7 @@ package elasticsearch import ( + "net" "testing" "github.com/stretchr/testify/assert" @@ -67,5 +68,33 @@ func TestConstructDummyMulticastAddress(t *testing.T) { assert.EqualValues(t, 0x01, firstOctet&0x01) } -// TODO: test getSecureMungedMACAddress (length) -// TODO: getMacAddress (not loopback, length) +func TestSecureMungedMACAddress(t *testing.T) { + addr, err := getSecureMungedMACAddress() + assert.NoError(t, err) + assert.Len(t, addr, addrLen) +} + +func TestGetMacAddress(t *testing.T) { + addr, err := getMacAddress() + assert.NoError(t, err) + assert.Len(t, addr, addrLen) + + getLoopbackAddrs := func() [][]byte { + var loAddrs [][]byte + + interfaces, err := net.Interfaces() + assert.NoError(t, err) + + for _, i := range interfaces { + if i.Flags == net.FlagLoopback { + loAddrs = append(loAddrs, i.HardwareAddr) + } + } + + return loAddrs + } + + for _, loAddr := range getLoopbackAddrs() { + assert.NotEqual(t, loAddr, addr) + } +} From 881c040df273385ebe2a0c46327c90c80aa0aeff Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 15 Nov 2019 11:02:53 -0800 Subject: [PATCH 22/50] Adding tests for ES ID generator --- .../generator/elasticsearch/generator_test.go | 92 ++++++++++++++++++- 1 file changed, 88 insertions(+), 4 deletions(-) diff --git a/libbeat/processors/uuid/generator/elasticsearch/generator_test.go b/libbeat/processors/uuid/generator/elasticsearch/generator_test.go index b9afdbda5fca..5b1fac929cdd 100644 --- a/libbeat/processors/uuid/generator/elasticsearch/generator_test.go +++ b/libbeat/processors/uuid/generator/elasticsearch/generator_test.go @@ -17,7 +17,91 @@ package elasticsearch -// TODO: check decoded length == 15 bytes -// TODO: bytes 7-12 are mac address -// TODO: test consecutive -// TODO: benchmarking +import ( + "encoding/base64" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetBase64UUIDLen(t *testing.T) { + id := GetBase64UUID() + + // Check that decoded ID is 15 bytes long + decodedBytes, err := base64.RawURLEncoding.DecodeString(id) + assert.NoError(t, err) + assert.Len(t, decodedBytes, 15) +} + +func TestGetBase64UUIDBytes(t *testing.T) { + id := GetBase64UUID() + + // Check that bytes 7-12 are secure munged mac address + decodedBytes, err := base64.RawURLEncoding.DecodeString(id) + assert.NoError(t, err) + assert.Equal(t, mac, decodedBytes[6:6+addrLen]) +} + +func TestGetBase64UUIDConsecutiveOrdering(t *testing.T) { + prevID := GetBase64UUID() + for i := 0; i < 10000; i++ { + decodedPrevID, err := base64.RawURLEncoding.DecodeString(prevID) + assert.NoError(t, err) + + currID := GetBase64UUID() + decodedCurrID, err := base64.RawURLEncoding.DecodeString(currID) + assert.NoError(t, err) + + // Check if current ID is greater than previous ID (accounting for + // wrap around of first byte). + if decodedCurrID[0] == 0x00 { // first byte wrapped around + // Check that previous ID's first byte was max possible byte value (0xff) + assert.EqualValues(t, decodedPrevID[0], 0xff) + + // Check that rest of current ID (after first byte) is greater than rest of + // previous ID (after first byte) + assert.True(t, isGreaterThan(decodedCurrID[1:], decodedPrevID[1:])) + } else { + // Check that current ID's first byte is exactly 1 more than previous ID's + // first byte + assert.Equal(t, decodedPrevID[0]+1, decodedCurrID[0]) + + // Check that entire current ID is greater than entire previous ID + assert.True(t, isGreaterThan(decodedCurrID, decodedPrevID)) + } + + prevID = currID + } +} + +func BenchmarkGetBase64UUID(b *testing.B) { + for n := 0; n < b.N; n++ { + GetBase64UUID() + } +} + +func isGreaterThan(b1, b2 []byte) bool { + if len(b1) > len(b2) { + return true + } + + if len(b2) < len(b1) { + return false + } + + if len(b1) == 0 { + return false + } + + // Lengths are equal and at least 1, compare values + + if b1[0] < b2[0] { + return false + } + + if b1[0] > b2[0] { + return true + } + + return isGreaterThan(b1[1:], b2[1:]) +} From f405381470976a2d99b9af9716a133d0645d1113 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 15 Nov 2019 11:08:02 -0800 Subject: [PATCH 23/50] Remove TODO after experimenting with IIFE (perf was worse) --- libbeat/processors/uuid/generator/elasticsearch/generator.go | 1 - 1 file changed, 1 deletion(-) diff --git a/libbeat/processors/uuid/generator/elasticsearch/generator.go b/libbeat/processors/uuid/generator/elasticsearch/generator.go index 9c487e930507..e16bd6aeb1ae 100644 --- a/libbeat/processors/uuid/generator/elasticsearch/generator.go +++ b/libbeat/processors/uuid/generator/elasticsearch/generator.go @@ -43,7 +43,6 @@ func init() { } func GetBase64UUID() string { - // TODO: consider moving only critical section into IIFE? Check after benchmarking. mu.Lock() defer mu.Unlock() From 7245204415f49c5a37c8b7bb4f13a729f8342f93 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 15 Nov 2019 11:33:57 -0800 Subject: [PATCH 24/50] Moving doc --- libbeat/processors/uuid/docs/uuid.asciidoc | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 libbeat/processors/uuid/docs/uuid.asciidoc diff --git a/libbeat/processors/uuid/docs/uuid.asciidoc b/libbeat/processors/uuid/docs/uuid.asciidoc new file mode 100644 index 000000000000..3768e0ccf3cc --- /dev/null +++ b/libbeat/processors/uuid/docs/uuid.asciidoc @@ -0,0 +1,18 @@ +[[uuid]] +=== Generate UUID for an event + +The `uuid` processor generates a random but roughly ordered UUID for an event. + +[source,yaml] +----------------------------------------------------- +processors: + - elasticsearch_id: ~ +----------------------------------------------------- + +The following settings are supported: + +`target_field`:: (Optional) Field in which the generated ID should be stored. Default is `@metadata.id`. + +`type`:: (Optional) Type of ID to generate. Currently only `elasticsearch` is supported and is the default. +The `elasticsearch` type generates IDs using the same algorithm that Elasticsearch uses for auto-generating +its document IDs. From 3f9652a8932a8d98b9390555a760a6a1d007d099 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 15 Nov 2019 11:43:02 -0800 Subject: [PATCH 25/50] Adding UUID processor to list in docs --- libbeat/docs/processors-list.asciidoc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libbeat/docs/processors-list.asciidoc b/libbeat/docs/processors-list.asciidoc index b2e9daec7dfc..7116da9ecdbb 100644 --- a/libbeat/docs/processors-list.asciidoc +++ b/libbeat/docs/processors-list.asciidoc @@ -92,6 +92,9 @@ endif::[] ifndef::no_truncate_fields_processor[] * <> endif::[] +ifndef::no_uuid_processor[] +* <> +endif::[] //# end::processors-list[] //# tag::processors-include[] @@ -185,6 +188,9 @@ endif::[] ifndef::no_truncate_fields_processor[] include::{libbeat-processors-dir}/actions/docs/truncate_fields.asciidoc[] endif::[] +ifndef::no_uuid_processor[] +include::{libbeat-processors-dir}/uuid/docs/uuid.asciidoc[] +endif::[] //# end::processors-include[] From 63fe346e20238459f5586d090e16949a5f4d6561 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 20 Nov 2019 04:23:55 -0800 Subject: [PATCH 26/50] Apply suggestions from docs code review Co-Authored-By: DeDe Morton --- libbeat/docs/processors-list.asciidoc | 2 +- libbeat/processors/uuid/docs/uuid.asciidoc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libbeat/docs/processors-list.asciidoc b/libbeat/docs/processors-list.asciidoc index 7116da9ecdbb..49ab57c5e720 100644 --- a/libbeat/docs/processors-list.asciidoc +++ b/libbeat/docs/processors-list.asciidoc @@ -93,7 +93,7 @@ ifndef::no_truncate_fields_processor[] * <> endif::[] ifndef::no_uuid_processor[] -* <> +* <> endif::[] //# end::processors-list[] diff --git a/libbeat/processors/uuid/docs/uuid.asciidoc b/libbeat/processors/uuid/docs/uuid.asciidoc index 3768e0ccf3cc..b24cd68da079 100644 --- a/libbeat/processors/uuid/docs/uuid.asciidoc +++ b/libbeat/processors/uuid/docs/uuid.asciidoc @@ -11,8 +11,8 @@ processors: The following settings are supported: -`target_field`:: (Optional) Field in which the generated ID should be stored. Default is `@metadata.id`. +`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata.id`. `type`:: (Optional) Type of ID to generate. Currently only `elasticsearch` is supported and is the default. The `elasticsearch` type generates IDs using the same algorithm that Elasticsearch uses for auto-generating -its document IDs. +document IDs. From 63a96fced1fbc697a739b578542b37ffde32ff1b Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 21 Nov 2019 06:16:15 -0800 Subject: [PATCH 27/50] Adding godoc --- libbeat/processors/uuid/generator/elasticsearch/generator.go | 5 +++-- libbeat/processors/uuid/generator/elasticsearch/mac.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/libbeat/processors/uuid/generator/elasticsearch/generator.go b/libbeat/processors/uuid/generator/elasticsearch/generator.go index e16bd6aeb1ae..23c47788c857 100644 --- a/libbeat/processors/uuid/generator/elasticsearch/generator.go +++ b/libbeat/processors/uuid/generator/elasticsearch/generator.go @@ -24,8 +24,6 @@ import ( "time" ) -// Golang port of https://github.com/elastic/elasticsearch/blob/a666fb22664284d8e2114841ebb58ea4e1924691/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java - var ( sequenceNumber uint32 lastTimestamp *time.Time @@ -42,6 +40,9 @@ func init() { sequenceNumber = rand.Uint32() } +// GetBase64UUID returns a base64-encoded, randomly-generated, but roughly ordered (over time), unique +// ID. The algorithm used to generate the ID is the same as used by Elasticsearch. +// See https://github.com/elastic/elasticsearch/blob/a666fb2266/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java func GetBase64UUID() string { mu.Lock() defer mu.Unlock() diff --git a/libbeat/processors/uuid/generator/elasticsearch/mac.go b/libbeat/processors/uuid/generator/elasticsearch/mac.go index 8398d4e4c2fc..96946edf1cdf 100644 --- a/libbeat/processors/uuid/generator/elasticsearch/mac.go +++ b/libbeat/processors/uuid/generator/elasticsearch/mac.go @@ -22,7 +22,7 @@ import ( "net" ) -// Golang port of https://github.com/elastic/elasticsearch/blob/a666fb22664284d8e2114841ebb58ea4e1924691/server/src/main/java/org/elasticsearch/common/MacAddressProvider.java +// Golang port of https://github.com/elastic/elasticsearch/blob/a666fb2266/server/src/main/java/org/elasticsearch/common/MacAddressProvider.java type id []byte From fc0e6bb46993b3b173071c4d7db12430caebef8c Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 21 Nov 2019 06:17:07 -0800 Subject: [PATCH 28/50] Rename generator function type --- libbeat/processors/uuid/generator/generator.go | 4 ++-- libbeat/processors/uuid/generator/generator_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libbeat/processors/uuid/generator/generator.go b/libbeat/processors/uuid/generator/generator.go index 048cd83b2eb5..b7e2c04b0e4b 100644 --- a/libbeat/processors/uuid/generator/generator.go +++ b/libbeat/processors/uuid/generator/generator.go @@ -19,9 +19,9 @@ package generator import "github.com/elastic/beats/libbeat/processors/uuid/generator/elasticsearch" -type generatorFn func() string +type Fn func() string -func Factory(typ string) (generatorFn, error) { +func Factory(typ string) (Fn, error) { switch typ { case "elasticsearch": return elasticsearch.GetBase64UUID, nil diff --git a/libbeat/processors/uuid/generator/generator_test.go b/libbeat/processors/uuid/generator/generator_test.go index 64294cbadaff..80c18c4dcce5 100644 --- a/libbeat/processors/uuid/generator/generator_test.go +++ b/libbeat/processors/uuid/generator/generator_test.go @@ -29,7 +29,7 @@ import ( func TestFactory(t *testing.T) { tests := map[string]struct { - expectedGeneratorFn generatorFn + expectedGeneratorFn Fn expectedErr error }{ "elasticsearch": { @@ -58,6 +58,6 @@ func TestFactory(t *testing.T) { } } -func getGeneratorFuncName(fn generatorFn) string { +func getGeneratorFuncName(fn Fn) string { return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() } From 539188dbad78f38500fe778c345da6d91da20d5a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 21 Nov 2019 06:18:05 -0800 Subject: [PATCH 29/50] Exporting and adding godoc --- libbeat/processors/uuid/generator/generator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/processors/uuid/generator/generator.go b/libbeat/processors/uuid/generator/generator.go index b7e2c04b0e4b..028bb0df5222 100644 --- a/libbeat/processors/uuid/generator/generator.go +++ b/libbeat/processors/uuid/generator/generator.go @@ -19,6 +19,7 @@ package generator import "github.com/elastic/beats/libbeat/processors/uuid/generator/elasticsearch" +// Fn represents a random, unique ID generator function. type Fn func() string func Factory(typ string) (Fn, error) { From ef1e22cb3572b8f74258dcc685ecc4e778675486 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 21 Nov 2019 06:20:29 -0800 Subject: [PATCH 30/50] Adding godoc --- libbeat/processors/uuid/generator/generator.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libbeat/processors/uuid/generator/generator.go b/libbeat/processors/uuid/generator/generator.go index 028bb0df5222..fd4c5c87e7b6 100644 --- a/libbeat/processors/uuid/generator/generator.go +++ b/libbeat/processors/uuid/generator/generator.go @@ -22,6 +22,8 @@ import "github.com/elastic/beats/libbeat/processors/uuid/generator/elasticsearch // Fn represents a random, unique ID generator function. type Fn func() string +// Factory takes as input the type of ID to generate and returns the generator +// function for that ID type. func Factory(typ string) (Fn, error) { switch typ { case "elasticsearch": From 848c35e26ec8fa6e9fb9f80eb46f916f76fa662c Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 21 Nov 2019 06:25:48 -0800 Subject: [PATCH 31/50] Updating godoc --- libbeat/processors/uuid/generator/generator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/uuid/generator/generator.go b/libbeat/processors/uuid/generator/generator.go index fd4c5c87e7b6..3863f467aed6 100644 --- a/libbeat/processors/uuid/generator/generator.go +++ b/libbeat/processors/uuid/generator/generator.go @@ -19,7 +19,7 @@ package generator import "github.com/elastic/beats/libbeat/processors/uuid/generator/elasticsearch" -// Fn represents a random, unique ID generator function. +// Fn represents an ID generator function. type Fn func() string // Factory takes as input the type of ID to generate and returns the generator From 6d604d9454fe558207c4c5a9f0cd5756687ec2c8 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 22 Nov 2019 12:30:17 -0800 Subject: [PATCH 32/50] Adding Unwrap error methods --- libbeat/processors/uuid/errors.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libbeat/processors/uuid/errors.go b/libbeat/processors/uuid/errors.go index 7ab5b0de9b78..58a90fd182ec 100644 --- a/libbeat/processors/uuid/errors.go +++ b/libbeat/processors/uuid/errors.go @@ -33,6 +33,9 @@ func makeErrConfigUnpack(cause error) errConfigUnpack { func (e errConfigUnpack) Error() string { return fmt.Sprintf("failed to unpack %v processor configuration: %v", processorName, e.cause) } +func (e errConfigUnpack) Unwrap() error { + return e.cause +} func makeErrComputeID(cause error) errComputeID { return errComputeID{cause} @@ -40,6 +43,9 @@ func makeErrComputeID(cause error) errComputeID { func (e errComputeID) Error() string { return fmt.Sprintf("failed to compute ID: %v", e.cause) } +func (e errComputeID) Unwrap() error { + return e.cause +} func makeErrUnknownType(typ string) errUnknownType { return errUnknownType{typ} From c89d3c49af86485f2e30dbeabf22b3bff6dd3a74 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 5 Dec 2019 04:30:02 -0800 Subject: [PATCH 33/50] Moving ES ID generator into generators package + singleton construction --- .../generator.go => es_generator.go} | 25 +++++++++++++------ ...generator_test.go => es_generator_test.go} | 25 +++++++++++-------- .../processors/uuid/generator/generator.go | 12 +++++---- .../uuid/generator/{elasticsearch => }/mac.go | 2 +- .../generator/{elasticsearch => }/mac_test.go | 2 +- 5 files changed, 41 insertions(+), 25 deletions(-) rename libbeat/processors/uuid/generator/{elasticsearch/generator.go => es_generator.go} (87%) rename libbeat/processors/uuid/generator/{elasticsearch/generator_test.go => es_generator_test.go} (87%) rename libbeat/processors/uuid/generator/{elasticsearch => }/mac.go (99%) rename libbeat/processors/uuid/generator/{elasticsearch => }/mac_test.go (99%) diff --git a/libbeat/processors/uuid/generator/elasticsearch/generator.go b/libbeat/processors/uuid/generator/es_generator.go similarity index 87% rename from libbeat/processors/uuid/generator/elasticsearch/generator.go rename to libbeat/processors/uuid/generator/es_generator.go index 23c47788c857..fda179a242c9 100644 --- a/libbeat/processors/uuid/generator/elasticsearch/generator.go +++ b/libbeat/processors/uuid/generator/es_generator.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch +package generator import ( "encoding/base64" @@ -24,9 +24,18 @@ import ( "time" ) +type esTimeBasedUUIDGenerator struct{} + +// Singleton instance and constructor returning it +var _esTimeBasedUUIDGenerator IDGenerator = (*esTimeBasedUUIDGenerator)(nil) + +func ESTimeBasedUUIDGenerator() IDGenerator { + return _esTimeBasedUUIDGenerator +} + var ( sequenceNumber uint32 - lastTimestamp *time.Time + lastTimestamp time.Time mac []byte mu sync.Mutex ) @@ -40,10 +49,10 @@ func init() { sequenceNumber = rand.Uint32() } -// GetBase64UUID returns a base64-encoded, randomly-generated, but roughly ordered (over time), unique +// NextID returns a base64-encoded, randomly-generated, but roughly ordered (over time), unique // ID. The algorithm used to generate the ID is the same as used by Elasticsearch. // See https://github.com/elastic/elasticsearch/blob/a666fb2266/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java -func GetBase64UUID() string { +func (_ *esTimeBasedUUIDGenerator) NextID() string { mu.Lock() defer mu.Unlock() @@ -93,19 +102,19 @@ func GetBase64UUID() string { return base64.RawURLEncoding.EncodeToString(uuidBytes) } -func getTimestamp() *time.Time { +func getTimestamp() time.Time { // Don't let timestamp go backwards, at least "on our watch" (while this process is running). We are still vulnerable if we are // shut down, clock goes backwards, and we restart... for this we randomize the sequenceNumber on init to decrease chance of // collision. now := time.Now() - if lastTimestamp == nil { - return &now + if lastTimestamp.IsZero() { + return now } if lastTimestamp.After(now) { return lastTimestamp } - return &now + return now } diff --git a/libbeat/processors/uuid/generator/elasticsearch/generator_test.go b/libbeat/processors/uuid/generator/es_generator_test.go similarity index 87% rename from libbeat/processors/uuid/generator/elasticsearch/generator_test.go rename to libbeat/processors/uuid/generator/es_generator_test.go index 5b1fac929cdd..fe7e71198301 100644 --- a/libbeat/processors/uuid/generator/elasticsearch/generator_test.go +++ b/libbeat/processors/uuid/generator/es_generator_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch +package generator import ( "encoding/base64" @@ -24,8 +24,9 @@ import ( "github.com/stretchr/testify/assert" ) -func TestGetBase64UUIDLen(t *testing.T) { - id := GetBase64UUID() +func TestIDLen(t *testing.T) { + g := ESTimeBasedUUIDGenerator() + id := g.NextID() // Check that decoded ID is 15 bytes long decodedBytes, err := base64.RawURLEncoding.DecodeString(id) @@ -33,8 +34,9 @@ func TestGetBase64UUIDLen(t *testing.T) { assert.Len(t, decodedBytes, 15) } -func TestGetBase64UUIDBytes(t *testing.T) { - id := GetBase64UUID() +func TestIDDBytes(t *testing.T) { + g := ESTimeBasedUUIDGenerator() + id := g.NextID() // Check that bytes 7-12 are secure munged mac address decodedBytes, err := base64.RawURLEncoding.DecodeString(id) @@ -42,13 +44,15 @@ func TestGetBase64UUIDBytes(t *testing.T) { assert.Equal(t, mac, decodedBytes[6:6+addrLen]) } -func TestGetBase64UUIDConsecutiveOrdering(t *testing.T) { - prevID := GetBase64UUID() +func TestIDConsecutiveOrdering(t *testing.T) { + g := ESTimeBasedUUIDGenerator() + + prevID := g.NextID() for i := 0; i < 10000; i++ { decodedPrevID, err := base64.RawURLEncoding.DecodeString(prevID) assert.NoError(t, err) - currID := GetBase64UUID() + currID := g.NextID() decodedCurrID, err := base64.RawURLEncoding.DecodeString(currID) assert.NoError(t, err) @@ -74,9 +78,10 @@ func TestGetBase64UUIDConsecutiveOrdering(t *testing.T) { } } -func BenchmarkGetBase64UUID(b *testing.B) { +func BenchmarkID(b *testing.B) { + g := ESTimeBasedUUIDGenerator() for n := 0; n < b.N; n++ { - GetBase64UUID() + g.NextID() } } diff --git a/libbeat/processors/uuid/generator/generator.go b/libbeat/processors/uuid/generator/generator.go index 3863f467aed6..5fa4524d8ba3 100644 --- a/libbeat/processors/uuid/generator/generator.go +++ b/libbeat/processors/uuid/generator/generator.go @@ -17,18 +17,20 @@ package generator -import "github.com/elastic/beats/libbeat/processors/uuid/generator/elasticsearch" - // Fn represents an ID generator function. type Fn func() string -// Factory takes as input the type of ID to generate and returns the generator -// function for that ID type. +// Factory takes as input the type of ID to generate and returns the constructor +// for the generator of that ID type. func Factory(typ string) (Fn, error) { switch typ { case "elasticsearch": - return elasticsearch.GetBase64UUID, nil + return ESTimeBasedUUIDGenerator, nil default: return nil, makeErrUnknownType(typ) } } + +type IDGenerator interface { + NextID() string +} diff --git a/libbeat/processors/uuid/generator/elasticsearch/mac.go b/libbeat/processors/uuid/generator/mac.go similarity index 99% rename from libbeat/processors/uuid/generator/elasticsearch/mac.go rename to libbeat/processors/uuid/generator/mac.go index 96946edf1cdf..54237d7fa480 100644 --- a/libbeat/processors/uuid/generator/elasticsearch/mac.go +++ b/libbeat/processors/uuid/generator/mac.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch +package generator import ( "crypto/rand" diff --git a/libbeat/processors/uuid/generator/elasticsearch/mac_test.go b/libbeat/processors/uuid/generator/mac_test.go similarity index 99% rename from libbeat/processors/uuid/generator/elasticsearch/mac_test.go rename to libbeat/processors/uuid/generator/mac_test.go index f9629143378b..ec24439bab6e 100644 --- a/libbeat/processors/uuid/generator/elasticsearch/mac_test.go +++ b/libbeat/processors/uuid/generator/mac_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package elasticsearch +package generator import ( "net" From 1e62ad792fdd90bbe5671d325a6b359fba01b9ca Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 5 Dec 2019 04:41:12 -0800 Subject: [PATCH 34/50] Addressing Hound feedback --- libbeat/processors/uuid/generator/es_generator.go | 5 +++-- libbeat/processors/uuid/generator/generator.go | 11 ++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/libbeat/processors/uuid/generator/es_generator.go b/libbeat/processors/uuid/generator/es_generator.go index fda179a242c9..fc3a52431b4e 100644 --- a/libbeat/processors/uuid/generator/es_generator.go +++ b/libbeat/processors/uuid/generator/es_generator.go @@ -26,9 +26,10 @@ import ( type esTimeBasedUUIDGenerator struct{} -// Singleton instance and constructor returning it +// Singleton instance var _esTimeBasedUUIDGenerator IDGenerator = (*esTimeBasedUUIDGenerator)(nil) +// ESTimeBasedUUIDGenerator returns the singleton instance for this generator func ESTimeBasedUUIDGenerator() IDGenerator { return _esTimeBasedUUIDGenerator } @@ -52,7 +53,7 @@ func init() { // NextID returns a base64-encoded, randomly-generated, but roughly ordered (over time), unique // ID. The algorithm used to generate the ID is the same as used by Elasticsearch. // See https://github.com/elastic/elasticsearch/blob/a666fb2266/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java -func (_ *esTimeBasedUUIDGenerator) NextID() string { +func (*esTimeBasedUUIDGenerator) NextID() string { mu.Lock() defer mu.Unlock() diff --git a/libbeat/processors/uuid/generator/generator.go b/libbeat/processors/uuid/generator/generator.go index 5fa4524d8ba3..96ceab0b59db 100644 --- a/libbeat/processors/uuid/generator/generator.go +++ b/libbeat/processors/uuid/generator/generator.go @@ -17,7 +17,12 @@ package generator -// Fn represents an ID generator function. +// IDGenerator implementors know how to generate and return a new ID +type IDGenerator interface { + NextID() string +} + +// Fn represents an ID generator constructor function. type Fn func() string // Factory takes as input the type of ID to generate and returns the constructor @@ -30,7 +35,3 @@ func Factory(typ string) (Fn, error) { return nil, makeErrUnknownType(typ) } } - -type IDGenerator interface { - NextID() string -} From fb6e89d66a6cbeb4a20e050eff62597de83fb372 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 5 Dec 2019 04:59:57 -0800 Subject: [PATCH 35/50] Renaming processor to `add_id` --- libbeat/docs/processors-list.asciidoc | 12 ++++++------ .../processors/{uuid/uuid.go => add_id/add_id.go} | 2 +- .../{uuid/uuid_test.go => add_id/add_id_test.go} | 2 +- libbeat/processors/{uuid => add_id}/config.go | 4 ++-- .../uuid.asciidoc => add_id/docs/add_id.asciidoc} | 8 ++++---- libbeat/processors/{uuid => add_id}/errors.go | 2 +- .../processors/{uuid => add_id}/generator/errors.go | 0 .../{uuid => add_id}/generator/es_generator.go | 0 .../{uuid => add_id}/generator/es_generator_test.go | 0 .../{uuid => add_id}/generator/generator.go | 0 .../{uuid => add_id}/generator/generator_test.go | 0 libbeat/processors/{uuid => add_id}/generator/mac.go | 0 .../{uuid => add_id}/generator/mac_test.go | 0 13 files changed, 15 insertions(+), 15 deletions(-) rename libbeat/processors/{uuid/uuid.go => add_id/add_id.go} (99%) rename libbeat/processors/{uuid/uuid_test.go => add_id/add_id_test.go} (99%) rename libbeat/processors/{uuid => add_id}/config.go (95%) rename libbeat/processors/{uuid/docs/uuid.asciidoc => add_id/docs/add_id.asciidoc} (78%) rename libbeat/processors/{uuid => add_id}/errors.go (99%) rename libbeat/processors/{uuid => add_id}/generator/errors.go (100%) rename libbeat/processors/{uuid => add_id}/generator/es_generator.go (100%) rename libbeat/processors/{uuid => add_id}/generator/es_generator_test.go (100%) rename libbeat/processors/{uuid => add_id}/generator/generator.go (100%) rename libbeat/processors/{uuid => add_id}/generator/generator_test.go (100%) rename libbeat/processors/{uuid => add_id}/generator/mac.go (100%) rename libbeat/processors/{uuid => add_id}/generator/mac_test.go (100%) diff --git a/libbeat/docs/processors-list.asciidoc b/libbeat/docs/processors-list.asciidoc index 49ab57c5e720..a9a4356377ae 100644 --- a/libbeat/docs/processors-list.asciidoc +++ b/libbeat/docs/processors-list.asciidoc @@ -14,6 +14,9 @@ endif::[] ifndef::no_add_host_metadata_processor[] * <> endif::[] +ifndef::no_add_id_processor[] +* <> +endif::[] ifndef::no_add_kubernetes_metadata_processor[] * <> endif::[] @@ -92,9 +95,6 @@ endif::[] ifndef::no_truncate_fields_processor[] * <> endif::[] -ifndef::no_uuid_processor[] -* <> -endif::[] //# end::processors-list[] //# tag::processors-include[] @@ -110,6 +110,9 @@ endif::[] ifndef::no_add_host_metadata_processor[] include::{libbeat-processors-dir}/add_host_metadata/docs/add_host_metadata.asciidoc[] endif::[] +ifndef::no_add_id[] +include::{libbeat-processors-dir}/add_id/docs/add_id.asciidoc[] +endif::[] ifndef::no_add_kubernetes_metadata_processor[] include::{libbeat-processors-dir}/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc[] endif::[] @@ -188,9 +191,6 @@ endif::[] ifndef::no_truncate_fields_processor[] include::{libbeat-processors-dir}/actions/docs/truncate_fields.asciidoc[] endif::[] -ifndef::no_uuid_processor[] -include::{libbeat-processors-dir}/uuid/docs/uuid.asciidoc[] -endif::[] //# end::processors-include[] diff --git a/libbeat/processors/uuid/uuid.go b/libbeat/processors/add_id/add_id.go similarity index 99% rename from libbeat/processors/uuid/uuid.go rename to libbeat/processors/add_id/add_id.go index 621175927ec6..d03b4b2b1ecc 100644 --- a/libbeat/processors/uuid/uuid.go +++ b/libbeat/processors/add_id/add_id.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package uuid +package add_id import ( "fmt" diff --git a/libbeat/processors/uuid/uuid_test.go b/libbeat/processors/add_id/add_id_test.go similarity index 99% rename from libbeat/processors/uuid/uuid_test.go rename to libbeat/processors/add_id/add_id_test.go index 9c2e9ff27ded..a7bd9baa8c5a 100644 --- a/libbeat/processors/uuid/uuid_test.go +++ b/libbeat/processors/add_id/add_id_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package uuid +package add_id import ( "testing" diff --git a/libbeat/processors/uuid/config.go b/libbeat/processors/add_id/config.go similarity index 95% rename from libbeat/processors/uuid/config.go rename to libbeat/processors/add_id/config.go index 19fe06c4ae01..c0f7bcacab4d 100644 --- a/libbeat/processors/uuid/config.go +++ b/libbeat/processors/add_id/config.go @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -package uuid +package add_id -// Config for Elasticsearch ID processor. +// Config for Add ID processor. type Config struct { TargetField string `config:"target_field"` // Target field for the ID Type string `config:"type"` // Type of ID diff --git a/libbeat/processors/uuid/docs/uuid.asciidoc b/libbeat/processors/add_id/docs/add_id.asciidoc similarity index 78% rename from libbeat/processors/uuid/docs/uuid.asciidoc rename to libbeat/processors/add_id/docs/add_id.asciidoc index b24cd68da079..0bc56dca1698 100644 --- a/libbeat/processors/uuid/docs/uuid.asciidoc +++ b/libbeat/processors/add_id/docs/add_id.asciidoc @@ -1,12 +1,12 @@ -[[uuid]] -=== Generate UUID for an event +[[add_id]] +=== Generate an ID for an event -The `uuid` processor generates a random but roughly ordered UUID for an event. +The `add_id` processor generates a unique ID for an event. [source,yaml] ----------------------------------------------------- processors: - - elasticsearch_id: ~ + - add_id: ~ ----------------------------------------------------- The following settings are supported: diff --git a/libbeat/processors/uuid/errors.go b/libbeat/processors/add_id/errors.go similarity index 99% rename from libbeat/processors/uuid/errors.go rename to libbeat/processors/add_id/errors.go index 58a90fd182ec..59fc45494bd6 100644 --- a/libbeat/processors/uuid/errors.go +++ b/libbeat/processors/add_id/errors.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package uuid +package add_id import ( "fmt" diff --git a/libbeat/processors/uuid/generator/errors.go b/libbeat/processors/add_id/generator/errors.go similarity index 100% rename from libbeat/processors/uuid/generator/errors.go rename to libbeat/processors/add_id/generator/errors.go diff --git a/libbeat/processors/uuid/generator/es_generator.go b/libbeat/processors/add_id/generator/es_generator.go similarity index 100% rename from libbeat/processors/uuid/generator/es_generator.go rename to libbeat/processors/add_id/generator/es_generator.go diff --git a/libbeat/processors/uuid/generator/es_generator_test.go b/libbeat/processors/add_id/generator/es_generator_test.go similarity index 100% rename from libbeat/processors/uuid/generator/es_generator_test.go rename to libbeat/processors/add_id/generator/es_generator_test.go diff --git a/libbeat/processors/uuid/generator/generator.go b/libbeat/processors/add_id/generator/generator.go similarity index 100% rename from libbeat/processors/uuid/generator/generator.go rename to libbeat/processors/add_id/generator/generator.go diff --git a/libbeat/processors/uuid/generator/generator_test.go b/libbeat/processors/add_id/generator/generator_test.go similarity index 100% rename from libbeat/processors/uuid/generator/generator_test.go rename to libbeat/processors/add_id/generator/generator_test.go diff --git a/libbeat/processors/uuid/generator/mac.go b/libbeat/processors/add_id/generator/mac.go similarity index 100% rename from libbeat/processors/uuid/generator/mac.go rename to libbeat/processors/add_id/generator/mac.go diff --git a/libbeat/processors/uuid/generator/mac_test.go b/libbeat/processors/add_id/generator/mac_test.go similarity index 100% rename from libbeat/processors/uuid/generator/mac_test.go rename to libbeat/processors/add_id/generator/mac_test.go From e00cb7b9379eb357271b34210a6a56a70c33af20 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 5 Dec 2019 05:06:25 -0800 Subject: [PATCH 36/50] Updating processor name in CHANGELOG entry --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8a61352fdd1d..377457841149 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -360,7 +360,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add consumer_lag in Kafka consumergroup metricset {pull}14822[14822] - Make use of consumer_lag in Kafka dashboard {pull}14863[14863] - Refactor kubernetes autodiscover to enable different resource based discovery {pull}14738[14738] -- Add `elasticsearch_id` processor. {pull}14524[14524] +- Add `add_id` processor. {pull}14524[14524] *Auditbeat* From 439b56f6329d92b8302116381dde946953a2e74f Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 5 Dec 2019 05:18:54 -0800 Subject: [PATCH 37/50] More refactoring updates --- libbeat/processors/add_id/add_id.go | 18 +++++++++--------- .../add_id/generator/generator_test.go | 10 ++++------ 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/libbeat/processors/add_id/add_id.go b/libbeat/processors/add_id/add_id.go index d03b4b2b1ecc..98fb578b15af 100644 --- a/libbeat/processors/add_id/add_id.go +++ b/libbeat/processors/add_id/add_id.go @@ -20,7 +20,7 @@ package add_id import ( "fmt" - "github.com/elastic/beats/libbeat/processors/uuid/generator" + "github.com/elastic/beats/libbeat/processors/add_id/generator" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" @@ -29,24 +29,24 @@ import ( ) func init() { - processors.RegisterPlugin("uuid", New) - jsprocessor.RegisterPlugin("UUID", New) + processors.RegisterPlugin("add_id", New) + jsprocessor.RegisterPlugin("AddID", New) } -const processorName = "uuid" +const processorName = "add_id" -type uuid struct { +type addID struct { config Config } -// New constructs a new UUID processor. +// New constructs a new Add ID processor. func New(cfg *common.Config) (processors.Processor, error) { config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return nil, makeErrConfigUnpack(err) } - p := &uuid{ + p := &addID{ config, } @@ -54,7 +54,7 @@ func New(cfg *common.Config) (processors.Processor, error) { } // Run enriches the given event with an ID -func (p *uuid) Run(event *beat.Event) (*beat.Event, error) { +func (p *addID) Run(event *beat.Event) (*beat.Event, error) { idFn, err := generator.Factory(p.config.Type) if err != nil { return nil, makeErrComputeID(err) @@ -68,6 +68,6 @@ func (p *uuid) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (p *uuid) String() string { +func (p *addID) String() string { return fmt.Sprintf("%v=[target_field=[%v]]", processorName, p.config.TargetField) } diff --git a/libbeat/processors/add_id/generator/generator_test.go b/libbeat/processors/add_id/generator/generator_test.go index 80c18c4dcce5..a3302f88a731 100644 --- a/libbeat/processors/add_id/generator/generator_test.go +++ b/libbeat/processors/add_id/generator/generator_test.go @@ -22,18 +22,16 @@ import ( "runtime" "testing" - "github.com/elastic/beats/libbeat/processors/uuid/generator/elasticsearch" - "github.com/stretchr/testify/assert" ) func TestFactory(t *testing.T) { tests := map[string]struct { - expectedGeneratorFn Fn + expectedIDGenerator func() IDGenerator expectedErr error }{ "elasticsearch": { - elasticsearch.GetBase64UUID, + ESTimeBasedUUIDGenerator, nil, }, "foobar": { @@ -46,9 +44,9 @@ func TestFactory(t *testing.T) { t.Run(name, func(t *testing.T) { typ := name fn, err := Factory(typ) - if test.expectedGeneratorFn != nil { + if test.expectedIDGenerator != nil { fnName := getGeneratorFuncName(fn) - expectedFnName := getGeneratorFuncName(test.expectedGeneratorFn) + expectedFnName := getGeneratorFuncName(test.expectedIDGenerator) assert.Equal(t, fnName, expectedFnName) } if test.expectedErr != nil { From 92637a5d7919fce129f3b41b1365cf8a8837eaf7 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 5 Dec 2019 05:30:38 -0800 Subject: [PATCH 38/50] Fixing more vet errors --- libbeat/processors/add_id/generator/generator.go | 2 +- .../processors/add_id/generator/generator_test.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/libbeat/processors/add_id/generator/generator.go b/libbeat/processors/add_id/generator/generator.go index 96ceab0b59db..a117e29c6659 100644 --- a/libbeat/processors/add_id/generator/generator.go +++ b/libbeat/processors/add_id/generator/generator.go @@ -27,7 +27,7 @@ type Fn func() string // Factory takes as input the type of ID to generate and returns the constructor // for the generator of that ID type. -func Factory(typ string) (Fn, error) { +func Factory(typ string) (func() IDGenerator, error) { switch typ { case "elasticsearch": return ESTimeBasedUUIDGenerator, nil diff --git a/libbeat/processors/add_id/generator/generator_test.go b/libbeat/processors/add_id/generator/generator_test.go index a3302f88a731..a3c31b2df4b1 100644 --- a/libbeat/processors/add_id/generator/generator_test.go +++ b/libbeat/processors/add_id/generator/generator_test.go @@ -43,11 +43,11 @@ func TestFactory(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { typ := name - fn, err := Factory(typ) + ctor, err := Factory(typ) if test.expectedIDGenerator != nil { - fnName := getGeneratorFuncName(fn) - expectedFnName := getGeneratorFuncName(test.expectedIDGenerator) - assert.Equal(t, fnName, expectedFnName) + ctorName := getGeneratorFuncName(ctor) + expectedCtorName := getGeneratorFuncName(test.expectedIDGenerator) + assert.Equal(t, ctorName, expectedCtorName) } if test.expectedErr != nil { assert.EqualError(t, err, test.expectedErr.Error()) @@ -56,6 +56,6 @@ func TestFactory(t *testing.T) { } } -func getGeneratorFuncName(fn Fn) string { - return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() +func getGeneratorFuncName(ctor func() IDGenerator) string { + return runtime.FuncForPC(reflect.ValueOf(ctor).Pointer()).Name() } From b7fe6fbb96792949e5d35e05da4a8c8b14f4874f Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 5 Dec 2019 05:36:51 -0800 Subject: [PATCH 39/50] Unexport config struct as it's only used within this package --- libbeat/processors/add_id/add_id.go | 2 +- libbeat/processors/add_id/config.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libbeat/processors/add_id/add_id.go b/libbeat/processors/add_id/add_id.go index 98fb578b15af..1a40e38c9040 100644 --- a/libbeat/processors/add_id/add_id.go +++ b/libbeat/processors/add_id/add_id.go @@ -36,7 +36,7 @@ func init() { const processorName = "add_id" type addID struct { - config Config + config config } // New constructs a new Add ID processor. diff --git a/libbeat/processors/add_id/config.go b/libbeat/processors/add_id/config.go index c0f7bcacab4d..8bd025d532e1 100644 --- a/libbeat/processors/add_id/config.go +++ b/libbeat/processors/add_id/config.go @@ -17,14 +17,14 @@ package add_id -// Config for Add ID processor. -type Config struct { +// configuration for Add ID processor. +type config struct { TargetField string `config:"target_field"` // Target field for the ID Type string `config:"type"` // Type of ID } -func defaultConfig() Config { - return Config{ +func defaultConfig() config { + return config{ TargetField: "@metadata.id", Type: "elasticsearch", } From d45e591d28ee65a15ab214126921ca2ad719ec9d Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 5 Dec 2019 05:45:13 -0800 Subject: [PATCH 40/50] Fixing doc anchor --- libbeat/processors/add_id/docs/add_id.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/add_id/docs/add_id.asciidoc b/libbeat/processors/add_id/docs/add_id.asciidoc index 0bc56dca1698..64d475669a0b 100644 --- a/libbeat/processors/add_id/docs/add_id.asciidoc +++ b/libbeat/processors/add_id/docs/add_id.asciidoc @@ -1,4 +1,4 @@ -[[add_id]] +[[add-id]] === Generate an ID for an event The `add_id` processor generates a unique ID for an event. From 0a764622d9eb7b71811a05ebed7a973816bed75f Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 5 Dec 2019 09:07:21 -0800 Subject: [PATCH 41/50] Moving generator construction to processor constructor; simplifying factory --- libbeat/processors/add_id/add_id.go | 13 +++++++----- .../processors/add_id/generator/generator.go | 11 ++++------ .../add_id/generator/generator_test.go | 20 ++++++------------- 3 files changed, 18 insertions(+), 26 deletions(-) diff --git a/libbeat/processors/add_id/add_id.go b/libbeat/processors/add_id/add_id.go index 1a40e38c9040..0df0a4e8af6a 100644 --- a/libbeat/processors/add_id/add_id.go +++ b/libbeat/processors/add_id/add_id.go @@ -37,6 +37,7 @@ const processorName = "add_id" type addID struct { config config + gen generator.IDGenerator } // New constructs a new Add ID processor. @@ -46,8 +47,14 @@ func New(cfg *common.Config) (processors.Processor, error) { return nil, makeErrConfigUnpack(err) } + gen, err := generator.Factory(p.config.Type) + if err != nil { + return nil, makeErrComputeID(err) + } + p := &addID{ config, + gen, } return p, nil @@ -55,12 +62,8 @@ func New(cfg *common.Config) (processors.Processor, error) { // Run enriches the given event with an ID func (p *addID) Run(event *beat.Event) (*beat.Event, error) { - idFn, err := generator.Factory(p.config.Type) - if err != nil { - return nil, makeErrComputeID(err) - } + id := p.gen.NextID() - id := idFn() if _, err := event.PutValue(p.config.TargetField, id); err != nil { return nil, makeErrComputeID(err) } diff --git a/libbeat/processors/add_id/generator/generator.go b/libbeat/processors/add_id/generator/generator.go index a117e29c6659..84c36bb48b18 100644 --- a/libbeat/processors/add_id/generator/generator.go +++ b/libbeat/processors/add_id/generator/generator.go @@ -22,15 +22,12 @@ type IDGenerator interface { NextID() string } -// Fn represents an ID generator constructor function. -type Fn func() string - -// Factory takes as input the type of ID to generate and returns the constructor -// for the generator of that ID type. -func Factory(typ string) (func() IDGenerator, error) { +// Factory takes as input the type of ID to generate and returns the +// generator of that ID type. +func Factory(typ string) (IDGenerator, error) { switch typ { case "elasticsearch": - return ESTimeBasedUUIDGenerator, nil + return ESTimeBasedUUIDGenerator(), nil default: return nil, makeErrUnknownType(typ) } diff --git a/libbeat/processors/add_id/generator/generator_test.go b/libbeat/processors/add_id/generator/generator_test.go index a3c31b2df4b1..f78c4ee6038d 100644 --- a/libbeat/processors/add_id/generator/generator_test.go +++ b/libbeat/processors/add_id/generator/generator_test.go @@ -18,8 +18,6 @@ package generator import ( - "reflect" - "runtime" "testing" "github.com/stretchr/testify/assert" @@ -27,11 +25,11 @@ import ( func TestFactory(t *testing.T) { tests := map[string]struct { - expectedIDGenerator func() IDGenerator - expectedErr error + expectedGen IDGenerator + expectedErr error }{ "elasticsearch": { - ESTimeBasedUUIDGenerator, + ESTimeBasedUUIDGenerator(), nil, }, "foobar": { @@ -43,11 +41,9 @@ func TestFactory(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { typ := name - ctor, err := Factory(typ) - if test.expectedIDGenerator != nil { - ctorName := getGeneratorFuncName(ctor) - expectedCtorName := getGeneratorFuncName(test.expectedIDGenerator) - assert.Equal(t, ctorName, expectedCtorName) + gen, err := Factory(typ) + if test.expectedGen != nil { + assert.Equal(t, test.expectedGen, gen) } if test.expectedErr != nil { assert.EqualError(t, err, test.expectedErr.Error()) @@ -55,7 +51,3 @@ func TestFactory(t *testing.T) { }) } } - -func getGeneratorFuncName(ctor func() IDGenerator) string { - return runtime.FuncForPC(reflect.ValueOf(ctor).Pointer()).Name() -} From 1b44e755c0cc6124c26f4d03b3ad6e36bd4d2819 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 5 Dec 2019 10:37:09 -0800 Subject: [PATCH 42/50] Fixing compile error --- libbeat/processors/add_id/add_id.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/add_id/add_id.go b/libbeat/processors/add_id/add_id.go index 0df0a4e8af6a..5df081460c33 100644 --- a/libbeat/processors/add_id/add_id.go +++ b/libbeat/processors/add_id/add_id.go @@ -47,7 +47,7 @@ func New(cfg *common.Config) (processors.Processor, error) { return nil, makeErrConfigUnpack(err) } - gen, err := generator.Factory(p.config.Type) + gen, err := generator.Factory(config.Type) if err != nil { return nil, makeErrComputeID(err) } From aa8739bfa9cb64596ba6336dcc978bdc105d4c7d Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 6 Dec 2019 06:01:07 -0800 Subject: [PATCH 43/50] Validate ID generator type in config --- libbeat/processors/add_id/config.go | 13 ++++++++++ .../processors/add_id/generator/generator.go | 26 ++++++++++++++----- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/libbeat/processors/add_id/config.go b/libbeat/processors/add_id/config.go index 8bd025d532e1..40b4d305de64 100644 --- a/libbeat/processors/add_id/config.go +++ b/libbeat/processors/add_id/config.go @@ -17,6 +17,10 @@ package add_id +import ( + "github.com/elastic/beats/libbeat/processors/add_id/generator" +) + // configuration for Add ID processor. type config struct { TargetField string `config:"target_field"` // Target field for the ID @@ -29,3 +33,12 @@ func defaultConfig() config { Type: "elasticsearch", } } + +func (c *config) Validate() error { + // Validate type of ID generator + if !generator.Exists(c.Type) { + return makeErrUnknownType(c.Type) + } + + return nil +} diff --git a/libbeat/processors/add_id/generator/generator.go b/libbeat/processors/add_id/generator/generator.go index 84c36bb48b18..ca94ecd64815 100644 --- a/libbeat/processors/add_id/generator/generator.go +++ b/libbeat/processors/add_id/generator/generator.go @@ -17,6 +17,12 @@ package generator +import "strings" + +var generators = map[string]IDGenerator{ + "elasticsearch": ESTimeBasedUUIDGenerator(), +} + // IDGenerator implementors know how to generate and return a new ID type IDGenerator interface { NextID() string @@ -24,11 +30,19 @@ type IDGenerator interface { // Factory takes as input the type of ID to generate and returns the // generator of that ID type. -func Factory(typ string) (IDGenerator, error) { - switch typ { - case "elasticsearch": - return ESTimeBasedUUIDGenerator(), nil - default: - return nil, makeErrUnknownType(typ) +func Factory(val string) (IDGenerator, error) { + typ := strings.ToLower(val) + g, found := generators[typ] + if !found { + return nil, makeErrUnknownType(val) } + + return g, nil +} + +// Exists returns whether the given type of ID generator exists. +func Exists(val string) bool { + typ := strings.ToLower(val) + _, found := generators[typ] + return found } From 7c794034d23d9524db10c46e2d26a847d4226d2d Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 6 Dec 2019 08:00:05 -0800 Subject: [PATCH 44/50] Finer-grained locking to reduce mutex contention --- .../add_id/generator/es_generator.go | 67 ++++++++++--------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/libbeat/processors/add_id/generator/es_generator.go b/libbeat/processors/add_id/generator/es_generator.go index fc3a52431b4e..28ddebed2ff3 100644 --- a/libbeat/processors/add_id/generator/es_generator.go +++ b/libbeat/processors/add_id/generator/es_generator.go @@ -54,6 +54,14 @@ func init() { // ID. The algorithm used to generate the ID is the same as used by Elasticsearch. // See https://github.com/elastic/elasticsearch/blob/a666fb2266/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java func (*esTimeBasedUUIDGenerator) NextID() string { + ts, seq := nextIDData() + var uuidBytes [15]byte + + packID(uuidBytes[:], ts, seq) + return base64.RawURLEncoding.EncodeToString(uuidBytes[:]) +} + +func nextIDData() (int64, uint32) { mu.Lock() defer mu.Unlock() @@ -70,9 +78,27 @@ func (*esTimeBasedUUIDGenerator) NextID() string { lastTimestamp = timestamp t := timestamp.UnixNano() / 1000 // timestamp in ms-since-epoch + return t, s +} + +func getTimestamp() time.Time { + // Don't let timestamp go backwards, at least "on our watch" (while this process is running). We are still vulnerable if we are + // shut down, clock goes backwards, and we restart... for this we randomize the sequenceNumber on init to decrease chance of + // collision. + now := time.Now() - uuidBytes := make([]byte, 15) + if lastTimestamp.IsZero() { + return now + } + + if lastTimestamp.After(now) { + return lastTimestamp + } + + return now +} +func packID(buf []byte, ts int64, seq uint32) { //// We have auto-generated ids, which are usually used for append-only workloads. //// So we try to optimize the order of bytes for indexing speed (by having quite //// unique bytes close to the beginning of the ids so that sorting is fast) and @@ -80,42 +106,23 @@ func (*esTimeBasedUUIDGenerator) NextID() string { // We use the sequence number rather than the timestamp because the distribution of // the timestamp depends too much on the indexing rate, so it is less reliable. - uuidBytes[0] = byte(s) // copy lowest-order byte from sequence number - uuidBytes[1] = byte(s >> 16) // copy 3rd lowest-order byte from sequence number + buf[0] = byte(seq) // copy lowest-order byte from sequence number + buf[1] = byte(seq >> 16) // copy 3rd lowest-order byte from sequence number // Now we start focusing on compression and put bytes that should not change too often. - uuidBytes[2] = byte(t >> 16) // 3rd lowest-order byte from timestamp; changes every ~65 secs - uuidBytes[3] = byte(t >> 24) // 4th lowest-order byte from timestamp; changes every ~4.5h - uuidBytes[4] = byte(t >> 32) // 5th lowest-order byte from timestamp; changes every ~50 days - uuidBytes[5] = byte(t >> 40) // 6th lowest-order byte from timestamp; changes every 35 years + buf[2] = byte(ts >> 16) // 3rd lowest-order byte from timestamp; changes every ~65 secs + buf[3] = byte(ts >> 24) // 4th lowest-order byte from timestamp; changes every ~4.5h + buf[4] = byte(ts >> 32) // 5th lowest-order byte from timestamp; changes every ~50 days + buf[5] = byte(ts >> 40) // 6th lowest-order byte from timestamp; changes every 35 years // Copy mac address bytes (6 bytes) - copy(uuidBytes[6:6+addrLen], mac) + copy(buf[6:6+addrLen], mac) // Finally we put the remaining bytes, which will likely not be compressed at all. - uuidBytes[12] = byte(t >> 8) // 2nd lowest-order byte from timestamp - uuidBytes[13] = byte(s >> 8) // 2nd lowest-order byte from sequence number - uuidBytes[14] = byte(t) + buf[12] = byte(ts >> 8) // 2nd lowest-order byte from timestamp + buf[13] = byte(seq >> 8) // 2nd lowest-order byte from sequence number + buf[14] = byte(ts) // See also: more detailed explanation of byte choices at // https://github.com/elastic/elasticsearch/blob/a666fb22664284d8e2114841ebb58ea4e1924691/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java#L80-L95 - - return base64.RawURLEncoding.EncodeToString(uuidBytes) -} - -func getTimestamp() time.Time { - // Don't let timestamp go backwards, at least "on our watch" (while this process is running). We are still vulnerable if we are - // shut down, clock goes backwards, and we restart... for this we randomize the sequenceNumber on init to decrease chance of - // collision. - now := time.Now() - - if lastTimestamp.IsZero() { - return now - } - - if lastTimestamp.After(now) { - return lastTimestamp - } - - return now } From 99c01b7bee8abadabaab068ea404512d3c954554 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 9 Dec 2019 11:27:23 -0800 Subject: [PATCH 45/50] Initialize package global variables that depend on randomness, later --- .../add_id/generator/es_generator.go | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/libbeat/processors/add_id/generator/es_generator.go b/libbeat/processors/add_id/generator/es_generator.go index 28ddebed2ff3..a626b13dc589 100644 --- a/libbeat/processors/add_id/generator/es_generator.go +++ b/libbeat/processors/add_id/generator/es_generator.go @@ -37,23 +37,19 @@ func ESTimeBasedUUIDGenerator() IDGenerator { var ( sequenceNumber uint32 lastTimestamp time.Time + once sync.Once mac []byte mu sync.Mutex ) -func init() { - m, err := getSecureMungedMACAddress() - if err != nil { - panic(err) - } - mac = m - sequenceNumber = rand.Uint32() -} - // NextID returns a base64-encoded, randomly-generated, but roughly ordered (over time), unique // ID. The algorithm used to generate the ID is the same as used by Elasticsearch. // See https://github.com/elastic/elasticsearch/blob/a666fb2266/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java func (*esTimeBasedUUIDGenerator) NextID() string { + // Initialize sequence number and mac address. We do this here instead of doing it in a package-level + // init function to give the runtime time to generate enough entropy for randomization. + initOnce() + ts, seq := nextIDData() var uuidBytes [15]byte @@ -61,6 +57,17 @@ func (*esTimeBasedUUIDGenerator) NextID() string { return base64.RawURLEncoding.EncodeToString(uuidBytes[:]) } +func initOnce() { + once.Do(func() { + sequenceNumber = rand.Uint32() + m, err := getSecureMungedMACAddress() + if err != nil { + panic(err) + } + mac = m + }) +} + func nextIDData() (int64, uint32) { mu.Lock() defer mu.Unlock() From e4f9fe9b8e75e3b723a5d1074aa4b9a460ddde85 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 9 Dec 2019 12:49:26 -0800 Subject: [PATCH 46/50] Compute last timestamp while accounting for system time going backwards --- .../add_id/generator/es_generator.go | 47 ++++++++++--------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/libbeat/processors/add_id/generator/es_generator.go b/libbeat/processors/add_id/generator/es_generator.go index a626b13dc589..53f1a7d4314b 100644 --- a/libbeat/processors/add_id/generator/es_generator.go +++ b/libbeat/processors/add_id/generator/es_generator.go @@ -36,7 +36,8 @@ func ESTimeBasedUUIDGenerator() IDGenerator { var ( sequenceNumber uint32 - lastTimestamp time.Time + lastTimestamp uint64 + delta uint64 once sync.Once mac []byte mu sync.Mutex @@ -68,7 +69,7 @@ func initOnce() { }) } -func nextIDData() (int64, uint32) { +func nextIDData() (uint64, uint32) { mu.Lock() defer mu.Unlock() @@ -77,35 +78,39 @@ func nextIDData() (int64, uint32) { // We only use bottom 3 bytes for the sequence number. s := sequenceNumber & 0xffffff - timestamp := getTimestamp() - if s == 0 { - // Always force the clock to increment whenever sequence number is 0, in case we have a long time-slip backwards. - timestamp.Add(1 * time.Millisecond) - } - lastTimestamp = timestamp - - t := timestamp.UnixNano() / 1000 // timestamp in ms-since-epoch + t := timestamp() return t, s } -func getTimestamp() time.Time { - // Don't let timestamp go backwards, at least "on our watch" (while this process is running). We are still vulnerable if we are - // shut down, clock goes backwards, and we restart... for this we randomize the sequenceNumber on init to decrease chance of - // collision. - now := time.Now() +// timestamp returns the next timestamp to use, while accounting for system time going +// backwards (e.g. due to a DST change). +func timestamp() uint64 { + now := uint64(time.Now().UnixNano() / 1000) + if lastTimestamp == 0 { + // Last timestamp has not been previously initialized + lastTimestamp = now + return lastTimestamp + } - if lastTimestamp.IsZero() { - return now + // Normally now should be later than lastTimestamp, but if the system time went backwards + // (e.g. due to DST change), we should compute a delta to account for this change, so we always return + // a value that's greater than the last call to this function. + if now < lastTimestamp { + delta = lastTimestamp - now + 1 + lastTimestamp = now + return now + delta } - if lastTimestamp.After(now) { - return lastTimestamp + // If the system time was reset, reset delta as well + if now-lastTimestamp >= delta { + delta = 0 } - return now + lastTimestamp = now + return lastTimestamp } -func packID(buf []byte, ts int64, seq uint32) { +func packID(buf []byte, ts uint64, seq uint32) { //// We have auto-generated ids, which are usually used for append-only workloads. //// So we try to optimize the order of bytes for indexing speed (by having quite //// unique bytes close to the beginning of the ids so that sorting is fast) and From f2faf1448bf18aa0c2eb664faa543e13b7082ee5 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 10 Dec 2019 10:18:29 -0800 Subject: [PATCH 47/50] Simpler and testable timestamp() function --- .../add_id/generator/es_generator.go | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/libbeat/processors/add_id/generator/es_generator.go b/libbeat/processors/add_id/generator/es_generator.go index 53f1a7d4314b..243ecbae5109 100644 --- a/libbeat/processors/add_id/generator/es_generator.go +++ b/libbeat/processors/add_id/generator/es_generator.go @@ -78,36 +78,28 @@ func nextIDData() (uint64, uint32) { // We only use bottom 3 bytes for the sequence number. s := sequenceNumber & 0xffffff - t := timestamp() - return t, s -} - -// timestamp returns the next timestamp to use, while accounting for system time going -// backwards (e.g. due to a DST change). -func timestamp() uint64 { now := uint64(time.Now().UnixNano() / 1000) - if lastTimestamp == 0 { - // Last timestamp has not been previously initialized - lastTimestamp = now - return lastTimestamp - } + lastTimestamp = timestamp(now, lastTimestamp) + return lastTimestamp, s +} - // Normally now should be later than lastTimestamp, but if the system time went backwards - // (e.g. due to DST change), we should compute a delta to account for this change, so we always return - // a value that's greater than the last call to this function. - if now < lastTimestamp { - delta = lastTimestamp - now + 1 - lastTimestamp = now - return now + delta +// timestamp returns a monotonically-increasing timestamp (in ms) to use, +// while accounting for system time going backwards (e.g. due to a DST change). +func timestamp(now, lastTS uint64) uint64 { + // Last timestamp has not been previously initialized. + if lastTS == 0 { + return now } - // If the system time was reset, reset delta as well - if now-lastTimestamp >= delta { - delta = 0 + // Normally now should be later than lastTimestamp. If that's the case, we can simply + // return now as the new timestamp. + if now > lastTS { + return now } - lastTimestamp = now - return lastTimestamp + // At this point, we know the system clock has gone backwards. So we increment the + // lastTimestamp by 1 (ms) and return it. + return lastTS + 1 } func packID(buf []byte, ts uint64, seq uint32) { From 51f8eed6e33028304e5af55dc2e0693876ea1738 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 10 Dec 2019 12:23:33 -0800 Subject: [PATCH 48/50] Adding unit test for timestamp function --- .../add_id/generator/es_generator.go | 14 +++++----- .../add_id/generator/es_generator_test.go | 27 +++++++++++++++++++ 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/libbeat/processors/add_id/generator/es_generator.go b/libbeat/processors/add_id/generator/es_generator.go index 243ecbae5109..ef51c1af7f80 100644 --- a/libbeat/processors/add_id/generator/es_generator.go +++ b/libbeat/processors/add_id/generator/es_generator.go @@ -84,17 +84,17 @@ func nextIDData() (uint64, uint32) { } // timestamp returns a monotonically-increasing timestamp (in ms) to use, -// while accounting for system time going backwards (e.g. due to a DST change). -func timestamp(now, lastTS uint64) uint64 { +// while accounting for system clock going backwards (e.g. due to a DST change). +func timestamp(clockTS, lastTS uint64) uint64 { // Last timestamp has not been previously initialized. if lastTS == 0 { - return now + return clockTS } - // Normally now should be later than lastTimestamp. If that's the case, we can simply - // return now as the new timestamp. - if now > lastTS { - return now + // Normally clockTimestamp should be later than lastTimestamp. If that's the case, we can simply + // return clockTimestamp as the new timestamp. + if clockTS > lastTS { + return clockTS } // At this point, we know the system clock has gone backwards. So we increment the diff --git a/libbeat/processors/add_id/generator/es_generator_test.go b/libbeat/processors/add_id/generator/es_generator_test.go index fe7e71198301..5a3eb520e076 100644 --- a/libbeat/processors/add_id/generator/es_generator_test.go +++ b/libbeat/processors/add_id/generator/es_generator_test.go @@ -20,6 +20,7 @@ package generator import ( "encoding/base64" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -78,6 +79,32 @@ func TestIDConsecutiveOrdering(t *testing.T) { } } +func TestMonotonicTimestamp(t *testing.T) { + now := uint64(time.Now().UnixNano() / 1000) + tests := map[string]struct { + clockTimestamp uint64 + lastTimestamp uint64 + }{ + "uninitialized": { + clockTimestamp: now, + }, + "clock_normal": { + clockTimestamp: now, + lastTimestamp: now - 1, + }, + "clock_went_backwards": { + clockTimestamp: now - 1, + lastTimestamp: now, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert.True(t, timestamp(test.clockTimestamp, test.lastTimestamp) > test.lastTimestamp) + }) + } +} + func BenchmarkID(b *testing.B) { g := ESTimeBasedUUIDGenerator() for n := 0; n < b.N; n++ { From 72b97c5f52b8c300ce84a7179f330fa618ac6997 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 10 Dec 2019 13:06:09 -0800 Subject: [PATCH 49/50] Re-implementing ES timestamp algorithm --- .../add_id/generator/es_generator.go | 38 ++++++++++--------- .../add_id/generator/es_generator_test.go | 21 ++++++++-- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/libbeat/processors/add_id/generator/es_generator.go b/libbeat/processors/add_id/generator/es_generator.go index ef51c1af7f80..74474a0eeca3 100644 --- a/libbeat/processors/add_id/generator/es_generator.go +++ b/libbeat/processors/add_id/generator/es_generator.go @@ -35,7 +35,7 @@ func ESTimeBasedUUIDGenerator() IDGenerator { } var ( - sequenceNumber uint32 + sequenceNumber uint64 lastTimestamp uint64 delta uint64 once sync.Once @@ -60,7 +60,7 @@ func (*esTimeBasedUUIDGenerator) NextID() string { func initOnce() { once.Do(func() { - sequenceNumber = rand.Uint32() + sequenceNumber = rand.Uint64() m, err := getSecureMungedMACAddress() if err != nil { panic(err) @@ -69,7 +69,7 @@ func initOnce() { }) } -func nextIDData() (uint64, uint32) { +func nextIDData() (uint64, uint64) { mu.Lock() defer mu.Unlock() @@ -78,31 +78,35 @@ func nextIDData() (uint64, uint32) { // We only use bottom 3 bytes for the sequence number. s := sequenceNumber & 0xffffff - now := uint64(time.Now().UnixNano() / 1000) - lastTimestamp = timestamp(now, lastTimestamp) + lastTimestamp = timestamp(nowMS(), lastTimestamp, s) return lastTimestamp, s } // timestamp returns a monotonically-increasing timestamp (in ms) to use, // while accounting for system clock going backwards (e.g. due to a DST change). -func timestamp(clockTS, lastTS uint64) uint64 { - // Last timestamp has not been previously initialized. - if lastTS == 0 { - return clockTS +func timestamp(clockTS, lastTS uint64, seq uint64) uint64 { + // Don't let timestamp go backwards, at least "on our watch" (while this process is running). We are still vulnerable if we are + // shut down, clock goes backwards, and we restart... for this we randomize the sequenceNumber on init to decrease chance of + // collision. + newTS := lastTS + if clockTS > lastTS { + newTS = clockTS } - // Normally clockTimestamp should be later than lastTimestamp. If that's the case, we can simply - // return clockTimestamp as the new timestamp. - if clockTS > lastTS { - return clockTS + // Always force the clock to increment whenever sequence number is 0, in case we have a long time-slip backwards. + if seq == 0 { + newTS++ } - // At this point, we know the system clock has gone backwards. So we increment the - // lastTimestamp by 1 (ms) and return it. - return lastTS + 1 + return newTS +} + +func nowMS() uint64 { + now := time.Now() + return uint64((now.Unix() * 1000) + (int64(now.Nanosecond()) / 1000000)) } -func packID(buf []byte, ts uint64, seq uint32) { +func packID(buf []byte, ts uint64, seq uint64) { //// We have auto-generated ids, which are usually used for append-only workloads. //// So we try to optimize the order of bytes for indexing speed (by having quite //// unique bytes close to the beginning of the ids so that sorting is fast) and diff --git a/libbeat/processors/add_id/generator/es_generator_test.go b/libbeat/processors/add_id/generator/es_generator_test.go index 5a3eb520e076..d8801c8ea1db 100644 --- a/libbeat/processors/add_id/generator/es_generator_test.go +++ b/libbeat/processors/add_id/generator/es_generator_test.go @@ -20,7 +20,6 @@ package generator import ( "encoding/base64" "testing" - "time" "github.com/stretchr/testify/assert" ) @@ -80,27 +79,41 @@ func TestIDConsecutiveOrdering(t *testing.T) { } func TestMonotonicTimestamp(t *testing.T) { - now := uint64(time.Now().UnixNano() / 1000) + now := nowMS() tests := map[string]struct { clockTimestamp uint64 lastTimestamp uint64 + sequenceNumber uint64 }{ - "uninitialized": { + "uninitialized_timestamp": { clockTimestamp: now, + sequenceNumber: 17, }, "clock_normal": { clockTimestamp: now, lastTimestamp: now - 1, + sequenceNumber: 17, + }, + "clock_normal_seq_wraparound": { + clockTimestamp: now, + lastTimestamp: now - 1, + sequenceNumber: 0, }, "clock_went_backwards": { clockTimestamp: now - 1, lastTimestamp: now, + sequenceNumber: 17, + }, + "clock_went_backwards_seq_wraparound": { + clockTimestamp: now - 1, + lastTimestamp: now, + sequenceNumber: 0, }, } for name, test := range tests { t.Run(name, func(t *testing.T) { - assert.True(t, timestamp(test.clockTimestamp, test.lastTimestamp) > test.lastTimestamp) + assert.True(t, timestamp(test.clockTimestamp, test.lastTimestamp, test.sequenceNumber) >= test.lastTimestamp) }) } } From 5e8420c67b67a48ef47b354387b63d04d46b6ed8 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 10 Dec 2019 13:06:56 -0800 Subject: [PATCH 50/50] Removing unused variable --- libbeat/processors/add_id/generator/es_generator.go | 1 - 1 file changed, 1 deletion(-) diff --git a/libbeat/processors/add_id/generator/es_generator.go b/libbeat/processors/add_id/generator/es_generator.go index 74474a0eeca3..249b4af412a9 100644 --- a/libbeat/processors/add_id/generator/es_generator.go +++ b/libbeat/processors/add_id/generator/es_generator.go @@ -37,7 +37,6 @@ func ESTimeBasedUUIDGenerator() IDGenerator { var ( sequenceNumber uint64 lastTimestamp uint64 - delta uint64 once sync.Once mac []byte mu sync.Mutex