-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
101 lines (83 loc) · 4.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import argparse
import re
import requests
import types
from colorama import init as colorama_init
from colorama import Fore
from colorama import Style
def debug_print(printable):
if args.verbose:
print(printable)
# return the tag name if it did not match in any of filters
def match_any_pattern(patterns, text):
for pattern in patterns:
if re.search(pattern, text):
return None
return text
def save_safe_tags(deletion_candidate):
grouped_by_repo_and_path = {}
for item in deletion_candidate:
key = f"{item['repo']}-{item['path']}"
if key not in grouped_by_repo_and_path:
grouped_by_repo_and_path[key] = []
grouped_by_repo_and_path[key].append(item)
regex_pattern = re.compile(configfile.SAFE_TAG, re.IGNORECASE)
elements_to_remove = []
for group_key, items in grouped_by_repo_and_path.items():
matching_items = [item for item in items if regex_pattern.search(item['name'])]
if matching_items:
# Keep only the latest matching item
matching_items_to_remove = matching_items[:-1]
artifact_url = configfile.BASE_URL + matching_items[-1]['repo'] + '/' + matching_items[-1]['path'] + '/' + matching_items[-1]['name']
print(f'{Fore.MAGENTA}{artifact_url} -> will be kept with SAVED tag!{Style.RESET_ALL}')
elements_to_remove.extend(matching_items_to_remove)
# Remove ans return elements from the original list
return [item for item in deletion_candidate if item not in elements_to_remove]
def filter_results(results, path):
deletion_candidate = []
for result in results:
artifact_url = configfile.BASE_URL + result['repo'] + '/' + result['path'] + '/' + result['name']
if match_any_pattern(path['keep_filters'], result['name']) is not None:
print(f'{Fore.RED}{artifact_url} -> will be deleted{Style.RESET_ALL}')
deletion_candidate.append(result)
else:
debug_print(f'{Fore.GREEN}{artifact_url} -> will be kept{Style.RESET_ALL}')
if args.safe:
deletion_candidate = save_safe_tags(deletion_candidate)
return(deletion_candidate)
def remove_asset_from_art(subject):
artifact_url = configfile.BASE_URL + subject['repo'] + '/' + subject['path'] + '/' + subject['name']
if not args.dry:
requests.delete(artifact_url, auth=(configfile.USER, configfile.PASSWORD))
def cleanup_repo(path):
query = 'items.find({"repo":"' + path['repo'] + '","path":"'+ path['path'] + '","created":{"$before":"' + path['keep_time'] + '"}},{"path":{"$ne":"."}},{"type":"folder"}).include("repo","name","path","created")'
myResp = requests.post(configfile.BASE_URL + 'api/search/aql', auth=(configfile.USER, configfile.PASSWORD), headers=configfile.HEADERS, data=query)
results = filter_results(eval(myResp.text)["results"], path)
for result in results:
remove_asset_from_art(result)
def prune_unreferenced_data():
myResp = requests.post(configfile.BASE_URL + 'api/system/storage/prune/start', auth=(configfile.USER, configfile.PASSWORD), headers=configfile.HEADERS)
def run_gc():
myResp = requests.post(configfile.BASE_URL + 'api/system/storage/gc', auth=(configfile.USER, configfile.PASSWORD), headers=configfile.HEADERS)
if __name__ == '__main__':
# parsing input arguments
parser = argparse.ArgumentParser(description='Cleaning up artifacts from jfrog artifactory in a granular manner.')
parser.add_argument('-C', '--config', help="config file to use in this directory, defaults to config.py", default='config', type=str)
parser.add_argument('-D', '--dry', help="run in dry-run mode", default=False, action="store_true")
parser.add_argument('-S', '--safe', help="run in safe-mode (keeps at least one of the safe tagged images)", default=False, action="store_true")
parser.add_argument('-v', '--verbose', help="run in verbose mode", default=False, action="store_true")
args = parser.parse_args()
colorama_init()
# import config file
configfile = types.ModuleType(args.config[:-3] if args.config.endswith('.py') else args.config)
with open(args.config + '.py' if not args.config.endswith('.py') else args.config, 'r') as file:
exec(file.read(), configfile.__dict__)
# this where the job gets done
for path in configfile.PATHS:
debug_print(f'{Fore.BLUE}{path} -> is being searched{Style.RESET_ALL}')
cleanup_repo(path)
if not args.dry:
debug_print(f'{Fore.BLUE}pruning unreferenced data{Style.RESET_ALL}')
prune_unreferenced_data()
debug_print(f'{Fore.BLUE}pruning unreferenced data{Style.RESET_ALL}')
run_gc()