Skip to content

Commit

Permalink
#208 Add support to the new alert artifact APIs provided by thehive 4…
Browse files Browse the repository at this point in the history
….0.4+
  • Loading branch information
nadouani committed Jan 13, 2021
1 parent e10a09a commit a8ad708
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 21 deletions.
102 changes: 102 additions & 0 deletions docs/samples/alert-operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,28 @@ alert_data = response.json()
print(alert_data.get('title'))
```

## Update alert

Update an existing alert

```python
from thehive4py.api import TheHiveApi

THEHIVE_URL = 'http://127.0.0.1:9000'
THEHIVE_API_KEY = '**YOUR_API_KEY**'

api = TheHiveApi(THEHIVE_URL, THEHIVE_API_KEY)

response = api.get_alert(ALERT_ID)

# Update description
alert_data = response.json()
alert_data['description'] = 'Updated alert desciption...'

# Update alert
api.update_alert(alert=Alert(json=alert_data), alert_id=ALERT_ID, fields=['description'])
```

## Search alerts

Search for alerts with `HIGH` severity, `AMBER` TLP and with a title containing `MALSPAM`
Expand Down Expand Up @@ -123,3 +145,83 @@ api = TheHiveApi(THEHIVE_URL, THEHIVE_API_KEY)

response = api.promote_alert_to_case(ALERT_ID, case_template='MALSPAM')
```

## Create an Alert Artifact

Create and add an artifact to an existing alert identified by `ALERT_ID`

!!! Warning
This function is available in TheHive 4 ONLY

```python
from thehive4py.api import TheHiveApi
from thehive4py.models import Tlp

THEHIVE_URL = 'http://127.0.0.1:9000'
THEHIVE_API_KEY = '**YOUR_API_KEY**'

api = TheHiveApi(THEHIVE_URL, THEHIVE_API_KEY)

# Instanciate a new domain artifact
artifact = AlertArtifact(dataType='domain', data='malicious-domain.tld', ignoreSimilarity=True, ioc=True)
api.create_alert_artifact(ALERT_ID, artifact)

# Instanciate a new file artifact
artifact = AlertArtifact(
dataType='file',
data='malicious-file.exe',
ignoreSimilarity=False,
ioc=True,
sighted=True,
tlp=Tlp.RED.value)
api.create_alert_artifact(alert_id, artifact)
```

## Update an Alert Artifact

Update an existing artifact identified by `ALERT_ID`

!!! Warning
This function is available in TheHive 4 ONLY

```python
from thehive4py.api import TheHiveApi
from thehive4py.models import Tlp

THEHIVE_URL = 'http://127.0.0.1:9000'
THEHIVE_API_KEY = '**YOUR_API_KEY**'

api = TheHiveApi(THEHIVE_URL, THEHIVE_API_KEY)

# Create a new domain artifact
artifact = AlertArtifact(dataType='domain', data='malicious-domain.tld', ignoreSimilarity=True, ioc=True)
response = api.create_alert_artifact(ALERT_ID, artifact)

# Update its tlp, sighted and ignoreSimilarity flags
artifact_data = response.json()[0]
artifact_data['tlp'] = Tlp.RED.value
artifact_data['sighted'] = True
artifact_data['ignoreSimilarity'] = False

new_artifact = AlertArtifact(json=artifact_data)
api.update_alert_artifact(artifact_data['id'], new_artifact, fields=['tlp', 'ioc', 'ignoreSimilarity'])
```

## Delete an Alert Artifact

Delete an existing alert artifact identified by `ARTIFACT_ID`

!!! Warning
This function is available in TheHive 4 ONLY

```python
from thehive4py.api import TheHiveApi

THEHIVE_URL = 'http://127.0.0.1:9000'
THEHIVE_API_KEY = '**YOUR_API_KEY**'

api = TheHiveApi(THEHIVE_URL, THEHIVE_API_KEY)

# Delete alert artifact
api.delete_alert_artifact(ARTIFACT_ID)
```
109 changes: 105 additions & 4 deletions thehive4py/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from thehive4py.auth import BasicAuth, BearerAuth
from thehive4py.models import CaseHelper, Version
from thehive4py.query import Parent, Id, And, Eq
from thehive4py.exceptions import TheHiveException, CaseException, CaseTaskException, CaseTaskLogException, CaseTemplateException, AlertException, CaseObservableException, CustomFieldException
from thehive4py.exceptions import *


