diff --git a/.chloggen/elasticsearchexporter_64-bit-metric-grouping-hash.yaml b/.chloggen/elasticsearchexporter_64-bit-metric-grouping-hash.yaml new file mode 100644 index 0000000000000..d12de92af20c8 --- /dev/null +++ b/.chloggen/elasticsearchexporter_64-bit-metric-grouping-hash.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Increase metric grouping hash and _metric_names_hash from 32 bit to 64 bit to reduce collisions and chance of consequent data loss. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [41208] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 681ac88728a7b..674b11fa1b725 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -1290,19 +1290,19 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`), - Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3.0]}},"resource":{},"scope":{},"_metric_names_hash":"f7fdad9f"}`), + Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3.0]}},"resource":{},"scope":{},"_metric_names_hash":"b23939f78dc5f649"}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`), - Document: []byte(`{"@timestamp":"3600000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2.0,4.5,5.5,6.0]}},"resource":{},"scope":{},"_metric_names_hash":"f7fdad9f"}`), + Document: []byte(`{"@timestamp":"3600000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2.0,4.5,5.5,6.0]}},"resource":{},"scope":{},"_metric_names_hash":"b23939f78dc5f649"}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.sum":"gauge_double"}}}`), - Document: []byte(`{"@timestamp":"3600000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.sum":1.5},"resource":{},"scope":{},"start_timestamp":"7200000.0","_metric_names_hash":"6e599000"}`), + Document: []byte(`{"@timestamp":"3600000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.sum":1.5},"resource":{},"scope":{},"start_timestamp":"7200000.0","_metric_names_hash":"f4a8ac5e1b330ad6"}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.summary":"summary"}}}`), - Document: []byte(`{"@timestamp":"10800000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.summary":{"sum":1.5,"value_count":1}},"resource":{},"scope":{},"start_timestamp":"10800000.0","_metric_names_hash":"45a9e3cb"}`), + Document: []byte(`{"@timestamp":"10800000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.summary":{"sum":1.5,"value_count":1}},"resource":{},"scope":{},"start_timestamp":"10800000.0","_metric_names_hash":"2f30c89222c9d308"}`), }, } @@ -1371,7 +1371,7 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.sum":"gauge_long","metrics.summary":"summary"}}}`), - Document: []byte(`{"@timestamp":"0.0","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"sum":0,"summary":{"sum":1.0,"value_count":10}},"resource":{},"scope":{},"_metric_names_hash":"7dc58200"}`), + Document: []byte(`{"@timestamp":"0.0","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"sum":0,"summary":{"sum":1.0,"value_count":10}},"resource":{},"scope":{},"_metric_names_hash":"e446964dc8337bbb"}`), }, } @@ -1421,11 +1421,11 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.histogram.summary":"summary"}}}`), - Document: []byte(`{"@timestamp":"0.0","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"histogram.summary":{"sum":1.0,"value_count":10}},"resource":{},"scope":{},"_metric_names_hash":"acbaed6b"}`), + Document: []byte(`{"@timestamp":"0.0","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"histogram.summary":{"sum":1.0,"value_count":10}},"resource":{},"scope":{},"_metric_names_hash":"fcd1d6737d725996"}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.exphistogram.summary":"summary"}}}`), - Document: []byte(`{"@timestamp":"3600000.0","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"exphistogram.summary":{"sum":1.0,"value_count":10}},"resource":{},"scope":{},"_metric_names_hash":"29641c64"}`), + Document: []byte(`{"@timestamp":"3600000.0","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"exphistogram.summary":{"sum":1.0,"value_count":10}},"resource":{},"scope":{},"_metric_names_hash":"6a10ca190ae63c5"}`), }, } @@ -1464,7 +1464,7 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.foo.bar":"gauge_long","metrics.foo":"gauge_long","metrics.foo.bar.baz":"gauge_long"}}}`), - Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"foo":0,"foo.bar":0,"foo.bar.baz":0},"resource":{},"scope":{},"_metric_names_hash":"204c382a"}`), + Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"foo":0,"foo.bar":0,"foo.bar.baz":0},"resource":{},"scope":{},"_metric_names_hash":"9c732a69b35274fe"}`), }, } diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index c85d1924af8c4..4e385c89efd4f 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -4,6 +4,7 @@ go 1.23.0 require ( github.com/cespare/xxhash v1.1.0 + github.com/cespare/xxhash/v2 v2.3.0 github.com/elastic/go-docappender/v2 v2.10.0 github.com/elastic/go-elasticsearch/v8 v8.18.1 github.com/elastic/go-freelru v0.16.0 @@ -44,7 +45,6 @@ require ( require ( github.com/cenkalti/backoff/v5 v5.0.2 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cilium/ebpf v0.16.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/elastic/elastic-transport-go/v8 v8.7.0 // indirect diff --git a/exporter/elasticsearchexporter/internal/metricgroup/hasher.go b/exporter/elasticsearchexporter/internal/metricgroup/hasher.go index 8b8169e5574c1..f9b785ff0cd17 100644 --- a/exporter/elasticsearchexporter/internal/metricgroup/hasher.go +++ b/exporter/elasticsearchexporter/internal/metricgroup/hasher.go @@ -5,8 +5,8 @@ package metricgroup // import "github.com/open-telemetry/opentelemetry-collector import ( "encoding/binary" - "hash/fnv" + "github.com/cespare/xxhash/v2" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -16,9 +16,9 @@ import ( // HashKey is a struct for comparing data point identity. type HashKey struct { - resourceHash uint32 - scopeHash uint32 - dpHash uint32 + resourceHash uint64 + scopeHash uint64 + dpHash uint64 } // DataPointHasher is an interface for hashing data points by their identity, @@ -64,64 +64,64 @@ func (h *ECSDataPointHasher) HashKey() HashKey { v.CopyTo(merged.PutEmpty(k)) } - hasher := fnv.New32a() + hasher := xxhash.New() timestampBuf := make([]byte, 8) binary.LittleEndian.PutUint64(timestampBuf, uint64(h.dp.Timestamp())) - hasher.Write(timestampBuf) + _, _ = hasher.Write(timestampBuf) mapHashSortedExcludeReservedAttrs(hasher, merged) return HashKey{ - dpHash: hasher.Sum32(), + dpHash: hasher.Sum64(), } } // OTelDataPointHasher computes a hash for each of resource, scope and data point on each Update call, // to avoid wasteful hashing and sorting on data point sharing the same resource and scope. type OTelDataPointHasher struct { - resourceHash uint32 - scopeHash uint32 - dpHash uint32 + resourceHash uint64 + scopeHash uint64 + dpHash uint64 } func (h *OTelDataPointHasher) UpdateResource(resource pcommon.Resource) { // We cannot use exp/metrics/identity here because some resource fields e.g. schema url // are not dimensions and should not be part of the hash. - hasher := fnv.New32a() + hasher := xxhash.New() // There is special handling to merge geo attributes during serialization, // but we can hash them as if they are separate now. mapHashSortedExcludeReservedAttrs(hasher, resource.Attributes(), elasticsearch.MappingHintsAttrKey) - h.resourceHash = hasher.Sum32() + h.resourceHash = hasher.Sum64() } func (h *OTelDataPointHasher) UpdateScope(scope pcommon.InstrumentationScope) { - hasher := fnv.New32a() - hasher.Write([]byte(scope.Name())) + hasher := xxhash.New() + _, _ = hasher.Write([]byte(scope.Name())) // There is special handling to merge geo attributes during serialization, // but we can hash them as if they are separate now. mapHashSortedExcludeReservedAttrs(hasher, scope.Attributes(), elasticsearch.MappingHintsAttrKey) - h.scopeHash = hasher.Sum32() + h.scopeHash = hasher.Sum64() } func (h *OTelDataPointHasher) UpdateDataPoint(dp datapoints.DataPoint) { - hasher := fnv.New32a() + hasher := xxhash.New() timestampBuf := make([]byte, 8) binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.Timestamp())) - hasher.Write(timestampBuf) + _, _ = hasher.Write(timestampBuf) binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.StartTimestamp())) - hasher.Write(timestampBuf) + _, _ = hasher.Write(timestampBuf) - hasher.Write([]byte(dp.Metric().Unit())) + _, _ = hasher.Write([]byte(dp.Metric().Unit())) // There is special handling to merge geo attributes during serialization, // but we can hash them as if they are separate now. mapHashSortedExcludeReservedAttrs(hasher, dp.Attributes(), elasticsearch.MappingHintsAttrKey) - h.dpHash = hasher.Sum32() + h.dpHash = hasher.Sum64() } func (h *OTelDataPointHasher) HashKey() HashKey { diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/metrics.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/metrics.go index ca53be045dc5f..83bbf627b396b 100644 --- a/exporter/elasticsearchexporter/internal/serializer/otelserializer/metrics.go +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/metrics.go @@ -6,10 +6,10 @@ package otelserializer // import "github.com/open-telemetry/opentelemetry-collec import ( "bytes" "fmt" - "hash/fnv" "sort" "strconv" + "github.com/cespare/xxhash/v2" "github.com/elastic/go-structform" "github.com/elastic/go-structform/json" "go.opentelemetry.io/collector/pdata/pcommon" @@ -92,13 +92,13 @@ func serializeDataPoints(v *json.Visitor, dataPoints []datapoints.DataPoint, val writeUIntField(v, "_doc_count", docCount) } sort.Strings(metricNames) - hasher := fnv.New32a() + hasher := xxhash.New() for _, name := range metricNames { _, _ = hasher.Write([]byte(name)) } // workaround for https://github.com/elastic/elasticsearch/issues/99123 // should use a string field to benefit from run-length encoding - writeStringFieldSkipDefault(v, "_metric_names_hash", strconv.FormatUint(uint64(hasher.Sum32()), 16)) + writeStringFieldSkipDefault(v, "_metric_names_hash", strconv.FormatUint(hasher.Sum64(), 16)) return dynamicTemplates } diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/metrics_test.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/metrics_test.go index 1886d6abbc1f9..ef212d1cc3feb 100644 --- a/exporter/elasticsearchexporter/internal/serializer/otelserializer/metrics_test.go +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/metrics_test.go @@ -58,6 +58,6 @@ func TestSerializeMetricsConflict(t *testing.T) { "metrics": map[string]any{ "foo": json.Number("42"), }, - "_metric_names_hash": "a9f37ed7", + "_metric_names_hash": "33bf00a859c4ba3f", }, result, eventAsJSON) }