Skip to content

Commit

Permalink
Add short hand ipv6 address support
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-forte-elastic committed Oct 23, 2023
1 parent c869ab6 commit efa0174
Showing 1 changed file with 63 additions and 12 deletions.
75 changes: 63 additions & 12 deletions eql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,13 @@ class CidrMatch(FunctionSignature):
h16_re = r'[a-fA-F0-9]{1,4}'
ipv6_re = r'^(?:[a-fA-F0-9]{1,4}:){7}[a-fA-F0-9]{1,4}$'
ipv6_compiled = re.compile(ipv6_re)
ipv6_shorthand_re = r'^([a-fA-F0-9]{1,4}:){0,6}(:[a-fA-F0-9]{1,4}){0,6}$'
ipv6_shorthand_compiled = re.compile(ipv6_shorthand_re)
cidrv6_re = r'(?:[a-fA-F0-9]{1,4}:){0,7}[a-fA-F0-9]{1,4}/(?:12[0-8]|1[01][0-9]|[0-9]{1,2})'
cidrv6_compiled = re.compile('^{}$'.format(cidrv6_re))
cidrv6_shorthand_re = r'^([a-fA-F0-9]{1,4}:){0,7}(:[a-fA-F0-9]{1,4}){0,7}/\d{1,3}$'
cidrv6_shorthand_compiled = re.compile(cidrv6_shorthand_re)


# store it in native representation, then recover it in network order
masks4 = [struct.unpack(">L", struct.pack(">L", MAX_IP & ~(MAX_IP >> b)))[0] for b in range(33)]
Expand All @@ -211,7 +216,47 @@ class CidrMatch(FunctionSignature):
masks6 = [int('1' * i + '0' * (128 - i), 2) for i in range(129)]
mask_addresses6 = [socket.inet_ntop(socket.AF_INET6, struct.pack(">QQ", m >> 64, m & 0xFFFFFFFFFFFFFFFF)) for m in masks6]

# TODO need to add function to expand IPv6 shorthand addresses to full addresses before matching
@classmethod
def expand_ipv6(cls, cidr):
"""Expand a shorthand IPv6 address or CIDR range to full notation."""
if "/" in cidr:
# Split the CIDR range into the address and prefix length
address, prefix_length = cidr.split("/")
# Expand the address
address = cls.expand_ipv6_address(address)
# Construct the full notation CIDR range
full_cidr = "{}/{}".format(address, prefix_length)
else:
# Expand the address
full_cidr = cls.expand_ipv6_address(cidr)

return full_cidr

@classmethod
def expand_ipv6_address(cls, ipv6_address):
"""Expand a shorthand IPv6 address to full notation."""
if "::" in ipv6_address:
# Split the address into the left and right parts
left, right = ipv6_address.split("::")
# Count the number of groups in each part
left_groups = left.split(":")
right_groups = right.split(":")
num_left_groups = len(left_groups)
num_right_groups = len(right_groups)
# Calculate the number of groups in the middle part
num_middle_groups = 8 - num_left_groups - num_right_groups
# Construct the full notation address
middle_groups = ["0000"] * num_middle_groups
full_groups = left_groups + middle_groups + right_groups
full_address = ":".join(full_groups)
else:
full_address = ipv6_address

# Add leading zeros to each group
full_address = re.sub(r"(^|:)([0-9a-fA-F]{1,3})(:|$)", r"\g<1>0\2\3", full_address)

return full_address


