Skip to content

Commit

Permalink
[3.3.x] Upload Workflow Stabilization and correct state management (#…
Browse files Browse the repository at this point in the history
…9083)

* [3.3.x] Upload Workflow Stabilization and correct state management

* [3.3.x] Upload Workflow Stabilization and correct state management

* [3.3.x] Upload Workflow Stabilization and correct state management

* - minor typos/logging infos

* Fix upload workflow logs and Invalid state

* Fix invalid value

* - Aling "_log" between "views.py" and "upload.py"

Co-authored-by: mattiagiupponi <[email protected]>
  • Loading branch information
Alessio Fabiani and mattiagiupponi authored Apr 12, 2022
1 parent 5dba27f commit f4f9010
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 31 deletions.
6 changes: 5 additions & 1 deletion geonode/geoserver/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ def geoserver_upload(
logger.warn(msg)
e.args = (msg,)
raise
logger.debug(f'Finished upload of {name} to GeoServer without errors.')
except Exception as e:
logger.error("Error during the creation of the resource in GeoServer", exc_info=e)
raise e

logger.debug(f'The File {name} has been sent to GeoServer without errors.')

# Step 5. Create the resource in GeoServer
logger.debug(f'>>> Step 5. Generating the metadata for {name} after successful import to GeoSever')
Expand Down
14 changes: 9 additions & 5 deletions geonode/upload/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,16 @@ def update_from_session(self, upload_session, layer=None):
_f,
assigned_name)

if "COMPLETE" == self.state:
self.complete = True
if self.layer and self.layer.processed:
self.state = Upload.STATE_RUNNING
if self.layer:
if self.layer.processed:
self.state = Upload.STATE_PROCESSED
else:
self.state = Upload.STATE_RUNNING
elif self.state in (Upload.STATE_READY, Upload.STATE_PENDING):
self.state = upload_session.import_session.state
if self.state == Upload.STATE_COMPLETE:
self.complete = True

self.save()

@property
Expand Down Expand Up @@ -235,7 +239,7 @@ def get_import_url(self):
if not session or session.state != Upload.STATE_COMPLETE:
session = gs_uploader.get_session(self.import_id)
except (NotFound, Exception):
if self.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED):
if not session and self.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED):
self.set_processing_state(Upload.STATE_INVALID)
if session and self.state != Upload.STATE_INVALID:
return f"{ogc_server_settings.LOCATION}rest/imports/{session.id}"
Expand Down
46 changes: 29 additions & 17 deletions geonode/upload/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs):
upload_workflow.apply_async()

# Let's finish the valid ones
for _upload in Upload.objects.exclude(state__in=[Upload.STATE_PROCESSED]).exclude(id__in=_upload_ids_expired):
for _upload in Upload.objects.exclude(state__in=[Upload.STATE_PROCESSED, Upload.STATE_INVALID]).exclude(id__in=_upload_ids_expired):
session = None
try:
if not _upload.import_id:
Expand All @@ -105,18 +105,24 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs):
)
)
elif _upload.state not in (Upload.STATE_READY, Upload.STATE_COMPLETE, Upload.STATE_RUNNING):
if session.state == Upload.STATE_COMPLETE and _upload.layer and _upload.layer.processed:
if session and session.state == Upload.STATE_COMPLETE and _upload.layer and _upload.layer.processed:
_upload.set_processing_state(Upload.STATE_PROCESSED)

upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_update_upload_session_state', _upload_ids,),
immutable=True
).on_error(
if session and any([_task.state in ["ERROR"] for _task in session.tasks]):
_upload_workflow_error.signature(
args=('_upload_workflow_error', _upload_ids,),
immutable=True
)
else:
upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_update_upload_session_state', _upload_ids,),
immutable=True
).on_error(
_upload_workflow_error.signature(
args=('_update_upload_session_state', _upload_ids,),
immutable=True
)
)
)

upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer)
result = upload_workflow.apply_async()
Expand All @@ -135,7 +141,8 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs):
def _upload_workflow_finalizer(self, task_name: str, upload_ids: list):
"""Task invoked at 'upload_workflow.chord' end in the case everything went well.
"""
logger.info(f"Task {task_name} upload ids: {upload_ids} finished successfully!")
if upload_ids:
logger.info(f"Task {task_name} upload ids: {upload_ids} finished successfully!")


