Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move submission files async with retry #8018

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
11 changes: 11 additions & 0 deletions ietf/submit/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,14 @@ class Meta:

class SubmissionExtResource(ExtResource):
submission = ForeignKey(Submission, related_name='external_resources')


class SubmissionFile(models.Model):
"""File associated with a submission"""
filename = models.CharField(max_length=255, help_text="Name of file")
time = models.DateTimeField(default=timezone.now, help_text="Creation time of record")
generated = models.BooleanField(default=False, help_text="True if file was generated by the Datatracker")
submission = models.ForeignKey(Submission, on_delete=models.CASCADE)

def __str__(self):
return self.filename
52 changes: 39 additions & 13 deletions ietf/submit/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,51 @@
#
# Celery task definitions
#
import inspect

from celery import shared_task
from typing import Optional

from django.db.models import Min
from django.conf import settings
from django.utils import timezone

from ietf.submit.models import Submission
from ietf.submit.utils import (cancel_submission, create_submission_event, process_uploaded_submission,
process_and_accept_uploaded_submission, run_all_yang_model_checks,
populate_yang_model_dirs)
from ietf.utils import log


@shared_task
def process_uploaded_submission_task(submission_id):
def get_submission(submission_id) -> Optional[Submission]:
try:
submission = Submission.objects.get(pk=submission_id)
except Submission.DoesNotExist:
log.log(f'process_uploaded_submission_task called for missing submission_id={submission_id}')
else:
caller_frame = inspect.stack()[1]
log.log(f"{caller_frame.function} called for missing submission_id={submission_id}")
submission = None
return submission


@shared_task
def process_uploaded_submission_task(submission_id):
# avoid circular imports with ietf.submit.utils
from ietf.submit.utils import process_uploaded_submission
submission = get_submission(submission_id)
if submission is not None:
process_uploaded_submission(submission)


@shared_task
def process_and_accept_uploaded_submission_task(submission_id):
try:
submission = Submission.objects.get(pk=submission_id)
except Submission.DoesNotExist:
log.log(f'process_uploaded_submission_task called for missing submission_id={submission_id}')
else:
# avoid circular imports with ietf.submit.utils
from ietf.submit.utils import process_and_accept_uploaded_submission
submission = get_submission(submission_id)
if submission is not None:
process_and_accept_uploaded_submission(submission)


