Skip to content

Commit

Permalink
[Fixes #9064] Improve Upload Workflow resources state management
Browse files Browse the repository at this point in the history
  • Loading branch information
afabiani committed Apr 11, 2022
1 parent 1a3f068 commit 5dba27f
Show file tree
Hide file tree
Showing 2 changed files with 345 additions and 333 deletions.
278 changes: 131 additions & 147 deletions geonode/upload/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from gsimporter.api import NotFound

from django.conf import settings
from django.db import transaction
from django.utils.timezone import timedelta, now

from geonode.celery_app import app
Expand Down Expand Up @@ -58,88 +57,73 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs):
After removing the stale ones, it collects all the unprocessed and runs them
in parallel."""

lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
try:
with transaction.atomic():
_upload_ids = []
_upload_tasks = []

# Check first if we need to delete stale sessions
expiry_time = now() - timedelta(hours=UPLOAD_SESSION_EXPIRY_HOURS)
for _upload in Upload.objects.exclude(state=Upload.STATE_PROCESSED).exclude(date__gt=expiry_time):
_upload.set_processing_state(Upload.STATE_INVALID)
_upload_ids.append(_upload.id)
_upload_tasks.append(
_upload_session_cleanup.signature(
args=(_upload.id,)
)
)

upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_upload_session_cleanup', _upload_ids,),
immutable=True
).on_error(
_upload_workflow_error.signature(
args=('_upload_session_cleanup', _upload_ids,),
immutable=True
)
)
upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer)
upload_workflow.apply_async()

# Let's finish the valid ones
_exclusion_processing_states = (
Upload.STATE_COMPLETE,
Upload.STATE_PROCESSED)
for _upload in Upload.objects.exclude(state__in=_exclusion_processing_states).exclude(id__in=_upload_ids):
session = None
try:
if not _upload.import_id:
raise NotFound
session = _upload.get_session.import_session
if not session or session.state != Upload.STATE_COMPLETE:
session = gs_uploader.get_session(_upload.import_id)
except (NotFound, Exception) as e:
logger.exception(e)
session = None
_upload_ids = []
_upload_tasks = []
_upload_ids_expired = []

if session:
_upload_ids.append(_upload.id)
_upload_tasks.append(
_update_upload_session_state.signature(
args=(_upload.id,)
)
)
else:
if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED):
_upload.set_processing_state(Upload.STATE_INVALID)
_upload_ids.append(_upload.id)
_upload_tasks.append(
_upload_session_cleanup.signature(
args=(_upload.id,)
)
)

upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_update_upload_session_state', _upload_ids,),
immutable=True
).on_error(
_upload_workflow_error.signature(
args=('_update_upload_session_state', _upload_ids,),
immutable=True
)
)

upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer)
result = upload_workflow.apply_async()
if result.ready():
with allow_join_result():
return result.get()
return result.state
finally:
lock.release()
# Check first if we need to delete stale sessions
expiry_time = now() - timedelta(hours=UPLOAD_SESSION_EXPIRY_HOURS)
for _upload in Upload.objects.exclude(state=Upload.STATE_PROCESSED).exclude(date__gt=expiry_time):
_upload.set_processing_state(Upload.STATE_INVALID)
_upload_ids_expired.append(_upload.id)
_upload_tasks.append(
_upload_session_cleanup.signature(
args=(_upload.id,)
)
)

upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_upload_session_cleanup', _upload_ids_expired,),
immutable=True
).on_error(
_upload_workflow_error.signature(
args=('_upload_session_cleanup', _upload_ids_expired,),
immutable=True
)
)
upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer)
upload_workflow.apply_async()

# Let's finish the valid ones
for _upload in Upload.objects.exclude(state__in=[Upload.STATE_PROCESSED]).exclude(id__in=_upload_ids_expired):
session = None
try:
if not _upload.import_id:
raise NotFound
session = _upload.get_session.import_session if _upload.get_session else None
if not session or session.state != Upload.STATE_COMPLETE:
session = gs_uploader.get_session(_upload.import_id)
except (NotFound, Exception) as e:
logger.exception(e)
session = None

if session:
_upload_ids.append(_upload.id)
_upload_tasks.append(
_update_upload_session_state.signature(
args=(_upload.id,)
)
)
elif _upload.state not in (Upload.STATE_READY, Upload.STATE_COMPLETE, Upload.STATE_RUNNING):
if session.state == Upload.STATE_COMPLETE and _upload.layer and _upload.layer.processed:
_upload.set_processing_state(Upload.STATE_PROCESSED)

upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_update_upload_session_state', _upload_ids,),
immutable=True
).on_error(
_upload_workflow_error.signature(
args=('_update_upload_session_state', _upload_ids,),
immutable=True
)
)

upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer)
result = upload_workflow.apply_async()
if result.ready():
with allow_join_result():
return result.get()
return result.state


@app.task(
Expand Down Expand Up @@ -177,72 +161,72 @@ def _upload_workflow_error(self, task_name: str, upload_ids: list):
ignore_result=False)
def _update_upload_session_state(self, upload_session_id: int):
"""Task invoked by 'upload_workflow.chord' in order to process all the 'PENDING' Upload tasks."""
_upload = Upload.objects.get(id=upload_session_id)
session = _upload.get_session.import_session
if not session or session.state != Upload.STATE_COMPLETE:
session = gs_uploader.get_session(_upload.import_id)

if session:
try:
_response = next_step_response(None, _upload.get_session)
_upload.refresh_from_db()
_content = _response.content
if isinstance(_content, bytes):
_content = _content.decode('UTF-8')
_response_json = json.loads(_content)
_success = _response_json.get('success', False)
_redirect_to = _response_json.get('redirect_to', '')

_tasks_failed = any([_task.state in ["BAD_FORMAT", "ERROR", "CANCELED"] for _task in session.tasks])
_tasks_waiting = any([_task.state in ["NO_CRS", "NO_BOUNDS", "NO_FORMAT"] for _task in session.tasks])
if not _tasks_waiting:
_tasks_waiting = (session.state == Upload.STATE_PENDING and any([_task.state in ["READY"] for _task in session.tasks]))

if _success:
if _tasks_failed:
# GeoNode Layer creation errored!
_upload.set_processing_state(Upload.STATE_INVALID)
elif 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to and _tasks_waiting:
_upload.set_resume_url(_redirect_to)
_upload.set_processing_state(Upload.STATE_WAITING)
elif session.state in (Upload.STATE_PENDING, Upload.STATE_RUNNING) and not _tasks_waiting:
if _upload.layer and _upload.layer.processed:
_upload.set_processing_state(Upload.STATE_PROCESSED)
else:
# GeoNode Layer updating...
_upload.set_processing_state(Upload.STATE_RUNNING)
elif session.state == Upload.STATE_COMPLETE and _upload.state in (Upload.STATE_COMPLETE, Upload.STATE_RUNNING, Upload.STATE_PENDING) and not _tasks_waiting:
if not _upload.layer or not _upload.layer.processed:
_response = final_step_view(None, _upload.get_session)
if _response:
_upload.refresh_from_db()
_content = _response.content
if isinstance(_content, bytes):
_content = _content.decode('UTF-8')
_response_json = json.loads(_content)
_success = _response_json.get('success', False)
_status = _response_json.get('status', 'error')
if _status == 'error':
# GeoNode Layer creation errored!
_upload.set_processing_state(Upload.STATE_INVALID)
elif _upload.state != Upload.STATE_PROCESSED:
if _upload.layer and _upload.layer.processed:
# GeoNode Layer successfully processed...
_upload.set_processing_state(Upload.STATE_PROCESSED)
else:
# GeoNode Layer updating...
_upload.set_processing_state(Upload.STATE_RUNNING)
elif _upload.layer and _upload.layer.processed:
_upload.set_processing_state(Upload.STATE_PROCESSED)
logger.debug(f"Upload {upload_session_id} updated with state {_upload.state}.")
except (NotFound, Exception) as e:
logger.exception(e)
if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED):
lock_id = f'_update_upload_session_state-{upload_session_id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
_upload = Upload.objects.get(id=upload_session_id)
session = _upload.get_session.import_session
if not session or session.state != Upload.STATE_COMPLETE:
session = gs_uploader.get_session(_upload.import_id)

if session:
try:
_response = next_step_response(None, _upload.get_session)
_upload.refresh_from_db()
_content = _response.content
if isinstance(_content, bytes):
_content = _content.decode('UTF-8')
_response_json = json.loads(_content)
_success = _response_json.get('success', False)
_redirect_to = _response_json.get('redirect_to', '')

_tasks_failed = any([_task.state in ["BAD_FORMAT", "ERROR", "CANCELED"] for _task in session.tasks])
_tasks_waiting = any([_task.state in ["NO_CRS", "NO_BOUNDS", "NO_FORMAT"] for _task in session.tasks])
if not _tasks_waiting:
_tasks_waiting = (session.state == Upload.STATE_PENDING and any([_task.state in ["READY"] for _task in session.tasks]))

if _success:
if _tasks_failed:
# GeoNode Layer creation errored!
_upload.set_processing_state(Upload.STATE_INVALID)
elif 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to and _tasks_waiting:
_upload.set_resume_url(_redirect_to)
_upload.set_processing_state(Upload.STATE_WAITING)
elif session.state in (Upload.STATE_PENDING, Upload.STATE_RUNNING) and not _tasks_waiting:
# GeoNode Layer updating...
_upload.set_processing_state(Upload.STATE_RUNNING)
elif session.state == Upload.STATE_COMPLETE and _upload.state in (Upload.STATE_COMPLETE, Upload.STATE_RUNNING, Upload.STATE_PENDING) and not _tasks_waiting:
if not _upload.layer or _upload.state == Upload.STATE_RUNNING:
_response = final_step_view(None, _upload.get_session)
if _response:
_upload.refresh_from_db()
_content = _response.content
if isinstance(_content, bytes):
_content = _content.decode('UTF-8')
_response_json = json.loads(_content)
_success = _response_json.get('success', False)
_status = _response_json.get('status', 'error')
if _status == 'error':
# GeoNode Layer creation errored!
_upload.set_processing_state(Upload.STATE_INVALID)
elif _upload.state != Upload.STATE_PROCESSED:
if _upload.layer and _upload.layer.processed:
# GeoNode Layer successfully processed...
_upload.set_processing_state(Upload.STATE_PROCESSED)
else:
# GeoNode Layer updating...
_upload.set_processing_state(Upload.STATE_RUNNING)
elif _upload.layer.processed:
_upload.set_processing_state(Upload.STATE_PROCESSED)
logger.debug(f"Upload {upload_session_id} updated with state {_upload.state}.")
except (NotFound, Exception) as e:
logger.exception(e)
if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED):
_upload.set_processing_state(Upload.STATE_INVALID)
logger.error(f"Upload {upload_session_id} deleted with state {_upload.state}.")
elif _upload.state != Upload.STATE_PROCESSED:
_upload.set_processing_state(Upload.STATE_INVALID)
logger.error(f"Upload {upload_session_id} deleted with state {_upload.state}.")
elif _upload.state != Upload.STATE_PROCESSED:
_upload.set_processing_state(Upload.STATE_INVALID)
logger.error(f"Unable to find the Importer Session - Upload {upload_session_id} deleted with state {_upload.state}.")
logger.error(f"Unable to find the Importer Session - Upload {upload_session_id} deleted with state {_upload.state}.")


@app.task(
Expand Down
Loading

0 comments on commit 5dba27f

Please sign in to comment.