@classmethod
def to_mask(cls, cidr_string):
Expand All @@ -222,7 +267,8 @@ def to_mask(cls, cidr_string):
ip_bytes = socket.inet_aton(ip_string)
subnet_int, = struct.unpack(">L", ip_bytes)
mask = cls.masks4[size]
elif cls.ipv6_compiled.match(ip_string):
elif cls.ipv6_compiled.match(ip_string) or cls.ipv6_shorthand_compiled.match(ip_string):
ip_string = cls.expand_ipv6_address(ip_string)
ip_bytes = socket.inet_pton(socket.AF_INET6, ip_string)
# TODO remove x if possible
subnet_int, x = struct.unpack(">QQ", ip_bytes)
Expand Down Expand Up @@ -259,10 +305,11 @@ def make_octet_re(cls, start, end):
@classmethod
def make_cidr_regex(cls, cidr):
"""Convert a list of wildcards strings for matching a cidr."""
if cls.ipv4_compiled.match(cidr):
if cls.cidrv4_compiled.match(cidr):
min_octets, max_octets = cls.to_range(cidr)
return r"\.".join(cls.make_octet_re(*pair) for pair in zip(min_octets, max_octets))
elif cls.ipv6_compiled.match(cidr):
elif cls.cidrv6_compiled.match(cidr) or cls.cidrv6_shorthand_compiled.match(cidr):
cidr = cls.expand_ipv6(cidr)
h16s, prefix_len = cidr.split("/")
h16s = h16s.split(":")
prefix_len = int(prefix_len)
Expand All @@ -278,15 +325,16 @@ def make_cidr_regex(cls, cidr):
@classmethod
def to_range(cls, cidr):
"""Get the IP range for a list of IP addresses."""
if cls.ipv4_compiled.match(cidr):
if cls.cidrv4_compiled.match(cidr):
ip_string, prefix_len = cidr.split("/")
prefix_len = int(prefix_len)
ip_integer = struct.unpack(">L", socket.inet_aton(ip_string))[0]
mask = cls.masks4[prefix_len]
max_ip_integer = ip_integer | (MAX_IP ^ mask)
min_octets = struct.unpack("BBBB", struct.pack(">L", ip_integer))
max_octets = struct.unpack("BBBB", struct.pack(">L", max_ip_integer))
elif cls.ipv6_compiled.match(cidr):
elif cls.cidrv6_compiled.match(cidr) or cls.cidrv6_shorthand_compiled.match(cidr):
cidr = cls.expand_ipv6(cidr)
ip_string, prefix_len = cidr.split("/")
prefix_len = int(prefix_len)
ip_bytes = socket.inet_pton(socket.AF_INET6, ip_string)
Expand All @@ -310,17 +358,17 @@ def get_callback(cls, _, *cidr_matches):
for cidr in cidr_matches:
if cls.ipv4_compiled.match(cidr.value):
ipv4_masks.append(cls.to_mask(cidr.value))
elif cls.ipv6_compiled.match(cidr.value):
elif cls.ipv6_compiled.match(cidr.value) or cls.ipv6_shorthand_compiled.match(cidr.value):
ipv6_masks.append(cls.to_mask(cidr.value))

def callback(source, *_):
if is_string(source) and (cls.ipv4_compiled.match(source) or cls.ipv6_compiled.match(source)):
if is_string(source) and (cls.ipv4_compiled.match(source) or cls.ipv6_compiled.match(source) or cls.ipv6_shorthand_compiled.match(source)):
if cls.ipv4_compiled.match(source):
ip_integer, _ = cls.to_mask(source + "/32")
for subnet, mask in ipv4_masks:
if ip_integer & mask == subnet:
return True
elif cls.ipv6_compiled.match(source):
elif cls.ipv6_compiled.match(source) or cls.ipv6_shorthand_compiled.match(source):
ip_integer, _ = cls.to_mask(source + "/128")
for subnet, mask in ipv6_masks:
if ip_integer & mask == subnet:
Expand All @@ -330,21 +378,24 @@ def callback(source, *_):

return callback

# TODO maybe only need to run expansions here
@classmethod
def run(cls, ip_address, *cidr_matches):
"""Compare an IP address against a list of cidr blocks."""
if is_string(ip_address) and (cls.ipv4_compiled.match(ip_address) or cls.ipv6_compiled.match(ip_address)):
if is_string(ip_address) and (cls.ipv4_compiled.match(ip_address) or cls.ipv6_compiled.match(ip_address) or cls.ipv6_shorthand_compiled.match(ip_address)):
if cls.ipv4_compiled.match(ip_address):
ip_integer, _ = cls.to_mask(ip_address + "/32")
for cidr in cidr_matches:
if is_string(cidr) and cls.cidrv4_compiled.match(cidr):
subnet, mask = cls.to_mask(cidr)
if ip_integer & mask == subnet:
return True
elif cls.ipv6_compiled.match(ip_address):
elif cls.ipv6_compiled.match(ip_address) or cls.ipv6_shorthand_compiled.match(ip_address):
ip_address = cls.expand_ipv6_address(ip_address)
ip_integer, _ = cls.to_mask(ip_address + "/128")
for cidr in cidr_matches:
if is_string(cidr) and cls.cidrv6_compiled.match(cidr):
if is_string(cidr) and (cls.cidrv6_compiled.match(cidr) or cls.cidrv6_shorthand_compiled.match(cidr)):
cidr = cls.expand_ipv6(cidr)
subnet, mask = cls.to_mask(cidr)
if ip_integer & mask == subnet:
return True
Expand Down

0 comments on commit efa0174

Please sign in to comment.