@shared_task
def cancel_stale_submissions():
# avoid circular imports with ietf.submit.utils
from ietf.submit.utils import cancel_submission, create_submission_event
now = timezone.now()
# first check for submissions gone stale awaiting validation
stale_unvalidated_submissions = Submission.objects.filter(
Expand Down Expand Up @@ -69,10 +79,26 @@ def cancel_stale_submissions():

@shared_task
def run_yang_model_checks_task():
# avoid circular imports with ietf.submit.utils
from ietf.submit.utils import run_all_yang_model_checks, populate_yang_model_dirs
populate_yang_model_dirs()
run_all_yang_model_checks()



@shared_task(
autoretry_for=(FileNotFoundError,),
retry_backoff=5, # exponential backoff starting with 5 seconds
retry_kwargs={"max_retries": 5}, # 5, 10, 20, 40, 80 second delays, then give up
retry_jitter=True, # jitter, using retry time as max for a random delay
)
def move_files_to_repository_task(submission_id):
# avoid circular imports with ietf.submit.utils
from ietf.submit.utils import move_files_to_repository
submission = get_submission(submission_id)
if submission is not None:
move_files_to_repository(submission)


@shared_task(bind=True)
def poke(self):
log.log(f'Poked {self.name}, request id {self.request.id}')
54 changes: 42 additions & 12 deletions ietf/submit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,9 @@ def post_submission(request, submission, approved_doc_desc, approved_subm_desc):
from ietf.doc.expire import move_draft_files_to_archive
move_draft_files_to_archive(draft, prev_rev)

move_files_to_repository(submission)
submission.state = DraftSubmissionStateName.objects.get(slug="posted")
log.log(f"{submission.name}: moved files")
move_files_to_repository_task.delay(submission)
log.log(f"{submission.name}: queued task to move files")

new_replaces, new_possibly_replaces = update_replaces_from_submission(request, submission, draft)
update_name_contains_indexes_with_new_doc(draft)
Expand Down Expand Up @@ -653,20 +653,35 @@ def rename_submission_files(submission, prev_rev, new_rev):


def move_files_to_repository(submission):
for ext in settings.IDSUBMIT_FILE_TYPES:
fname = f"{submission.name}-{submission.rev}.{ext}"
source = Path(settings.IDSUBMIT_STAGING_PATH) / fname
dest = Path(settings.IDSUBMIT_REPOSITORY_PATH) / fname
"""Move staging files to the draft repository and create hard links

If any of the expected files are missing and not already in place, raises FileNotFoundError
after moving as many files as it can.
"""
files_to_move = [
Path(settings.IDSUBMIT_STAGING_PATH) / sf.filename
for sf in submission.submissionfile_set.all()
]
any_missing = False
for source in files_to_move:
dest = Path(settings.IDSUBMIT_REPOSITORY_PATH) / source.name
if source.exists():
move(source, dest)
all_archive_dest = Path(settings.INTERNET_ALL_DRAFTS_ARCHIVE_DIR) / dest.name
ftp_dest = Path(settings.FTP_DIR) / "internet-drafts" / dest.name
all_archive_dest = Path(settings.INTERNET_ALL_DRAFTS_ARCHIVE_DIR) / source.name
ftp_dest = Path(settings.FTP_DIR) / "internet-drafts" / source.name
os.link(dest, all_archive_dest)
os.link(dest, ftp_dest)
log.log(f"Moved {source.name} from {source.parent} to {dest.parent} and created links.")
elif dest.exists():
log.log("Intended to move '%s' to '%s', but found source missing while destination exists.")
elif f".{ext}" in submission.file_types.split(','):
raise ValueError("Intended to move '%s' to '%s', but found source and destination missing.")
log.log(
f"Intended to move {source.name} from {source.parent} to {dest.parent} "
"but found source missing while destination exists."
)
else:
log.log(f"Unable to move {source.name}: {source} not found.")
any_missing = True
if any_missing:
raise FileNotFoundError


def remove_staging_files(name, rev, exts=None):
Expand Down Expand Up @@ -753,6 +768,7 @@ def clear_existing_files(form):


def save_files(form):
"""Save files and return map from extension to filename with full path"""
file_name = {}
for ext in list(form.fields.keys()):
if not ext in form.formats:
Expand Down Expand Up @@ -931,7 +947,7 @@ def staging_path(filename, revision, ext):
return pathlib.Path(settings.IDSUBMIT_STAGING_PATH) / f'{filename}-{revision}{ext}'


def render_missing_formats(submission):
def render_missing_formats(submission: Submission):
"""Generate txt and html formats from xml draft

If a txt file already exists, leaves it in place. Overwrites an existing html file
Expand Down Expand Up @@ -975,6 +991,13 @@ def render_missing_formats(submission):
xml_version,
)
)
_, created = submission.submissionfile_set.update_or_create(
filename=txt_path.name,
defaults={"time": timezone.now(), "generated": True},
)
if created:
# We don't expect a SubmissionFile to exist - log the event
log.log(f"Generated replacement for missing submission file: {txt_path.name}")

# --- Convert to html ---
html_path = staging_path(submission.name, submission.rev, '.html')
Expand All @@ -990,6 +1013,13 @@ def render_missing_formats(submission):
xml_version,
)
)
_, created = submission.submissionfile_set.update_or_create(
filename=html_path.name,
defaults={"time": timezone.now(), "generated": True},
)
if created:
# We don't expect a SubmissionFile to exist - log the event
log.log(f"Generated replacement for missing submission file: {html_path.name}")


def accept_submission(submission: Submission, request: Optional[HttpRequest] = None, autopost=False):
Expand Down
15 changes: 13 additions & 2 deletions ietf/submit/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import datetime

from pathlib import Path
from typing import Optional, cast # pyflakes:ignore
from urllib.parse import urljoin

Expand Down Expand Up @@ -58,7 +59,12 @@ def upload_submission(request):
submission.submission_date = date_today()
submission.save()
clear_existing_files(form)
save_files(form)
files_by_ext = save_files(form)
for filename_with_path in files_by_ext.values():
submission.submissionfile_set.create(
filename=Path(filename_with_path).name, # drop the path
generated=False,
)
create_submission_event(request, submission, desc="Uploaded submission")
# Wrap in on_commit so the delayed task cannot start until the view is done with the DB
transaction.on_commit(
Expand Down Expand Up @@ -133,7 +139,12 @@ def err(code, error, messages=None):
submission.replaces = form.cleaned_data['replaces']
submission.save()
clear_existing_files(form)
save_files(form)
files_by_ext = save_files(form)
for filename_with_path in files_by_ext.values():
submission.submissionfile_set.create(
filename=Path(filename_with_path).name, # drop the path
generated=False,
)
create_submission_event(request, submission, desc="Uploaded submission through API")

# Wrap in on_commit so the delayed task cannot start until the view is done with the DB
Expand Down
Loading