diff --git a/geonode/geoserver/helpers.py b/geonode/geoserver/helpers.py index 0be2f264513..4a47f778a7a 100755 --- a/geonode/geoserver/helpers.py +++ b/geonode/geoserver/helpers.py @@ -66,7 +66,7 @@ from owslib.wms import WebMapService from geonode import GeoNodeException from geonode.utils import http_client -from geonode.layers.models import Layer, UploadSession, Attribute, Style +from geonode.layers.models import Layer, Attribute, Style from geonode.layers.enumerations import LAYER_ATTRIBUTE_NUMERIC_DATA_TYPES from geonode.security.views import _perms_info_json from geonode.security.utils import set_geowebcache_invalidate_cache @@ -2119,10 +2119,6 @@ def sync_instance_with_geoserver( sender=instance.__class__, instance=instance, update_fields=['thumbnail_url']) return instance - geonode_upload_sessions = UploadSession.objects.filter(resource=instance) - geonode_upload_sessions.update(processed=False) - instance.set_dirty_state() - gs_resource = None values = None _tries = 0 @@ -2315,10 +2311,6 @@ def sync_instance_with_geoserver( except Exception as e: logger.exception(e) return None - finally: - geonode_upload_sessions = UploadSession.objects.filter(resource=instance) - geonode_upload_sessions.update(processed=True) - instance.clear_dirty_state() # Refreshing layer links logger.debug(f"... Creating Default Resource Links for Layer {instance.title}") diff --git a/geonode/geoserver/tasks.py b/geonode/geoserver/tasks.py index 3b856b11a62..cf7c486197d 100644 --- a/geonode/geoserver/tasks.py +++ b/geonode/geoserver/tasks.py @@ -214,6 +214,9 @@ def geoserver_finalize_upload( lock_id = f'{self.request.id}' with AcquireLock(lock_id) as lock: if lock.acquire() is True: + # Hide the resource until finished + instance.set_dirty_state() + from geonode.upload.models import Upload upload = Upload.objects.get(import_id=import_id) upload.layer = instance @@ -287,9 +290,19 @@ def geoserver_finalize_upload( shutil.rmtree(tempdir) except Exception as e: logger.warning(e) + + try: + # Update the upload sessions + geonode_upload_sessions = UploadSession.objects.filter(resource=instance) + geonode_upload_sessions.update(processed=True) + instance.upload_session = geonode_upload_sessions.first() + except Exception as e: + logger.exception(e) finally: upload.complete = True upload.save() + # Show the resource finally finished + upload.set_processing_state(Upload.STATE_PROCESSED) signals.upload_complete.send(sender=geoserver_finalize_upload, layer=instance) @@ -302,7 +315,7 @@ def geoserver_finalize_upload( expires=3600, acks_late=False, autoretry_for=(Exception, ), - retry_kwargs={'max_retries': 3, 'countdown': 10}, + retry_kwargs={'max_retries': 0, 'countdown': 10}, retry_backoff=True, retry_backoff_max=700, retry_jitter=True) @@ -331,7 +344,7 @@ def geoserver_post_save_layers( expires=30, acks_late=False, autoretry_for=(Exception, ), - retry_kwargs={'max_retries': 3, 'countdown': 10}, + retry_kwargs={'max_retries': 0, 'countdown': 10}, retry_backoff=True, retry_backoff_max=700, retry_jitter=True) diff --git a/geonode/upload/__init__.py b/geonode/upload/__init__.py index 3a0b808b1f1..f2d194a5da2 100644 --- a/geonode/upload/__init__.py +++ b/geonode/upload/__init__.py @@ -50,7 +50,7 @@ def run_setup_hooks(sender, **kwargs): check_intervals = IntervalSchedule.objects.filter(every=600, period="seconds") if not check_intervals.exists(): check_interval, _ = IntervalSchedule.objects.get_or_create( - every=600, + every=10, period="seconds" ) else: @@ -76,7 +76,7 @@ def ready(self): post_migrate.connect(run_setup_hooks, sender=self) settings.CELERY_BEAT_SCHEDULE['finalize-incomplete-session-resources'] = { 'task': 'geonode.upload.tasks.finalize_incomplete_session_uploads', - 'schedule': 60.0, + 'schedule': 10.0, } diff --git a/geonode/upload/models.py b/geonode/upload/models.py index 94d95194d7d..32d9bf15ddc 100644 --- a/geonode/upload/models.py +++ b/geonode/upload/models.py @@ -181,12 +181,7 @@ def progress(self): return 50.0 elif self.state == Upload.STATE_PROCESSED: return 100.0 - elif self.complete or self.state in (Upload.STATE_COMPLETE, Upload.STATE_RUNNING): - if self.layer and self.layer.processed: - self.set_processing_state(Upload.STATE_PROCESSED) - return 100.0 - elif self.state == Upload.STATE_RUNNING: - return 66.0 + elif self.state in (Upload.STATE_COMPLETE, Upload.STATE_RUNNING): return 80.0 def set_resume_url(self, resume_url): diff --git a/geonode/upload/tasks.py b/geonode/upload/tasks.py index 06cec051b61..22c7f57e550 100644 --- a/geonode/upload/tasks.py +++ b/geonode/upload/tasks.py @@ -23,6 +23,7 @@ from gsimporter.api import NotFound from django.conf import settings +from django.db import transaction from django.utils.timezone import timedelta, now from geonode.celery_app import app @@ -60,72 +61,77 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs): lock_id = f'{self.request.id}' with AcquireLock(lock_id) as lock: if lock.acquire() is True: - _upload_ids = [] - _upload_tasks = [] - - # Check first if we need to delete stale sessions - expiry_time = now() - timedelta(hours=UPLOAD_SESSION_EXPIRY_HOURS) - for _upload in Upload.objects.exclude(state=Upload.STATE_PROCESSED).exclude(date__gt=expiry_time): - _upload.set_processing_state(Upload.STATE_INVALID) - _upload_ids.append(_upload.id) - _upload_tasks.append( - _upload_session_cleanup.signature( - args=(_upload.id,) + with transaction.atomic(): + _upload_ids = [] + _upload_tasks = [] + + # Check first if we need to delete stale sessions + expiry_time = now() - timedelta(hours=UPLOAD_SESSION_EXPIRY_HOURS) + for _upload in Upload.objects.exclude(state=Upload.STATE_PROCESSED).exclude(date__gt=expiry_time): + _upload.set_processing_state(Upload.STATE_INVALID) + _upload_ids.append(_upload.id) + _upload_tasks.append( + _upload_session_cleanup.signature( + args=(_upload.id,) + ) ) - ) - upload_workflow_finalizer = _upload_workflow_finalizer.signature( - args=('_upload_session_cleanup', _upload_ids,), - immutable=True - ).on_error( - _upload_workflow_error.signature( + upload_workflow_finalizer = _upload_workflow_finalizer.signature( args=('_upload_session_cleanup', _upload_ids,), immutable=True + ).on_error( + _upload_workflow_error.signature( + args=('_upload_session_cleanup', _upload_ids,), + immutable=True + ) ) - ) - upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer) - upload_workflow.apply_async() - - # Let's finish the valid ones - _processing_states = ( - Upload.STATE_RUNNING, - Upload.STATE_INVALID, - Upload.STATE_PROCESSED) - for _upload in Upload.objects.exclude(state__in=_processing_states): - session = None - try: - if not _upload.import_id: - raise NotFound - session = _upload.get_session.import_session - if not session or session.state != Upload.STATE_COMPLETE: - session = gs_uploader.get_session(_upload.import_id) - except (NotFound, Exception) as e: - logger.exception(e) + upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer) + upload_workflow.apply_async() + + # Let's finish the valid ones + _exclusion_processing_states = ( + Upload.STATE_COMPLETE, + Upload.STATE_PROCESSED) + for _upload in Upload.objects.exclude(state__in=_exclusion_processing_states).exclude(id__in=_upload_ids): session = None - if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED): - _upload.set_processing_state(Upload.STATE_INVALID) - if _upload.layer: - _upload.layer.delete() - - if session: - _upload_ids.append(_upload.id) - _upload_tasks.append( - _update_upload_session_state.signature( - args=(_upload.id,) + try: + if not _upload.import_id: + raise NotFound + session = _upload.get_session.import_session + if not session or session.state != Upload.STATE_COMPLETE: + session = gs_uploader.get_session(_upload.import_id) + except (NotFound, Exception) as e: + logger.exception(e) + session = None + + if session: + _upload_ids.append(_upload.id) + _upload_tasks.append( + _update_upload_session_state.signature( + args=(_upload.id,) + ) ) - ) - - upload_workflow_finalizer = _upload_workflow_finalizer.signature( - args=('_update_upload_session_state', _upload_ids,), - immutable=True - ).on_error( - _upload_workflow_error.signature( + else: + if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED): + _upload.set_processing_state(Upload.STATE_INVALID) + _upload_ids.append(_upload.id) + _upload_tasks.append( + _upload_session_cleanup.signature( + args=(_upload.id,) + ) + ) + + upload_workflow_finalizer = _upload_workflow_finalizer.signature( args=('_update_upload_session_state', _upload_ids,), immutable=True + ).on_error( + _upload_workflow_error.signature( + args=('_update_upload_session_state', _upload_ids,), + immutable=True + ) ) - ) - upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer) - upload_workflow.apply_async() + upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer) + upload_workflow.apply_async() @app.task( @@ -163,38 +169,55 @@ def _upload_workflow_error(self, task_name: str, upload_ids: list): ) def _update_upload_session_state(self, upload_session_id: int): """Task invoked by 'upload_workflow.chord' in order to process all the 'PENDING' Upload tasks.""" - - lock_id = f'{self.request.id}' - with AcquireLock(lock_id) as lock: - if lock.acquire() is True: - _upload = Upload.objects.get(id=upload_session_id) - session = _upload.get_session.import_session - if not session or session.state != Upload.STATE_COMPLETE: - session = gs_uploader.get_session(_upload.import_id) - - if session: - try: - content = next_step_response(None, _upload.get_session).content - if isinstance(content, bytes): - content = content.decode('UTF-8') - response_json = json.loads(content) - _success = response_json.get('success', False) - _redirect_to = response_json.get('redirect_to', '') - if _success: - if 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to: - _upload.set_resume_url(_redirect_to) - _upload.set_processing_state(Upload.STATE_WAITING) - else: - if session.state == Upload.STATE_COMPLETE and _upload.state == Upload.STATE_PENDING: - if not _upload.layer or not _upload.layer.processed: - final_step_view(None, _upload.get_session) - _upload.set_processing_state(Upload.STATE_RUNNING) - except (NotFound, Exception) as e: - logger.exception(e) - if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED): - _upload.set_processing_state(Upload.STATE_INVALID) - if _upload.layer: - _upload.layer.delete() + _upload = Upload.objects.get(id=upload_session_id) + session = _upload.get_session.import_session + if not session or session.state != Upload.STATE_COMPLETE: + session = gs_uploader.get_session(_upload.import_id) + + if session: + try: + _response = next_step_response(None, _upload.get_session) + _upload.refresh_from_db() + _content = _response.content + if isinstance(_content, bytes): + _content = _content.decode('UTF-8') + _response_json = json.loads(_content) + _success = _response_json.get('success', False) + _redirect_to = _response_json.get('redirect_to', '') + if _success: + if 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to: + _upload.set_resume_url(_redirect_to) + _upload.set_processing_state(Upload.STATE_WAITING) + else: + if session.state == Upload.STATE_COMPLETE and _upload.state == Upload.STATE_PENDING: + if not _upload.layer or not _upload.layer.processed: + _response = final_step_view(None, _upload.get_session) + _upload.refresh_from_db() + _content = _response.content + if isinstance(_content, bytes): + _content = _content.decode('UTF-8') + _response_json = json.loads(_content) + _success = _response_json.get('success', False) + _status = _response_json.get('status', 'error') + if _status == 'error': + # GeoNode Layer creation errored! + _upload.set_processing_state(Upload.STATE_INVALID) + elif _status == 'pending': + # GeoNode Layer not ready yet... + _upload.set_processing_state(Upload.STATE_PENDING) + elif _upload.state != Upload.STATE_PROCESSED: + if _upload.layer and _upload.layer.processed: + # GeoNode Layer successfully processed... + _upload.set_processing_state(Upload.STATE_PROCESSED) + else: + # GeoNode Layer updating... + _upload.set_processing_state(Upload.STATE_RUNNING) + logger.debug(f"Upload {upload_session_id} updated with state {_upload.state}.") + except (NotFound, Exception) as e: + logger.exception(e) + if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED): + _upload.set_processing_state(Upload.STATE_INVALID) + logger.error(f"Upload {upload_session_id} deleted with state {_upload.state}.") @app.task( @@ -206,15 +229,11 @@ def _update_upload_session_state(self, upload_session_id: int): ) def _upload_session_cleanup(self, upload_session_id: int): """Task invoked by 'upload_workflow.chord' in order to remove and cleanup all the 'INVALID' stale Upload tasks.""" - - lock_id = f'{self.request.id}' - with AcquireLock(lock_id) as lock: - if lock.acquire() is True: - try: - _upload = Upload.objects.get(id=upload_session_id) - if _upload.layer: - _upload.layer.delete() - _upload.delete() - logger.debug(f"Upload {upload_session_id} deleted with state {_upload.state}.") - except Exception as e: - logger.error(f"Upload {upload_session_id} errored with exception {e}.") + try: + _upload = Upload.objects.get(id=upload_session_id) + if _upload.layer: + _upload.layer.delete() + _upload.delete() + logger.debug(f"Upload {upload_session_id} deleted with state {_upload.state}.") + except Exception as e: + logger.error(f"Upload {upload_session_id} errored with exception {e}.") diff --git a/geonode/upload/upload.py b/geonode/upload/upload.py index 6e67321aad0..6fce1fd1532 100644 --- a/geonode/upload/upload.py +++ b/geonode/upload/upload.py @@ -567,6 +567,7 @@ def srs_step(upload_session, source, target): Upload.objects.update_from_session(upload_session) +@transaction.atomic def final_step(upload_session, user, charset="UTF-8", layer_id=None): import_session = upload_session.import_session import_id = import_session.id @@ -774,19 +775,16 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None): if not created: return saved_layer + # Hide the resource until finished + saved_layer.set_dirty_state() + # Create a new upload session - try: - with transaction.atomic(): - geonode_upload_session, created = UploadSession.objects.get_or_create( - resource=saved_layer, user=user - ) - geonode_upload_session.processed = False - geonode_upload_session.save() - Upload.objects.update_from_session( - upload_session, layer=saved_layer) - except IntegrityError as e: - Upload.objects.invalidate_from_session(upload_session) - raise UploadException.from_exc(_('Error configuring Layer'), e) + geonode_upload_session, created = UploadSession.objects.get_or_create( + resource=saved_layer, user=user + ) + geonode_upload_session.processed = False + geonode_upload_session.save() + Upload.objects.update_from_session(upload_session, layer=saved_layer) # Add them to the upload session (new file fields are created). assigned_name = None