class TheHiveApi:
Expand Down Expand Up @@ -409,7 +409,7 @@ def update_case_observable(self, observable_id, case_observable, fields=[]):
response (requests.Response): Response object including a JSON description of the updated case observable
Raises:
CaseObservableException: An error occured during case observable creation
CaseObservableException: An error occured during case observable update
"""

req = self.url + "/api/case/artifact/{}".format(observable_id)
Expand Down Expand Up @@ -1303,6 +1303,107 @@ def download_observable_attachment(self, observable_id, archive=True):
except requests.exceptions.RequestException as e:
raise CaseObservableException("Error on retrieving attachment of case observable {}: {}".format(observable_id, e))

def create_alert_artifact(self, alert_id, alert_artifact):

"""
Create an alert artifact
Arguments:
alert_id (str): Alert identifier
alert_artifact (AlertArtifact): Instance of [AlertArtifact][thehive4py.models.AlertArtifact]
Returns:
response (requests.Response): Response object including a JSON description of an alert artifact
Raises:
AlertArtifactException: An error occured during alert artifact creation
!!! Warning
This function is available in TheHive 4 ONLY
"""

if self.__isVersion(Version.THEHIVE_3.value):
raise AlertArtifactException("This function is available in TheHive 4 ONLY")

# - addObservable(file)
# - addObservable(data)
req = self.url + "/api/alert/{}/artifact".format(alert_id)

if alert_artifact.dataType == 'file':
try:
fields = ["dataType", "message", "tlp", "tags", "ioc", "sighted", "ignoreSimilarity"]
data = {k: v for k, v in alert_artifact.__dict__.items() if k in fields}

data = {"_json": json.dumps(data)}
return requests.post(req, data=data, files=alert_artifact.data, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
raise AlertArtifactException("Alert artifact create error: {}".format(e))
else:
try:
data = alert_artifact.jsonify(excludes=['id'])

return requests.post(req, headers={'Content-Type': 'application/json'}, data=data, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
raise AlertArtifactException("Alert artifact create error: {}".format(e))

def delete_alert_artifact(self, artifact_id):
"""
Deletes a TheHive alert artifact.
Arguments:
artifact_id (str): Id of the artifact to delete
Returns:
response (requests.Response): Response object including true or false based on the action's success
Raises:
AlertArtifactException: An error occured during alert artifact deletion
!!! Warning
This function is available in TheHive 4 ONLY
"""

if self.__isVersion(Version.THEHIVE_3.value):
raise AlertArtifactException("This function is available in TheHive 4 ONLY")

req = self.url + "/api/alert/artifact/{}".format(artifact_id)

try:
return requests.delete(req, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
raise AlertArtifactException("Alert artifact deletion error: {}".format(e))

def update_alert_artifact(self, artifact_id, alert_artifact, fields=[]):

"""
Update an existing alert artifact
Arguments:
artifact_id: Artifact identifier
alert_artifact (AlertArtifact): Instance of [AlertArtifact][thehive4py.models.AlertArtifact]
fields (Array): Optional parameter, an array of fields names, the ones we want to update.
Updatable fields are: [`tlp`, `ioc`, `sighted`, `tags`, `message`, `ignoreSimilarity`]
Returns:
response (requests.Response): Response object including a JSON description of the updated alert artifact
Raises:
AlertArtifactException: An error occured during alert artifact update
!!! Warning
This function is available in TheHive 4 ONLY
"""

if self.__isVersion(Version.THEHIVE_3.value):
raise AlertArtifactException("This function is available in TheHive 4 ONLY")

req = self.url + "/api/alert/artifact/{}".format(artifact_id)

update_keys = ['message', 'tlp', 'tags', 'ioc', 'sighted', 'ignoreSimilarity']

data = {k: v for k, v in alert_artifact.__dict__.items() if (
len(fields) > 0 and k in fields) or (len(fields) == 0 and k in update_keys)}

try:
return requests.patch(req, headers={'Content-Type': 'application/json'}, json=data, proxies=self.proxies, auth=self.auth, verify=self.cert)
except requests.exceptions.RequestException as e:
raise CaseObservableException("Case observable update error: {}".format(e))
14 changes: 14 additions & 0 deletions thehive4py/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,56 @@ class CaseException(TheHiveException):
"""
pass


