forked from ascemama/nucleusPlugins
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add script which pulls Whitesource reports (JSON) tranforms them (CSV…
…) and sends them to Nucleus
- Loading branch information
1 parent
eb4e8cf
commit 6465838
Showing
1 changed file
with
109 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
#!/usr/bin/python3.7 | ||
|
||
####### Script to download reports (JSON) from whitesource SAAS, transform them (CSV) and upload them to Nucleus SAAS ############ | ||
|
||
import json | ||
import csv | ||
|
||
# Used to post the whitesource file to Nucleus | ||
import requests | ||
import time | ||
import datetime | ||
from pathlib import Path | ||
|
||
# Enter in the root URL of your Nucleus instance. | ||
NUCLEUS_ROOT_URL = "https://XXXXXX.nucleussec.com" | ||
|
||
# retrieve this API_KEY from Nucleus GUI. Must be Admin. | ||
NUCLEUS_API_KEY = "XXXXXXXXXXXXXXXXXXXXXXXXXX" | ||
|
||
#retrieve this API_KEY (of the nucleus service user) in whitesource. Must have whitesource admin user. | ||
WHITESOURCE_NUCLEUS_USER_API_KEY="XXXXXXXXXXXXXXXXXXXXXXXXXXXXX" | ||
|
||
#project ID from the APPSEC project in Nucleus | ||
NUCLEUS_PROJECT_ID="XXXXXX" | ||
|
||
#products ID in whitesource | ||
PRODUCTSTOKEN=""" | ||
{ | ||
"PRODUCT1":"XXXXXXXXXXXXXX", | ||
"PRODUCT2:"XXXXXXXXXX", | ||
"PRODUCT3":"XXXXXXXXXXXX" | ||
} | ||
""" | ||
|
||
def post_to_nucleus(outputfile): | ||
|
||
with open(outputfile, 'rb') as f: | ||
nucleus_url = str(NUCLEUS_ROOT_URL + | ||
'/nucleus/api/projects/'+NUCLEUS_PROJECT_ID+'/scans') | ||
print("Posted to URL:", nucleus_url) | ||
file_upload = requests.post( | ||
nucleus_url, files={outputfile: f}, headers={'x-apikey': NUCLEUS_API_KEY}) | ||
print(file_upload.content) | ||
|
||
|
||
def get_from_whitesource(productToken): | ||
json = { | ||
"requestType" : "getProductVulnerabilityReport", | ||
"userKey": WHITESOURCE_NUCLEUS_USER_API_KEY, | ||
"productToken" : productToken, | ||
"format" : "json" | ||
} | ||
response=requests.post('https://saas.whitesourcesoftware.com/api/v1.2', json=json) | ||
return response.content | ||
|
||
#need to convert JSON report from whitesource to CSV for Nucleus :/ | ||
def customParser(inputJsonString, outputPath): | ||
|
||
jsonObj = json.loads(inputJsonString) | ||
|
||
# For debug | ||
#text_file=open(outputPath+".json","wb") | ||
#text_file.write(inputJsonString) | ||
#text_file.close() | ||
|
||
with open(outputPath, 'w', newline='') as csvfile: | ||
csvwriter = csv.writer(csvfile, delimiter=',') | ||
csvwriter.writerow(['nucleus_import_version', 'host_name', 'scan_type', 'scan_tool', 'finding_type', 'finding_cve', 'finding_number','finding_name', 'finding_severity', 'finding_description', 'finding_solution', 'finding_output', 'finding_path', 'finding_result']) | ||
try: | ||
for vulnerability in jsonObj["vulnerabilities"]: | ||
csv_line = [] | ||
severity = vulnerability["severity"] | ||
vulnDescription = vulnerability["description"] | ||
library = vulnerability["library"]["name"] | ||
vulnName = vulnerability["name"] | ||
if "topFix" not in vulnerability: | ||
solutionDescription="" | ||
else: | ||
solutionDescription = vulnerability["topFix"]["fixResolution"] | ||
host_name = vulnerability["product"] | ||
finding_path=vulnerability["library"]["filename"] | ||
csv_line = ['1', host_name, "Application", "WhiteSource", "Vuln", vulnName, vulnName+host_name,vulnName+": "+library, severity, vulnDescription, solutionDescription, library, finding_path, 'FAILED'] | ||
|
||
if csv_line != []: | ||
csvwriter.writerow(csv_line) | ||
else: | ||
pass | ||
|
||
return csvfile | ||
except Exception as e: | ||
print("Error, probably bad json document. Check that you are trying to parse the correct doc type") | ||
print("Error was the following:"+ str(e)) | ||
error_file=open("error.log","a") | ||
error_file.write(str(datetime.datetime.now()) + " Error was the following:"+ str(e)+"\n") | ||
error_file.close() | ||
|
||
|
||
if __name__ == "__main__": | ||
#get all reports | ||
jsonProductsToken=json.loads(PRODUCTSTOKEN) | ||
#loop over all report (=whitesource project) | ||
for product in jsonProductsToken: | ||
inputJsonFile = get_from_whitesource(jsonProductsToken[product]) | ||
time.sleep(5) | ||
#this path works only on linux. | ||
outputPath=str(Path.home())+"/"+product+".xls" | ||
outputfile = customParser(inputJsonFile, outputPath) | ||
post_to_nucleus(outputPath) | ||
|