From 9b4f6821d0f28f24bea40058a9859d4541172d33 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic <119343520+eric-forte-elastic@users.noreply.github.com> Date: Tue, 31 Oct 2023 09:38:42 -0400 Subject: [PATCH] [FR] Add IPv6 Support to CidrMatch using ipaddress lib (#80) * Add stub to make PR * Add base ipv6 functionality * Add short hand ipv6 address support * updated linting * Fixed missing constant definition * Updated code to fix unit test issues * Fix typo * Fix line too long * Updated ipv6 checks * Cleanup ipv6 masking * Fix Typo * Fix regex and add unit tests * linting * Fixed typo * Support for python2 * Removed typo * Added unit tests to python engine * Added randomized testing * Cleanup * updated version * Minor update to docstring * Update eql/functions.py Co-authored-by: Justin Ibarra <16747370+brokensound77@users.noreply.github.com> * Update eql/functions.py Co-authored-by: Justin Ibarra <16747370+brokensound77@users.noreply.github.com> * updated variable names for consistency * Fixed typo missing = * Typo replaced < with > * reverting size logic * ipaddress library implementation * linting * remove unused imports * Python2 support * Py3 linting fix * remove whitespace * Updates py2 support * typo * Moved python2 checks to utils * linting * add default parameter * Moved iscidr to utils * fixed docstrings * Linting --------- Co-authored-by: Justin Ibarra <16747370+brokensound77@users.noreply.github.com> --- CHANGELOG.md | 9 + eql/__init__.py | 2 +- eql/functions.py | 158 +++-------------- eql/utils.py | 29 ++++ setup.py | 1 + tests/test_functions.py | 141 +++++++-------- tests/test_python_engine.py | 337 +++++++++++++++++++++++++++++++++++- 7 files changed, 463 insertions(+), 214 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92cf1e5..c434214 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,15 @@ # Event Query Language - Changelog The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +# Version 0.9.19 + + _Released 2023-10-10_ + +### Added + +* Added IPv6 support for CidrMatch +* Removed the regex support for testing CidrMatch in favor of the native ipaddress module testing + # Version 0.9.18 _Released 2023-09-01_ diff --git a/eql/__init__.py b/eql/__init__.py index 24d682d..e5b867e 100644 --- a/eql/__init__.py +++ b/eql/__init__.py @@ -66,7 +66,7 @@ Walker, ) -__version__ = '0.9.18' +__version__ = '0.9.19' __all__ = ( "__version__", "AnalyticOutput", diff --git a/eql/functions.py b/eql/functions.py index 0679ee9..7191064 100644 --- a/eql/functions.py +++ b/eql/functions.py @@ -1,17 +1,22 @@ """EQL functions.""" import re -import socket -import struct -from .signatures import SignatureMixin from .errors import EqlError +from .signatures import SignatureMixin from .types import TypeHint -from .utils import is_string, to_unicode, is_number, fold_case, is_insensitive - +from .utils import ( + fold_case, + get_ipaddress, + get_subnet, + is_cidr_pattern, + is_insensitive, + is_number, + is_string, + to_unicode, +) _registry = {} REGEX_FLAGS = re.UNICODE | re.DOTALL -MAX_IP = 0xffffffff def regex_flags(): @@ -193,126 +198,17 @@ class CidrMatch(FunctionSignature): additional_types = TypeHint.String.require_literal() return_value = TypeHint.Boolean - octet_re = r'(?:25[0-5]|2[0-4][0-9]|[01]?[0-9]?[0-9])' - ip_re = r'\.'.join([octet_re, octet_re, octet_re, octet_re]) - ip_compiled = re.compile(r'^{}$'.format(ip_re)) - cidr_compiled = re.compile(r'^{}/(?:3[0-2]|2[0-9]|1[0-9]|[0-9])$'.format(ip_re)) - - # store it in native representation, then recover it in network order - masks = [struct.unpack(">L", struct.pack(">L", MAX_IP & ~(MAX_IP >> b)))[0] for b in range(33)] - mask_addresses = [socket.inet_ntoa(struct.pack(">L", m)) for m in masks] - - @classmethod - def to_mask(cls, cidr_string): - """Split an IP address plus cidr block to the mask.""" - ip_string, size = cidr_string.split("/") - size = int(size) - ip_bytes = socket.inet_aton(ip_string) - subnet_int, = struct.unpack(">L", ip_bytes) - - mask = cls.masks[size] - - return subnet_int & mask, mask - - @classmethod - def make_octet_re(cls, start, end): - """Convert an octet-range into a regular expression.""" - combos = [] - - if start == end: - return "{:d}".format(start) - - if start == 0 and end == 255: - return cls.octet_re - - # 0xx, 1xx, 2xx - for hundreds in (0, 100, 200): - h = int(hundreds / 100) - h_digit = "0?" if h == 0 else "{:d}".format(h) - - # if the whole range is included, then add it - if start <= hundreds < hundreds + 99 <= end: - # allow for leading zeros - if h == 0: - combos.append("{:s}[0-9]?[0-9]".format(h_digit)) - else: - combos.append("{:s}[0-9][0-9]".format(h_digit)) - continue - - # determine which of the tens ranges are entirely included - # so that we can do "h[a-b][0-9]" - hundreds_matches = [] - full_tens = [] - - # now loop over h00, h10, h20 - for tens in range(hundreds, hundreds + 100, 10): - t = int(tens / 10) % 10 - t_digit = "0?" if (h == 0 and t == 0) else "{:d}".format(t) - - if start <= tens < tens + 9 <= end: - # fully included, add to the list - full_tens.append(t) - continue - - # now add the final [a-b] - matching_ones = [one % 10 for one in range(tens, tens + 10) if start <= one <= end] - - if matching_ones: - ones_match = t_digit - if len(matching_ones) == 1: - ones_match += "{:d}".format(matching_ones[0]) - else: - ones_match += "[{:d}-{:d}]".format(min(matching_ones), max(matching_ones)) - hundreds_matches.append(ones_match) - - if full_tens: - if len(full_tens) == 1: - tens_match = "{:d}".format(full_tens[0]) - else: - tens_match = "[{:d}-{:d}]".format(min(full_tens), max(full_tens)) - - # allow for 001 - 009 - if h == 0 and 0 in full_tens: - tens_match += "?" - - tens_match += "[0-9]" - hundreds_matches.append(tens_match) - - if len(hundreds_matches) == 1: - combos.append("{:s}{:s}".format(h_digit, hundreds_matches[0])) - elif len(hundreds_matches) > 1: - combos.append("{:s}(?:{:s})".format(h_digit, "|".join(hundreds_matches))) - - return "(?:{})".format("|".join(combos)) - - @classmethod - def make_cidr_regex(cls, cidr): - """Convert a list of wildcards strings for matching a cidr.""" - min_octets, max_octets = cls.to_range(cidr) - return r"\.".join(cls.make_octet_re(*pair) for pair in zip(min_octets, max_octets)) - - @classmethod - def to_range(cls, cidr): - """Get the IP range for a list of IP addresses.""" - ip_integer, mask = cls.to_mask(cidr) - max_ip_integer = ip_integer | (MAX_IP ^ mask) - - min_octets = struct.unpack("BBBB", struct.pack(">L", ip_integer)) - max_octets = struct.unpack("BBBB", struct.pack(">L", max_ip_integer)) - - return min_octets, max_octets - @classmethod def get_callback(cls, _, *cidr_matches): """Get the callback function with all the masks converted.""" - masks = [cls.to_mask(cidr.value) for cidr in cidr_matches] + cidr_networks = [get_subnet(cidr.value) for cidr in cidr_matches] def callback(source, *_): - if is_string(source) and cls.ip_compiled.match(source): - ip_integer, _ = cls.to_mask(source + "/32") + if is_string(source): + ip_address = get_ipaddress(source) - for subnet, mask in masks: - if ip_integer & mask == subnet: + for subnet in cidr_networks: + if ip_address in subnet: return True return False @@ -322,13 +218,14 @@ def callback(source, *_): @classmethod def run(cls, ip_address, *cidr_matches): """Compare an IP address against a list of cidr blocks.""" - if is_string(ip_address) and cls.ip_compiled.match(ip_address): - ip_integer, _ = cls.to_mask(ip_address + "/32") + if is_string(ip_address): + ip_address = get_ipaddress(ip_address) for cidr in cidr_matches: - if is_string(cidr) and cls.cidr_compiled.match(cidr): - subnet, mask = cls.to_mask(cidr) - if ip_integer & mask == subnet: + if is_string(cidr): + subnet = get_subnet(cidr) + + if ip_address in subnet: return True return False @@ -349,14 +246,13 @@ def validate(cls, arguments): # overwrite the original node text = argument.node.value.strip() - if not cls.cidr_compiled.match(argument.node.value): + if not is_cidr_pattern(text): return pos # Since it does match, we should also rewrite the string to align to the base of the subnet - ip_address, size = text.split("/") - subnet_integer, _ = cls.to_mask(text) - subnet_bytes = struct.pack(">L", subnet_integer) - subnet_base = socket.inet_ntoa(subnet_bytes) + _, size = text.split("/") + subnet = get_subnet(text) + subnet_base = subnet.network_address # overwrite the original argument so it becomes the subnet argument.node = String("{}/{}".format(subnet_base, size)) @@ -704,4 +600,4 @@ def run(cls, source, *wildcards): # circular dependency -from .ast import MathOperation, FunctionCall, Comparison, String # noqa: E402 +from .ast import Comparison, FunctionCall, MathOperation, String # noqa: E402 diff --git a/eql/utils.py b/eql/utils.py index 3066520..ad2a766 100644 --- a/eql/utils.py +++ b/eql/utils.py @@ -2,6 +2,7 @@ import codecs import gzip import io +import ipaddress import json import os import sys @@ -11,6 +12,9 @@ CASE_INSENSITIVE = True _loaded_plugins = False +# Var to check if Python2 or Python3 +py_version = sys.version_info.major + # Python2 and Python3 compatible type checking unicode_t = type(u"") long_t = type(int(1e100)) @@ -65,6 +69,17 @@ def is_insensitive(): return CASE_INSENSITIVE +def is_cidr_pattern(cidr): + """Check if a string is a valid CIDR notation.""" + if "/" not in cidr: + return False + try: + get_subnet(cidr) + return True + except ValueError: + return False + + def fold_case(s): """Helper function for normalizing case for strings.""" if is_insensitive() and is_string(s): @@ -79,6 +94,20 @@ def str_presenter(dumper, data): return dumper.represent_scalar('tag:yaml.org,2002:str', data) +def get_ipaddress(ipaddr_string): + """Get an ip_address object from a string containing an ip address.""" + if py_version == 2: + ipaddr_string = ipaddr_string.decode("utf-8") # noqa: F821 + return ipaddress.ip_address(ipaddr_string) + + +def get_subnet(cidr_string, strict=False): + """Get an ip_network object from a string containing an cidr range.""" + if py_version == 2: + cidr_string = cidr_string.decode("utf-8") # noqa: F821 + return ipaddress.ip_network(cidr_string, strict=strict) + + def get_type_converter(items): """Get a python callback function that can convert None to observed typed values.""" items = iter(items) diff --git a/setup.py b/setup.py index 82dcb59..0783e07 100644 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ install_requires = [ "lark-parser~=0.12.0", "enum34; python_version<'3.4'", + "ipaddress; python_version<'3'", ] test_requires = [ diff --git a/tests/test_functions.py b/tests/test_functions.py index f7dbfba..b276159 100644 --- a/tests/test_functions.py +++ b/tests/test_functions.py @@ -1,6 +1,4 @@ """Test Python Engine for EQL.""" -import random -import re import unittest from eql.ast import String, Field @@ -99,82 +97,63 @@ def test_cidr_match_rewrite(self): self.assertIsNone(position) - def test_cidr_ranges(self): - """Check that CIDR ranges are correctly identified.""" - cidr_range = CidrMatch.to_range("10.0.0.0/8") - self.assertListEqual(list(cidr_range), [ - (10, 0, 0, 0), (10, 255, 255, 255) - ]) - cidr_range = CidrMatch.to_range("123.45.67.189/32") - self.assertListEqual(list(cidr_range), [ - (123, 45, 67, 189), (123, 45, 67, 189) - ]) - - cidr_range = CidrMatch.to_range("0.0.0.0/0") - self.assertListEqual(list(cidr_range), [ - (0, 0, 0, 0), (255, 255, 255, 255) - ]) - - cidr_range = CidrMatch.to_range("192.168.15.2/22") - self.assertListEqual(list(cidr_range), [ - (192, 168, 12, 0), (192, 168, 15, 255) - ]) - - def test_octet_regex(self): - """Test that octet regex are correctly matching the range.""" - for _ in range(100): - # too many possible combos, so we can just randomly generate them - start = random.randrange(256) - end = random.randrange(256) - - # order them correctly - start, end = min(start, end), max(start, end) - - # now build the regex and check that each one matches - regex = re.compile("^(?:" + CidrMatch.make_octet_re(start, end) + ")$") - self.assertEqual(regex.groups, 0) - - for num in range(500): - should_match = start <= num <= end - did_match = regex.match(str(num)) is not None - self.assertEqual(should_match, did_match) - - def test_cidr_regex(self): - """Test that octet regex are correctly matching the range.""" - for _ in range(200): - # make an ip address - ip_addr = ( - random.randrange(256), - random.randrange(256), - random.randrange(256), - random.randrange(256), - ) - size = random.randrange(33) - total_ips = 2 ** (32 - size) - - args = list(ip_addr) - args.append(size) - cidr_mask = "{:d}.{:d}.{:d}.{:d}/{:d}".format(*args) - - pattern = CidrMatch.make_cidr_regex(cidr_mask) - - regex = re.compile("^(?:{})$".format(pattern)) - self.assertEqual(regex.groups, 0) - - min_ip, max_ip = CidrMatch.to_range(cidr_mask) - - # randomly pick IPs that *are* in the range - for _ in range(min(200, total_ips)): - rand_addr = [random.randrange(mn, mx + 1) for mn, mx in zip(min_ip, max_ip)] - rand_ip = "{:d}.{:d}.{:d}.{:d}".format(*rand_addr) - - self.assertIsNotNone(regex.match(rand_ip)) - - # todo: pick IPs that are definitely not in the range - for _ in range(200): - rand_addr = [random.randrange(0, 255) for _ in range(4)] - in_subnet = all(mn <= o <= mx for o, mn, mx in zip(rand_addr, min_ip, max_ip)) - rand_ip = "{:d}.{:d}.{:d}.{:d}".format(*rand_addr) - - rv = regex.match(rand_ip) is not None - self.assertEqual(rv, in_subnet) + def test_ipv6_cidr_match_validation(self): + """Check that invalid CIDR addresses are detected.""" + arguments = [ + Field("ip"), + String("2001:db8::/32"), + String("b"), + String("fe80::/64"), + ] + info = [types.NodeInfo(arg, types.TypeHint.String) for arg in arguments] + + position = CidrMatch.validate(info) + self.assertEqual(position, 2) + + # test that missing / causes failure + info[2].node.value = "2001:db8::1" + position = CidrMatch.validate(info) + self.assertEqual(position, 2) + + # test for invalid ip + info[2].node.value = "2001:db8::g/32" + position = CidrMatch.validate(info) + self.assertEqual(position, 2) + + info[2].node.value = "2001:db8::1/32" + position = CidrMatch.validate(info) + self.assertIsNone(position) + + def test_ipv6_cidr_match_rewrite(self): + """Test that cidrMatch() rewrites the arguments.""" + arguments = [ + Field("ip"), + String("2001:db8::/32"), # IPv6 CIDR address + ] + info = [types.NodeInfo(arg, types.TypeHint.String) for arg in arguments] + + position = CidrMatch.validate(info) + self.assertEqual(position, None) + + new_arguments = [arg.node for arg in info] + + # check that the original were only modified to round the values + self.assertIsNot(arguments[0], new_arguments[1]) + self.assertIsNot(arguments[1], new_arguments[1]) + + # and that the values were set to the base of the subnet + self.assertEqual(new_arguments[1].value, "2001:db8::/32") + + # test that /0 is working + info[1].node = String("::/0") + position = CidrMatch.validate(info) + new_arguments = [arg.node for arg in info] + self.assertIsNone(position) + self.assertIsNot(arguments[1], new_arguments[1]) + + # and /128 + self.assertEqual(new_arguments[1].value, "::/0") + info[1].node = String("2001:db8::1/128") + position = CidrMatch.validate(info) + + self.assertIsNone(position) diff --git a/tests/test_python_engine.py b/tests/test_python_engine.py index 424f76a..b759e3a 100644 --- a/tests/test_python_engine.py +++ b/tests/test_python_engine.py @@ -1,12 +1,15 @@ """Test Python Engine for EQL.""" +import ipaddress import random +import sys import uuid from collections import defaultdict from eql import * # noqa: F403 from eql.ast import * # noqa: F403 from eql.engine import Scope -from eql.parser import ignore_missing_functions, allow_sample, elasticsearch_syntax +from eql.parser import (allow_sample, elasticsearch_syntax, + ignore_missing_functions) from eql.schema import EVENT_TYPE_GENERIC from eql.tests.base import TestEngine @@ -389,6 +392,338 @@ def test_custom_functions(self): event_ids = [event.data['serial_event_id'] for event in output] self.validate_results(event_ids, [43, 45, 52], "Custom function 'reverse'") + def test_cidrmatch(self): + """Test the cidrMatch custom function.""" + def to_range(cidr): + """Convert a CIDR notation to a tuple of the minimum and maximum IP addresses in the range.""" + # Python 2 support + if sys.version_info.major == 2: + cidr = unicode(cidr) + ip_network = ipaddress.ip_network(cidr, strict=False) + + min_ip_address = ip_network.network_address + max_ip_address = ip_network.broadcast_address + + return min_ip_address, max_ip_address + + def generate_random_ip_from_range(min_ip_address, max_ip_address): + """Generate a random IP address from a given range.""" + ip_constructor = type(min_ip_address) + + # Convert the ipaddress.IPv4Address or ipaddress.IPv6 to integers + min_ip_address_int = int(min_ip_address) + max_ip_address_int = int(max_ip_address) + + # Generate a random IP address within the range. + random_ip_address_int = random.randint(min_ip_address_int, max_ip_address_int) + + # Convert the random integer back to an ipaddress.IPv4Address or ipaddress.IPv6Address object. + random_ip_address = ip_constructor(random_ip_address_int) + + return random_ip_address + + def generate_random_ip_address_not_in_range(min_ip_address, max_ip_address, max_tries=2): + """Generate a random IP address NOT in a given range.""" + ip_constructor = type(min_ip_address) + max_int = 2**32 - 1 if type(min_ip_address) == ipaddress.IPv4Address else 2**128 - 1 + + min_ip_address_int = int(min_ip_address) + max_ip_address_int = int(max_ip_address) + + # Generate a random IP address. + if random.random() < 0.5: + random_ip_address = ip_constructor(random.randint(0, min_ip_address_int)) + else: + random_ip_address = ip_constructor(random.randint(max_ip_address_int, max_int)) + + # Check if the random IP address is within the provided range. + while random_ip_address >= min_ip_address and random_ip_address <= max_ip_address: + # Generate another random IP address. + random_ip_address = ip_constructor(random.randint(0, min_ip_address_int)) + if random_ip_address >= min_ip_address and random_ip_address <= max_ip_address: + random_ip_address = ip_constructor(random.randint(max_ip_address_int, max_int)) + max_tries -= 1 + if max_tries == 0: + return random_ip_address + + return random_ip_address + + config = {"flatten": True} + events = [ + Event.from_data(d) + for d in [ + { + "event_type": "process", + "process_name": "malicious.exe", + "unique_pid": "host1-1", + "timestamp": 116444736000000000, + "source": {"ip": "2001:0db8:0000:0000:0000:0000:0000:0001"}, + }, + { + "event_type": "process", + "process_name": "missing.exe", + "unique_pid": "host1-1", + "timestamp": 116444738000000000, + "source": {"ip": "2001:db8::1"}, + }, + { + "event_type": "file", + "file_name": "suspicious.txt", + "unique_pid": "host1-1", + "timestamp": 116444740000000000, + "source": {"ip": "fe80::1"}, + }, + ] + ] + + # Should return no results none of the CIDR ranges contain those IPs + query = """ + sequence by unique_pid with maxspan=7m + [ process where cidrMatch(source.ip, "10.0.0.1/8") ] + [ process where cidrMatch(source.ip, "10.0.0.1/16") ] + [ file where cidrMatch(source.ip, "10.0.0.1/32", "2001:0db8::/32") ] + """ + with elasticsearch_syntax: + parsed_query = parse_query(query) + + output = self.get_output(queries=[parsed_query], config=config, events=events) + + self.assertEqual(len(output), 0, "Missing or extra results") + + # Should return results since the cidr ranges match the addresses in the events + query = """ + sequence by unique_pid with maxspan=7m + [ process where cidrMatch(source.ip, "2001:0db8::/32") ] + [ process where cidrMatch(source.ip, "2001:0db8:0000:0000:0000:0000:0000:0000/32") ] + [ file where cidrMatch(source.ip, "fe80::/10", "2001:0db8::/32") ] + """ + + with elasticsearch_syntax: + parsed_query = parse_query(query) + + output = self.get_output(queries=[parsed_query], config=config, events=events) + + self.assertEqual(len(output), 3, "Missing or extra results") + + # Randomized IPv4 testing + for _ in range(200): + # make an ip address + ip_addr = ( + random.randrange(256), + random.randrange(256), + random.randrange(256), + random.randrange(256), + ) + size = random.randrange(33) + total_ips = 2 ** (32 - size) + + args = list(ip_addr) + args.append(size) + cidr_mask = "{:d}.{:d}.{:d}.{:d}/{:d}".format(*args) + + min_ip, max_ip = to_range(cidr_mask) + + # randomly pick IPs that *are* in the range + for _ in range(min(200, total_ips)): + rand_addr = generate_random_ip_from_range(min_ip, max_ip) + rand_ip = str(rand_addr) + events = [ + Event.from_data(d) + for d in [ + { + "event_type": "process", + "process_name": "malicious.exe", + "unique_pid": "host1-1", + "timestamp": 116444736000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + { + "event_type": "process", + "process_name": "missing.exe", + "unique_pid": "host1-1", + "timestamp": 116444738000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + { + "event_type": "file", + "file_name": "suspicious.txt", + "unique_pid": "host1-1", + "timestamp": 116444740000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + ] + ] + + query = """ + sequence by unique_pid with maxspan=7m + [ process where cidrMatch(source.ip, "{}") ] + [ process where cidrMatch(source.ip, "{}") ] + """.format( + cidr_mask, cidr_mask + ) + + with elasticsearch_syntax: + parsed_query = parse_query(query) + + output = self.get_output(queries=[parsed_query], config=config, events=events) + + self.assertEqual(len(output), 2, "Missing or extra results") + + # pick IPs that are definitely not in the range + for _ in range(200): + rand_addr = generate_random_ip_address_not_in_range(min_ip, max_ip) + in_subnet = rand_addr >= min_ip and rand_addr <= max_ip + rand_ip = str(rand_addr) + events = [ + Event.from_data(d) + for d in [ + { + "event_type": "process", + "process_name": "malicious.exe", + "unique_pid": "host1-1", + "timestamp": 116444736000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + { + "event_type": "process", + "process_name": "missing.exe", + "unique_pid": "host1-1", + "timestamp": 116444738000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + { + "event_type": "file", + "file_name": "suspicious.txt", + "unique_pid": "host1-1", + "timestamp": 116444740000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + ] + ] + + query = """ + sequence by unique_pid with maxspan=7m + [ process where cidrMatch(source.ip, "{}") ] + [ process where cidrMatch(source.ip, "{}") ] + """.format( + cidr_mask, cidr_mask + ) + + with elasticsearch_syntax: + parsed_query = parse_query(query) + + output = self.get_output(queries=[parsed_query], config=config, events=events) + + # Check against known truth if is in_subnet + rv = len(output) != 0 + self.assertEqual(rv, in_subnet, "Missing or extra results") + + # Randomized IPv6 testing + for _ in range(200): + # make an ip address + ip_addr = tuple(random.randrange(65536) for _ in range(8)) + size = random.randrange(129) + total_ips = 2 ** (128 - size) + + cidr_mask = ":".join("{:x}".format(x) for x in ip_addr) + "/{:d}".format(size) + + min_ip, max_ip = to_range(cidr_mask) + + # randomly pick IPs that *are* in the range + for _ in range(min(200, total_ips)): + rand_addr = generate_random_ip_from_range(min_ip, max_ip) + rand_ip = str(rand_addr) + events = [ + Event.from_data(d) + for d in [ + { + "event_type": "process", + "process_name": "malicious.exe", + "unique_pid": "host1-1", + "timestamp": 116444736000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + { + "event_type": "process", + "process_name": "missing.exe", + "unique_pid": "host1-1", + "timestamp": 116444738000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + { + "event_type": "file", + "file_name": "suspicious.txt", + "unique_pid": "host1-1", + "timestamp": 116444740000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + ] + ] + + query = """ + sequence by unique_pid with maxspan=7m + [ process where cidrMatch(source.ip, "{}") ] + [ process where cidrMatch(source.ip, "{}") ] + """.format( + cidr_mask, cidr_mask + ) + + with elasticsearch_syntax: + parsed_query = parse_query(query) + + output = self.get_output(queries=[parsed_query], config=config, events=events) + + self.assertEqual(len(output), 2, "Missing or extra results") + + # pick IPs that are definitely not in the range + for _ in range(200): + rand_addr = generate_random_ip_address_not_in_range(min_ip, max_ip) + in_subnet = rand_addr >= min_ip and rand_addr <= max_ip + rand_ip = str(rand_addr) + events = [ + Event.from_data(d) + for d in [ + { + "event_type": "process", + "process_name": "malicious.exe", + "unique_pid": "host1-1", + "timestamp": 116444736000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + { + "event_type": "process", + "process_name": "missing.exe", + "unique_pid": "host1-1", + "timestamp": 116444738000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + { + "event_type": "file", + "file_name": "suspicious.txt", + "unique_pid": "host1-1", + "timestamp": 116444740000000000, + "source": {"ip": "{}".format(rand_ip)}, + }, + ] + ] + + query = """ + sequence by unique_pid with maxspan=7m + [ process where cidrMatch(source.ip, "{}") ] + [ process where cidrMatch(source.ip, "{}") ] + """.format( + cidr_mask, cidr_mask + ) + + with elasticsearch_syntax: + parsed_query = parse_query(query) + + output = self.get_output(queries=[parsed_query], config=config, events=events) + + # Check against known truth if is in_subnet + rv = len(output) != 0 + self.assertEqual(rv, in_subnet, "Missing or extra results") + def test_analytic_output(self): """Confirm that analytics return the same results as queries.""" analytics = [q['analytic'] for q in self.get_example_queries()]