-
Notifications
You must be signed in to change notification settings - Fork 42
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #344 from opencivicdata/clean-cli
Add pupa clean CLI command
- Loading branch information
Showing
4 changed files
with
208 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import sys | ||
from datetime import datetime, timezone, timedelta | ||
|
||
import django | ||
from django.apps import apps | ||
from .base import BaseCommand | ||
|
||
|
||
def get_subclasses(app_list, abstract_class): | ||
""" | ||
Finds and returns all subclasses of an abstract class. | ||
""" | ||
result = [] | ||
for app in app_list: | ||
for model in apps.get_app_config(app).get_models(): | ||
if issubclass(model, abstract_class) and model is not abstract_class: | ||
result.append(model) | ||
return result | ||
|
||
|
||
class Command(BaseCommand): | ||
name = "clean" | ||
help = "Removes database objects that haven't been seen in recent scrapes" | ||
|
||
def add_args(self): | ||
self.add_argument( | ||
"--window", | ||
type=int, | ||
default=7, | ||
help=( | ||
"objects not seen in this many days will be deleted from the database" | ||
), | ||
) | ||
self.add_argument( | ||
"--report", | ||
action="store_true", | ||
help=( | ||
"generate a report of what objects this command" | ||
" would delete without making any changes to the database" | ||
), | ||
) | ||
self.add_argument( | ||
"--noinput", | ||
action="store_true", | ||
help="delete objects without getting user confirmation", | ||
) | ||
|
||
def get_stale_objects(self, window): | ||
""" | ||
Find all database objects that haven't seen been in {window} days. | ||
""" | ||
|
||
from opencivicdata.core.models.base import OCDBase | ||
|
||
ocd_apps = ["core", "legislative"] | ||
# Check all subclasses of OCDBase | ||
models = get_subclasses(ocd_apps, OCDBase) | ||
|
||
for model in models: | ||
# Jurisdictions are protected from deletion | ||
if "Jurisdiction" not in model.__name__: | ||
cutoff_date = datetime.now(tz=timezone.utc) - timedelta(days=window) | ||
yield from model.objects.filter(last_seen__lte=cutoff_date).iterator() | ||
|
||
def remove_stale_objects(self, window): | ||
""" | ||
Remove all database objects that haven't seen been in {window} days. | ||
""" | ||
|
||
for obj in self.get_stale_objects(window): | ||
print(f"Deleting {obj}...") | ||
obj.delete() | ||
|
||
def report_stale_objects(self, window): | ||
""" | ||
Print all database objects that haven't seen been in {window} days. | ||
""" | ||
for obj in self.get_stale_objects(window): | ||
print(obj) | ||
|
||
def handle(self, args, other): | ||
django.setup() | ||
|
||
if args.report: | ||
print( | ||
"These objects have not been seen in a scrape within the last" | ||
f" {args.window} days:" | ||
) | ||
self.report_stale_objects() | ||
else: | ||
if not args.noinput: | ||
print( | ||
f"This will permanently delete" | ||
f" {len(list(self.get_stale_objects(args.window)))}" | ||
" objects from your database" | ||
f" that have not been scraped within the last {args.window}" | ||
" days. Are you sure? (Y/N)" | ||
) | ||
resp = input() | ||
if resp != "Y": | ||
sys.exit() | ||
|
||
print( | ||
"Removing objects that haven't been seen in a scrape within" | ||
f" the last {args.window} days..." | ||
) | ||
self.remove_stale_objects(args.window) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
import pytest | ||
import argparse | ||
from datetime import datetime, timezone, timedelta | ||
from freezegun import freeze_time | ||
|
||
from opencivicdata.core.models import Person, Organization, Jurisdiction, Division | ||
|
||
from pupa.cli.commands.clean import Command | ||
|
||
|
||
@pytest.fixture | ||
def subparsers(): | ||
parser = argparse.ArgumentParser("pupa", description="pupa CLI") | ||
parser.add_argument("--debug", action="store_true", help="open debugger on error") | ||
parser.add_argument( | ||
"--loglevel", | ||
default="INFO", | ||
help=( | ||
"set log level. options are: " | ||
"DEBUG|INFO|WARNING|ERROR|CRITICAL " | ||
"(default is INFO)" | ||
), | ||
) | ||
return parser.add_subparsers(dest="subcommand") | ||
|
||
|
||
def create_jurisdiction(): | ||
Division.objects.create(id="ocd-division/country:us", name="USA") | ||
return Jurisdiction.objects.create(id="jid", division_id="ocd-division/country:us") | ||
|
||
|
||
@pytest.mark.django_db | ||
def test_get_stale_objects(subparsers): | ||
_ = create_jurisdiction() | ||
o = Organization.objects.create(name="WWE", jurisdiction_id="jid") | ||
p = Person.objects.create(name="George Washington", family_name="Washington") | ||
m = p.memberships.create(organization=o) | ||
|
||
expected_stale_objects = {p, o, m} | ||
|
||
a_week_from_now = datetime.now(tz=timezone.utc) + timedelta(days=7) | ||
with freeze_time(a_week_from_now): | ||
p = Person.objects.create(name="Thomas Jefferson", family_name="Jefferson") | ||
p.memberships.create(organization=o) | ||
assert set(Command(subparsers).get_stale_objects(7)) == expected_stale_objects | ||
|
||
|
||
@pytest.mark.django_db | ||
def test_remove_stale_objects(subparsers): | ||
_ = create_jurisdiction() | ||
o = Organization.objects.create(name="WWE", jurisdiction_id="jid") | ||
p = Person.objects.create(name="George Washington", family_name="Washington") | ||
m = p.memberships.create(organization=o) | ||
|
||
expected_stale_objects = {p, o, m} | ||
|
||
a_week_from_now = datetime.now(tz=timezone.utc) + timedelta(days=7) | ||
with freeze_time(a_week_from_now): | ||
p = Person.objects.create(name="Thomas Jefferson", family_name="Jefferson") | ||
p.memberships.create(organization=o) | ||
|
||
Command(subparsers).remove_stale_objects(7) | ||
for obj in expected_stale_objects: | ||
was_deleted = not type(obj).objects.filter(id=obj.id).exists() | ||
assert was_deleted | ||
|
||
|
||
@pytest.mark.django_db | ||
def test_clean_command(subparsers): | ||
_ = create_jurisdiction() | ||
o = Organization.objects.create(name="WWE", jurisdiction_id="jid") | ||
|
||
stale_person = Person.objects.create( | ||
name="George Washington", family_name="Washington" | ||
) | ||
stale_membership = stale_person.memberships.create(organization=o) | ||
|
||
a_week_from_now = datetime.now(tz=timezone.utc) + timedelta(days=7) | ||
with freeze_time(a_week_from_now): | ||
not_stale_person = Person.objects.create( | ||
name="Thomas Jefferson", family_name="Jefferson" | ||
) | ||
not_stale_membership = not_stale_person.memberships.create(organization=o) | ||
o.save() # Update org's last_seen field | ||
|
||
# Call clean command | ||
Command(subparsers).handle( | ||
argparse.Namespace(noinput=True, report=False, window=7), [] | ||
) | ||
|
||
expected_stale_objects = {stale_person, stale_membership} | ||
for obj in expected_stale_objects: | ||
was_deleted = not type(obj).objects.filter(id=obj.id).exists() | ||
assert was_deleted | ||
|
||
expected_not_stale_objects = {o, not_stale_person, not_stale_membership} | ||
for obj in expected_not_stale_objects: | ||
was_not_deleted = type(obj).objects.filter(id=obj.id).exists() | ||
assert was_not_deleted |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters