diff --git a/docs/changelog/82410.yaml b/docs/changelog/82410.yaml new file mode 100644 index 0000000000000..b21ebe77f30a3 --- /dev/null +++ b/docs/changelog/82410.yaml @@ -0,0 +1,5 @@ +pr: 82410 +summary: Add an aggregator for IPv4 and IPv6 subnets +area: Aggregations +type: enhancement +issues: [] diff --git a/docs/reference/aggregations/bucket.asciidoc b/docs/reference/aggregations/bucket.asciidoc index dfdaca18e6cfb..88fe92c27f9b3 100644 --- a/docs/reference/aggregations/bucket.asciidoc +++ b/docs/reference/aggregations/bucket.asciidoc @@ -46,6 +46,8 @@ include::bucket/global-aggregation.asciidoc[] include::bucket/histogram-aggregation.asciidoc[] +include::bucket/ipprefix-aggregation.asciidoc[] + include::bucket/iprange-aggregation.asciidoc[] include::bucket/missing-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/bucket/ipprefix-aggregation.asciidoc b/docs/reference/aggregations/bucket/ipprefix-aggregation.asciidoc new file mode 100644 index 0000000000000..2dee6654869f7 --- /dev/null +++ b/docs/reference/aggregations/bucket/ipprefix-aggregation.asciidoc @@ -0,0 +1,387 @@ +[[search-aggregations-bucket-ipprefix-aggregation]] +=== IP prefix aggregation +++++ +IP prefix +++++ + +A bucket aggregation that groups documents based on the network or sub-network of an IP address. An IP address consists of two groups of bits: the most significant bits which represent the network prefix, and the least significant bits which represent the host. + +[[ipprefix-agg-ex]] +==== Example + +For example, consider the following index: +[source,console] +---------------------------------------------- +PUT network-traffic +{ + "mappings": { + "properties": { + "ipv4": { "type": "ip" }, + "ipv6": { "type": "ip" } + } + } +} + +POST /network-traffic/_bulk?refresh +{"index":{"_id":0}} +{"ipv4":"192.168.1.10","ipv6":"2001:db8:a4f8:112a:6001:0:12:7f10"} +{"index":{"_id":1}} +{"ipv4":"192.168.1.12","ipv6":"2001:db8:a4f8:112a:6001:0:12:7f12"} +{"index":{"_id":2}} +{ "ipv4":"192.168.1.33","ipv6":"2001:db8:a4f8:112a:6001:0:12:7f33"} +{"index":{"_id":3}} +{"ipv4":"192.168.1.10","ipv6":"2001:db8:a4f8:112a:6001:0:12:7f10"} +{"index":{"_id":4}} +{"ipv4":"192.168.2.41","ipv6":"2001:db8:a4f8:112c:6001:0:12:7f41"} +{"index":{"_id":5}} +{"ipv4":"192.168.2.10","ipv6":"2001:db8:a4f8:112c:6001:0:12:7f10"} +{"index":{"_id":6}} +{"ipv4":"192.168.2.23","ipv6":"2001:db8:a4f8:112c:6001:0:12:7f23"} +{"index":{"_id":7}} +{"ipv4":"192.168.3.201","ipv6":"2001:db8:a4f8:114f:6001:0:12:7201"} +{"index":{"_id":8}} +{"ipv4":"192.168.3.107","ipv6":"2001:db8:a4f8:114f:6001:0:12:7307"} +---------------------------------------------- +// TESTSETUP + +The following aggregation groups documents into buckets. Each bucket identifies a different sub-network. The sub-network is calculated by applying a netmask with prefix length of `24` to each IP address in the `ipv4` field: + +[source,console,id=ip-prefix-ipv4-example] +-------------------------------------------------- +GET /network-traffic/_search +{ + "size": 0, + "aggs": { + "ipv4-subnets": { + "ip_prefix": { + "field": "ipv4", + "prefix_length": 24 + } + } + } +} +-------------------------------------------------- +// TEST + +Response: + +[source,console-result] +-------------------------------------------------- +{ + ... + + "aggregations": { + "ipv4-subnets": { + "buckets": [ + { + "key": "192.168.1.0", + "is_ipv6": false, + "doc_count": 4, + "prefix_length": 24, + "netmask": "255.255.255.0" + }, + { + "key": "192.168.2.0", + "is_ipv6": false, + "doc_count": 3, + "prefix_length": 24, + "netmask": "255.255.255.0" + }, + { + "key": "192.168.3.0", + "is_ipv6": false, + "doc_count": 2, + "prefix_length": 24, + "netmask": "255.255.255.0" + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + +To aggregate IPv6 addresses, set `is_ipv6` to `true`. + +[source,console,id=ip-prefix-ipv6-example] +-------------------------------------------------- +GET /network-traffic/_search +{ + "size": 0, + "aggs": { + "ipv6-subnets": { + "ip_prefix": { + "field": "ipv6", + "prefix_length": 64, + "is_ipv6": true + } + } + } +} +-------------------------------------------------- +// TEST + +If `is_ipv6` is `true`, the response doesn't include a `netmask` for each bucket. + +[source,console-result] +-------------------------------------------------- +{ + ... + + "aggregations": { + "ipv6-subnets": { + "buckets": [ + { + "key": "2001:db8:a4f8:112a::", + "is_ipv6": true, + "doc_count": 4, + "prefix_length": 64 + }, + { + "key": "2001:db8:a4f8:112c::", + "is_ipv6": true, + "doc_count": 3, + "prefix_length": 64 + }, + { + "key": "2001:db8:a4f8:114f::", + "is_ipv6": true, + "doc_count": 2, + "prefix_length": 64 + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + +[role="child_attributes"] +[[ip-prefix-agg-params]] +==== Parameters + +`field`:: +(Required, string) +The document IP address field to aggregate on. The field mapping type must be <>. + +`prefix_length`:: +(Required, integer) +Length of the network prefix. For IPv4 addresses, the accepted range is `[0, 32]`. For IPv6 addresses, the accepted range is `[0, 128]`. + +`is_ipv6`:: +(Optional, boolean) +Defines whether the prefix applies to IPv6 addresses. Just specifying the `prefix_length` parameter is not enough to know if an IP prefix applies to IPv4 or IPv6 addresses. Defaults to `false`. + +`append_prefix_length`:: +(Optional, boolean) +Defines whether the prefix length is appended to IP address keys in the response. Defaults to `false`. + +`keyed`:: +(Optional, boolean) +Defines whether buckets are returned as a hash rather than an array in the response. Defaults to `false`. + +`min_doc_count`:: +(Optional, integer) +Defines the minimum number of documents for buckets to be included in the response. Defaults to `1`. + + +[[ipprefix-agg-response]] +==== Response body + +`key`:: +(string) +The IPv6 or IPv4 subnet. + +`prefix_length`:: +(integer) +The length of the prefix used to aggregate the bucket. + +`doc_count`:: +(integer) +Number of documents matching a specific IP prefix. + +`is_ipv6`:: +(boolean) +Defines whether the netmask is an IPv6 netmask. + +`netmask`:: +(string) +The IPv4 netmask. If `is_ipv6` is `true` in the request, this field is missing in the response. + +[[ipprefix-agg-keyed-response]] +==== Keyed Response + +Set the `keyed` flag of `true` to associate an unique IP address key with each bucket and return sub-networks as a hash rather than an array. + +Example: + +[source,console,id=ip-prefix-keyed-example] +-------------------------------------------------- +GET /network-traffic/_search +{ + "size": 0, + "aggs": { + "ipv4-subnets": { + "ip_prefix": { + "field": "ipv4", + "prefix_length": 24, + "keyed": true + } + } + } +} +-------------------------------------------------- +// TEST + +Response: + +[source,console-result] +-------------------------------------------------- +{ + ... + + "aggregations": { + "ipv4-subnets": { + "buckets": { + "192.168.1.0": { + "is_ipv6": false, + "doc_count": 4, + "prefix_length": 24, + "netmask": "255.255.255.0" + }, + "192.168.2.0": { + "is_ipv6": false, + "doc_count": 3, + "prefix_length": 24, + "netmask": "255.255.255.0" + }, + "192.168.3.0": { + "is_ipv6": false, + "doc_count": 2, + "prefix_length": 24, + "netmask": "255.255.255.0" + } + } + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + +[[ipprefix-agg-append-prefix-length]] +==== Append the prefix length to the IP address key + +Set the `append_prefix_length` flag to `true` to catenate IP address keys with the prefix length of the sub-network. + +Example: + +[source,console,id=ip-prefix-append-prefix-len-example] +-------------------------------------------------- +GET /network-traffic/_search +{ + "size": 0, + "aggs": { + "ipv4-subnets": { + "ip_prefix": { + "field": "ipv4", + "prefix_length": 24, + "append_prefix_length": true + } + } + } +} +-------------------------------------------------- +// TEST + +Response: + +[source,console-result] +-------------------------------------------------- +{ + ... + + "aggregations": { + "ipv4-subnets": { + "buckets": [ + { + "key": "192.168.1.0/24", + "is_ipv6": false, + "doc_count": 4, + "prefix_length": 24, + "netmask": "255.255.255.0" + }, + { + "key": "192.168.2.0/24", + "is_ipv6": false, + "doc_count": 3, + "prefix_length": 24, + "netmask": "255.255.255.0" + }, + { + "key": "192.168.3.0/24", + "is_ipv6": false, + "doc_count": 2, + "prefix_length": 24, + "netmask": "255.255.255.0" + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + +[[ipprefix-agg-min-doc-count]] +==== Minimum document count + +Use the `min_doc_count` parameter to only return buckets with a minimum number of documents. + +[source,console,id=ip-prefix-min-doc-count-example] +-------------------------------------------------- +GET /network-traffic/_search +{ + "size": 0, + "aggs": { + "ipv4-subnets": { + "ip_prefix": { + "field": "ipv4", + "prefix_length": 24, + "min_doc_count": 3 + } + } + } +} +-------------------------------------------------- +// TEST + +Response: + +[source,console-result] +-------------------------------------------------- +{ + ... + + "aggregations": { + "ipv4-subnets": { + "buckets": [ + { + "key": "192.168.1.0", + "is_ipv6": false, + "doc_count": 4, + "prefix_length": 24, + "netmask": "255.255.255.0" + }, + { + "key": "192.168.2.0", + "is_ipv6": false, + "doc_count": 3, + "prefix_length": 24, + "netmask": "255.255.255.0" + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/450_ip_prefix.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/450_ip_prefix.yml new file mode 100644 index 0000000000000..0c1d09b2e770f --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/450_ip_prefix.yml @@ -0,0 +1,485 @@ +setup: + - do: + indices.create: + index: test + body: + settings: + number_of_replicas: 0 + mappings: + properties: + ipv4: + type: ip + ipv6: + type: ip + ip: + type: ip + value: + type: long + + - do: + bulk: + index: test + refresh: true + body: + - { "index": { } } + - { "ipv4": "192.168.1.10", "ipv6": "2001:db8:a4f8:112a:6001:0:12:7f10", "value": 10, ip: "192.168.1.10" } + - { "index": { } } + - { "ipv4": "192.168.1.12", "ipv6": "2001:db8:a4f8:112a:6001:0:12:7f12", "value": 20, ip: "2001:db8:a4f8:112a:6001:0:12:7f12" } + - { "index": { } } + - { "ipv4": "192.168.1.33", "ipv6": "2001:db8:a4f8:112a:6001:0:12:7f33", "value": 40, ip: "192.168.1.33" } + - { "index": { } } + - { "ipv4": "192.168.1.10", "ipv6": "2001:db8:a4f8:112a:6001:0:12:7f10", "value": 20, ip: "2001:db8:a4f8:112a:6001:0:12:7f10" } + - { "index": { } } + - { "ipv4": "192.168.1.33", "ipv6": "2001:db8:a4f8:112a:6001:0:12:7f33", "value": 70, ip: "192.168.1.33" } + - { "index": { } } + - { "ipv4": "192.168.2.41", "ipv6": "2001:db8:a4f8:112c:6001:0:12:7f41", "value": 20, ip: "2001:db8:a4f8:112c:6001:0:12:7f41" } + - { "index": { } } + - { "ipv4": "192.168.2.10", "ipv6": "2001:db8:a4f8:112c:6001:0:12:7f10", "value": 30, ip: "192.168.2.10" } + - { "index": { } } + - { "ipv4": "192.168.2.23", "ipv6": "2001:db8:a4f8:112c:6001:0:12:7f23", "value": 50, ip: "2001:db8:a4f8:112c:6001:0:12:7f23" } + - { "index": { } } + - { "ipv4": "192.168.2.41", "ipv6": "2001:db8:a4f8:112c:6001:0:12:7f41", "value": 60, ip: "192.168.2.41" } + - { "index": { } } + - { "ipv4": "192.168.2.10", "ipv6": "2001:db8:a4f8:112c:6001:0:12:7f10", "value": 10, ip: "2001:db8:a4f8:112c:6001:0:12:7f10" } + +--- +"IPv4 prefix": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + ip_prefix: + ip_prefix: + field: "ipv4" + is_ipv6: false + prefix_length: 24 + + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.ip_prefix.buckets: 2 } + - match: { aggregations.ip_prefix.buckets.0.key: "192.168.1.0" } + - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 } + - is_false: aggregations.ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.ip_prefix.buckets.0.prefix_length: 24 } + - match: { aggregations.ip_prefix.buckets.0.netmask: "255.255.255.0" } + - match: { aggregations.ip_prefix.buckets.1.key: "192.168.2.0" } + - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 } + - is_false: aggregations.ip_prefix.buckets.1.is_ipv6 + - match: { aggregations.ip_prefix.buckets.1.prefix_length: 24 } + - match: { aggregations.ip_prefix.buckets.1.netmask: "255.255.255.0" } + +--- +# NOTE: here prefix_length = 24 which means the netmask 255.255.255.0 will be applied to the +# high 24 bits of a field which is an IPv4 address encoded on 16 bytes. As a result the +# network part will just 0s. +"IPv4 prefix with incorrect is_ipv6": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + ip_prefix: + ip_prefix: + field: "ipv4" + is_ipv6: true + prefix_length: 24 + + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.ip_prefix.buckets: 1 } + - match: { aggregations.ip_prefix.buckets.0.key: "::" } + - match: { aggregations.ip_prefix.buckets.0.doc_count: 10 } + - is_true: aggregations.ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.ip_prefix.buckets.0.prefix_length: 24 } + +--- +"IPv4 short prefix": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + first: + ip_prefix: + field: "ipv4" + is_ipv6: false + prefix_length: 13 + second: + ip_prefix: + field: "ipv4" + is_ipv6: false + prefix_length: 6 + + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.first.buckets: 1 } + - match: { aggregations.first.buckets.0.key: "192.168.0.0" } + - match: { aggregations.first.buckets.0.doc_count: 10 } + - is_false: aggregations.first.buckets.0.is_ipv6 + - match: { aggregations.first.buckets.0.prefix_length: 13 } + - match: { aggregations.first.buckets.0.netmask: "255.248.0.0" } + - length: { aggregations.second.buckets: 1 } + - match: { aggregations.second.buckets.0.key: "192.0.0.0" } + - match: { aggregations.second.buckets.0.doc_count: 10 } + - is_false: aggregations.second.buckets.0.is_ipv6 + - match: { aggregations.second.buckets.0.prefix_length: 6 } + - match: { aggregations.second.buckets.0.netmask: "252.0.0.0" } + +--- +"IPv6 prefix": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + ip_prefix: + ip_prefix: + field: "ipv6" + is_ipv6: true + prefix_length: 64 + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.ip_prefix.buckets: 2 } + - match: { aggregations.ip_prefix.buckets.0.key: "2001:db8:a4f8:112a::" } + - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 } + - is_true: aggregations.ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.ip_prefix.buckets.0.prefix_length: 64 } + - match: { aggregations.ip_prefix.buckets.0.netmask: null } + - match: { aggregations.ip_prefix.buckets.1.key: "2001:db8:a4f8:112c::" } + - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 } + - is_true: aggregations.ip_prefix.buckets.1.is_ipv6 + - match: { aggregations.ip_prefix.buckets.1.prefix_length: 64 } + - match: { aggregations.ip_prefix.buckets.1.netmask: null } + +--- +# NOTE: here prefix_length = 16 which means the netmask will be applied to the second +# group of 2 bytes starting from the right (i.e. for "2001:db8:a4f8:112a:6001:0:12:7f10" +# it will be the 2 bytes whose value is set to 12 hexadecimal) which results to 18 decimal, +# with everything else being 0s. +"IPv6 prefix with incorrect is_ipv6": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + ip_prefix: + ip_prefix: + field: "ipv6" + is_ipv6: false + prefix_length: 16 + + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.ip_prefix.buckets: 1 } + - match: { aggregations.ip_prefix.buckets.0.key: "0.18.0.0" } + - match: { aggregations.ip_prefix.buckets.0.doc_count: 10 } + - is_false: aggregations.ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.ip_prefix.buckets.0.prefix_length: 16 } + - match: { aggregations.ip_prefix.buckets.0.netmask: "255.255.0.0" } + +--- +"Invalid IPv4 prefix": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + catch: /\[prefix_length\] must be in range \[0, 32\] while value is \[44\]/ + search: + body: + size: 0 + aggs: + ip_prefix: + ip_prefix: + field: "ipv4" + is_ipv6: false + prefix_length: 44 + + +--- +"Invalid IPv6 prefix": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + catch: /\[prefix_length] must be in range \[0, 128\] while value is \[170]/ + search: + body: + size: 0 + aggs: + ip_prefix: + ip_prefix: + field: "ipv6" + is_ipv6: true + prefix_length: 170 + +--- +"IPv4 prefix sub aggregation": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + top_ip_prefix: + ip_prefix: + field: "ipv4" + is_ipv6: false + prefix_length: 16 + aggs: + sub_ip_prefix: + ip_prefix: + field: "ipv4" + is_ipv6: false + prefix_length: 24 + + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.top_ip_prefix.buckets: 1 } + - match: { aggregations.top_ip_prefix.buckets.0.key: "192.168.0.0" } + - match: { aggregations.top_ip_prefix.buckets.0.doc_count: 10 } + - is_false: aggregations.top_ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.top_ip_prefix.buckets.0.prefix_length: 16 } + - match: { aggregations.top_ip_prefix.buckets.0.netmask: "255.255.0.0" } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.key: "192.168.1.0" } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.doc_count: 5 } + - is_false: aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.prefix_length: 24 } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.netmask: "255.255.255.0" } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.key: "192.168.2.0" } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.doc_count: 5 } + - is_false: aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.is_ipv6 + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.prefix_length: 24 } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.netmask: "255.255.255.0" } + +--- +"IPv6 prefix sub aggregation": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + top_ip_prefix: + ip_prefix: + field: "ipv6" + is_ipv6: true + prefix_length: 48 + aggs: + sub_ip_prefix: + ip_prefix: + field: "ipv6" + is_ipv6: true + prefix_length: 64 + + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.top_ip_prefix.buckets: 1 } + - match: { aggregations.top_ip_prefix.buckets.0.key: "2001:db8:a4f8::" } + - match: { aggregations.top_ip_prefix.buckets.0.doc_count: 10 } + - is_true: aggregations.top_ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.top_ip_prefix.buckets.0.prefix_length: 48 } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.key: "2001:db8:a4f8:112a::" } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.doc_count: 5 } + - is_true: aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.prefix_length: 64 } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.netmask: null } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.key: "2001:db8:a4f8:112c::" } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.doc_count: 5 } + - is_true: aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.is_ipv6 + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.prefix_length: 64 } + - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.netmask: null } + +--- +"IPv6 prefix metric sub aggregation": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + ip_prefix: + ip_prefix: + field: "ipv6" + is_ipv6: true + prefix_length: 64 + aggs: + sum: + sum: + field: value + + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.ip_prefix.buckets: 2 } + - match: { aggregations.ip_prefix.buckets.0.key: "2001:db8:a4f8:112a::" } + - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 } + - is_true: aggregations.ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.ip_prefix.buckets.0.prefix_length: 64 } + - match: { aggregations.ip_prefix.buckets.0.netmask: null } + - match: { aggregations.ip_prefix.buckets.0.sum.value: 160 } + - match: { aggregations.ip_prefix.buckets.1.key: "2001:db8:a4f8:112c::" } + - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 } + - is_true: aggregations.ip_prefix.buckets.1.is_ipv6 + - match: { aggregations.ip_prefix.buckets.1.prefix_length: 64 } + - match: { aggregations.ip_prefix.buckets.1.netmask: null } + - match: { aggregations.ip_prefix.buckets.1.sum.value: 170 } + +--- +"IPv4 prefix appended": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + ip_prefix: + ip_prefix: + field: "ipv4" + is_ipv6: false + prefix_length: 24 + append_prefix_length: true + + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.ip_prefix.buckets: 2 } + - match: { aggregations.ip_prefix.buckets.0.key: "192.168.1.0/24" } + - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 } + - is_false: aggregations.ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.ip_prefix.buckets.0.prefix_length: 24 } + - match: { aggregations.ip_prefix.buckets.0.netmask: "255.255.255.0" } + - match: { aggregations.ip_prefix.buckets.1.key: "192.168.2.0/24" } + - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 } + - is_false: aggregations.ip_prefix.buckets.1.is_ipv6 + - match: { aggregations.ip_prefix.buckets.1.prefix_length: 24 } + - match: { aggregations.ip_prefix.buckets.1.netmask: "255.255.255.0" } + +--- +"IPv6 prefix appended": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + ip_prefix: + ip_prefix: + field: "ipv6" + is_ipv6: true + prefix_length: 64 + append_prefix_length: true + + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.ip_prefix.buckets: 2 } + - match: { aggregations.ip_prefix.buckets.0.key: "2001:db8:a4f8:112a::/64" } + - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 } + - is_true: aggregations.ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.ip_prefix.buckets.0.prefix_length: 64 } + - match: { aggregations.ip_prefix.buckets.0.netmask: null } + - match: { aggregations.ip_prefix.buckets.1.key: "2001:db8:a4f8:112c::/64" } + - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 } + - is_true: aggregations.ip_prefix.buckets.1.is_ipv6 + - match: { aggregations.ip_prefix.buckets.1.prefix_length: 64 } + - match: { aggregations.ip_prefix.buckets.1.netmask: null } + +--- +"Mixed IPv4 and IPv6 with is_ipv6 false": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + ip_prefix: + ip_prefix: + field: "ip" + is_ipv6: false + prefix_length: 16 + + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.ip_prefix.buckets: 2 } + - match: { aggregations.ip_prefix.buckets.0.key: "0.18.0.0" } + - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 } + - is_false: aggregations.ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.ip_prefix.buckets.0.prefix_length: 16 } + - match: { aggregations.ip_prefix.buckets.0.netmask: "255.255.0.0" } + - match: { aggregations.ip_prefix.buckets.1.key: "192.168.0.0" } + - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 } + - is_false: aggregations.ip_prefix.buckets.1.is_ipv6 + - match: { aggregations.ip_prefix.buckets.1.prefix_length: 16 } + - match: { aggregations.ip_prefix.buckets.1.netmask: "255.255.0.0" } + +--- +"Mixed IPv4 and IPv6 with is_ipv6 true": + - skip: + version: " - 8.0.99" + reason: "added in 8.1.0" + - do: + search: + body: + size: 0 + aggs: + ip_prefix: + ip_prefix: + field: "ip" + is_ipv6: true + prefix_length: 64 + + + - match: { hits.total.value: 10 } + - match: { hits.total.relation: "eq" } + - length: { aggregations.ip_prefix.buckets: 3 } + - match: { aggregations.ip_prefix.buckets.0.key: "::" } + - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 } + - is_true: aggregations.ip_prefix.buckets.0.is_ipv6 + - match: { aggregations.ip_prefix.buckets.0.prefix_length: 64 } + - match: { aggregations.ip_prefix.buckets.0.netmask: null } + - match: { aggregations.ip_prefix.buckets.1.key: "2001:db8:a4f8:112a::" } + - match: { aggregations.ip_prefix.buckets.1.doc_count: 2 } + - is_true: aggregations.ip_prefix.buckets.1.is_ipv6 + - match: { aggregations.ip_prefix.buckets.1.prefix_length: 64 } + - match: { aggregations.ip_prefix.buckets.1.netmask: null } + - match: { aggregations.ip_prefix.buckets.2.key: "2001:db8:a4f8:112c::" } + - match: { aggregations.ip_prefix.buckets.2.doc_count: 3 } + - is_true: aggregations.ip_prefix.buckets.2.is_ipv6 + - match: { aggregations.ip_prefix.buckets.2.prefix_length: 64 } + - match: { aggregations.ip_prefix.buckets.2.netmask: null } diff --git a/server/src/main/java/org/elasticsearch/search/DocValueFormat.java b/server/src/main/java/org/elasticsearch/search/DocValueFormat.java index 59e5608338bcf..1a41a4398536d 100644 --- a/server/src/main/java/org/elasticsearch/search/DocValueFormat.java +++ b/server/src/main/java/org/elasticsearch/search/DocValueFormat.java @@ -447,14 +447,14 @@ public double parseDouble(String value, boolean roundUp, LongSupplier now) { } }; - DocValueFormat IP = IpDocValueFormat.INSTANCE; + IpDocValueFormat IP = IpDocValueFormat.INSTANCE; /** * Stateless, singleton formatter for IP address data */ class IpDocValueFormat implements DocValueFormat { - public static final DocValueFormat INSTANCE = new IpDocValueFormat(); + public static final IpDocValueFormat INSTANCE = new IpDocValueFormat(); private IpDocValueFormat() {} diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index bbd247f7d0ae0..a5030d9accb77 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -119,6 +119,8 @@ import org.elasticsearch.search.aggregations.bucket.nested.InternalReverseNested; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.nested.ReverseNestedAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.prefix.InternalIpPrefix; +import org.elasticsearch.search.aggregations.bucket.prefix.IpPrefixAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.range.GeoDistanceAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.range.InternalBinaryRange; @@ -528,6 +530,12 @@ private ValuesSourceRegistry registerAggregations(List plugins) { .setAggregatorRegistrar(DateRangeAggregationBuilder::registerAggregators), builder ); + registerAggregation( + new AggregationSpec(IpPrefixAggregationBuilder.NAME, IpPrefixAggregationBuilder::new, IpPrefixAggregationBuilder.PARSER) + .addResultReader(InternalIpPrefix::new) + .setAggregatorRegistrar(IpPrefixAggregationBuilder::registerAggregators), + builder + ); registerAggregation( new AggregationSpec(IpRangeAggregationBuilder.NAME, IpRangeAggregationBuilder::new, IpRangeAggregationBuilder.PARSER) .addResultReader(InternalBinaryRange::new) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/InternalIpPrefix.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/InternalIpPrefix.java new file mode 100644 index 0000000000000..a99c78d7150a9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/InternalIpPrefix.java @@ -0,0 +1,350 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.aggregations.bucket.prefix; + +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregationReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.KeyComparable; +import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class InternalIpPrefix extends InternalMultiBucketAggregation { + + public static class Bucket extends InternalMultiBucketAggregation.InternalBucket + implements + IpPrefix.Bucket, + KeyComparable { + + private final transient DocValueFormat format; + private final BytesRef key; + private final boolean keyed; + private final boolean isIpv6; + private final int prefixLength; + private final boolean appendPrefixLength; + private final long docCount; + private final InternalAggregations aggregations; + + public Bucket( + DocValueFormat format, + BytesRef key, + boolean keyed, + boolean isIpv6, + int prefixLength, + boolean appendPrefixLength, + long docCount, + InternalAggregations aggregations + ) { + this.format = format; + this.key = key; + this.keyed = keyed; + this.isIpv6 = isIpv6; + this.prefixLength = prefixLength; + this.appendPrefixLength = appendPrefixLength; + this.docCount = docCount; + this.aggregations = aggregations; + } + + /** + * Read from a stream. + */ + public Bucket(StreamInput in, DocValueFormat format, boolean keyed) throws IOException { + this.format = format; + this.keyed = keyed; + this.key = in.readBytesRef(); + this.isIpv6 = in.readBoolean(); + this.prefixLength = in.readVInt(); + this.appendPrefixLength = in.readBoolean(); + this.docCount = in.readLong(); + this.aggregations = InternalAggregations.readFrom(in); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + String key = DocValueFormat.IP.format(this.key); + if (appendPrefixLength) { + key = key + "/" + prefixLength; + } + if (keyed) { + builder.startObject(key); + } else { + builder.startObject(); + builder.field(CommonFields.KEY.getPreferredName(), key); + } + if (isIpv6 == false) { + builder.field("netmask", DocValueFormat.IP.format(netmask(prefixLength))); + } + builder.field(CommonFields.DOC_COUNT.getPreferredName(), docCount); + builder.field(IpPrefixAggregationBuilder.IS_IPV6_FIELD.getPreferredName(), isIpv6); + builder.field(IpPrefixAggregationBuilder.PREFIX_LENGTH_FIELD.getPreferredName(), prefixLength); + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; + } + + private static BytesRef netmask(int prefixLength) { + return IpPrefixAggregationBuilder.extractNetmask(prefixLength, false); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBytesRef(key); + out.writeBoolean(isIpv6); + out.writeVInt(prefixLength); + out.writeBoolean(appendPrefixLength); + out.writeLong(docCount); + aggregations.writeTo(out); + } + + public DocValueFormat getFormat() { + return format; + } + + public BytesRef getKey() { + return key; + } + + @Override + public String getKeyAsString() { + return DocValueFormat.IP.format(key); + } + + public boolean isIpv6() { + return isIpv6; + } + + public int getPrefixLength() { + return prefixLength; + } + + public boolean appendPrefixLength() { + return appendPrefixLength; + } + + @Override + public long getDocCount() { + return docCount; + } + + @Override + public InternalAggregations getAggregations() { + return aggregations; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Bucket bucket = (Bucket) o; + return isIpv6 == bucket.isIpv6 + && prefixLength == bucket.prefixLength + && appendPrefixLength == bucket.appendPrefixLength + && docCount == bucket.docCount + && Objects.equals(format, bucket.format) + && Objects.equals(key, bucket.key) + && Objects.equals(aggregations, bucket.aggregations); + } + + @Override + public int hashCode() { + return Objects.hash(format, key, isIpv6, prefixLength, appendPrefixLength, docCount, aggregations); + } + + @Override + public int compareKey(Bucket other) { + return this.key.compareTo(other.key); + } + } + + protected final DocValueFormat format; + protected final boolean keyed; + protected final long minDocCount; + private final List buckets; + + public InternalIpPrefix( + String name, + DocValueFormat format, + boolean keyed, + long minDocCount, + List buckets, + Map metadata + ) { + super(name, metadata); + this.keyed = keyed; + this.minDocCount = minDocCount; + this.format = format; + this.buckets = buckets; + } + + /** + * Stream from a stream. + */ + public InternalIpPrefix(StreamInput in) throws IOException { + super(in); + format = in.readNamedWriteable(DocValueFormat.class); + keyed = in.readBoolean(); + minDocCount = in.readVLong(); + buckets = in.readList(stream -> new Bucket(stream, format, keyed)); + } + + @Override + public String getWriteableName() { + return IpPrefixAggregationBuilder.NAME; + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(format); + out.writeBoolean(keyed); + out.writeVLong(minDocCount); + out.writeList(buckets); + } + + @Override + public InternalAggregation reduce(List aggregations, AggregationReduceContext reduceContext) { + List reducedBuckets = reduceBuckets(aggregations, reduceContext); + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); + + return new InternalIpPrefix(getName(), format, keyed, minDocCount, reducedBuckets, metadata); + } + + private List reduceBuckets(List aggregations, AggregationReduceContext reduceContext) { + final PriorityQueue> pq = new PriorityQueue<>(aggregations.size()) { + @Override + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return a.current().key.compareTo(b.current().key) < 0; + } + }; + for (InternalAggregation aggregation : aggregations) { + InternalIpPrefix ipPrefix = (InternalIpPrefix) aggregation; + if (ipPrefix.buckets.isEmpty() == false) { + pq.add(new IteratorAndCurrent<>(ipPrefix.buckets.iterator())); + } + } + + List reducedBuckets = new ArrayList<>(); + if (pq.size() > 0) { + // list of buckets coming from different shards that have the same value + List currentBuckets = new ArrayList<>(); + BytesRef value = pq.top().current().key; + + do { + final IteratorAndCurrent top = pq.top(); + if (top.current().key.equals(value) == false) { + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); + if (reduced.getDocCount() >= minDocCount) { + reducedBuckets.add(reduced); + } + currentBuckets.clear(); + value = top.current().key; + } + + currentBuckets.add(top.current()); + + if (top.hasNext()) { + top.next(); + assert top.current().key.compareTo(value) > 0 + : "shards must return data sorted by value [" + top.current().key + "] and [" + value + "]"; + pq.updateTop(); + } else { + pq.pop(); + } + } while (pq.size() > 0); + + if (currentBuckets.isEmpty() == false) { + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); + if (reduced.getDocCount() >= minDocCount) { + reducedBuckets.add(reduced); + } + } + } + + return reducedBuckets; + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + if (keyed) { + builder.startObject(CommonFields.BUCKETS.getPreferredName()); + } else { + builder.startArray(CommonFields.BUCKETS.getPreferredName()); + } + for (InternalIpPrefix.Bucket bucket : buckets) { + bucket.toXContent(builder, params); + } + if (keyed) { + builder.endObject(); + } else { + builder.endArray(); + } + return builder; + } + + @Override + public InternalIpPrefix create(List buckets) { + return new InternalIpPrefix(name, format, keyed, minDocCount, buckets, metadata); + } + + @Override + public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) { + return new Bucket( + format, + prototype.key, + prototype.keyed, + prototype.isIpv6, + prototype.prefixLength, + prototype.appendPrefixLength, + prototype.docCount, + prototype.aggregations + ); + } + + @Override + protected Bucket reduceBucket(List buckets, AggregationReduceContext context) { + assert buckets.size() > 0; + List aggregations = new ArrayList<>(buckets.size()); + for (InternalIpPrefix.Bucket bucket : buckets) { + aggregations.add(bucket.getAggregations()); + } + InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + return createBucket(aggs, buckets.get(0)); + } + + @Override + public List getBuckets() { + return Collections.unmodifiableList(buckets); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (super.equals(o) == false) return false; + InternalIpPrefix that = (InternalIpPrefix) o; + return minDocCount == that.minDocCount && Objects.equals(format, that.format) && Objects.equals(buckets, that.buckets); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), format, minDocCount, buckets); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefix.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefix.java new file mode 100644 index 0000000000000..b4ed37569565d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefix.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.aggregations.bucket.prefix; + +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; + +import java.util.List; + +/** + * A {@code ip prefix} aggregation. Defines multiple buckets, each representing a subnet. + */ +public interface IpPrefix extends MultiBucketsAggregation { + + /** + * A bucket in the aggregation where documents fall in + */ + interface Bucket extends MultiBucketsAggregation.Bucket { + + } + + /** + * @return The buckets of this aggregation (each bucket representing a subnet) + */ + @Override + List getBuckets(); +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregationBuilder.java new file mode 100644 index 0000000000000..3e4ee317c36fa --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregationBuilder.java @@ -0,0 +1,314 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.aggregations.bucket.prefix; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; + +/** + * A builder for IP prefix aggregations. This builder can operate with both IPv4 and IPv6 fields. + */ +public class IpPrefixAggregationBuilder extends ValuesSourceAggregationBuilder { + public static final String NAME = "ip_prefix"; + public static final ValuesSourceRegistry.RegistryKey REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>( + NAME, + IpPrefixAggregationSupplier.class + ); + public static final ObjectParser PARSER = ObjectParser.fromBuilder( + NAME, + IpPrefixAggregationBuilder::new + ); + + public static final ParseField PREFIX_LENGTH_FIELD = new ParseField("prefix_length"); + public static final ParseField IS_IPV6_FIELD = new ParseField("is_ipv6"); + public static final ParseField APPEND_PREFIX_LENGTH_FIELD = new ParseField("append_prefix_length"); + public static final ParseField MIN_DOC_COUNT_FIELD = new ParseField("min_doc_count"); + public static final ParseField KEYED_FIELD = new ParseField("keyed"); + + static { + ValuesSourceAggregationBuilder.declareFields(PARSER, false, false, false); + PARSER.declareInt(IpPrefixAggregationBuilder::prefixLength, PREFIX_LENGTH_FIELD); + PARSER.declareBoolean(IpPrefixAggregationBuilder::isIpv6, IS_IPV6_FIELD); + PARSER.declareLong(IpPrefixAggregationBuilder::minDocCount, MIN_DOC_COUNT_FIELD); + PARSER.declareBoolean(IpPrefixAggregationBuilder::appendPrefixLength, APPEND_PREFIX_LENGTH_FIELD); + PARSER.declareBoolean(IpPrefixAggregationBuilder::keyed, KEYED_FIELD); + } + + private static final int IPV6_MAX_PREFIX_LENGTH = 128; + private static final int IPV4_MAX_PREFIX_LENGTH = 32; + private static final int MIN_PREFIX_LENGTH = 0; + + /** Read from a stream, for internal use only. */ + public IpPrefixAggregationBuilder(StreamInput in) throws IOException { + super(in); + this.prefixLength = in.readVInt(); + this.isIpv6 = in.readBoolean(); + this.minDocCount = in.readVLong(); + this.appendPrefixLength = in.readBoolean(); + this.keyed = in.readBoolean(); + } + + public static void registerAggregators(ValuesSourceRegistry.Builder builder) { + IpPrefixAggregatorFactory.registerAggregators(builder); + } + + private long minDocCount = 1; + private int prefixLength = -1; + private boolean isIpv6 = false; + private boolean appendPrefixLength = false; + private boolean keyed = false; + + private static void throwOnInvalidFieldValue(final String fieldName, final T minValue, final T maxValue, final T fieldValue) { + throw new IllegalArgumentException( + "[" + + fieldName + + "] must be in range [" + + minValue.toString() + + ", " + + maxValue.toString() + + "] while value is [" + + fieldValue.toString() + + "]" + ); + } + + /** Set the minDocCount on this builder, and return the builder so that calls can be chained. */ + public IpPrefixAggregationBuilder minDocCount(long minDocCount) { + if (minDocCount < 1) { + throwOnInvalidFieldValue(MIN_DOC_COUNT_FIELD.getPreferredName(), 1, Integer.MAX_VALUE, minDocCount); + } + this.minDocCount = minDocCount; + return this; + } + + /** + * Set the prefixLength on this builder, and return the builder so that calls can be chained. + * + * @throws IllegalArgumentException if prefixLength is negative. + * */ + public IpPrefixAggregationBuilder prefixLength(int prefixLength) { + if (prefixLength < MIN_PREFIX_LENGTH) { + throwOnInvalidFieldValue( + PREFIX_LENGTH_FIELD.getPreferredName(), + 0, + isIpv6 ? IPV6_MAX_PREFIX_LENGTH : IPV4_MAX_PREFIX_LENGTH, + prefixLength + ); + } + this.prefixLength = prefixLength; + return this; + } + + /** Set the isIpv6 on this builder, and return the builder so that calls can be chained. */ + public IpPrefixAggregationBuilder isIpv6(boolean isIpv6) { + this.isIpv6 = isIpv6; + return this; + } + + /** Set the appendPrefixLength on this builder, and return the builder so that calls can be chained. */ + public IpPrefixAggregationBuilder appendPrefixLength(boolean appendPrefixLength) { + this.appendPrefixLength = appendPrefixLength; + return this; + } + + /** Set the keyed on this builder, and return the builder so that calls can be chained. */ + public IpPrefixAggregationBuilder keyed(boolean keyed) { + this.keyed = keyed; + return this; + } + + /** Create a new builder with the given name. */ + public IpPrefixAggregationBuilder(String name) { + super(name); + } + + protected IpPrefixAggregationBuilder( + IpPrefixAggregationBuilder clone, + AggregatorFactories.Builder factoriesBuilder, + Map metadata + ) { + super(clone, factoriesBuilder, metadata); + this.minDocCount = clone.minDocCount; + this.isIpv6 = clone.isIpv6; + this.prefixLength = clone.prefixLength; + this.appendPrefixLength = clone.appendPrefixLength; + this.keyed = clone.keyed; + } + + @Override + protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metadata) { + return new IpPrefixAggregationBuilder(this, factoriesBuilder, metadata); + } + + @Override + public BucketCardinality bucketCardinality() { + return BucketCardinality.MANY; + } + + @Override + public String getType() { + return NAME; + } + + @Override + protected void innerWriteTo(StreamOutput out) throws IOException { + out.writeVInt(prefixLength); + out.writeBoolean(isIpv6); + out.writeVLong(minDocCount); + out.writeBoolean(appendPrefixLength); + out.writeBoolean(keyed); + } + + @Override + protected ValuesSourceRegistry.RegistryKey getRegistryKey() { + return REGISTRY_KEY; + } + + @Override + protected ValuesSourceType defaultValueSourceType() { + return CoreValuesSourceType.IP; + } + + @Override + protected ValuesSourceAggregatorFactory innerBuild( + AggregationContext context, + ValuesSourceConfig config, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder + ) throws IOException { + IpPrefixAggregationSupplier aggregationSupplier = context.getValuesSourceRegistry().getAggregator(REGISTRY_KEY, config); + + if (prefixLength < 0 + || (isIpv6 == false && prefixLength > IPV4_MAX_PREFIX_LENGTH) + || (isIpv6 && prefixLength > IPV6_MAX_PREFIX_LENGTH)) { + throwOnInvalidFieldValue( + PREFIX_LENGTH_FIELD.getPreferredName(), + MIN_PREFIX_LENGTH, + isIpv6 ? IPV6_MAX_PREFIX_LENGTH : IPV4_MAX_PREFIX_LENGTH, + prefixLength + ); + } + + IpPrefixAggregator.IpPrefix ipPrefix = new IpPrefixAggregator.IpPrefix( + isIpv6, + prefixLength, + appendPrefixLength, + extractNetmask(prefixLength, isIpv6) + ); + + return new IpPrefixAggregatorFactory( + name, + config, + keyed, + minDocCount, + ipPrefix, + context, + parent, + subFactoriesBuilder, + metadata, + aggregationSupplier + ); + } + + /** + * @param prefixLength the network prefix length which defines the size of the network. + * @param isIpv6 true for an IPv6 netmask, false for an IPv4 netmask. + * + * @return a 16-bytes representation of the subnet with 1s identifying the network + * part and 0s identifying the host part. + * + * @throws IllegalArgumentException if prefixLength is not in range [0, 128] for an IPv6 + * network, or is not in range [0, 32] for an IPv4 network. + */ + public static BytesRef extractNetmask(int prefixLength, boolean isIpv6) { + if (prefixLength < 0 + || (isIpv6 == false && prefixLength > IPV4_MAX_PREFIX_LENGTH) + || (isIpv6 && prefixLength > IPV6_MAX_PREFIX_LENGTH)) { + throwOnInvalidFieldValue( + PREFIX_LENGTH_FIELD.getPreferredName(), + MIN_PREFIX_LENGTH, + isIpv6 ? IPV6_MAX_PREFIX_LENGTH : IPV4_MAX_PREFIX_LENGTH, + prefixLength + ); + } + + byte[] ipv4Address = { 0, 0, 0, 0 }; + byte[] ipv6Address = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + byte[] ipAddress = (isIpv6) ? ipv6Address : ipv4Address; + int bytesCount = prefixLength / 8; + int bitsCount = prefixLength % 8; + int i = 0; + // NOTE: first set whole bytes to 255 (0xFF) + for (; i < bytesCount; i++) { + ipAddress[i] = (byte) 0xFF; + } + // NOTE: then set the remaining bits to 1. + // Trailing bits are already set to 0 at initialization time. + // Example: for prefixLength = 20, we first set 16 bits (2 bytes) + // to 0xFF, then set the remaining 4 bits to 1. + if (bitsCount > 0) { + int rem = 0; + for (int j = 0; j < bitsCount; j++) { + rem |= 1 << (7 - j); + } + ipAddress[i] = (byte) rem; + } + + try { + return new BytesRef(InetAddress.getByAddress(ipAddress).getAddress()); + } catch (UnknownHostException e) { + throw new IllegalArgumentException("Unable to get the ip address for [" + Arrays.toString(ipAddress) + "]", e); + } + } + + @Override + protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(PREFIX_LENGTH_FIELD.getPreferredName(), prefixLength); + builder.field(IS_IPV6_FIELD.getPreferredName(), isIpv6); + builder.field(APPEND_PREFIX_LENGTH_FIELD.getPreferredName(), appendPrefixLength); + builder.field(KEYED_FIELD.getPreferredName(), keyed); + builder.field(MIN_DOC_COUNT_FIELD.getPreferredName(), minDocCount); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (super.equals(o) == false) return false; + IpPrefixAggregationBuilder that = (IpPrefixAggregationBuilder) o; + return minDocCount == that.minDocCount && prefixLength == that.prefixLength && isIpv6 == that.isIpv6; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), minDocCount, prefixLength, isIpv6); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregationSupplier.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregationSupplier.java new file mode 100644 index 0000000000000..aa20d9f0e75ad --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregationSupplier.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.aggregations.bucket.prefix; + +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.CardinalityUpperBound; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; + +import java.io.IOException; +import java.util.Map; + +public interface IpPrefixAggregationSupplier { + Aggregator build( + String name, + AggregatorFactories factories, + ValuesSourceConfig config, + boolean keyed, + long minDocCount, + IpPrefixAggregator.IpPrefix ipPrefix, + AggregationContext context, + Aggregator parent, + CardinalityUpperBound cardinality, + Map metadata + ) throws IOException; +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java new file mode 100644 index 0000000000000..a195e48feccef --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java @@ -0,0 +1,269 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.aggregations.bucket.prefix; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.fielddata.SortedBinaryDocValues; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.search.aggregations.CardinalityUpperBound; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.NonCollectingAggregator; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * An IP prefix aggregator for IPv6 or IPv4 subnets. + */ +public final class IpPrefixAggregator extends BucketsAggregator { + + public static class IpPrefix { + final boolean isIpv6; + final int prefixLength; + final boolean appendPrefixLength; + final BytesRef netmask; + + public IpPrefix(boolean isIpv6, int prefixLength, boolean appendPrefixLength, BytesRef netmask) { + this.isIpv6 = isIpv6; + this.prefixLength = prefixLength; + this.appendPrefixLength = appendPrefixLength; + this.netmask = netmask; + } + + public boolean isIpv6() { + return isIpv6; + } + + public int getPrefixLength() { + return prefixLength; + } + + public boolean appendPrefixLength() { + return appendPrefixLength; + } + + public BytesRef getNetmask() { + return netmask; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IpPrefix ipPrefix = (IpPrefix) o; + return isIpv6 == ipPrefix.isIpv6 + && prefixLength == ipPrefix.prefixLength + && appendPrefixLength == ipPrefix.appendPrefixLength + && Objects.equals(netmask, ipPrefix.netmask); + } + + @Override + public int hashCode() { + return Objects.hash(isIpv6, prefixLength, appendPrefixLength, netmask); + } + } + + final ValuesSourceConfig config; + final long minDocCount; + final boolean keyed; + final BytesKeyedBucketOrds bucketOrds; + final IpPrefix ipPrefix; + + public IpPrefixAggregator( + String name, + AggregatorFactories factories, + ValuesSourceConfig config, + boolean keyed, + long minDocCount, + IpPrefix ipPrefix, + AggregationContext context, + Aggregator parent, + CardinalityUpperBound cardinality, + Map metadata + ) throws IOException { + super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata); + this.config = config; + this.keyed = keyed; + this.minDocCount = minDocCount; + this.bucketOrds = BytesKeyedBucketOrds.build(bigArrays(), cardinality); + this.ipPrefix = ipPrefix; + } + + @Override + protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + return new IpPrefixLeafCollector(sub, config.getValuesSource().bytesValues(ctx), ipPrefix); + } + + private class IpPrefixLeafCollector extends LeafBucketCollectorBase { + private final IpPrefix ipPrefix; + private final LeafBucketCollector sub; + private final SortedBinaryDocValues values; + + IpPrefixLeafCollector(final LeafBucketCollector sub, final SortedBinaryDocValues values, final IpPrefix ipPrefix) { + super(sub, values); + this.sub = sub; + this.values = values; + this.ipPrefix = ipPrefix; + } + + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + BytesRef previousSubnet = null; + BytesRef subnet = new BytesRef(new byte[ipPrefix.netmask.length]); + BytesRef ipAddress; + if (values.advanceExact(doc)) { + int valuesCount = values.docValueCount(); + + for (int i = 0; i < valuesCount; ++i) { + ipAddress = values.nextValue(); + maskIpAddress(ipAddress, ipPrefix.netmask, subnet); + if (previousSubnet != null && subnet.bytesEquals(previousSubnet)) { + continue; + } + long bucketOrd = bucketOrds.add(owningBucketOrd, subnet); + if (bucketOrd < 0) { + bucketOrd = -1 - bucketOrd; + collectExistingBucket(sub, doc, bucketOrd); + } else { + collectBucket(sub, doc, bucketOrd); + } + previousSubnet = subnet; + } + } + } + + private void maskIpAddress(final BytesRef ipAddress, final BytesRef subnetMask, final BytesRef subnet) { + assert ipAddress.length == 16 : "Invalid length for ip address [" + ipAddress.length + "] expected 16 bytes"; + // NOTE: IPv4 addresses are encoded as 16-bytes. As a result, we use an + // offset (12) to apply the subnet to the last 4 bytes (byes 12, 13, 14, 15) + // if the subnet mask is just a 4-bytes subnet mask. + int offset = subnetMask.length == 4 ? 12 : 0; + for (int i = 0; i < subnetMask.length; ++i) { + subnet.bytes[i] = (byte) (ipAddress.bytes[i + offset] & subnetMask.bytes[i]); + } + } + } + + @Override + public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + long totalOrdsToCollect = 0; + final int[] bucketsInOrd = new int[owningBucketOrds.length]; + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); + bucketsInOrd[ordIdx] = (int) bucketCount; + totalOrdsToCollect += bucketCount; + } + + long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect]; + int b = 0; + for (long owningBucketOrd : owningBucketOrds) { + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + while (ordsEnum.next()) { + bucketOrdsToCollect[b++] = ordsEnum.ord(); + } + } + + InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); + InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; + b = 0; + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + List buckets = new ArrayList<>(bucketsInOrd[ordIdx]); + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); + while (ordsEnum.next()) { + long ordinal = ordsEnum.ord(); + if (bucketOrdsToCollect[b] != ordinal) { + throw new AggregationExecutionException( + "Iteration order of [" + + bucketOrds + + "] changed without mutating. [" + + ordinal + + "] should have been [" + + bucketOrdsToCollect[b] + + "]" + ); + } + BytesRef ipAddress = new BytesRef(); + ordsEnum.readValue(ipAddress); + long docCount = bucketDocCount(ordinal); + buckets.add( + new InternalIpPrefix.Bucket( + config.format(), + BytesRef.deepCopyOf(ipAddress), + keyed, + ipPrefix.isIpv6, + ipPrefix.prefixLength, + ipPrefix.appendPrefixLength, + docCount, + subAggregationResults[b++] + ) + ); + + // NOTE: the aggregator is expected to return sorted results + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); + } + results[ordIdx] = new InternalIpPrefix(name, config.format(), keyed, minDocCount, buckets, metadata()); + } + return results; + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalIpPrefix(name, config.format(), keyed, minDocCount, Collections.emptyList(), metadata()); + } + + @Override + public void doClose() { + Releasables.close(bucketOrds); + } + + public static class Unmapped extends NonCollectingAggregator { + + private final ValuesSourceConfig config; + private final boolean keyed; + private final long minDocCount; + + protected Unmapped( + String name, + AggregatorFactories factories, + ValuesSourceConfig config, + boolean keyed, + long minDocCount, + AggregationContext context, + Aggregator parent, + Map metadata + ) throws IOException { + super(name, context, parent, factories, metadata); + this.config = config; + this.keyed = keyed; + this.minDocCount = minDocCount; + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalIpPrefix(name, config.format(), keyed, minDocCount, Collections.emptyList(), metadata()); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregatorFactory.java new file mode 100644 index 0000000000000..ad14a2acbf6f1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregatorFactory.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.aggregations.bucket.prefix; + +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.CardinalityUpperBound; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; + +import java.io.IOException; +import java.util.Map; + +public class IpPrefixAggregatorFactory extends ValuesSourceAggregatorFactory { + private final boolean keyed; + private final long minDocCount; + private final IpPrefixAggregator.IpPrefix ipPrefix; + private final IpPrefixAggregationSupplier aggregationSupplier; + + public IpPrefixAggregatorFactory( + String name, + ValuesSourceConfig config, + boolean keyed, + long minDocCount, + IpPrefixAggregator.IpPrefix ipPrefix, + AggregationContext context, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metadata, + IpPrefixAggregationSupplier aggregationSupplier + ) throws IOException { + super(name, config, context, parent, subFactoriesBuilder, metadata); + this.keyed = keyed; + this.minDocCount = minDocCount; + this.ipPrefix = ipPrefix; + this.aggregationSupplier = aggregationSupplier; + } + + public static void registerAggregators(ValuesSourceRegistry.Builder builder) { + builder.register(IpPrefixAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.IP, IpPrefixAggregator::new, true); + } + + @Override + protected Aggregator createUnmapped(Aggregator parent, Map metadata) throws IOException { + return new IpPrefixAggregator.Unmapped(name, factories, config, keyed, minDocCount, context, parent, metadata); + } + + @Override + protected Aggregator doCreateInternal(Aggregator parent, CardinalityUpperBound cardinality, Map metadata) + throws IOException { + return aggregationSupplier.build(name, factories, config, keyed, minDocCount, ipPrefix, context, parent, cardinality, metadata); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/IpPrefixTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/IpPrefixTests.java new file mode 100644 index 0000000000000..7b7602ec67816 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/IpPrefixTests.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.elasticsearch.search.aggregations.BaseAggregationTestCase; +import org.elasticsearch.search.aggregations.bucket.prefix.IpPrefixAggregationBuilder; + +import static org.hamcrest.Matchers.startsWith; + +public class IpPrefixTests extends BaseAggregationTestCase { + @Override + protected IpPrefixAggregationBuilder createTestAggregatorBuilder() { + final String name = randomAlphaOfLengthBetween(3, 10); + final IpPrefixAggregationBuilder factory = new IpPrefixAggregationBuilder(name); + boolean isIpv6 = randomBoolean(); + int prefixLength = isIpv6 ? randomIntBetween(1, 128) : randomIntBetween(1, 32); + factory.field(IP_FIELD_NAME); + + factory.appendPrefixLength(randomBoolean()); + factory.isIpv6(isIpv6); + factory.prefixLength(prefixLength); + factory.keyed(randomBoolean()); + factory.minDocCount(randomIntBetween(1, 3)); + + return factory; + } + + public void testNegativePrefixLength() { + final IpPrefixAggregationBuilder factory = new IpPrefixAggregationBuilder(randomAlphaOfLengthBetween(3, 10)); + boolean isIpv6 = randomBoolean(); + final String rangeAsString = isIpv6 ? "[0, 128]" : "[0, 32]"; + factory.isIpv6(isIpv6); + int randomPrefixLength = randomIntBetween(-1000, -1); + + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> factory.prefixLength(randomPrefixLength)); + assertThat( + ex.getMessage(), + startsWith("[prefix_length] must be in range " + rangeAsString + " while value is [" + randomPrefixLength + "]") + ); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregatorTests.java new file mode 100644 index 0000000000000..8b032e1690923 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregatorTests.java @@ -0,0 +1,1077 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.aggregations.bucket.prefix; + +import org.apache.lucene.document.InetAddressPoint; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.network.InetAddresses; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.IpFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; +import org.elasticsearch.search.aggregations.metrics.InternalSum; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; + +import java.io.IOException; +import java.net.InetAddress; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.util.Collections.singleton; + +public class IpPrefixAggregatorTests extends AggregatorTestCase { + + private static final class TestIpDataHolder { + private final String ipAddressAsString; + private final InetAddress ipAddress; + private final String subnetAsString; + private final InetAddress subnet; + private final int prefixLength; + private final long time; + + TestIpDataHolder(final String ipAddressAsString, final String subnetAsString, final int prefixLength, final long time) { + this.ipAddressAsString = ipAddressAsString; + this.ipAddress = InetAddresses.forString(ipAddressAsString); + this.subnetAsString = subnetAsString; + this.subnet = InetAddresses.forString(subnetAsString); + this.prefixLength = prefixLength; + this.time = time; + } + + public String getIpAddressAsString() { + return ipAddressAsString; + } + + public InetAddress getIpAddress() { + return ipAddress; + } + + public InetAddress getSubnet() { + return subnet; + } + + public String getSubnetAsString() { + return subnetAsString; + } + + public int getPrefixLength() { + return prefixLength; + } + + public long getTime() { + return time; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TestIpDataHolder that = (TestIpDataHolder) o; + return prefixLength == that.prefixLength + && time == that.time + && Objects.equals(ipAddressAsString, that.ipAddressAsString) + && Objects.equals(ipAddress, that.ipAddress) + && Objects.equals(subnetAsString, that.subnetAsString) + && Objects.equals(subnet, that.subnet); + } + + @Override + public int hashCode() { + return Objects.hash(ipAddressAsString, ipAddress, subnetAsString, subnet, prefixLength, time); + } + + @Override + public String toString() { + return "TestIpDataHolder{" + + "ipAddressAsString='" + + ipAddressAsString + + '\'' + + ", ipAddress=" + + ipAddress + + ", subnetAsString='" + + subnetAsString + + '\'' + + ", subnet=" + + subnet + + ", prefixLength=" + + prefixLength + + ", time=" + + time + + '}'; + } + } + + public void testEmptyDocument() throws IOException { + // GIVEN + final int prefixLength = 16; + final String field = "ipv4"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field) + .isIpv6(false) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength); + final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field); + final List ipAddresses = Collections.emptyList(); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertFalse(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + + assertTrue(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldType); + } + + public void testIpv4Addresses() throws IOException { + // GIVEN + final int prefixLength = 16; + final String field = "ipv4"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field) + .isIpv6(false) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength); + final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field); + final List ipAddresses = List.of( + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, defaultTime()) + ); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress())))) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertFalse(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldType); + } + + public void testIpv6Addresses() throws IOException { + // GIVEN + final int prefixLength = 64; + final String field = "ipv6"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field) + .isIpv6(true) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength); + final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field); + final List ipAddresses = List.of( + new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a::", prefixLength, defaultTime()), + new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a::", prefixLength, defaultTime()), + new TestIpDataHolder("2001:db8:a4ff:112a::7002:7ff2", "2001:db8:a4ff:112a::", prefixLength, defaultTime()), + new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f::", prefixLength, defaultTime()), + new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f::", prefixLength, defaultTime()) + ); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress())))) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertTrue(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldType); + } + + public void testZeroPrefixLength() throws IOException { + // GIVEN + final int prefixLength = 0; + final String field = "ipv4"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field) + .isIpv6(false) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength); + final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field); + final List ipAddresses = List.of( + new TestIpDataHolder("192.168.1.12", "0.0.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.12", "0.0.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.117", "0.0.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.10.27", "0.0.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.169.0.88", "0.0.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.19.0.44", "0.0.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.122.2.67", "0.0.0.0", prefixLength, defaultTime()) + ); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress())))) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertFalse(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldType); + } + + public void testIpv4MaxPrefixLength() throws IOException { + // GIVEN + final int prefixLength = 32; + final String field = "ipv4"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field) + .isIpv6(false) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength); + final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field); + final List ipAddresses = List.of( + new TestIpDataHolder("192.168.1.12", "192.168.1.12", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.12", "192.168.1.12", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.117", "192.168.1.117", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.10.27", "192.168.10.27", prefixLength, defaultTime()), + new TestIpDataHolder("192.169.0.88", "192.169.0.88", prefixLength, defaultTime()), + new TestIpDataHolder("10.19.0.44", "10.19.0.44", prefixLength, defaultTime()), + new TestIpDataHolder("10.122.2.67", "10.122.2.67", prefixLength, defaultTime()) + ); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress())))) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertFalse(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldType); + } + + public void testIpv6MaxPrefixLength() throws IOException { + // GIVEN + final int prefixLength = 128; + final String field = "ipv6"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field) + .isIpv6(true) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength); + final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field); + final List ipAddresses = List.of( + new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a:6001:0:12:7f2a", prefixLength, defaultTime()), + new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a:7044:1f01:0:44f2", prefixLength, defaultTime()), + new TestIpDataHolder("2001:db8:a4ff:112a:0:0:7002:7ff2", "2001:db8:a4ff:112a::7002:7ff2", prefixLength, defaultTime()), + new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f:1212:0:1:3", prefixLength, defaultTime()), + new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f:7770:12f6:0:30", prefixLength, defaultTime()) + ); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress())))) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertTrue(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldType); + } + + public void testAggregateOnIpv4Field() throws IOException { + // GIVEN + final int prefixLength = 16; + final String ipv4FieldName = "ipv4"; + final String ipv6FieldName = "ipv6"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(ipv4FieldName) + .isIpv6(false) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength); + final MappedFieldType[] fieldTypes = { new IpFieldMapper.IpFieldType(ipv4FieldName), new IpFieldMapper.IpFieldType(ipv6FieldName) }; + final List ipAddresses = List.of( + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, defaultTime()) + ); + final String ipv6Value = "2001:db8:a4f8:112a:6001:0:12:7f2a"; + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + List.of( + new SortedDocValuesField(ipv4FieldName, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))), + new SortedDocValuesField(ipv6FieldName, new BytesRef(InetAddressPoint.encode(InetAddresses.forString(ipv6Value)))) + ) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertFalse(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldTypes); + } + + public void testAggregateOnIpv6Field() throws IOException { + // GIVEN + final int prefixLength = 64; + final String ipv4FieldName = "ipv4"; + final String ipv6FieldName = "ipv6"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(ipv6FieldName) + .isIpv6(true) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength); + final MappedFieldType[] fieldTypes = { new IpFieldMapper.IpFieldType(ipv4FieldName), new IpFieldMapper.IpFieldType(ipv6FieldName) }; + final List ipAddresses = List.of( + new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a::", prefixLength, defaultTime()), + new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a::", prefixLength, defaultTime()), + new TestIpDataHolder("2001:db8:a4ff:112a::7002:7ff2", "2001:db8:a4ff:112a::", prefixLength, defaultTime()), + new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f::", prefixLength, defaultTime()), + new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f::", prefixLength, defaultTime()) + ); + final String ipv4Value = "192.168.10.20"; + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + List.of( + new SortedDocValuesField(ipv6FieldName, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))), + new SortedDocValuesField(ipv4FieldName, new BytesRef(InetAddressPoint.encode(InetAddresses.forString(ipv4Value)))) + ) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertTrue(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldTypes); + } + + public void testIpv4AggregationAsSubAggregation() throws IOException { + // GIVEN + final int prefixLength = 16; + final String ipv4FieldName = "ipv4"; + final String datetimeFieldName = "datetime"; + final String dateHistogramAggregationName = "date_histogram"; + final String ipPrefixAggregationName = "ip_prefix"; + final AggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder(dateHistogramAggregationName).calendarInterval( + DateHistogramInterval.DAY + ) + .field(datetimeFieldName) + .subAggregation( + new IpPrefixAggregationBuilder(ipPrefixAggregationName).field(ipv4FieldName) + .isIpv6(false) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength) + ); + final DateFieldMapper.DateFieldType dateFieldType = new DateFieldMapper.DateFieldType(datetimeFieldName); + final IpFieldMapper.IpFieldType ipFieldType = new IpFieldMapper.IpFieldType(ipv4FieldName); + final MappedFieldType[] fieldTypes = { ipFieldType, dateFieldType }; + + long day1 = dateFieldType.parse("2021-10-12"); + long day2 = dateFieldType.parse("2021-10-11"); + final List ipAddresses = List.of( + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, day1), + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, day2), + new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, day1), + new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, day2), + new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, day1), + new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, day2), + new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, day1), + new TestIpDataHolder("10.19.13.32", "10.19.0.0", prefixLength, day2) + ); + + final Set expectedBucket1Subnets = ipAddresses.stream() + .filter(testIpDataHolder -> testIpDataHolder.getTime() == day1) + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set expectedBucket2Subnets = ipAddresses.stream() + .filter(testIpDataHolder -> testIpDataHolder.getTime() == day2) + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (final TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + List.of( + new SortedDocValuesField(ipv4FieldName, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))), + new SortedNumericDocValuesField(datetimeFieldName, ipDataHolder.getTime()) + ) + ); + } + }, agg -> { + final InternalDateHistogram dateHistogram = (InternalDateHistogram) agg; + final List buckets = dateHistogram.getBuckets(); + assertEquals(2, buckets.size()); + + final InternalDateHistogram.Bucket day1Bucket = buckets.stream() + .filter(bucket -> bucket.getKey().equals(Instant.ofEpochMilli(day1).atZone(ZoneOffset.UTC))) + .findAny() + .orElse(null); + final InternalDateHistogram.Bucket day2Bucket = buckets.stream() + .filter(bucket -> bucket.getKey().equals(Instant.ofEpochMilli(day2).atZone(ZoneOffset.UTC))) + .findAny() + .orElse(null); + final InternalIpPrefix ipPrefix1 = Objects.requireNonNull(day1Bucket).getAggregations().get(ipPrefixAggregationName); + final InternalIpPrefix ipPrefix2 = Objects.requireNonNull(day2Bucket).getAggregations().get(ipPrefixAggregationName); + assertNotNull(ipPrefix1); + assertNotNull(ipPrefix2); + assertEquals(expectedBucket1Subnets.size(), ipPrefix1.getBuckets().size()); + assertEquals(expectedBucket2Subnets.size(), ipPrefix2.getBuckets().size()); + + final Set bucket1Subnets = ipPrefix1.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set bucket2Subnets = ipPrefix2.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + assertTrue(bucket1Subnets.containsAll(expectedBucket1Subnets)); + assertTrue(bucket2Subnets.containsAll(expectedBucket2Subnets)); + assertTrue(expectedBucket1Subnets.containsAll(bucket1Subnets)); + assertTrue(expectedBucket2Subnets.containsAll(bucket2Subnets)); + }, fieldTypes); + } + + public void testIpv6AggregationAsSubAggregation() throws IOException { + // GIVEN + final int prefixLength = 64; + final String ipv4FieldName = "ipv6"; + final String datetimeFieldName = "datetime"; + final String dateHistogramAggregationName = "date_histogram"; + final String ipPrefixAggregationName = "ip_prefix"; + final AggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder(dateHistogramAggregationName).calendarInterval( + DateHistogramInterval.DAY + ) + .field(datetimeFieldName) + .subAggregation( + new IpPrefixAggregationBuilder(ipPrefixAggregationName).field(ipv4FieldName) + .isIpv6(true) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength) + ); + final DateFieldMapper.DateFieldType dateFieldType = new DateFieldMapper.DateFieldType(datetimeFieldName); + final IpFieldMapper.IpFieldType ipFieldType = new IpFieldMapper.IpFieldType(ipv4FieldName); + final MappedFieldType[] fieldTypes = { ipFieldType, dateFieldType }; + + long day1 = dateFieldType.parse("2021-11-04"); + long day2 = dateFieldType.parse("2021-11-05"); + final List ipAddresses = List.of( + new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a::", prefixLength, day1), + new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a::", prefixLength, day1), + new TestIpDataHolder("2001:db8:a4ff:112a::7002:7ff2", "2001:db8:a4ff:112a::", prefixLength, day2), + new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f::", prefixLength, day2), + new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f::", prefixLength, day1) + ); + + final Set expectedBucket1Subnets = ipAddresses.stream() + .filter(testIpDataHolder -> testIpDataHolder.getTime() == day1) + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set expectedBucket2Subnets = ipAddresses.stream() + .filter(testIpDataHolder -> testIpDataHolder.getTime() == day2) + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (final TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + List.of( + new SortedDocValuesField(ipv4FieldName, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))), + new SortedNumericDocValuesField(datetimeFieldName, ipDataHolder.getTime()) + ) + ); + } + }, agg -> { + final InternalDateHistogram dateHistogram = (InternalDateHistogram) agg; + final List buckets = dateHistogram.getBuckets(); + assertEquals(2, buckets.size()); + + final InternalDateHistogram.Bucket day1Bucket = buckets.stream() + .filter(bucket -> bucket.getKey().equals(Instant.ofEpochMilli(day1).atZone(ZoneOffset.UTC))) + .findAny() + .orElse(null); + final InternalDateHistogram.Bucket day2Bucket = buckets.stream() + .filter(bucket -> bucket.getKey().equals(Instant.ofEpochMilli(day2).atZone(ZoneOffset.UTC))) + .findAny() + .orElse(null); + final InternalIpPrefix ipPrefix1 = Objects.requireNonNull(day1Bucket).getAggregations().get(ipPrefixAggregationName); + final InternalIpPrefix ipPrefix2 = Objects.requireNonNull(day2Bucket).getAggregations().get(ipPrefixAggregationName); + assertNotNull(ipPrefix1); + assertNotNull(ipPrefix2); + assertEquals(expectedBucket1Subnets.size(), ipPrefix1.getBuckets().size()); + assertEquals(expectedBucket2Subnets.size(), ipPrefix2.getBuckets().size()); + + final Set bucket1Subnets = ipPrefix1.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set bucket2Subnets = ipPrefix2.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + assertTrue(bucket1Subnets.containsAll(expectedBucket1Subnets)); + assertTrue(bucket2Subnets.containsAll(expectedBucket2Subnets)); + assertTrue(expectedBucket1Subnets.containsAll(bucket1Subnets)); + assertTrue(expectedBucket2Subnets.containsAll(bucket2Subnets)); + }, fieldTypes); + } + + public void testIpPrefixSubAggregations() throws IOException { + // GIVEN + final int topPrefixLength = 16; + final int subPrefixLength = 24; + final String ipv4FieldName = "ipv4"; + final String topIpPrefixAggregation = "top_ip_prefix"; + final String subIpPrefixAggregation = "sub_ip_prefix"; + final AggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder(topIpPrefixAggregation).field(ipv4FieldName) + .isIpv6(false) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(topPrefixLength) + .subAggregation( + new IpPrefixAggregationBuilder(subIpPrefixAggregation).field(ipv4FieldName) + .isIpv6(false) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(subPrefixLength) + ); + final IpFieldMapper.IpFieldType ipFieldType = new IpFieldMapper.IpFieldType(ipv4FieldName); + final MappedFieldType[] fieldTypes = { ipFieldType }; + + final String FIRST_SUBNET = "192.168.0.0"; + final String SECOND_SUBNET = "192.169.0.0"; + final List ipAddresses = List.of( + new TestIpDataHolder("192.168.1.12", FIRST_SUBNET, topPrefixLength, defaultTime()), + new TestIpDataHolder("192.168.10.12", FIRST_SUBNET, topPrefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.117", FIRST_SUBNET, topPrefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.27", FIRST_SUBNET, topPrefixLength, defaultTime()), + new TestIpDataHolder("192.169.1.18", SECOND_SUBNET, topPrefixLength, defaultTime()), + new TestIpDataHolder("192.168.2.129", FIRST_SUBNET, topPrefixLength, defaultTime()), + new TestIpDataHolder("192.169.2.49", SECOND_SUBNET, topPrefixLength, defaultTime()), + new TestIpDataHolder("192.169.1.201", SECOND_SUBNET, topPrefixLength, defaultTime()) + ); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (final TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + List.of(new SortedDocValuesField(ipv4FieldName, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress())))) + ); + } + }, agg -> { + final InternalIpPrefix topIpPrefix = (InternalIpPrefix) agg; + final List buckets = topIpPrefix.getBuckets(); + assertEquals(2, buckets.size()); + + final InternalIpPrefix.Bucket firstSubnetBucket = topIpPrefix.getBuckets() + .stream() + .filter(bucket -> FIRST_SUBNET.equals(bucket.getKeyAsString())) + .findAny() + .orElse(null); + final InternalIpPrefix.Bucket secondSubnetBucket = topIpPrefix.getBuckets() + .stream() + .filter(bucket -> SECOND_SUBNET.equals(bucket.getKeyAsString())) + .findAny() + .orElse(null); + assertNotNull(firstSubnetBucket); + assertNotNull(secondSubnetBucket); + assertEquals(5, firstSubnetBucket.getDocCount()); + assertEquals(3, secondSubnetBucket.getDocCount()); + + final InternalIpPrefix firstBucketSubAggregation = firstSubnetBucket.getAggregations().get(subIpPrefixAggregation); + final InternalIpPrefix secondBucketSubAggregation = secondSubnetBucket.getAggregations().get(subIpPrefixAggregation); + final Set firstSubnetNestedSubnets = firstBucketSubAggregation.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set secondSubnetNestedSubnets = secondBucketSubAggregation.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + final List expectedFirstSubnetNestedSubnets = List.of("192.168.1.0", "192.168.2.0", "192.168.10.0"); + final List expectedSecondSubnetNestedSUbnets = List.of("192.169.1.0", "192.169.2.0"); + assertTrue(firstSubnetNestedSubnets.containsAll(expectedFirstSubnetNestedSubnets)); + assertTrue(expectedFirstSubnetNestedSubnets.containsAll(firstSubnetNestedSubnets)); + assertTrue(secondSubnetNestedSubnets.containsAll(expectedSecondSubnetNestedSUbnets)); + assertTrue(expectedSecondSubnetNestedSUbnets.containsAll(secondSubnetNestedSubnets)); + + }, fieldTypes); + } + + public void testIpv4AppendPrefixLength() throws IOException { + // GIVEN + final int prefixLength = 16; + final String field = "ipv4"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field) + .isIpv6(false) + .keyed(randomBoolean()) + .appendPrefixLength(true) + .minDocCount(1) + .prefixLength(prefixLength); + final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field); + final List ipAddresses = List.of( + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, defaultTime()) + ); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress())))) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .map(appendPrefixLength(prefixLength)) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .map(appendPrefixLength(prefixLength)) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertFalse(bucket.isIpv6()); + assertTrue(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldType); + } + + public void testIpv6AppendPrefixLength() throws IOException { + // GIVEN + final int prefixLength = 64; + final String field = "ipv6"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field) + .isIpv6(true) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength); + final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field); + final List ipAddresses = List.of( + new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a::", prefixLength, defaultTime()), + new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a::", prefixLength, defaultTime()), + new TestIpDataHolder("2001:db8:a4ff:112a::7002:7ff2", "2001:db8:a4ff:112a::", prefixLength, defaultTime()), + new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f::", prefixLength, defaultTime()), + new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f::", prefixLength, defaultTime()) + ); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress())))) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .map(appendPrefixLength(prefixLength)) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .map(appendPrefixLength(prefixLength)) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertTrue(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldType); + } + + public void testMinDocCount() throws IOException { + // GIVEN + final int prefixLength = 16; + final String field = "ipv4"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field) + .isIpv6(false) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(2) + .prefixLength(prefixLength); + final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field); + final List ipAddresses = List.of( + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, defaultTime()) + ); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress())))) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = Set.of("192.168.0.0"); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertFalse(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldType); + } + + public void testAggregationWithQueryFilter() throws IOException { + // GIVEN + final int prefixLength = 16; + final String field = "ipv4"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field) + .isIpv6(false) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength); + final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field); + final List ipAddresses = List.of( + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, defaultTime()), + new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, defaultTime()) + ); + final Query query = InetAddressPoint.newRangeQuery( + field, + InetAddresses.forString("192.168.0.0"), + InetAddressPoint.nextDown(InetAddresses.forString("192.169.0.0")) + ); + + // WHEN + testCase(aggregationBuilder, query, iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + List.of( + new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))), + new InetAddressPoint(field, ipDataHolder.getIpAddress()) + ) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .filter(subnet -> subnet.startsWith("192.168.")) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertFalse(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + }, fieldType); + } + + public void testMetricAggregation() throws IOException { + // GIVEN + final int prefixLength = 64; + final String ipField = "ipv6"; + final String timeField = "time"; + final String topAggregationName = "ip_prefix"; + final String subAggregationName = "total_time"; + final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder(topAggregationName).field(ipField) + .isIpv6(true) + .keyed(randomBoolean()) + .appendPrefixLength(false) + .minDocCount(1) + .prefixLength(prefixLength) + .subAggregation(new SumAggregationBuilder(subAggregationName).field(timeField)); + final MappedFieldType[] fieldTypes = { + new IpFieldMapper.IpFieldType(ipField), + new NumberFieldMapper.NumberFieldType(timeField, NumberFieldMapper.NumberType.LONG) }; + final List ipAddresses = List.of( + new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a::", prefixLength, 100), + new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a::", prefixLength, 110), + new TestIpDataHolder("2001:db8:a4ff:112a::7002:7ff2", "2001:db8:a4ff:112a::", prefixLength, 200), + new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f::", prefixLength, 170), + new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f::", prefixLength, 130) + ); + + // WHEN + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for (TestIpDataHolder ipDataHolder : ipAddresses) { + iw.addDocument( + List.of( + new SortedDocValuesField(ipField, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))), + new NumericDocValuesField(timeField, ipDataHolder.getTime()) + ) + ); + } + }, agg -> { + final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg; + final Set expectedSubnets = ipAddresses.stream() + .map(TestIpDataHolder::getSubnetAsString) + .collect(Collectors.toUnmodifiableSet()); + final Set ipAddressesAsString = ipPrefix.getBuckets() + .stream() + .map(InternalIpPrefix.Bucket::getKeyAsString) + .collect(Collectors.toUnmodifiableSet()); + + // THEN + ipPrefix.getBuckets().forEach(bucket -> { + assertTrue(bucket.isIpv6()); + assertFalse(bucket.appendPrefixLength()); + assertEquals(prefixLength, bucket.getPrefixLength()); + }); + assertFalse(ipPrefix.getBuckets().isEmpty()); + assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size()); + assertTrue(ipAddressesAsString.containsAll(expectedSubnets)); + assertTrue(expectedSubnets.containsAll(ipAddressesAsString)); + + assertEquals(210, ((InternalSum) ipPrefix.getBuckets().get(0).getAggregations().get(subAggregationName)).getValue(), 0); + assertEquals(200, ((InternalSum) ipPrefix.getBuckets().get(1).getAggregations().get(subAggregationName)).getValue(), 0); + assertEquals(300, ((InternalSum) ipPrefix.getBuckets().get(2).getAggregations().get(subAggregationName)).getValue(), 0); + }, fieldTypes); + } + + private Function appendPrefixLength(int prefixLength) { + return subnetAddress -> subnetAddress + "/" + prefixLength; + } + + private long defaultTime() { + return randomLongBetween(0, Long.MAX_VALUE); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java index 1b20af39fedb7..4275ae3082c93 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java @@ -65,6 +65,7 @@ public final class TransformAggregations { "geotile_grid", "global", "histogram", + "ip_prefix", "ip_range", "matrix_stats", "nested",