diff --git a/docs/changelog/143381.yaml b/docs/changelog/143381.yaml new file mode 100644 index 0000000000000..13e2a3f36f85f --- /dev/null +++ b/docs/changelog/143381.yaml @@ -0,0 +1,6 @@ +area: Downsampling +issues: + - 136178 +pr: 143381 +summary: Aggregate counter downsampling preserves resets +type: enhancement diff --git a/x-pack/plugin/downsample/qa/mixed-cluster/build.gradle b/x-pack/plugin/downsample/qa/mixed-cluster/build.gradle index 3e6c4b6bf0aa4..7114981142e39 100644 --- a/x-pack/plugin/downsample/qa/mixed-cluster/build.gradle +++ b/x-pack/plugin/downsample/qa/mixed-cluster/build.gradle @@ -20,7 +20,7 @@ dependencies { restResources { restApi { - include '_common', 'bulk', 'cluster', 'indices', 'search', 'ingest.put_pipeline', 'ingest.delete_pipeline' + include '_common', 'bulk', 'cluster', 'indices', 'search', 'ingest.put_pipeline', 'ingest.delete_pipeline', 'capabilities' } } diff --git a/x-pack/plugin/downsample/qa/mixed-cluster/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml b/x-pack/plugin/downsample/qa/mixed-cluster/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml index bec90a0925391..bf7f0dc78782c 100644 --- a/x-pack/plugin/downsample/qa/mixed-cluster/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml +++ b/x-pack/plugin/downsample/qa/mixed-cluster/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml @@ -83,9 +83,12 @@ setup: --- "Downsample index": - requires: - cluster_features: ["gte_v8.10.0"] - reason: "Downsampling executed using persistent task framework from version 8.10" - test_runner_features: allowed_warnings + capabilities: + - method: POST + path: /{index}/_downsample/{target_index} + capabilities: [ "downsampling.store_reset_counters" ] + test_runner_features: [ capabilities, allowed_warnings ] + reason: Storing counter resets when downsampling was added in 9.4 - do: allowed_warnings: @@ -105,30 +108,72 @@ setup: body: sort: [ "_tsid", "@timestamp" ] - - length: { hits.hits: 4 } - - match: { hits.hits.0._source._doc_count: 2 } - # Verify dimensions & time - - match: { hits.hits.0._source.metricset: pod } - - match: { hits.hits.0._source.k8s.pod.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - length: { hits.hits: 7 } + + # Downsampled doc + - match: { hits.hits.0._source._doc_count: 1 } - match: { hits.hits.0._source.@timestamp: 2021-04-28T18:00:00.000Z } + # Dimensions + - match: { hits.hits.0._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.0._source.metricset: pod } + # Metrics + - match: { hits.hits.0._source.k8s\.pod\.multi-counter: 7 } + - match: { hits.hits.0._source.k8s\.pod\.scaled-counter: 7.0 } + - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.min: 100 } + - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.max: 102 } + - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.sum: 607 } + - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.value_count: 6 } + - match: { hits.hits.0._source.k8s\.pod\.scaled-gauge.min: 100.0 } + - match: { hits.hits.0._source.k8s\.pod\.scaled-gauge.max: 101.0 } + - match: { hits.hits.0._source.k8s\.pod\.scaled-gauge.sum: 201.0 } + - match: { hits.hits.0._source.k8s\.pod\.scaled-gauge.value_count: 2 } + - match: { hits.hits.0._source.k8s\.pod\.network\.tx.min: 1434521831 } + - match: { hits.hits.0._source.k8s\.pod\.network\.tx.max: 1434577921 } + - match: { hits.hits.0._source.k8s\.pod\.network\.tx.value_count: 2 } + # Labels + - match: { hits.hits.0._source.k8s\.pod\.ip: "10.10.55.56" } + - match: { hits.hits.0._source.k8s\.pod\.created_at: "2021-04-28T19:43:00.000Z" } + - match: { hits.hits.0._source.k8s\.pod\.number_of_containers: 1 } + - match: { hits.hits.0._source.k8s\.pod\.tags: ["backend", "test", "us-west2"] } + - match: { hits.hits.0._source.k8s\.pod\.values: [1, 1, 2] } + - is_false: hits.hits.0._source.k8s\.pod\.running + + # Doc with counter resets + - is_false: hits.hits.1._source._doc_count + - match: { hits.hits.1._source.@timestamp: 2021-04-28T18:50:23.142Z } + # Dimensions + - match: { hits.hits.1._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.1._source.metricset: pod } + # Metrics + - match: { hits.hits.1._source.k8s\.pod\.multi-counter: 0 } + - match: { hits.hits.1._source.k8s\.pod\.scaled-counter: 0.0 } + # Only dimensions and counters that have been reset are in this doc + - is_false: hits.hits.1._source.k8s\.pod\.multi-gauge + + # Next downsampled doc + - match: { hits.hits.2._source._doc_count: 1 } + - match: { hits.hits.2._source.@timestamp: 2021-04-28T19:00:00.000Z } + # Dimensions + - match: { hits.hits.2._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.2._source.metricset: pod } + # Metrics + - match: { hits.hits.2._source.k8s\.pod\.multi-counter: 1000 } + - match: { hits.hits.2._source.k8s\.pod\.scaled-counter: 1000.0 } + - match: { hits.hits.2._source.k8s\.pod\.multi-gauge.min: 95 } - # Verify metrics - - match: { hits.hits.0._source.k8s.pod.multi-gauge.min: 100 } - - match: { hits.hits.0._source.k8s.pod.multi-gauge.max: 102 } - - match: { hits.hits.0._source.k8s.pod.multi-gauge.sum: 607 } - - match: { hits.hits.0._source.k8s.pod.multi-gauge.value_count: 6 } - - match: { hits.hits.0._source.k8s.pod.multi-counter: 0 } - - match: { hits.hits.0._source.k8s.pod.network.tx.min: 1434521831 } - - match: { hits.hits.0._source.k8s.pod.network.tx.max: 1434577921 } - - match: { hits.hits.0._source.k8s.pod.network.tx.value_count: 2 } - - match: { hits.hits.0._source.k8s.pod.ip: "10.10.55.56" } - - match: { hits.hits.0._source.k8s.pod.created_at: "2021-04-28T19:43:00.000Z" } - - match: { hits.hits.0._source.k8s.pod.number_of_containers: 1 } - - match: { hits.hits.0._source.k8s.pod.tags: [ "backend", "test", "us-west2" ] } - - match: { hits.hits.0._source.k8s.pod.values: [ 1, 1, 2 ] } - - is_false: hits.hits.0._source.k8s.pod.running + # Doc with counter resets + - is_false: hits.hits.3._source._doc_count + - match: { hits.hits.3._source.@timestamp: 2021-04-28T19:51:03.142Z } + # Dimensions + - match: { hits.hits.3._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.3._source.metricset: pod } + # Metrics + - match: { hits.hits.3._source.k8s\.pod\.multi-counter: 76 } + # Only dimensions and counters that have been reset are in this doc + - is_false: hits.hits.3._source.k8s\.pod\.scaled-counter + - is_false: hits.hits.3._source.k8s\.pod\.multi-gauge - # Assert rollup index settings + # Assert downsample index settings - do: indices.get_settings: index: test-downsample @@ -138,7 +183,7 @@ setup: - match: { test-downsample.settings.index.time_series.start_time: 2021-04-28T00:00:00Z } - match: { test-downsample.settings.index.routing_path: [ "metricset", "k8s.pod.uid"] } - # Assert rollup index mapping + # Assert downsample index mapping - do: indices.get_mapping: index: test-downsample diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample-with-security/10_basic.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample-with-security/10_basic.yml index 87783a918c86c..8f1362f71cc24 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample-with-security/10_basic.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample-with-security/10_basic.yml @@ -293,9 +293,12 @@ setup: --- "Downsample index": - requires: - cluster_features: ["gte_v8.13.0"] - reason: _tsid hashing introduced in 8.13 - test_runner_features: allowed_warnings + capabilities: + - method: POST + path: /{index}/_downsample/{target_index} + capabilities: [ "downsampling.store_reset_counters" ] + test_runner_features: [ capabilities, allowed_warnings ] + reason: Storing counter resets when downsampling was added in 9.4 - do: allowed_warnings: @@ -315,26 +318,40 @@ setup: body: sort: [ "_tsid", "@timestamp" ] - - length: { hits.hits: 4 } - - match: { hits.hits.0._source._doc_count: 2 } - - match: { hits.hits.0._source.k8s.pod.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } - - match: { hits.hits.0._source.metricset: pod } - - match: { hits.hits.0._source.@timestamp: "2021-04-28T18:00:00.000Z" } - - match: { hits.hits.0._source.k8s.pod.multi-counter: 0 } - - match: { hits.hits.0._source.k8s.pod.multi-gauge.min: 100.0 } - - match: { hits.hits.0._source.k8s.pod.multi-gauge.max: 102.0 } - - match: { hits.hits.0._source.k8s.pod.multi-gauge.sum: 607.0 } - - match: { hits.hits.0._source.k8s.pod.multi-gauge.value_count: 6 } - - match: { hits.hits.0._source.k8s.pod.network.tx.min: 1434521831 } - - match: { hits.hits.0._source.k8s.pod.network.tx.max: 1434577921 } - - match: { hits.hits.0._source.k8s.pod.network.tx.value_count: 2 } - - match: { hits.hits.0._source.k8s.pod.ip: "10.10.55.56" } - - match: { hits.hits.0._source.k8s.pod.created_at: "2021-04-28T19:43:00.000Z" } - - match: { hits.hits.0._source.k8s.pod.number_of_containers: 1 } - - match: { hits.hits.0._source.k8s.pod.tags: [ "backend", "test", "us-west2" ] } - - match: { hits.hits.0._source.k8s.pod.values: [ 1, 1, 2 ] } + - length: { hits.hits: 7 } + + # Downsampled doc + - match: { hits.hits.0._source._doc_count: 1 } + - match: { hits.hits.0._source.@timestamp: 2021-04-28T18:00:00.000Z } + # Dimensions + - match: { hits.hits.0._source.k8s.pod.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.0._source.metricset: pod } + # Metrics + - match: { hits.hits.0._source.k8s.pod.multi-counter: 7 } + - match: { hits.hits.0._source.k8s.pod.multi-gauge.min: 100 } + - match: { hits.hits.0._source.k8s.pod.multi-gauge.max: 102 } + - match: { hits.hits.0._source.k8s.pod.multi-gauge.sum: 607 } + - match: { hits.hits.0._source.k8s.pod.multi-gauge.value_count: 6 } + - match: { hits.hits.0._source.k8s.pod.network.tx.min: 1434521831 } + - match: { hits.hits.0._source.k8s.pod.network.tx.max: 1434577921 } + - match: { hits.hits.0._source.k8s.pod.network.tx.value_count: 2 } + # Labels + - match: { hits.hits.0._source.k8s.pod.ip: "10.10.55.56" } + - match: { hits.hits.0._source.k8s.pod.created_at: "2021-04-28T19:43:00.000Z" } + - match: { hits.hits.0._source.k8s.pod.number_of_containers: 1 } + - match: { hits.hits.0._source.k8s.pod.tags: ["backend", "test", "us-west2"] } + - match: { hits.hits.0._source.k8s.pod.values: [1, 1, 2] } - is_false: hits.hits.0._source.k8s.pod.running + # Doc with counter resets + - is_false: hits.hits.1._source._doc_count + - match: { hits.hits.1._source.@timestamp: 2021-04-28T18:50:23.142Z } + # Dimensions + - match: { hits.hits.1._source.k8s.pod.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.1._source.metricset: pod } + # Metrics + - match: { hits.hits.1._source.k8s.pod.multi-counter: 0 } + # Assert downsample index settings - do: indices.get_settings: @@ -369,12 +386,3 @@ setup: - do: indices.get: index: test - - # Assert downsample index has been force merged - - do: - indices.segments: - index: test-downsample - - - match: { _shards.total: 1} - - match: { indices.test-downsample.shards.0.0.num_committed_segments: 1} - - match: { indices.test-downsample.shards.0.0.num_search_segments: 1} diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml index d574c24d8b67c..8aac72e5fdaf9 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml @@ -1,7 +1,5 @@ setup: - requires: - cluster_features: ["gte_v8.5.0"] - reason: "rollup renamed to downsample in 8.5.0" test_runner_features: allowed_warnings - do: @@ -87,7 +85,7 @@ setup: - '{"index": {}}' - '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.37", "multi-counter" : [1000, 1001, 1002], "scaled-counter": 1000.0, "multi-gauge": [99, 100, 110], "scaled-gauge": 99.0, "network": {"tx": 1434587694, "rx": 530604797}, "created_at": "2021-04-28T19:44:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [4, 5, 2]}}}' - '{"index": {}}' - - '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.120", "multi-counter" : [76, 77, 78], "scaled-counter": 70.0, "multi-gauge": [95, 98, 100], "scaled-gauge": 95.0, "network": {"tx": 1434595272, "rx": 530605511}, "created_at": "2021-04-28T19:45:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [3, 2, 1]}}}' + - '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.120", "multi-counter" : [76, 77, 78], "scaled-counter": 1070.0, "multi-gauge": [95, 98, 100], "scaled-gauge": 95.0, "network": {"tx": 1434595272, "rx": 530605511}, "created_at": "2021-04-28T19:45:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [3, 2, 1]}}}' - do: indices.put_settings: @@ -302,8 +300,12 @@ setup: --- "Downsample index": - requires: - cluster_features: ["gte_v8.13.0"] - reason: _tsid hashing introduced in 8.13 + capabilities: + - method: POST + path: /{index}/_downsample/{target_index} + capabilities: [ "downsampling.store_reset_counters" ] + test_runner_features: [ "capabilities" ] + reason: Storing counter resets when downsampling was added in 9.4 - do: allowed_warnings: @@ -323,14 +325,17 @@ setup: body: sort: [ "_tsid", "@timestamp" ] - - length: { hits.hits: 4 } - - match: { hits.hits.0._source._doc_count: 2 } - - match: { hits.hits.0._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } - - match: { hits.hits.0._source.metricset: pod } + - length: { hits.hits: 7 } + # Downsampled doc + - match: { hits.hits.0._source._doc_count: 1 } - match: { hits.hits.0._source.@timestamp: 2021-04-28T18:00:00.000Z } - - match: { hits.hits.0._source.k8s\.pod\.multi-counter: 0 } - - match: { hits.hits.0._source.k8s\.pod\.scaled-counter: 0.00 } + # Dimensions + - match: { hits.hits.0._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.0._source.metricset: pod } + # Metrics + - match: { hits.hits.0._source.k8s\.pod\.multi-counter: 7 } + - match: { hits.hits.0._source.k8s\.pod\.scaled-counter: 7.0 } - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.min: 100 } - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.max: 102 } - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.sum: 607 } @@ -342,6 +347,7 @@ setup: - match: { hits.hits.0._source.k8s\.pod\.network\.tx.min: 1434521831 } - match: { hits.hits.0._source.k8s\.pod\.network\.tx.max: 1434577921 } - match: { hits.hits.0._source.k8s\.pod\.network\.tx.value_count: 2 } + # Labels - match: { hits.hits.0._source.k8s\.pod\.ip: "10.10.55.56" } - match: { hits.hits.0._source.k8s\.pod\.created_at: "2021-04-28T19:43:00.000Z" } - match: { hits.hits.0._source.k8s\.pod\.number_of_containers: 1 } @@ -349,6 +355,41 @@ setup: - match: { hits.hits.0._source.k8s\.pod\.values: [1, 1, 2] } - is_false: hits.hits.0._source.k8s\.pod\.running + # Doc with counter resets + - is_false: hits.hits.1._source._doc_count + - match: { hits.hits.1._source.@timestamp: 2021-04-28T18:50:23.142Z } + # Dimensions + - match: { hits.hits.1._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.1._source.metricset: pod } + # Metrics + - match: { hits.hits.1._source.k8s\.pod\.multi-counter: 0 } + - match: { hits.hits.1._source.k8s\.pod\.scaled-counter: 0.0 } + # Only dimensions and counters that have been reset are in this doc + - is_false: hits.hits.1._source.k8s\.pod\.multi-gauge + + # Next downsampled doc + - match: { hits.hits.2._source._doc_count: 1 } + - match: { hits.hits.2._source.@timestamp: 2021-04-28T19:00:00.000Z } + # Dimensions + - match: { hits.hits.2._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.2._source.metricset: pod } + # Metrics + - match: { hits.hits.2._source.k8s\.pod\.multi-counter: 1000 } + - match: { hits.hits.2._source.k8s\.pod\.scaled-counter: 1000.0 } + - match: { hits.hits.2._source.k8s\.pod\.multi-gauge.min: 95 } + + # Doc with counter resets + - is_false: hits.hits.3._source._doc_count + - match: { hits.hits.3._source.@timestamp: 2021-04-28T19:51:03.142Z } + # Dimensions + - match: { hits.hits.3._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.3._source.metricset: pod } + # Metrics + - match: { hits.hits.3._source.k8s\.pod\.multi-counter: 76 } + # Only dimensions and counters that have been reset are in this doc + - is_false: hits.hits.3._source.k8s\.pod\.scaled-counter + - is_false: hits.hits.3._source.k8s\.pod\.multi-gauge + # Assert downsample index settings - do: indices.get_settings: @@ -815,8 +856,12 @@ setup: --- "Downsample a downsampled index": - requires: - cluster_features: ["gte_v8.13.0"] - reason: _tsid hashing introduced in 8.13 + capabilities: + - method: POST + path: /{index}/_downsample/{target_index} + capabilities: [ "downsampling.store_reset_counters" ] + test_runner_features: [ "capabilities" ] + reason: Storing counter resets when downsampling was added in 9.4 - do: allowed_warnings: @@ -868,12 +913,12 @@ setup: body: sort: [ "_tsid", "@timestamp" ] - - length: { hits.hits: 3 } - - match: { hits.hits.0._source._doc_count: 4 } + - length: { hits.hits: 7 } + - match: { hits.hits.0._source._doc_count: 1 } - match: { hits.hits.0._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } - match: { hits.hits.0._source.metricset: pod } - match: { hits.hits.0._source.@timestamp: 2021-04-28T18:00:00.000Z } - - match: { hits.hits.0._source.k8s\.pod\.multi-counter: 76 } + - match: { hits.hits.0._source.k8s\.pod\.multi-counter: 7 } - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.min: 95.0 } - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.max: 110.0 } - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.sum: 1209.0 } @@ -887,15 +932,34 @@ setup: - match: { hits.hits.0._source.k8s\.pod\.tags: [ "backend", "test", "us-west1" ] } - match: { hits.hits.0._source.k8s\.pod\.values: [ 1, 2, 3 ] } - - match: { hits.hits.1._source.k8s\.pod\.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 } + - match: { hits.hits.1._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } - match: { hits.hits.1._source.metricset: pod } - - match: { hits.hits.1._source.@timestamp: 2021-04-28T18:00:00.000Z } - - match: { hits.hits.1._source._doc_count: 2 } + - match: { hits.hits.1._source.@timestamp: 2021-04-28T18:50:23.142Z } + - match: { hits.hits.1._source.k8s\.pod\.multi-counter: 0 } - - match: { hits.hits.2._source.k8s\.pod\.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 } + - match: { hits.hits.2._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } - match: { hits.hits.2._source.metricset: pod } - - match: { hits.hits.2._source.@timestamp: 2021-04-28T20:00:00.000Z } - - match: { hits.hits.2._source._doc_count: 2 } + - match: { hits.hits.2._source.@timestamp: 2021-04-28T19:00:00.000Z } + - match: { hits.hits.2._source.k8s\.pod\.multi-counter: 1000 } + + - match: { hits.hits.3._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.3._source.metricset: pod } + - match: { hits.hits.3._source.@timestamp: 2021-04-28T19:51:03.142Z } + - match: { hits.hits.3._source.k8s\.pod\.multi-counter: 76 } + + - match: { hits.hits.4._source.k8s\.pod\.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 } + - match: { hits.hits.4._source.metricset: pod } + - match: { hits.hits.4._source.@timestamp: 2021-04-28T18:00:00.000Z } + - match: { hits.hits.4._source._doc_count: 1 } + + - match: { hits.hits.5._source.k8s\.pod\.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 } + - match: { hits.hits.5._source.metricset: pod } + - match: { hits.hits.5._source.@timestamp: 2021-04-28T18:50:24.467Z } + + - match: { hits.hits.6._source.k8s\.pod\.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 } + - match: { hits.hits.6._source.metricset: pod } + - match: { hits.hits.6._source.@timestamp: 2021-04-28T20:00:00.000Z } + - match: { hits.hits.6._source._doc_count: 2 } - do: allowed_warnings: diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/40_runtime_fields.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/40_runtime_fields.yml index 10ab2fcc5d128..433ed53eccec7 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/40_runtime_fields.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/40_runtime_fields.yml @@ -1,12 +1,12 @@ --- "Runtime fields accessing metric fields in downsample target index": - requires: - reason: Using average as the value of a downsampled gauge was added in 8.14 - test_runner_features: [close_to, allowed_warnings, capabilities] capabilities: - method: POST - path: /_search - capabilities: [ aggregate_metric_double_defaults_to_average ] + path: /{index}/_downsample/{target_index} + capabilities: [ "downsampling.store_reset_counters" ] + test_runner_features: [ capabilities, close_to, allowed_warnings] + reason: Storing counter resets when downsampling was added in 9.4 - do: indices.create: @@ -145,23 +145,23 @@ - length: { hits.hits: 4 } - - close_to: { hits.hits.0.fields.received_kb.0: { value: 518142.3935, error: 0.0001 } } - - close_to: { hits.hits.0.fields.sent_kb.0: { value: 1400935.4472, error: 0.0001 } } + - close_to: { hits.hits.0.fields.received_kb.0: { value: 518139.8417, error: 0.0001 } } + - close_to: { hits.hits.0.fields.sent_kb.0: { value: 1400900.2255, error: 0.0001 } } - close_to: { hits.hits.0.fields.tx_kb.0: { value: 1400927.6132, error: 0.0001 } } - close_to: { hits.hits.0.fields.rx_kb.0: { value: 518151.9951, error: 0.0001 } } - - close_to: { hits.hits.1.fields.received_kb.0: { value: 518186.5039, error: 0.0001 } } - - close_to: { hits.hits.1.fields.sent_kb.0: { value: 1400988.2822, error: 0.0001 } } + - close_to: { hits.hits.1.fields.received_kb.0: { value: 518164.1699, error: 0.0001 } } + - close_to: { hits.hits.1.fields.sent_kb.0: { value: 1400966.6992, error: 0.0001 } } - close_to: { hits.hits.1.fields.tx_kb.0: { value: 1400968.2451, error: 0.0001 } } - close_to: { hits.hits.1.fields.rx_kb.0: { value: 518169.0957, error: 0.0001 } } - - close_to: { hits.hits.2.fields.received_kb.0: { value: 783343.5488, error: 0.0001 } } - - close_to: { hits.hits.2.fields.sent_kb.0: { value: 1954908.8779, error: 0.0001 } } + - close_to: { hits.hits.2.fields.received_kb.0: { value: 783333.7832, error: 0.0001 } } + - close_to: { hits.hits.2.fields.sent_kb.0: { value: 1954901.0654, error: 0.0001 } } - close_to: { hits.hits.2.fields.tx_kb.0: { value: 1956541.3305, error: 0.0001 } } - close_to: { hits.hits.2.fields.rx_kb.0: { value: 783014.5332, error: 0.0001 } } - - close_to: { hits.hits.3.fields.received_kb.0: { value: 783377.7343, error: 0.0001 } } - - close_to: { hits.hits.3.fields.sent_kb.0: { value: 1955339.7343, error: 0.0001 } } + - close_to: { hits.hits.3.fields.received_kb.0: { value: 783372.8505, error: 0.0001 } } + - close_to: { hits.hits.3.fields.sent_kb.0: { value: 1955096.3671, error: 0.0001 } } - close_to: { hits.hits.3.fields.tx_kb.0: { value: 1962470.67333, error: 0.0001 } } - close_to: { hits.hits.3.fields.rx_kb.0: { value: 784190.9179, error: 0.0001 } } diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleRateIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleRateIT.java new file mode 100644 index 0000000000000..19644682fd5aa --- /dev/null +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleRateIT.java @@ -0,0 +1,298 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.downsample; + +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.downsample.DownsampleAction; +import org.elasticsearch.action.downsample.DownsampleConfig; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xpack.esql.action.ColumnInfoImpl; +import org.elasticsearch.xpack.esql.action.EsqlQueryAction; +import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.downsample.DownsampleDataStreamTests.TIMEOUT; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThan; + +public class DownsampleRateIT extends DownsamplingIntegTestCase { + + private static final String INDEX_NAME = "metrics"; + private static final String DOWNSAMPLED_INDEX_NAME = "metrics-downsampled"; + private static final String MAPPING = """ + { + "properties": { + "@timestamp": { + "type": "date" + }, + "metricset": { + "type": "keyword", + "time_series_dimension": true + }, + "counter": { + "type": "long", + "time_series_metric": "counter" + } + } + } + """; + public static final String START_TIME = "2021-04-29T00:00:00Z"; + public static final String END_TIME = "2021-04-29T23:59:59Z"; + + public void testTimeSeriesAggregateRate() { + runTest( + List.of( + new DocumentSpec("2021-04-29T17:01:00.000Z", 1), + new DocumentSpec("2021-04-29T17:03:12.470Z", 2), + new DocumentSpec("2021-04-29T17:10:12.470Z", 5), + new DocumentSpec("2021-04-29T17:22:22.470Z", 6), + new DocumentSpec("2021-04-29T17:24:22.470Z", 10), + new DocumentSpec("2021-04-29T17:29:22.470Z", 11), + new DocumentSpec("2021-04-29T17:32:22.470Z", 12), + new DocumentSpec("2021-04-29T17:39:22.470Z", 13) + ), + "30m", + 0.003 + ); + } + + public void testTimeSeriesAggregateRate_SingleReset() { + runTest( + List.of( + new DocumentSpec("2021-04-29T17:02:12.470Z", 1), + new DocumentSpec("2021-04-29T17:03:12.470Z", 2), + new DocumentSpec("2021-04-29T17:10:12.470Z", 5), + new DocumentSpec("2021-04-29T17:19:12.470Z", 8), + new DocumentSpec("2021-04-29T17:20:22.470Z", 3), + new DocumentSpec("2021-04-29T17:22:22.470Z", 6), + new DocumentSpec("2021-04-29T17:24:22.470Z", 10), + new DocumentSpec("2021-04-29T17:29:22.470Z", 11), + new DocumentSpec("2021-04-29T17:32:22.470Z", 12), + new DocumentSpec("2021-04-29T17:39:22.470Z", 13) + ), + "30m", + 0.003 + ); + } + + public void testTimeSeriesQueryingSingleLargeReset() { + runTest( + List.of( + new DocumentSpec("2021-04-29T17:02:12.470Z", 1000), + new DocumentSpec("2021-04-29T17:03:12.470Z", 1003), + new DocumentSpec("2021-04-29T17:10:12.470Z", 1010), + new DocumentSpec("2021-04-29T17:19:12.470Z", 1040), + new DocumentSpec("2021-04-29T17:20:22.470Z", 1060), + new DocumentSpec("2021-04-29T17:22:22.470Z", 20), + new DocumentSpec("2021-04-29T17:24:22.470Z", 30), + new DocumentSpec("2021-04-29T17:29:22.470Z", 40), + new DocumentSpec("2021-04-29T17:32:22.470Z", 70), + new DocumentSpec("2021-04-29T17:39:22.470Z", 80) + ), + "30m", + 0.003 + ); + } + + public void testTimeSeriesQuerying_MultipleResets() { + runTest( + List.of( + new DocumentSpec("2021-04-29T17:02:12.470Z", 1000), + new DocumentSpec("2021-04-29T17:03:12.470Z", 1003), + new DocumentSpec("2021-04-29T17:05:12.470Z", 1010), + new DocumentSpec("2021-04-29T17:06:12.470Z", 1040), + new DocumentSpec("2021-04-29T17:07:22.470Z", 1060), + new DocumentSpec("2021-04-29T17:08:22.470Z", 20), + new DocumentSpec("2021-04-29T17:10:22.470Z", 30), + new DocumentSpec("2021-04-29T17:11:22.470Z", 40), + new DocumentSpec("2021-04-29T17:12:22.470Z", 70), + new DocumentSpec("2021-04-29T17:22:22.470Z", 80), + new DocumentSpec("2021-04-29T17:23:22.470Z", 20), + new DocumentSpec("2021-04-29T17:24:22.470Z", 10), + new DocumentSpec("2021-04-29T17:25:22.470Z", 20), + new DocumentSpec("2021-04-29T17:26:22.470Z", 40), + new DocumentSpec("2021-04-29T17:27:22.470Z", 60), + new DocumentSpec("2021-04-29T17:28:22.470Z", 5), + new DocumentSpec("2021-04-29T17:29:22.470Z", 10), + new DocumentSpec("2021-04-29T17:59:22.470Z", 20) + ), + "30m", + 0.003 + ); + } + + public void testTimeSeriesQuerying_RandomDocuments() { + long startTime = Instant.parse(START_TIME).toEpochMilli(); + long endTime = Instant.parse(END_TIME).toEpochMilli(); + int counter = 0; + long currentTime = startTime; + List documentSpecs = new ArrayList<>(); + while (currentTime < endTime) { + if (randomInt(9) > 0) { + counter = randomInt(100); + } else { + counter += randomInt(100); + } + documentSpecs.add(new DocumentSpec(randomFrom("pod-1", "pod-2", "pod-3"), DATE_FORMATTER.formatMillis(currentTime), counter)); + currentTime += randomLongBetween(5, 30) * 1000; + } + // We use higher rate epsilon because there is a bigger fluctuation due to the random data + runTest(documentSpecs, "1h", 0.1); + } + + private void runTest(List documentSpecs, String interval, double rateEpsilon) { + createIndex(); + indexDocuments(documentSpecs); + DownsampleConfig downsampleConfig = new DownsampleConfig( + new DateHistogramInterval(interval), + DownsampleConfig.SamplingMethod.AGGREGATE + ); + downsample(downsampleConfig); + + try (var baseline = queryRate(INDEX_NAME); var contender = queryRate(DOWNSAMPLED_INDEX_NAME)) { + compareResults(baseline, contender, rateEpsilon); + } + } + + private void compareResults(EsqlQueryResponse baseline, EsqlQueryResponse contender, double rateEpsilon) { + assertResultColumns(baseline); + assertResultColumns(contender); + List baselineRows = convertToSortedList(baseline); + List contenderRows = convertToSortedList(contender); + for (int i = 0; i < baselineRows.size(); i++) { + RateResult baselineRow = baselineRows.get(i); + RateResult contenderRow = contenderRows.get(i); + // We need these two assertions to correctly identify the rate + assertThat(contenderRow.timeseries, equalTo(baselineRow.timeseries)); + assertThat(contenderRow.timestamp, equalTo(baselineRow.timestamp)); + assertEquals(baselineRow.rate, contenderRow.rate, rateEpsilon); + } + } + + // We need to convert the result to a list and sort it by timeseries first and then by timestamp + // to compare the results row by row + private static List convertToSortedList(EsqlQueryResponse result) { + var rows = new ArrayList((int) result.getRowCount()); + for (Iterable objects : result.rows()) { + var row = objects.iterator(); + while (row.hasNext()) { + var rate = (double) row.next(); + var timeseries = (String) row.next(); + var timestamp = (String) row.next(); + rows.add(new RateResult(timeseries, timestamp, rate)); + } + } + rows.sort(Comparator.comparing(RateResult::timeseries).thenComparing(RateResult::timestamp)); + return rows; + } + + private void assertResultColumns(EsqlQueryResponse response) { + var columns = response.columns(); + assertThat(columns, hasSize(3)); + assertThat( + response.columns(), + equalTo( + List.of( + new ColumnInfoImpl("rate", "double", null), + new ColumnInfoImpl("_timeseries", "keyword", null), + new ColumnInfoImpl("time_bucket", "date", null) + ) + ) + ); + } + + private EsqlQueryResponse queryRate(String indexName) { + String command = "TS " + indexName + " | STATS rate=RATE(counter) BY time_bucket = TBUCKET(1 hour) | SORT time_bucket"; + return client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(command)).actionGet(30, TimeUnit.SECONDS); + } + + private static void createIndex() { + CreateIndexRequest request = new CreateIndexRequest(INDEX_NAME); + request.settings( + Settings.builder() + .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.getName()) + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "metricset") + .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), START_TIME) + .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), END_TIME) + ); + request.mapping(MAPPING); + assertAcked(client().admin().indices().create(request)); + } + + private void indexDocuments(List documentSpecs) { + AtomicInteger i = new AtomicInteger(); + Supplier nextDoc = () -> { + try { + assertThat(i.get(), lessThan(documentSpecs.size())); + var docSpec = documentSpecs.get(i.getAndIncrement()); + return XContentFactory.jsonBuilder() + .startObject() + .field("@timestamp", docSpec.timestamp) + .field("metricset", docSpec.dimension) + .field("counter", docSpec.counter) + .endObject(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + bulkIndex(INDEX_NAME, nextDoc, documentSpecs.size()); + } + + private void downsample(DownsampleConfig downsampleConfig) { + // Set the source index to read-only state + assertAcked( + indicesAdmin().prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build()) + ); + + assertAcked( + client().execute( + DownsampleAction.INSTANCE, + new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, INDEX_NAME, DOWNSAMPLED_INDEX_NAME, TIMEOUT, downsampleConfig) + ) + ); + + // Wait for downsampling to complete + SubscribableListener listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + final var indexMetadata = clusterState.metadata().getProject().index(DOWNSAMPLED_INDEX_NAME); + if (indexMetadata == null) { + return false; + } + var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings()); + return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS; + }); + safeAwait(listener); + } + + record DocumentSpec(String dimension, String timestamp, int counter) { + DocumentSpec(String timestamp, int counter) { + this("pod", timestamp, counter); + } + } + + record RateResult(String timeseries, String timestamp, double rate) {} +} diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractFieldDownsampler.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractFieldDownsampler.java index 97eb564d38342..9b0088cc4b7d2 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractFieldDownsampler.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractFieldDownsampler.java @@ -78,7 +78,8 @@ static List> create( SearchExecutionContext context, String[] fields, Map multiFieldSources, - DownsampleConfig.SamplingMethod samplingMethod + DownsampleConfig.SamplingMethod samplingMethod, + DownsamplerCountPerValueType fieldCounts ) { List> downsamplers = new ArrayList<>(); for (String field : fields) { @@ -87,7 +88,7 @@ static List> create( assert fieldType != null : "Unknown field type for field: [" + sourceField + "]"; if (fieldType instanceof AggregateMetricDoubleFieldMapper.AggregateMetricDoubleFieldType aggMetricFieldType) { - downsamplers.addAll(AggregateMetricDoubleFieldDownsampler.create(context, aggMetricFieldType, samplingMethod)); + downsamplers.addAll(AggregateMetricDoubleFieldDownsampler.create(context, aggMetricFieldType, samplingMethod, fieldCounts)); } else { if (context.fieldExistsInIndex(field)) { final IndexFieldData fieldData; @@ -97,7 +98,7 @@ static List> create( } else { fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); } - downsamplers.add(create(field, fieldType, fieldData, samplingMethod)); + downsamplers.add(create(field, fieldType, fieldData, samplingMethod, fieldCounts)); } } } @@ -111,24 +112,85 @@ private static AbstractFieldDownsampler create( String fieldName, MappedFieldType fieldType, IndexFieldData fieldData, - DownsampleConfig.SamplingMethod samplingMethod + DownsampleConfig.SamplingMethod samplingMethod, + DownsamplerCountPerValueType fieldCounts ) { assert AggregateMetricDoubleFieldDownsampler.supportsFieldType(fieldType) == false : "Aggregate metric double should be handled by a dedicated downsampler"; if (TDigestHistogramFieldDownsampler.supportsFieldType(fieldType)) { + fieldCounts.increaseTDigestHistogramFields(); return TDigestHistogramFieldDownsampler.create(fieldName, fieldType, fieldData, samplingMethod); } if (ExponentialHistogramFieldDownsampler.supportsFieldType(fieldType)) { + fieldCounts.increaseExponentialHistogramFields(); return ExponentialHistogramFieldDownsampler.create(fieldName, fieldData, samplingMethod); } if (NumericMetricFieldDownsampler.supportsFieldType(fieldType)) { - return NumericMetricFieldDownsampler.create(fieldName, fieldType, fieldData, samplingMethod); + return NumericMetricFieldDownsampler.create(fieldName, fieldType, fieldData, samplingMethod, fieldCounts); } // TODO: Support POSITION in downsampling if (fieldType.getMetricType() == POSITION) { throw new IllegalArgumentException("Unsupported metric type [position] for downsampling"); } // If a field is not a metric, we downsample it as a label - return LastValueFieldDownsampler.create(fieldName, fieldType, fieldData); + return LastValueFieldDownsampler.create(fieldName, fieldType, fieldData, fieldCounts); + } + + static class DownsamplerCountPerValueType { + private int numericFields = 0; + private int aggregateCounterFields = 0; + private int formattedValueFields = 0; + private int dimensionFields = 0; + private int exponentialHistogramFields = 0; + private int tDigestHistogramFields = 0; + + void increaseNumericFields() { + numericFields++; + } + + void increaseAggregateCounterFields() { + aggregateCounterFields++; + } + + void increaseFormattedValueFields() { + formattedValueFields++; + } + + void increaseDimensionFields() { + dimensionFields++; + formattedValueFields++; + } + + void increaseExponentialHistogramFields() { + exponentialHistogramFields++; + } + + void increaseTDigestHistogramFields() { + tDigestHistogramFields++; + } + + int numericFields() { + return numericFields; + } + + int aggregateCounterFields() { + return aggregateCounterFields; + } + + int formattedValueFields() { + return formattedValueFields; + } + + int dimensionFields() { + return dimensionFields; + } + + int exponentialHistogramFields() { + return exponentialHistogramFields; + } + + int tDigestHistogramFields() { + return tDigestHistogramFields; + } } } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricDoubleFieldDownsampler.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricDoubleFieldDownsampler.java index 1821013432427..1a40076c22a4f 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricDoubleFieldDownsampler.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricDoubleFieldDownsampler.java @@ -206,7 +206,8 @@ private boolean isEmpty() { static List create( SearchExecutionContext context, AggregateMetricDoubleFieldMapper.AggregateMetricDoubleFieldType aggMetricFieldType, - DownsampleConfig.SamplingMethod samplingMethod + DownsampleConfig.SamplingMethod samplingMethod, + DownsamplerCountPerValueType fieldCounts ) { List downsamplers = new ArrayList<>(); // If the field is an aggregate_metric_double field, we should load all its subfields @@ -217,6 +218,7 @@ static List create( if (context.fieldExistsInIndex(metricSubField.name())) { IndexFieldData fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH); downsamplers.add(create(aggMetricFieldType, metric, fieldData, samplingMethod)); + fieldCounts.increaseNumericFields(); } } return downsamplers; diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/CounterResetDataPoints.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/CounterResetDataPoints.java new file mode 100644 index 0000000000000..d083671fcda5a --- /dev/null +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/CounterResetDataPoints.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.downsample; + +import org.elasticsearch.core.Tuple; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +/** + * Helper class that stores counter measurements necessary to detect a reset during downsampling. These measurements are + * the last measurement before a reset and the next measurement. + * + * Note: this code is used sequentially, and therefore there is no thread-safety built-in. + */ +class CounterResetDataPoints { + + /** + * Tracks timestamp measurements and the counter values at that moment. + */ + private final Map>> dataPoints = new HashMap<>(); + + void addDataPoint(String counterName, ResetPoint resetPoint) { + dataPoints.computeIfAbsent(resetPoint.timestamp, k -> new ArrayList<>()).add(Tuple.tuple(counterName, resetPoint.value)); + } + + public boolean isEmpty() { + return dataPoints.isEmpty(); + } + + /** + * Apply the processor consumer on each tracked measurement. + */ + public void processDataPoints(BiConsumer>> processor) { + if (isEmpty() == false) { + dataPoints.forEach(processor); + } + } + + public int countResetDocuments() { + return dataPoints.size(); + } + + record ResetPoint(long timestamp, double value) {} +} diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsampler.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsampler.java index b0e0240bfe169..f1e3560c22796 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsampler.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsampler.java @@ -89,7 +89,8 @@ public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) thro static List create( final SearchExecutionContext context, final String[] dimensions, - final Map multiFieldSources + final Map multiFieldSources, + DownsamplerCountPerValueType fieldCounts ) { List downsamplers = new ArrayList<>(); for (String dimension : dimensions) { @@ -98,6 +99,7 @@ static List create( assert fieldType != null : "Unknown type for dimension field: [" + sourceFieldName + "]"; if (context.fieldExistsInIndex(fieldType.name())) { + fieldCounts.increaseDimensionFields(); final IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); if (fieldType instanceof FlattenedFieldMapper.KeyedFlattenedFieldType flattenedFieldType) { // Name of the field type and name of the dimension are different in this case. diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java index 842dde6294182..77c2da34ddd7d 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java @@ -31,11 +31,13 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.fielddata.FormattedDocValues; import org.elasticsearch.index.fielddata.HistogramValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.index.fielddata.SortedNumericLongValues; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.DocCountFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; @@ -69,6 +71,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import static java.util.stream.Collectors.groupingBy; @@ -98,9 +102,11 @@ class DownsampleShardIndexer { private final DocValueFormat timestampFormat; private final Rounding.Prepared rounding; private final List> fieldDownsamplers; + private final TimestampValueFetcher timestampValueFetcher; private final DownsampleShardTask task; private final DownsampleShardPersistentTaskState state; private final String[] dimensions; + private final AbstractFieldDownsampler.DownsamplerCountPerValueType fieldCounts; private volatile boolean abort = false; ByteSizeValue downsampleBulkSize = DOWNSAMPLE_BULK_SIZE; ByteSizeValue downsampleMaxBytesInFlight = DOWNSAMPLE_MAX_BYTES_IN_FLIGHT; @@ -142,20 +148,25 @@ class DownsampleShardIndexer { this.timestampField = (DateFieldMapper.DateFieldType) searchExecutionContext.getFieldType(config.getTimestampField()); this.timestampFormat = timestampField.docValueFormat(null, null); this.rounding = config.createRounding(); + this.fieldCounts = new AbstractFieldDownsampler.DownsamplerCountPerValueType(); var samplingMethod = config.getSamplingMethodOrDefault(); List> downsamplers = new ArrayList<>(metrics.length + labels.length + dimensions.length); - downsamplers.addAll(AbstractFieldDownsampler.create(searchExecutionContext, metrics, multiFieldSources, samplingMethod)); + downsamplers.addAll( + AbstractFieldDownsampler.create(searchExecutionContext, metrics, multiFieldSources, samplingMethod, fieldCounts) + ); // Labels are downsampled using the last value, they are not influenced by the requested sampling method downsamplers.addAll( AbstractFieldDownsampler.create( searchExecutionContext, labels, multiFieldSources, - DownsampleConfig.SamplingMethod.LAST_VALUE + DownsampleConfig.SamplingMethod.LAST_VALUE, + fieldCounts ) ); - downsamplers.addAll(DimensionFieldDownsampler.create(searchExecutionContext, dimensions, multiFieldSources)); + downsamplers.addAll(DimensionFieldDownsampler.create(searchExecutionContext, dimensions, multiFieldSources, fieldCounts)); + this.timestampValueFetcher = new TimestampValueFetcher(timestampField, searchExecutionContext); this.fieldDownsamplers = Collections.unmodifiableList(downsamplers); toClose = null; } finally { @@ -353,13 +364,21 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure) } private class TimeSeriesBucketCollector extends BucketCollector { + // Constants to reduce object allocation when we do not need documents for counter resets + private static final DimensionFieldDownsampler[] EMPTY_DIMENSIONS_FOR_COUNTER_RESETS = new DimensionFieldDownsampler[0]; + private static final NumericMetricFieldDownsampler.AggregateCounter[] EMPTY_AGGREGATE_COUNTERS = + new NumericMetricFieldDownsampler.AggregateCounter[0]; private final BulkProcessor2 bulkProcessor; private final DownsampleBucketBuilder downsampleBucketBuilder; private LeafDownsampleCollector currentLeafCollector; + // Downsamplers grouped by the doc value input they expect, we use primitive arrays to reduce the footprint. private final LastValueFieldDownsampler[] formattedDocValuesDownsamplers; private final ExponentialHistogramFieldDownsampler[] exponentialHistogramDownsamplers; private final TDigestHistogramFieldDownsampler[] tDigestHistogramDownsamplers; private final NumericMetricFieldDownsampler[] numericDownsamplers; + // Aggregate counter downsampler is dealt with separately than the numeric ones because + // they additionally require timestamps. + private final NumericMetricFieldDownsampler.AggregateCounter[] aggregateCounterDownsamplers; private long docsProcessed; private long bucketsCreated; long lastTimestamp = Long.MAX_VALUE; @@ -367,19 +386,60 @@ private class TimeSeriesBucketCollector extends BucketCollector { TimeSeriesBucketCollector(BulkProcessor2 bulkProcessor, String[] dimensions) { this.bulkProcessor = bulkProcessor; - this.numericDownsamplers = fieldDownsamplers.stream() - .filter(NumericMetricFieldDownsampler.class::isInstance) - .toArray(NumericMetricFieldDownsampler[]::new); - this.formattedDocValuesDownsamplers = fieldDownsamplers.stream() - .filter(LastValueFieldDownsampler.class::isInstance) - .toArray(LastValueFieldDownsampler[]::new); - this.exponentialHistogramDownsamplers = fieldDownsamplers.stream() - .filter(ExponentialHistogramFieldDownsampler.class::isInstance) - .toArray(ExponentialHistogramFieldDownsampler[]::new); - this.tDigestHistogramDownsamplers = fieldDownsamplers.stream() - .filter(TDigestHistogramFieldDownsampler.class::isInstance) - .toArray(TDigestHistogramFieldDownsampler[]::new); - this.downsampleBucketBuilder = new DownsampleBucketBuilder(fieldDownsamplers, dimensions); + int numericFieldIndex = 0; + this.numericDownsamplers = new NumericMetricFieldDownsampler[fieldCounts.numericFields()]; + int formattedValueFieldIndex = 0; + this.formattedDocValuesDownsamplers = new LastValueFieldDownsampler[fieldCounts.formattedValueFields()]; + int exponentialHistogramFieldIndex = 0; + this.exponentialHistogramDownsamplers = new ExponentialHistogramFieldDownsampler[fieldCounts.exponentialHistogramFields()]; + int tDigestHistogramFieldIndex = 0; + this.tDigestHistogramDownsamplers = new TDigestHistogramFieldDownsampler[fieldCounts.tDigestHistogramFields()]; + int aggregateCounterFieldIndex = 0; + this.aggregateCounterDownsamplers = fieldCounts.aggregateCounterFields() == 0 + ? EMPTY_AGGREGATE_COUNTERS + : new NumericMetricFieldDownsampler.AggregateCounter[fieldCounts.aggregateCounterFields()]; + int dimensionFieldIndex = 0; + DimensionFieldDownsampler[] dimensionDownsamplers = fieldCounts.aggregateCounterFields() == 0 + ? EMPTY_DIMENSIONS_FOR_COUNTER_RESETS + : new DimensionFieldDownsampler[fieldCounts.dimensionFields()]; + + for (AbstractFieldDownsampler fieldDownsampler : fieldDownsamplers) { + switch (fieldDownsampler) { + case NumericMetricFieldDownsampler.AggregateCounter aggregateCounter -> { + assert aggregateCounterFieldIndex < aggregateCounterDownsamplers.length; + aggregateCounterDownsamplers[aggregateCounterFieldIndex++] = aggregateCounter; + } + case NumericMetricFieldDownsampler numericMetricDownsampler -> { + assert numericFieldIndex < numericDownsamplers.length; + numericDownsamplers[numericFieldIndex++] = numericMetricDownsampler; + } + case LastValueFieldDownsampler lastValueDownsampler -> { + assert formattedValueFieldIndex < formattedDocValuesDownsamplers.length; + formattedDocValuesDownsamplers[formattedValueFieldIndex++] = lastValueDownsampler; + if (dimensionDownsamplers.length > 0 + && lastValueDownsampler instanceof DimensionFieldDownsampler dimensionFieldDownsampler) { + assert dimensionFieldIndex < dimensionDownsamplers.length; + dimensionDownsamplers[dimensionFieldIndex++] = dimensionFieldDownsampler; + } + } + case ExponentialHistogramFieldDownsampler exponentialHistogramDownsampler -> { + assert exponentialHistogramFieldIndex < exponentialHistogramDownsamplers.length; + exponentialHistogramDownsamplers[exponentialHistogramFieldIndex++] = exponentialHistogramDownsampler; + } + case TDigestHistogramFieldDownsampler tDigestDownsampler -> { + assert tDigestHistogramFieldIndex < tDigestHistogramDownsamplers.length; + tDigestHistogramDownsamplers[tDigestHistogramFieldIndex++] = tDigestDownsampler; + } + default -> throw new IllegalArgumentException("Unknown field downsampler type: " + fieldDownsampler.getClass()); + } + } + + this.downsampleBucketBuilder = new DownsampleBucketBuilder( + fieldDownsamplers, + aggregateCounterDownsamplers, + dimensionDownsamplers, + dimensions + ); } @Override @@ -405,6 +465,12 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag for (int i = 0; i < tDigestHistogramDownsamplers.length; i++) { tDigestHistogramValues[i] = tDigestHistogramDownsamplers[i].getLeaf(ctx); } + var aggregateCounterValues = new SortedNumericDoubleValues[aggregateCounterDownsamplers.length]; + for (int i = 0; i < aggregateCounterDownsamplers.length; i++) { + aggregateCounterValues[i] = aggregateCounterDownsamplers[i].getLeaf(ctx); + } + // If there are no aggregate counters we can skip fetching the timestamps + var timestampValues = aggregateCounterDownsamplers.length == 0 ? null : timestampValueFetcher.getLeaf(ctx); return new LeafDownsampleCollector( aggCtx, @@ -412,7 +478,9 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag numericValues, formattedDocValues, exponentialHistogramValues, - tDigestHistogramValues + tDigestHistogramValues, + aggregateCounterValues, + timestampValues ); } @@ -430,6 +498,8 @@ class LeafDownsampleCollector extends LeafBucketCollector { final FormattedDocValues[] formattedDocValues; final ExponentialHistogramValuesReader[] exponentialHistogramValues; final HistogramValues[] tDigestHistogramValues; + private final SortedNumericDoubleValues[] aggregateCounterValues; + final SortedNumericLongValues timestampValues; final IntArrayList docIdBuffer = new IntArrayList(DOCID_BUFFER_SIZE); final long timestampBoundStartTime = searchExecutionContext.getIndexSettings().getTimestampBounds().startTime(); @@ -440,7 +510,9 @@ class LeafDownsampleCollector extends LeafBucketCollector { SortedNumericDoubleValues[] numericValues, FormattedDocValues[] formattedDocValues, ExponentialHistogramValuesReader[] exponentialHistogramValues, - HistogramValues[] tDigestHistogramValues + HistogramValues[] tDigestHistogramValues, + SortedNumericDoubleValues[] aggregateCounterValues, + SortedNumericLongValues timestampValues ) { this.aggCtx = aggCtx; this.docCountProvider = docCountProvider; @@ -448,6 +520,8 @@ class LeafDownsampleCollector extends LeafBucketCollector { this.formattedDocValues = formattedDocValues; this.exponentialHistogramValues = exponentialHistogramValues; this.tDigestHistogramValues = tDigestHistogramValues; + this.aggregateCounterValues = aggregateCounterValues; + this.timestampValues = timestampValues; } @Override @@ -484,11 +558,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException { if (tsidChanged || downsampleBucketBuilder.timestamp() != lastHistoTimestamp) { bulkCollection(); - // Flush downsample doc if not empty - if (downsampleBucketBuilder.isEmpty() == false) { - XContentBuilder doc = downsampleBucketBuilder.buildDownsampleDocument(); - indexBucket(doc); - } + flushIfNotEmpty(); // Create new downsample bucket if (tsidChanged) { @@ -522,6 +592,13 @@ void leafBulkCollection() throws IOException { collect(formattedDocValuesDownsamplers, formattedDocValues); collect(exponentialHistogramDownsamplers, exponentialHistogramValues); collect(tDigestHistogramDownsamplers, tDigestHistogramValues); + if (aggregateCounterDownsamplers.length > 0) { + assert timestampValues != null; + long[] timestamps = TimestampValueFetcher.fetch(timestampValues, docIdBuffer); + for (int i = 0; i < aggregateCounterDownsamplers.length; i++) { + aggregateCounterDownsamplers[i].collect(aggregateCounterValues[i], timestamps, docIdBuffer); + } + } docsProcessed += docIdBuffer.size(); task.setDocsProcessed(docsProcessed); @@ -561,6 +638,16 @@ boolean assertTsidAndTimestamp(BytesRef tsidHash, long timestamp) { } } + private void flushIfNotEmpty() throws IOException { + if (downsampleBucketBuilder.isEmpty() == false) { + downsampleBucketBuilder.updateResetDataPoints(); + XContentBuilder downsampleDocument = downsampleBucketBuilder.buildDownsampleDocument(); + indexBucket(downsampleDocument); + + downsampleBucketBuilder.flushResetDocumentsIfNeeded(this::indexBucket); + } + } + private void indexBucket(XContentBuilder doc) { IndexRequestBuilder request = client.prepareIndex(downsampleIndex); request.setSource(doc); @@ -582,10 +669,7 @@ public void preCollection() { public void postCollection() throws IOException { // Flush downsample doc if not empty bulkCollection(); - if (downsampleBucketBuilder.isEmpty() == false) { - XContentBuilder doc = downsampleBucketBuilder.buildDownsampleDocument(); - indexBucket(doc); - } + flushIfNotEmpty(); // check cancel after the flush all data checkCancelled(); @@ -609,31 +693,39 @@ private class DownsampleBucketBuilder { private int tsidOrd = -1; private long timestamp; private int docCount; + private CounterResetDataPoints counterResetDataPoints; + // A list of all the downsamplers so we can reset them before moving on to the next bucket private final List> fieldDownsamplers; - private final DownsampleFieldSerializer[] groupedDownsamplers; - private final String[] dimensions; - - DownsampleBucketBuilder(List> fieldDownsamplers, String[] dimensions) { + // An array of field serializers, each field has one serializer which can group one or more AbstractFieldDownsamplers + private final DownsampleFieldSerializer[] fieldSerializers; + // We track the dimensions and aggregate counter downsamplers separately to serialise the extra counter reset documents + private final DimensionFieldDownsampler[] dimensionDownsamplers; + private final NumericMetricFieldDownsampler.AggregateCounter[] aggregateCounterDownsamplers; + private final boolean legacyDimensions; + + DownsampleBucketBuilder( + List> fieldDownsamplers, + NumericMetricFieldDownsampler.AggregateCounter[] aggregateCounterDownsamplers, + DimensionFieldDownsampler[] dimensionDownsamplers, + String[] dimensions + ) { this.fieldDownsamplers = fieldDownsamplers; - this.dimensions = dimensions; + this.legacyDimensions = dimensions.length == 0; + this.dimensionDownsamplers = dimensionDownsamplers; + this.aggregateCounterDownsamplers = aggregateCounterDownsamplers; /* * The field downsamplers for aggregate_metric_double all share the same name (this is * the name they will be serialized in the target index). We group all field downsamplers by * name. If grouping yields multiple field downsamplers, we delegate serialization to * the AggregateMetricFieldSerializer class. */ - groupedDownsamplers = fieldDownsamplers.stream() - .collect(groupingBy(AbstractFieldDownsampler::name)) - .entrySet() - .stream() - .map(e -> { - if (e.getValue().size() == 1) { - return e.getValue().get(0); - } else { - return new AggregateMetricDoubleFieldDownsampler.Serializer(e.getKey(), e.getValue()); - } - }) - .toArray(DownsampleFieldSerializer[]::new); + fieldSerializers = fieldDownsamplers.stream().collect(groupingBy(AbstractFieldDownsampler::name)).entrySet().stream().map(e -> { + if (e.getValue().size() == 1) { + return e.getValue().get(0); + } else { + return new AggregateMetricDoubleFieldDownsampler.Serializer(e.getKey(), e.getValue()); + } + }).toArray(DownsampleFieldSerializer[]::new); } /** @@ -643,6 +735,10 @@ public void resetTsid(BytesRef tsid, int tsidOrd, long timestamp) { this.tsid = BytesRef.deepCopyOf(tsid); this.tsidOrd = tsidOrd; resetTimestamp(timestamp); + // In case of tsid change, the aggregate counter downsamplers need to reset the previous value + for (int i = 0; i < aggregateCounterDownsamplers.length; i++) { + aggregateCounterDownsamplers[i].tsidReset(); + } } /** @@ -654,6 +750,8 @@ public void resetTimestamp(long timestamp) { for (AbstractFieldDownsampler downsampler : fieldDownsamplers) { downsampler.reset(); } + // We need to clear the extra data points for this bucket + this.counterResetDataPoints = aggregateCounterDownsamplers.length > 0 ? new CounterResetDataPoints() : null; if (logger.isTraceEnabled()) { logger.trace( "New bucket for _tsid: [{}], @timestamp: [{}]", @@ -674,6 +772,14 @@ public void collectDocCount(IntArrayList buffer, DocCountProvider docCountProvid } } + public void updateResetDataPoints() { + if (counterResetDataPoints != null) { + for (int i = 0; i < aggregateCounterDownsamplers.length; i++) { + aggregateCounterDownsamplers[i].updateResetDataPoints(counterResetDataPoints); + } + } + } + public XContentBuilder buildDownsampleDocument() throws IOException { XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE); builder.startObject(); @@ -682,14 +788,64 @@ public XContentBuilder buildDownsampleDocument() throws IOException { return builder; } builder.field(timestampField.name(), timestampFormat.format(timestamp)); - builder.field(DocCountFieldMapper.NAME, docCount); + // We remove the reset documents from the doc count otherwise in every downsample round + // the doc count will re-count the reset documents. + int resetDocCount = counterResetDataPoints == null ? 0 : counterResetDataPoints.countResetDocuments(); + int downsampledDocumentDocCount = docCount - resetDocCount; + assert downsampledDocumentDocCount > 0 : "Reset documents should already be included in the processed document count"; + builder.field(DocCountFieldMapper.NAME, downsampledDocumentDocCount); // Serialize fields - for (DownsampleFieldSerializer fieldDownsampler : groupedDownsamplers) { + for (DownsampleFieldSerializer fieldDownsampler : fieldSerializers) { fieldDownsampler.write(builder); } - if (dimensions.length == 0) { + extractLegacyDimensionsIfNeeded(builder); + + builder.endObject(); + return builder; + } + + public XContentBuilder buildExtraCounterDocument(long timestamp, List> counterValues) throws IOException { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE); + builder.startObject(); + builder.field(timestampField.name(), timestampFormat.format(timestamp)); + + // Serialize fields + for (DimensionFieldDownsampler dimensionFieldDownsampler : dimensionDownsamplers) { + dimensionFieldDownsampler.write(builder); + } + for (Tuple counterValue : counterValues) { + builder.field(counterValue.v1(), counterValue.v2()); + } + + extractLegacyDimensionsIfNeeded(builder); + + builder.endObject(); + return builder; + } + + void flushResetDocumentsIfNeeded(Consumer indexResetDoc) throws IOException { + if (counterResetDataPoints == null || counterResetDataPoints.isEmpty()) { + return; + } + + AtomicReference error = new AtomicReference<>(); + counterResetDataPoints.processDataPoints((timestamp, counterValues) -> { + try { + XContentBuilder resetDoc = buildExtraCounterDocument(timestamp, counterValues); + indexResetDoc.accept(resetDoc); + } catch (IOException e) { + error.set(e); + } + }); + if (error.get() != null) { + throw error.get(); + } + } + + private void extractLegacyDimensionsIfNeeded(XContentBuilder builder) throws IOException { + if (legacyDimensions) { logger.debug("extracting dimensions from legacy tsid"); Map dimensions = (Map) DocValueFormat.TIME_SERIES_ID.format(tsid); for (Map.Entry e : dimensions.entrySet()) { @@ -697,9 +853,6 @@ public XContentBuilder buildDownsampleDocument() throws IOException { builder.field((String) e.getKey(), e.getValue()); } } - - builder.endObject(); - return builder; } public long timestamp() { diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/LastValueFieldDownsampler.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/LastValueFieldDownsampler.java index ea9b593e41723..2b00f1c230404 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/LastValueFieldDownsampler.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/LastValueFieldDownsampler.java @@ -38,11 +38,17 @@ class LastValueFieldDownsampler extends AbstractFieldDownsampler fieldData) { + static LastValueFieldDownsampler create( + String name, + MappedFieldType fieldType, + IndexFieldData fieldData, + DownsamplerCountPerValueType fieldCounts + ) { assert AggregateMetricDoubleFieldDownsampler.supportsFieldType(fieldType) == false && ExponentialHistogramFieldDownsampler.supportsFieldType(fieldType) == false && TDigestHistogramFieldDownsampler.supportsFieldType(fieldType) == false : "field '" + name + "' of type '" + fieldType.typeName() + "' should be processed by a dedicated downsampler"; + fieldCounts.increaseFormattedValueFields(); if ("flattened".equals(fieldType.typeName())) { return new LastValueFieldDownsampler.FlattenedFieldProducer(name, fieldType, fieldData); } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/NumericMetricFieldDownsampler.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/NumericMetricFieldDownsampler.java index 85caa630148d1..a43f8f2ed6890 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/NumericMetricFieldDownsampler.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/NumericMetricFieldDownsampler.java @@ -19,6 +19,8 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; /** * Class that collects all raw values for a numeric metric field and computes its aggregate (downsampled) @@ -26,7 +28,8 @@ * gauge and metric types. */ abstract sealed class NumericMetricFieldDownsampler extends AbstractFieldDownsampler permits - AggregateMetricDoubleFieldDownsampler, NumericMetricFieldDownsampler.AggregateGauge, NumericMetricFieldDownsampler.LastValue { + AggregateMetricDoubleFieldDownsampler, NumericMetricFieldDownsampler.AggregateGauge, NumericMetricFieldDownsampler.LastValue, + NumericMetricFieldDownsampler.AggregateCounter { NumericMetricFieldDownsampler(String name, IndexFieldData fieldData) { super(name, fieldData); @@ -47,14 +50,22 @@ static NumericMetricFieldDownsampler create( String fieldName, MappedFieldType fieldType, IndexFieldData fieldData, - DownsampleConfig.SamplingMethod samplingMethod + DownsampleConfig.SamplingMethod samplingMethod, + DownsamplerCountPerValueType fieldCounts ) { assert supportsFieldType(fieldType) : "only gauges and counters accepted, other metrics should have been handled by dedicated downsamplers"; - if (samplingMethod == DownsampleConfig.SamplingMethod.AGGREGATE && fieldType.getMetricType() == TimeSeriesParams.MetricType.GAUGE) { + TimeSeriesParams.MetricType metricType = fieldType.getMetricType(); + if (samplingMethod == DownsampleConfig.SamplingMethod.LAST_VALUE) { + fieldCounts.increaseNumericFields(); + return new NumericMetricFieldDownsampler.LastValue(fieldName, fieldData); + } + if (metricType == TimeSeriesParams.MetricType.GAUGE) { + fieldCounts.increaseNumericFields(); return new NumericMetricFieldDownsampler.AggregateGauge(fieldName, fieldData); } - return new NumericMetricFieldDownsampler.LastValue(fieldName, fieldData); + fieldCounts.increaseAggregateCounterFields(); + return new NumericMetricFieldDownsampler.AggregateCounter(fieldName, fieldData); } static final double MAX_NO_VALUE = -Double.MAX_VALUE; @@ -163,4 +174,123 @@ public void write(XContentBuilder builder) throws IOException { } } } + + /** + * {@link NumericMetricFieldDownsampler} implementation for creating the required downsampling doc to support a pre-aggregated + * counter metric field. This producer tracks the following: + * - The first value for this counter per tsid and bucket, so it can be stored in the downsampled document. + * - The last-seen timestamp, so it can update the extraDataPoints structure which is shared across all aggregate counter producers. + * Important note: This class assumes that field values are collected and sorted by descending order by time. + */ + static final class AggregateCounter extends NumericMetricFieldDownsampler { + + final Deque resetStack = new ArrayDeque<>(); + double downsampledValue = Double.NaN; + long lastTimestamp = -1; + // Cross bucket value + double previousValue = Double.NaN; + // This value captures the persisted value of the previous bucket for the same tsid and + // allows us to avoid persisting the after-the-reset-document + double previousBucketValue = Double.NaN; + + AggregateCounter(String name, IndexFieldData fieldData) { + super(name, fieldData); + } + + public void collect(SortedNumericDoubleValues counterDocValues, long[] timestamps, IntArrayList docIdBuffer) throws IOException { + assert timestamps.length == docIdBuffer.size() : "timestamps and docIdBuffer should have the same size"; + for (int i = 0; i < docIdBuffer.size(); i++) { + int docId = docIdBuffer.get(i); + var currentTimestamp = timestamps[i]; + if (counterDocValues.advanceExact(docId) == false || currentTimestamp < 0) { + continue; + } + int docValuesCount = counterDocValues.docValueCount(); + assert docValuesCount > 0; + isEmpty = false; + + var currentCounterValue = counterDocValues.nextValue(); + // If this the first time we encounter a value for this tsid + if (Double.isNaN(previousValue)) { + downsampledValue = currentCounterValue; + previousValue = currentCounterValue; + lastTimestamp = currentTimestamp; + continue; + } + + // when we detect a reset, (remember that field values are collected and sorted by descending order by time) + if (currentCounterValue > previousValue) { + // We check if we need to persist the previous value too + // If timestamp -1 means that the previous value is already persisted by a previous bucket, nothing extra to persist + if (lastTimestamp > 0) { + // If we have a previous value in this bucket, we need to see if the last persisted value is enough to capture the + // reset or not. + double lastPersisted = Double.NaN; + if (resetStack.isEmpty() == false) { + lastPersisted = resetStack.peek().value(); + } else if (Double.isNaN(previousBucketValue) == false) { + lastPersisted = previousBucketValue; + } + // If there is no known last persisted value or the last persisted is larger than the current value, + // we need to store the previous document to capture the reset. + if (Double.isNaN(lastPersisted) || Double.compare(currentCounterValue, lastPersisted) < 0) { + resetStack.push(new CounterResetDataPoints.ResetPoint(lastTimestamp, previousValue)); + } + } + // This is the last value before reset, which we always need to persist + resetStack.push(new CounterResetDataPoints.ResetPoint(currentTimestamp, currentCounterValue)); + } + downsampledValue = currentCounterValue; + previousValue = currentCounterValue; + assert lastTimestamp == -1 || currentTimestamp < lastTimestamp; + lastTimestamp = currentTimestamp; + } + } + + public void reset() { + isEmpty = true; + previousBucketValue = downsampledValue; + downsampledValue = Double.NaN; + lastTimestamp = -1; + resetStack.clear(); + } + + public void tsidReset() { + reset(); + previousValue = Double.NaN; + previousBucketValue = Double.NaN; + } + + @Override + public void write(XContentBuilder builder) throws IOException { + if (isEmpty() == false) { + builder.field(name(), downsampledValue); + } + } + + @Override + public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException { + throw new UnsupportedOperationException("This producer should never be called without timestamps"); + } + + /** + * Update {@link CounterResetDataPoints} which contains all reset counter values, + * with the latest reset points of this counter field. + * @param counterResetDataPoints the extra reset data values for every counter for this bucket + */ + public void updateResetDataPoints(CounterResetDataPoints counterResetDataPoints) { + if (resetStack.isEmpty()) { + return; + } + // It is possible that the first reset data point is the same with the first data point + // we skip this if this is the case + var firstResetPoint = resetStack.pop(); + if (firstResetPoint.value() != downsampledValue) { + counterResetDataPoints.addDataPoint(name(), firstResetPoint); + } + while (resetStack.isEmpty() == false) { + counterResetDataPoints.addDataPoint(name(), resetStack.pop()); + } + } + } } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RestDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RestDownsampleAction.java index 14d29f29873db..67987b42bab44 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RestDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RestDownsampleAction.java @@ -32,7 +32,8 @@ public class RestDownsampleAction extends BaseRestHandler { "downsample.multi_field_fix", "downsampling.exponential_histograms", "downsampling.tdigest_histograms", - "downsampling.tdigest" + "downsampling.tdigest", + "downsampling.store_reset_counters" ); @Override diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TimestampValueFetcher.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TimestampValueFetcher.java new file mode 100644 index 0000000000000..3b0bc7f2d1289 --- /dev/null +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TimestampValueFetcher.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.downsample; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.internal.hppc.IntArrayList; +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.SortedNumericLongValues; +import org.elasticsearch.index.fielddata.plain.LeafLongFieldData; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.SearchExecutionContext; + +import java.io.IOException; + +/** + * This class loads the timestamp values for the provided field. + */ +class TimestampValueFetcher { + private final IndexFieldData fieldData; + + TimestampValueFetcher(DateFieldMapper.DateFieldType fieldType, SearchExecutionContext context) { + fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); + } + + SortedNumericLongValues getLeaf(LeafReaderContext context) { + LeafLongFieldData numericFieldData = (LeafLongFieldData) fieldData.load(context); + return numericFieldData.getLongValues(); + } + + static long[] fetch(SortedNumericLongValues timestampDocValues, IntArrayList docIdBuffer) throws IOException { + long[] timestamps = new long[docIdBuffer.size()]; + for (int i = 0; i < docIdBuffer.size(); i++) { + int docId = docIdBuffer.get(i); + if (timestampDocValues.advanceExact(docId) == false) { + timestamps[i] = -1; + } + int docValuesCount = timestampDocValues.docValueCount(); + assert docValuesCount == 1; + timestamps[i] = timestampDocValues.nextValue(); + } + return timestamps; + } +} diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/AggregateCounterFieldDownsamplerTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/AggregateCounterFieldDownsamplerTests.java new file mode 100644 index 0000000000000..3c138d1a5a875 --- /dev/null +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/AggregateCounterFieldDownsamplerTests.java @@ -0,0 +1,300 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.downsample; + +import org.apache.lucene.internal.hppc.IntArrayList; +import org.apache.lucene.internal.hppc.IntDoubleHashMap; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.List; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; + +public class AggregateCounterFieldDownsamplerTests extends ESTestCase { + + /** + * Monotonically increasing counter with no resets within a single bucket. + * Downsampled doc: 1 + */ + public void testAggregateCounter() throws IOException { + CounterResetDataPoints resetDataPoints = new CounterResetDataPoints(); + NumericMetricFieldDownsampler.AggregateCounter producer = new NumericMetricFieldDownsampler.AggregateCounter("my-counter", null); + IntArrayList docIdBuffer = IntArrayList.from(6, 5, 4, 3, 2, 1, 0); + long[] timeValues = new long[] { 70, 60, 50, 40, 30, 20, 10 }; + SortedNumericDoubleValues counterValues = createNumericValuesInstance(docIdBuffer, 64, 32, 16, 8, 4, 2, 1); + producer.collect(counterValues, timeValues, docIdBuffer); + producer.updateResetDataPoints(resetDataPoints); + assertThat(producer.downsampledValue, equalTo(1.0)); + assertThat(resetDataPoints.isEmpty(), equalTo(true)); + producer.reset(); + assertThat(producer.downsampledValue, equalTo(Double.NaN)); + assertThat(producer.previousValue, equalTo(1.0)); + assertThat(producer.lastTimestamp, equalTo(-1L)); + producer.tsidReset(); + assertThat(producer.previousValue, equalTo(Double.NaN)); + } + + /** + * Single reset within a bucket. The last-before-reset value (16 at t=50) and the after-reset + * value (5 at t=60) are both stored as reset data points. + * Downsampled doc: 1 + * Reset docs: 16 at 50, 5 at 60 + */ + public void testAggregateCounterWithReset() throws IOException { + CounterResetDataPoints resetDataPoints = new CounterResetDataPoints(); + NumericMetricFieldDownsampler.AggregateCounter producer = new NumericMetricFieldDownsampler.AggregateCounter("my-counter", null); + IntArrayList docIdBuffer = IntArrayList.from(6, 5, 4, 3, 2, 1, 0); + long[] timeValues = new long[] { 70, 60, 50, 40, 30, 20, 10 }; + SortedNumericDoubleValues counterValues = createNumericValuesInstance(docIdBuffer, 8, 5, 16, 8, 4, 2, 1); + producer.collect(counterValues, timeValues, docIdBuffer); + producer.updateResetDataPoints(resetDataPoints); + assertThat(producer.downsampledValue, equalTo(1.0)); + assertThat(resetDataPoints.countResetDocuments(), equalTo(2)); + resetDataPoints.processDataPoints((timestamp, dataPoints) -> { + assertThat(timestamp, anyOf(equalTo(60L), equalTo(50L))); + if (timestamp == 60L) { + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 5.0)))); + } + if (timestamp == 50L) { + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 16.0)))); + } + }); + producer.reset(); + assertThat(producer.downsampledValue, equalTo(Double.NaN)); + assertThat(producer.previousValue, equalTo(1.0)); + assertThat(producer.lastTimestamp, equalTo(-1L)); + producer.tsidReset(); + assertThat(producer.previousValue, equalTo(Double.NaN)); + } + + /** + * Counter with a reset where the last-before-reset value (1) is also the earliest value in + * the bucket and equals the downsampled value. Only the after-reset value (0 at t=20) is + * stored as a reset data point; the before-reset value is not duplicated. + * Downsampled doc: 1 + * Reset docs: 0 at 20 + */ + public void testAggregateCounterDoesNotDuplicateFirstValue() throws IOException { + CounterResetDataPoints resetDataPoints = new CounterResetDataPoints(); + NumericMetricFieldDownsampler.AggregateCounter producer = new NumericMetricFieldDownsampler.AggregateCounter("my-counter", null); + IntArrayList docIdBuffer = IntArrayList.from(2, 1, 0); + long[] timeValues = new long[] { 30, 20, 10 }; + SortedNumericDoubleValues counterValues = createNumericValuesInstance(docIdBuffer, 7, 0, 1); + producer.collect(counterValues, timeValues, docIdBuffer); + producer.updateResetDataPoints(resetDataPoints); + assertThat(producer.downsampledValue, equalTo(1.0)); + assertThat(resetDataPoints.countResetDocuments(), equalTo(1)); + resetDataPoints.processDataPoints((timestamp, dataPoints) -> { + assertThat(timestamp, equalTo(20L)); + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 0.0)))); + }); + producer.reset(); + assertThat(producer.downsampledValue, equalTo(Double.NaN)); + assertThat(producer.previousValue, equalTo(1.0)); + assertThat(producer.lastTimestamp, equalTo(-1L)); + producer.tsidReset(); + assertThat(producer.previousValue, equalTo(Double.NaN)); + } + + /** + * Two resets within a single bucket where the last-before-reset value of the earlier reset (8) + * is larger than the last-before-reset value of the later reset (5, which is also the most + * recently persisted reset point). This means the after-reset value (3) is redundant and does + * NOT get stored as a separate reset data point. + * Downsampled doc: 1 + * Reset docs: 8 at 40, 5 at 60, 2 at 70 + */ + public void testAggregateCounterWithMultipleResetsLastBeforeResetLarger() throws IOException { + CounterResetDataPoints resetDataPoints = new CounterResetDataPoints(); + NumericMetricFieldDownsampler.AggregateCounter producer = new NumericMetricFieldDownsampler.AggregateCounter("my-counter", null); + IntArrayList docIdBuffer = IntArrayList.from(7, 6, 5, 4, 3, 2, 1, 0); + long[] timeValues = new long[] { 80, 70, 60, 50, 40, 30, 20, 10 }; + SortedNumericDoubleValues counterValues = createNumericValuesInstance(docIdBuffer, 4, 2, 5, 3, 8, 4, 2, 1); + producer.collect(counterValues, timeValues, docIdBuffer); + producer.updateResetDataPoints(resetDataPoints); + assertThat(producer.downsampledValue, equalTo(1.0)); + assertThat(resetDataPoints.countResetDocuments(), equalTo(3)); + resetDataPoints.processDataPoints((timestamp, dataPoints) -> { + assertThat(timestamp, anyOf(equalTo(40L), equalTo(60L), equalTo(70L))); + if (timestamp == 40L) { + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 8.0)))); + } + if (timestamp == 60L) { + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 5.0)))); + } + if (timestamp == 70L) { + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 2.0)))); + } + }); + producer.reset(); + assertThat(producer.downsampledValue, equalTo(Double.NaN)); + assertThat(producer.previousValue, equalTo(1.0)); + assertThat(producer.lastTimestamp, equalTo(-1L)); + producer.tsidReset(); + assertThat(producer.previousValue, equalTo(Double.NaN)); + } + + /** + * Two resets within a single bucket where the last-before-reset value of the earlier reset (4) + * is smaller than the last-before-reset value of the later reset (5, which is also the most + * recently persisted reset point). This means the after-reset value (3) is NOT redundant and + * gets stored as a separate reset data point. + * Downsampled doc: 1 + * Reset docs: 4 at 30, 3 at 40, 5 at 50, 2 at 60 + */ + public void testAggregateCounterWithMultipleResetsLastBeforeResetSmaller() throws IOException { + CounterResetDataPoints resetDataPoints = new CounterResetDataPoints(); + NumericMetricFieldDownsampler.AggregateCounter producer = new NumericMetricFieldDownsampler.AggregateCounter("my-counter", null); + IntArrayList docIdBuffer = IntArrayList.from(6, 5, 4, 3, 2, 1, 0); + long[] timeValues = new long[] { 70, 60, 50, 40, 30, 20, 10 }; + SortedNumericDoubleValues counterValues = createNumericValuesInstance(docIdBuffer, 4, 2, 5, 3, 4, 2, 1); + producer.collect(counterValues, timeValues, docIdBuffer); + producer.updateResetDataPoints(resetDataPoints); + assertThat(producer.downsampledValue, equalTo(1.0)); + assertThat(resetDataPoints.countResetDocuments(), equalTo(4)); + resetDataPoints.processDataPoints((timestamp, dataPoints) -> { + assertThat(timestamp, anyOf(equalTo(30L), equalTo(40L), equalTo(50L), equalTo(60L))); + if (timestamp == 30L) { + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 4.0)))); + } + if (timestamp == 40L) { + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 3.0)))); + } + if (timestamp == 50L) { + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 5.0)))); + } + if (timestamp == 60L) { + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 2.0)))); + } + }); + producer.reset(); + assertThat(producer.downsampledValue, equalTo(Double.NaN)); + assertThat(producer.previousValue, equalTo(1.0)); + assertThat(producer.lastTimestamp, equalTo(-1L)); + producer.tsidReset(); + assertThat(producer.previousValue, equalTo(Double.NaN)); + } + + /** + * Two buckets processed in reverse time order. Bucket #2 (t=50-70) has monotonically + * increasing values 4, 5, 6 with no resets. Bucket #1 (t=10-40) has values 7, 8, 0, 2 + * with a reset at t=30. Both the last-before-reset value (8 at t=20) and the after-reset + * value (0 at t=30) are added as there is no other bucket information for the same tsid. + * Downsampled docs: 7, 4 + * Reset docs: 8 at 20, 0 at 30 + */ + public void testAggregateCounterDoesNotAddNotRedundantValue() throws IOException { + CounterResetDataPoints resetDataPoints = new CounterResetDataPoints(); + NumericMetricFieldDownsampler.AggregateCounter producer = new NumericMetricFieldDownsampler.AggregateCounter("my-counter", null); + // Bucket #2 + IntArrayList docIdBuffer = IntArrayList.from(6, 5, 4); + long[] timeValues = new long[] { 70, 60, 50 }; + SortedNumericDoubleValues counterValues = createNumericValuesInstance(docIdBuffer, 6, 5, 4); + producer.collect(counterValues, timeValues, docIdBuffer); + producer.updateResetDataPoints(resetDataPoints); + assertThat(producer.downsampledValue, equalTo(4.0)); + assertThat(resetDataPoints.isEmpty(), equalTo(true)); + producer.reset(); + + // Bucket #1 + docIdBuffer = IntArrayList.from(3, 2, 1, 0); + timeValues = new long[] { 40, 30, 20, 10 }; + counterValues = createNumericValuesInstance(docIdBuffer, 2, 0, 8, 7); + producer.collect(counterValues, timeValues, docIdBuffer); + resetDataPoints = new CounterResetDataPoints(); + producer.updateResetDataPoints(resetDataPoints); + assertThat(producer.downsampledValue, equalTo(7.0)); + assertThat(resetDataPoints.countResetDocuments(), equalTo(1)); + resetDataPoints.processDataPoints((timestamp, dataPoints) -> { + assertThat(timestamp, equalTo(20L)); + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 8.0)))); + }); + producer.reset(); + assertThat(producer.downsampledValue, equalTo(Double.NaN)); + assertThat(producer.previousValue, equalTo(7.0)); + assertThat(producer.lastTimestamp, equalTo(-1L)); + producer.tsidReset(); + assertThat(producer.previousValue, equalTo(Double.NaN)); + } + + /** + * Two buckets with 2 different tsids. Bucket tsid_2 has monotonically increasing values + * with no resets. Bucket tsid_2 has values 7, 8, 0, 2 with a reset at t=30. Only the + * last-before-reset value (8 at t=20) is stored as a reset data point; the after-reset + * value (0 at t=30) is not added as it would be redundant. + * Downsampled docs: 7, 4 + * Reset docs: 8 at 20, 0 at 30 + */ + public void testAggregateCounterResetsWhenTsidChanges() throws IOException { + CounterResetDataPoints resetDataPoints = new CounterResetDataPoints(); + NumericMetricFieldDownsampler.AggregateCounter producer = new NumericMetricFieldDownsampler.AggregateCounter("my-counter", null); + // Bucket tsid_2 + IntArrayList docIdBuffer = IntArrayList.from(6, 5, 4); + long[] timeValues = new long[] { 40, 20, 10 }; + SortedNumericDoubleValues counterValues = createNumericValuesInstance(docIdBuffer, 6, 5, 4); + producer.collect(counterValues, timeValues, docIdBuffer); + producer.updateResetDataPoints(resetDataPoints); + assertThat(producer.downsampledValue, equalTo(4.0)); + assertThat(resetDataPoints.isEmpty(), equalTo(true)); + producer.tsidReset(); + + // Bucket tsid_1 + docIdBuffer = IntArrayList.from(3, 2, 1, 0); + timeValues = new long[] { 40, 30, 20, 10 }; + counterValues = createNumericValuesInstance(docIdBuffer, 2, 0, 8, 7); + producer.collect(counterValues, timeValues, docIdBuffer); + resetDataPoints = new CounterResetDataPoints(); + producer.updateResetDataPoints(resetDataPoints); + assertThat(producer.downsampledValue, equalTo(7.0)); + assertThat(resetDataPoints.countResetDocuments(), equalTo(2)); + resetDataPoints.processDataPoints((timestamp, dataPoints) -> { + assertThat(timestamp, anyOf(equalTo(20L), equalTo(30L))); + if (timestamp == 20L) { + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 8.0)))); + } + if (timestamp == 30L) { + assertThat(dataPoints, equalTo(List.of(Tuple.tuple("my-counter", 0.0)))); + } + }); + producer.reset(); + assertThat(producer.downsampledValue, equalTo(Double.NaN)); + assertThat(producer.previousValue, equalTo(7.0)); + assertThat(producer.lastTimestamp, equalTo(-1L)); + producer.tsidReset(); + assertThat(producer.previousValue, equalTo(Double.NaN)); + } + + static SortedNumericDoubleValues createNumericValuesInstance(IntArrayList docIdBuffer, double... values) { + return new SortedNumericDoubleValues() { + + final IntDoubleHashMap docIdToValue = IntDoubleHashMap.from(docIdBuffer.toArray(), values); + + int currentDocId = -1; + + @Override + public boolean advanceExact(int target) { + currentDocId = target; + return docIdToValue.containsKey(target); + } + + @Override + public double nextValue() { + return docIdToValue.get(currentDocId); + } + + @Override + public int docValueCount() { + return 1; + } + }; + } +} diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/CounterResetDataPointsTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/CounterResetDataPointsTests.java new file mode 100644 index 0000000000000..380995bab415d --- /dev/null +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/CounterResetDataPointsTests.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.downsample; + +import org.elasticsearch.core.Tuple; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; + +public class CounterResetDataPointsTests extends ESTestCase { + + public void testEmptyByDefault() { + CounterResetDataPoints dataPoints = new CounterResetDataPoints(); + assertThat(dataPoints.isEmpty(), equalTo(true)); + assertThat(dataPoints.countResetDocuments(), equalTo(0)); + } + + public void testAddSingleDataPoint() { + CounterResetDataPoints dataPoints = new CounterResetDataPoints(); + dataPoints.addDataPoint( + "counter_a", + new CounterResetDataPoints.ResetPoint(randomLongBetween(100, 10000), randomIntBetween(0, 10000)) + ); + + assertThat(dataPoints.isEmpty(), equalTo(false)); + assertThat(dataPoints.countResetDocuments(), equalTo(1)); + } + + public void testAddMultipleDataPointsAtDifferentTimestamps() { + CounterResetDataPoints dataPoints = new CounterResetDataPoints(); + dataPoints.addDataPoint("counter_a", new CounterResetDataPoints.ResetPoint(100L, randomIntBetween(0, 10000))); + dataPoints.addDataPoint("counter_a", new CounterResetDataPoints.ResetPoint(200L, randomIntBetween(0, 10000))); + dataPoints.addDataPoint("counter_a", new CounterResetDataPoints.ResetPoint(300L, randomIntBetween(0, 10000))); + + assertThat(dataPoints.countResetDocuments(), equalTo(3)); + } + + public void testAddMultipleCountersAtSameTimestamp() { + CounterResetDataPoints dataPoints = new CounterResetDataPoints(); + long timestamp = randomLongBetween(100, 10000); + dataPoints.addDataPoint("counter_a", new CounterResetDataPoints.ResetPoint(timestamp, 10.0)); + dataPoints.addDataPoint("counter_b", new CounterResetDataPoints.ResetPoint(timestamp, 20.0)); + + assertThat(dataPoints.countResetDocuments(), equalTo(1)); + + dataPoints.processDataPoints((t, values) -> { + assertThat(t, equalTo(timestamp)); + assertThat(values, hasItem(Tuple.tuple("counter_a", 10.0))); + assertThat(values, hasItem(Tuple.tuple("counter_b", 20.0))); + }); + } + + public void testProcessDataPointsOnEmpty() { + CounterResetDataPoints dataPoints = new CounterResetDataPoints(); + List visited = new ArrayList<>(); + dataPoints.processDataPoints((timestamp, values) -> visited.add(timestamp)); + + assertThat(visited, hasSize(0)); + } + + public void testProcessDataPointsVisitsAllTimestamps() { + CounterResetDataPoints dataPoints = new CounterResetDataPoints(); + dataPoints.addDataPoint("c1", new CounterResetDataPoints.ResetPoint(10L, 1.0)); + dataPoints.addDataPoint("c2", new CounterResetDataPoints.ResetPoint(20L, 2.0)); + dataPoints.addDataPoint("c3", new CounterResetDataPoints.ResetPoint(30L, 3.0)); + + List timestamps = new ArrayList<>(); + dataPoints.processDataPoints((timestamp, values) -> timestamps.add(timestamp)); + + assertThat(timestamps, hasSize(3)); + assertTrue(timestamps.contains(10L)); + assertTrue(timestamps.contains(20L)); + assertTrue(timestamps.contains(30L)); + } + + public void testProcessDataPointsWithMixedCountersAndTimestamps() { + CounterResetDataPoints dataPoints = new CounterResetDataPoints(); + dataPoints.addDataPoint("cpu", new CounterResetDataPoints.ResetPoint(100L, 50.0)); + dataPoints.addDataPoint("mem", new CounterResetDataPoints.ResetPoint(100L, 80.0)); + dataPoints.addDataPoint("cpu", new CounterResetDataPoints.ResetPoint(200L, 5.0)); + + assertThat(dataPoints.countResetDocuments(), equalTo(2)); + + dataPoints.processDataPoints((timestamp, values) -> { + if (timestamp == 100L) { + assertThat(values, hasItems(Tuple.tuple("cpu", 50.0), Tuple.tuple("mem", 80.0))); + } else if (timestamp == 200L) { + assertThat(values, equalTo(List.of(Tuple.tuple("cpu", 5.0)))); + } else { + fail("unexpected timestamp: " + timestamp); + } + }); + } +} diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index 7f6c521edfdaf..7b2f13d891a91 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -112,7 +112,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -1377,38 +1376,36 @@ private void assertDownsampleIndexAggregations( Map downsampleHitDocumentFields = downsampleHit.getDocumentFields(); List originalFields = originalHitDocumentFields.values().stream().toList(); List downsampleFields = downsampleHitDocumentFields.values().stream().toList(); - List originalFieldsList = originalFields.stream().flatMap(x -> x.getValues().stream()).toList(); - List downsampelFieldsList = downsampleFields.stream().flatMap(x -> x.getValues().stream()).toList(); - if (originalFieldsList.isEmpty() == false && downsampelFieldsList.isEmpty() == false) { - // NOTE: here we take advantage of the fact that a label field is indexed also as a metric of type - // `counter`. This way we can actually check that the label value stored in the downsample index - // is the last value (which is what we store for a metric of type counter) by comparing the metric - // field value to the label field value. - originalFieldsList.forEach( - field -> assertTrue( - "Field [" + field + "] is not included in the downsample fields: " + downsampelFieldsList, - downsampelFieldsList.contains(field) - ) - ); - downsampelFieldsList.forEach( - field -> assertTrue( - "Field [" + field + "] is not included in the source fields: " + originalFieldsList, - originalFieldsList.contains(field) - ) - ); - String labelName = originalHit.getDocumentFields().values().stream().findFirst().get().getName(); - Object originalLabelValue = originalHit.getDocumentFields().values().stream().findFirst().get().getValue(); - Object downsampleLabelValue = downsampleHit.getDocumentFields().values().stream().findFirst().get().getValue(); - Optional labelAsMetric = topHitsOriginalAggregations.stream() - .filter(agg -> agg.getName().equals("metric_" + downsampleTopHits.getName())) - .findFirst(); - // NOTE: this check is possible only if the label can be indexed as a metric (the label is a numeric field) - if (labelAsMetric.isPresent()) { - double metricValue = ((InternalTopHits) labelAsMetric.get()).getHits().getHits()[0].field( - "metric_" + labelName - ).getValue(); - assertEquals(metricValue, downsampleLabelValue); - assertEquals(metricValue, originalLabelValue); + if (originalFields.isEmpty() == false && downsampleFields.isEmpty() == false) { + assertEquals(originalFields.size(), downsampleFields.size()); + for (int fieldIndex = 0; fieldIndex < originalFields.size(); fieldIndex++) { + DocumentField originalField = originalFields.get(fieldIndex); + DocumentField downsampleField = downsampleFields.get(fieldIndex); + assertEquals(originalField.getName(), downsampleField.getName()); + originalField.getValues() + .forEach( + value -> assertTrue( + "Field [" + + originalField.getName() + + "] does not have the value [" + + value + + "] in the downsample field values: " + + downsampleField.getValues(), + downsampleField.getValues().contains(value) + ) + ); + downsampleField.getValues() + .forEach( + value -> assertTrue( + "Field [" + + downsampleField.getName() + + "] does not have the value [" + + value + + "] in the source field values: " + + originalField.getValues(), + originalField.getValues().contains(value) + ) + ); } } } @@ -1573,6 +1570,11 @@ private AggregationBuilder buildAggregations( SortBuilders.fieldSort(timestampField).order(SortOrder.DESC) ).size(1).fetchField(fieldName) ); + case "first_value" -> dateHistogramAggregation.subAggregation( + new TopHitsAggregationBuilder(fieldName + "_" + supportedAggregation).sort( + SortBuilders.fieldSort(timestampField).order(SortOrder.ASC) + ).size(1).fetchField(fieldName) + ); case "sum" -> dateHistogramAggregation.subAggregation( new SumAggregationBuilder(fieldName + "_" + supportedAggregation).field(fieldName) ); @@ -1599,7 +1601,10 @@ private String[] supportedAggs(TimeSeriesParams.MetricType metricType, Downsampl if (samplingMethod == DownsampleConfig.SamplingMethod.LAST_VALUE) { return new String[] { "last_value" }; } else { - return metricType.supportedAggs(); + if (metricType == TimeSeriesParams.MetricType.GAUGE) { + return metricType.supportedAggs(); + } + return new String[] { "first_value" }; } } diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/LastValueFieldDownsamplerTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/LastValueFieldDownsamplerTests.java index 9eb6d6948021c..05bb319696b68 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/LastValueFieldDownsamplerTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/LastValueFieldDownsamplerTests.java @@ -122,9 +122,11 @@ public Object nextValue() { } public void testFlattenedLastValueFieldDownsampler() throws IOException { - var downsampler = LastValueFieldDownsampler.create("dummy", createDummyFlattenedFieldType(), null); + AbstractFieldDownsampler.DownsamplerCountPerValueType fieldCounts = new AbstractFieldDownsampler.DownsamplerCountPerValueType(); + var downsampler = LastValueFieldDownsampler.create("dummy", createDummyFlattenedFieldType(), null, fieldCounts); assertTrue(downsampler.isEmpty()); assertEquals("dummy", downsampler.name()); + assertEquals(1, fieldCounts.formattedValueFields()); var bytes = List.of("a\0value_a", "b\0value_b", "c\0value_c", "d\0value_d"); var docValues = new FormattedDocValues() {