-
Notifications
You must be signed in to change notification settings - Fork 0
/
l9format.py
195 lines (157 loc) · 5.75 KB
/
l9format.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import decimal
from serde import Model, fields
# Import from https://github.com/rossmacarthur/serde/commit/304884bca3c80e8b9a22a054b64dd94e3324b593#diff-dd2f65f16516311f0183377201a3d0b7894438787da732fc2bbc9c28cbde9fb8
# A helper function...
def round_decimal(
decimal_obj: decimal.Decimal, num_of_places: int = 6
) -> decimal.Decimal:
return decimal_obj.quantize(decimal.Decimal(10) ** -num_of_places).normalize()
class Decimal(fields.Instance):
"""
A `~decimal.Decimal` field.
This field serializes `~decimal.Decimal` objects as strings and
deserializes string representations of Decimals as `~decimal.Decimal`
objects.
The resolution of the decimal can be specified. When not specified, the number
is not rounded. When it is specified, the decimal is rounded to this number of
decimal places upon serialization and deserialization.
Note: When float type numbers are not rounded before serialization,
they will be serialized in exact form, which as they are floats,
is almost never the exact intended value,
e.g. 0.2 = 0.20000000000000000000023
Args:
resolution (Union[int, bool]): The number of decimal places to round to.
When None, rounding is disabled.
**kwargs: keyword arguments for the `Field` constructor.
"""
ty = decimal.Decimal
def __init__(self, resolution=None, **kwargs):
super(Decimal, self).__init__(self.__class__.ty, **kwargs)
self.resolution = resolution
def serialize(self, value: decimal.Decimal) -> str:
if self.resolution is not None:
value = round_decimal(value, num_of_places=self.resolution)
return "{0:f}".format(value)
def deserialize(self, value) -> decimal.Decimal:
try:
if self.resolution is not None:
return round_decimal(
decimal.Decimal(value), num_of_places=self.resolution
)
return decimal.Decimal(value)
except decimal.DecimalException:
raise ValidationError("invalid decimal", value=value)
class L9HttpEvent(Model):
root: fields.Str()
url: fields.Str()
status: fields.Int()
# FIXME: must be int64
length: fields.Int()
header: fields.Optional(fields.Dict(key=fields.Str(), value=fields.Str()))
title: fields.Str()
favicon_hash: fields.Str()
class ServiceCredentials(Model):
noauth: fields.Bool()
username: fields.Str()
password: fields.Str()
key: fields.Str()
# Bytes
raw: fields.Optional(fields.Str())
class SoftwareModule(Model):
name: fields.Str()
version: fields.Str()
fingerprint: fields.Str()
class Software(Model):
name: fields.Str()
version: fields.Str()
os: fields.Str()
modules: fields.Optional(fields.List(fields.Nested(SoftwareModule)))
fingerprint: fields.Str()
class Certificate(Model):
cn: fields.Str()
domain: fields.Optional(fields.List(fields.Str()))
fingerprint: fields.Str()
key_algo: fields.Str()
key_size: fields.Int()
issuer_name: fields.Str()
not_before: fields.DateTime()
not_after: fields.DateTime()
valid: fields.Bool()
class GeoPoint(Model):
lat: Decimal()
lon: Decimal()
class GeoLocation(Model):
continent_name: fields.Optional(fields.Str())
region_iso_code: fields.Optional(fields.Str())
city_name: fields.Optional(fields.Str())
country_iso_code: fields.Optional(fields.Str())
country_name: fields.Optional(fields.Str())
region_name: fields.Optional(fields.Str())
location: fields.Optional(fields.Nested(GeoPoint))
class L9SSHEvent(Model):
fingerprint: fields.Str()
version: fields.Int()
banner: fields.Str()
motd: fields.Str()
class DatasetSummary(Model):
rows: fields.Int()
files: fields.Int()
size: fields.Int()
collections: fields.Int()
infected: fields.Bool()
ransom_notes: fields.Optional(fields.List(fields.Str()))
class L9LeakEvent(Model):
stage: fields.Str()
type: fields.Str()
severity: fields.Str()
dataset: fields.Nested(DatasetSummary)
class L9SSLEvent(Model):
detected: fields.Bool()
enabled: fields.Bool()
jarm: fields.Str()
cypher_suite: fields.Str()
version: fields.Str()
certificate: fields.Nested(Certificate)
class L9ServiceEvent(Model):
credentials: fields.Nested(ServiceCredentials)
software: fields.Nested(Software)
class Network(Model):
organization_name: fields.Str()
asn: fields.Int()
network: fields.Str()
class L9Event(Model):
event_type: fields.Str()
event_source: fields.Str()
event_pipeline: fields.Optional(fields.List(fields.Str()))
event_fingerprint: fields.Optional(fields.Str())
ip: fields.Str()
host: fields.Str()
reverse: fields.Str()
mac: fields.Optional(fields.Str())
vendor: fields.Optional(fields.Str())
transport: fields.Optional(fields.List(fields.Str()))
protocol: fields.Str()
http: fields.Nested(L9HttpEvent)
summary: fields.Str()
time: fields.DateTime()
ssl: fields.Optional(fields.Nested(L9SSLEvent))
ssh: fields.Nested(L9SSHEvent)
service: fields.Nested(L9ServiceEvent)
leak: fields.Optional(fields.Nested((L9LeakEvent)))
tags: fields.Optional(fields.List(fields.Str()))
geoip: fields.Nested(GeoLocation)
network: fields.Nested(Network)
class L9Aggregation(Model):
summary = fields.Optional(fields.Str())
ip = fields.Str()
resource_id = fields.Str()
open_ports = fields.List(fields.Str())
leak_count = fields.Int()
leak_event_count = fields.Int()
events = fields.List(fields.Nested(L9Event))
plugins = fields.List(fields.Str())
geoip = fields.Nested(GeoLocation)
network = fields.Nested(Network)
creation_date = fields.DateTime()
update_date = fields.DateTime()
fresh = fields.Bool()