@app.task(
Expand Down Expand Up @@ -180,23 +187,28 @@ def _update_upload_session_state(self, upload_session_id: int):
_success = _response_json.get('success', False)
_redirect_to = _response_json.get('redirect_to', '')

_tasks_ready = any([_task.state in ["READY"] for _task in session.tasks])
_tasks_failed = any([_task.state in ["BAD_FORMAT", "ERROR", "CANCELED"] for _task in session.tasks])
_tasks_waiting = any([_task.state in ["NO_CRS", "NO_BOUNDS", "NO_FORMAT"] for _task in session.tasks])
if not _tasks_waiting:
_tasks_waiting = (session.state == Upload.STATE_PENDING and any([_task.state in ["READY"] for _task in session.tasks]))

if _success:
if _tasks_failed:
# GeoNode Layer creation errored!
_upload.set_processing_state(Upload.STATE_INVALID)
elif 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to and _tasks_waiting:
elif 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to and (_tasks_waiting or _tasks_ready):
_upload.set_resume_url(_redirect_to)
_upload.set_processing_state(Upload.STATE_WAITING)
elif session.state in (Upload.STATE_PENDING, Upload.STATE_RUNNING) and not _tasks_waiting:
# GeoNode Layer updating...
_upload.set_processing_state(Upload.STATE_RUNNING)
elif session.state == Upload.STATE_COMPLETE and _upload.state in (Upload.STATE_COMPLETE, Upload.STATE_RUNNING, Upload.STATE_PENDING) and not _tasks_waiting:
if not _upload.layer or _upload.state == Upload.STATE_RUNNING:
elif session.state in (Upload.STATE_PENDING, Upload.STATE_RUNNING) and not (_tasks_waiting or _tasks_ready):
if _upload.layer and not _upload.layer.processed:
# GeoNode Layer updating...
_upload.set_processing_state(Upload.STATE_RUNNING)
elif session.state == Upload.STATE_RUNNING and _upload.layer and _upload.layer.processed:
# GeoNode Layer successfully processed...
_upload.set_processing_state(Upload.STATE_PROCESSED)
elif (session.state == Upload.STATE_COMPLETE and _upload.state in (
Upload.STATE_COMPLETE, Upload.STATE_RUNNING, Upload.STATE_PENDING) and not _tasks_waiting) or (
session.state == Upload.STATE_PENDING and _tasks_ready):
if not _upload.layer:
_response = final_step_view(None, _upload.get_session)
if _response:
_upload.refresh_from_db()
Expand All @@ -216,7 +228,7 @@ def _update_upload_session_state(self, upload_session_id: int):
else:
# GeoNode Layer updating...
_upload.set_processing_state(Upload.STATE_RUNNING)
elif _upload.layer.processed:
elif _upload.layer and _upload.layer.processed:
_upload.set_processing_state(Upload.STATE_PROCESSED)
logger.debug(f"Upload {upload_session_id} updated with state {_upload.state}.")
except (NotFound, Exception) as e:
Expand Down
20 changes: 16 additions & 4 deletions geonode/upload/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@
logger = logging.getLogger(__name__)


def _log(msg, *args):
logger.debug(msg, *args)
def _log(msg, *args, level='error'):
# this logger is used also for debug purpose with error level
getattr(logger, level)(msg, *args)


class UploaderSession:
Expand Down Expand Up @@ -187,7 +188,7 @@ def upload(
user = get_default_user()
if isinstance(user, str):
user = get_user_model().objects.get(username=user)
import_session = save_step(
import_session, upload = save_step(
user,
name,
base_file,
Expand Down Expand Up @@ -426,6 +427,10 @@ def save_step(user, layer, spatial_files, overwrite=True, store_spatial_files=Tr
or import_session.tasks[0].state == 'BAD_FORMAT':
error_msg = 'There may be a problem with the data provided - ' \
'we could not identify its format'
elif import_session.tasks[0].state == 'ERROR':
task = import_session.tasks[0]
error_msg = "Unexpected error durng the GeoServer upload" \
"please check GeoServer logs for more information"

if not mosaic and len(import_session.tasks) > 1:
error_msg = "Only a single upload is supported at the moment"
Expand Down Expand Up @@ -455,7 +460,7 @@ def save_step(user, layer, spatial_files, overwrite=True, store_spatial_files=Tr
logger.exception(Exception(error_msg))
raise UploadException(error_msg)
else:
_log("Finished upload of [%s] to GeoServer without errors.", name)
_log("The File [%s] has been sent to GeoServer without errors.", name, level="debug")
return import_session, upload


Expand Down Expand Up @@ -628,6 +633,13 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
lock_id = f'final_step-{import_id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
try:
Upload.objects.get(import_id=import_id)
except Exception as e:
logger.exception(e)
Upload.objects.invalidate_from_session(upload_session)
raise UploadException.from_exc(
_("The Upload Session is no more available"), e)
_log(f'Reloading session {import_id} to check validity')
try:
import_session = gs_uploader.get_session(import_id)
Expand Down
10 changes: 6 additions & 4 deletions geonode/upload/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@
logger = logging.getLogger(__name__)


def _log(msg, *args):
logger.debug(msg, *args)
def _log(msg, *args, level='error'):
# this logger is used also for debug purpose with error level
getattr(logger, level)(msg, *args)


def _get_upload_session(req):
Expand Down Expand Up @@ -534,7 +535,7 @@ def time_step_view(request, upload_session):

form = create_time_form(request, upload_session, request.POST)
if not form.is_valid():
logger.warning('Invalid upload form: %s', form.errors)
logger.exception('Invalid upload form: %s', form.errors)
return error_response(request, errors=["Invalid Submission"])

cleaned = form.cleaned_data
Expand Down Expand Up @@ -639,7 +640,8 @@ def final_step_view(req, upload_session):
)
register_event(req, EventType.EVENT_UPLOAD, saved_layer)
return _json_response
except (LayerNotReady, AssertionError):
except (LayerNotReady, AssertionError) as e:
logger.exception(e)
force_ajax = '&force_ajax=true' if req and 'force_ajax' in req.GET and req.GET['force_ajax'] == 'true' else ''
return json_response(
{
Expand Down

0 comments on commit f4f9010

Please sign in to comment.