-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpurge-users.py
107 lines (89 loc) · 4.13 KB
/
purge-users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/python3
import sys
import requests
import argparse
import os
from datetime import datetime, timedelta, timezone
# Parse command line arguments
parser = argparse.ArgumentParser(
description="Delete users who haven't received an email in the last 30 days",
epilog="If the MXG_API_KEY environment variable is not set, you will be prompted for the API key.")
parser.add_argument("domain", nargs="?", help="Process a specific domain")
parser.add_argument("-d","--dry-run", action="store_true", help="Print users instead of deleting them")
args = parser.parse_args()
if args.dry_run:
print("================ DRY RUN! No changes will be made =================")
# Get the API key from environment variable or prompt for it
api_key = os.environ.get("MXG_API_KEY")
if not api_key:
print("The MXG_API_KEY environment variable is not set.")
api_key = input("Enter your MXGuardian API key to continue: ")
if not api_key:
print("No API key provided. Exiting.")
sys.exit()
# Set API base URL
base_url = os.environ.get("MXG_API_URL")
if not base_url:
base_url = "https://secure.mxguardian.net/api/v1"
# Verify server certificate (replace with path to CA bundle if using self-signed certificate)
verify = True
# Endpoint for retrieving domain list
domain_list_url = f"{base_url}/domains"
# Endpoint for retrieving user list
user_list_url = f"{base_url}/domains/{{domain}}/users"
# Endpoint for retrieving user messages (pagesize=1 to limit results)
afterDate = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat(timespec='seconds')
messages_url = f"{base_url}/users/{{user}}/messages?mode=I&pagesize=1&filter=after:{afterDate}"
# Endpoint for deleting a user
delete_user_url = f"{base_url}/users/{{user_email}}"
# Set the headers
headers = {"Authorization": f"Bearer {api_key}"}
# Initialize count variables
user_count = 0
domain_dict = {} # Used to count the number of domains that have users deleted
# Get the list of domains
if args.domain:
domain_list = [{"domain_name": args.domain}]
else:
response = requests.get(domain_list_url, headers=headers, verify=verify)
if not response.ok:
print(f"Error getting domain list. Reason: {response.reason}")
sys.exit()
domain_list = response.json()["results"]
# Iterate over the domains
for domain in domain_list:
domain_name = domain["domain_name"]
print(f"Processing domain {domain_name}...")
# Get the list of users for the current domain
user_list_response = requests.get(user_list_url.format(domain=domain_name), headers=headers, verify=verify)
if not user_list_response.ok:
print(f"Error getting user list for domain {domain_name}. Reason: {user_list_response.reason}")
sys.exit()
user_list = user_list_response.json()["results"]
# Iterate over the users
for user in user_list:
user_email = user["user_email"]
# Get the number of messages for the current user in the last 30 days
messages_response = requests.get(messages_url.format(user=user_email), headers=headers, verify=verify)
if not messages_response.ok:
print(f"Error getting messages for {user_email}. Reason: {messages_response.reason}")
sys.exit()
message_count = messages_response.json()["count"]
if message_count == 0:
if args.dry_run:
print(f"{user_email} has not received any messages in the last 30 days.")
user_count += 1
domain_dict[domain_name] = 1
else:
# Delete the user
delete_user_response = requests.delete(delete_user_url.format(user_email=user_email), headers=headers, verify=verify)
if delete_user_response.ok:
print(f"{user_email} has been deleted.")
user_count += 1
domain_dict[domain_name] = 1
else:
print(f"Failed to delete user {user_email}. Reason: {delete_user_response.reason}")
if args.dry_run:
print(f"{user_count} users from {len(domain_dict)} domains would have been deleted.")
else:
print(f"Deleted {user_count} users from {len(domain_dict)} domains.")