class CaseTaskException(CaseException):
"""
Exception raised by failure of API calls related to `Case Task` handling
"""
pass


class CaseTaskLogException(CaseTaskException):
"""
Exception raised by failure of API calls related to `Case Task Log` handling
"""
pass


class CaseObservableException(CaseException):
"""
Exception raised by failure of API calls related to `Case Observable` handling
"""
pass


class ObservableException(TheHiveException):
"""
Exception raised by failure of API calls related to `Observable` handling
"""
pass


class AlertException(TheHiveException):
"""
Exception raised by failure of API calls related to `Alert` handling
"""
pass


class AlertArtifactException(CaseException):
"""
Exception raised by failure of API calls related to `Alert Artifact` handling
"""
pass


class CaseTemplateException(TheHiveException):
"""
Exception raised by failure of API calls related to `Case Template` handling
"""
pass


class CustomFieldException(TheHiveException):
"""
Exception raised by failure of API calls related to `Custom Fields` handling
Expand Down
46 changes: 29 additions & 17 deletions thehive4py/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,9 @@ def __init__(self, **attributes):
self.artifacts = []
for artifact in artifacts:
if type(artifact) == AlertArtifact:
self.artifacts.append(artifact)
self.artifacts.append(artifact.as_base64())
else:
self.artifacts.append(AlertArtifact(json=artifact))
self.artifacts.append(AlertArtifact(json=artifact).as_base64())


class AlertArtifact(JSONSerializable):
Expand All @@ -675,8 +675,8 @@ class AlertArtifact(JSONSerializable):
- Otherwise, the `data` value is the observable's value
json (JSON): If the field is not equal to None, the observable is instantiated using the JSON value instead of the arguements
!!! Warning
`ignoreSimilarity` attribute is available in TheHive 4 ONLY
!!! Warning
`ignoreSimilarity` attribute is available in TheHive 4 ONLY
"""

def __init__(self, **attributes):
Expand All @@ -694,22 +694,34 @@ def __init__(self, **attributes):
self.ignoreSimilarity = attributes.get('ignoreSimilarity', False)

data = attributes.get('data', None)
if self.dataType == 'file':
if isinstance(data, tuple):
file_object, filename = data
if self.dataType != 'file':
self.data = data
else:
if data is not None:
if isinstance(data, tuple):
file_object, filename = data
else:
filename = data
# we are opening this here, but we can't close it
# because it gets passed into requests.post. this is
# the substance of issue #10.
file_object = open(filename, 'rb')

mime = magic.Magic(mime=True).from_buffer(file_object.read())
file_object.seek(0)

self.data = {'attachment': (filename, file_object, mime)}
else:
filename = data
# we are opening this here, but we can't close it
# because it gets passed into requests.post. this is
# the substance of issue #10.
file_object = open(filename, 'rb')
self.attachment = attributes.get('attachment', None)

mime = magic.Magic(mime=True).from_buffer(file_object.read())
file_object.seek(0)
def as_base64(self):
if 'data' in self.__dict__ and self.data is not None and self.dataType == 'file' and isinstance(self.data, dict):
filename, file_object, mime = self.data.get('attachment')

encoded_string = base64.b64encode(file_object.read())
file_object.seek(0)
encoded_string = base64.b64encode(file_object.read())

self.data = "{};{};{}".format(filename, mime, encoded_string.decode())
else:
self.data = data

return self

0 comments on commit a8ad708

Please sign in to comment.