Skip to content

Commit

Permalink
[Fixes #9064] Improve Upload Workflow resources state management
Browse files Browse the repository at this point in the history
  • Loading branch information
afabiani committed Apr 10, 2022
1 parent df2df3a commit b9a76b3
Show file tree
Hide file tree
Showing 7 changed files with 680 additions and 636 deletions.
253 changes: 136 additions & 117 deletions geonode/geoserver/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ def geoserver_update_layers(self, *args, **kwargs):
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
return gs_slurp(*args, **kwargs)
try:
return gs_slurp(*args, **kwargs)
finally:
lock.release()


@app.task(
Expand Down Expand Up @@ -110,6 +113,8 @@ def geoserver_set_style(
base_file=base_file)
except Exception as e:
logger.exception(e)
finally:
lock.release()


@app.task(
Expand Down Expand Up @@ -144,45 +149,48 @@ def geoserver_create_style(
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True and instance:
f = None
if sld_file and os.path.exists(sld_file) and os.access(sld_file, os.R_OK):
if os.path.isfile(sld_file):
try:
f = open(sld_file)
except Exception:
pass
elif tempdir and os.path.exists(tempdir):
if os.path.isfile(os.path.join(tempdir, sld_file)):
try:
f = None
if sld_file and os.path.exists(sld_file) and os.access(sld_file, os.R_OK):
if os.path.isfile(sld_file):
try:
f = open(os.path.join(tempdir, sld_file))
f = open(sld_file)
except Exception:
pass
if f:
sld = f.read()
f.close()
if not gs_catalog.get_style(name=name, workspace=settings.DEFAULT_WORKSPACE):
style = gs_catalog.create_style(
name,
sld,
raw=True,
workspace=settings.DEFAULT_WORKSPACE)
gs_layer = gs_catalog.get_layer(name)
_default_style = gs_layer.default_style
gs_layer.default_style = style
gs_catalog.save(gs_layer)
set_styles(instance, gs_catalog)
try:
gs_catalog.delete(_default_style)
Link.objects.filter(
resource=instance.resourcebase_ptr,
name='Legend',
url__contains=f'STYLE={_default_style.name}').delete()
except Exception as e:
logger.exception(e)
elif tempdir and os.path.exists(tempdir):
if os.path.isfile(os.path.join(tempdir, sld_file)):
try:
f = open(os.path.join(tempdir, sld_file))
except Exception:
pass
if f:
sld = f.read()
f.close()
if not gs_catalog.get_style(name=name, workspace=settings.DEFAULT_WORKSPACE):
style = gs_catalog.create_style(
name,
sld,
raw=True,
workspace=settings.DEFAULT_WORKSPACE)
gs_layer = gs_catalog.get_layer(name)
_default_style = gs_layer.default_style
gs_layer.default_style = style
gs_catalog.save(gs_layer)
set_styles(instance, gs_catalog)
try:
gs_catalog.delete(_default_style)
Link.objects.filter(
resource=instance.resourcebase_ptr,
name='Legend',
url__contains=f'STYLE={_default_style.name}').delete()
except Exception as e:
logger.exception(e)
else:
get_sld_for(gs_catalog, instance)
else:
get_sld_for(gs_catalog, instance)
else:
get_sld_for(gs_catalog, instance)
finally:
lock.release()


@app.task(
Expand Down Expand Up @@ -223,101 +231,104 @@ def geoserver_finalize_upload(
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
# Hide the resource until finished
instance.set_processing_state("RUNNING")

from geonode.upload.models import Upload
upload = Upload.objects.get(import_id=import_id)
upload.layer = instance
upload.save()
upload.set_processing_state(Upload.STATE_RUNNING)

try:
# Update the upload sessions
geonode_upload_sessions = UploadSession.objects.filter(resource=instance)
geonode_upload_sessions.update(processed=False)
instance.upload_session = geonode_upload_sessions.first()
except Exception as e:
logger.exception(e)
geoserver_finalize_upload.retry(exc=e)
# Hide the resource until finished
instance.set_processing_state("RUNNING")

# Sanity checks
if isinstance(xml_file, list):
if len(xml_file) > 0:
xml_file = xml_file[0]
else:
from geonode.upload.models import Upload
upload = Upload.objects.get(import_id=import_id)
upload.layer = instance
upload.save()
upload.set_processing_state(Upload.STATE_RUNNING)

try:
# Update the upload sessions
geonode_upload_sessions = UploadSession.objects.filter(resource=instance)
geonode_upload_sessions.update(processed=False)
instance.upload_session = geonode_upload_sessions.first()
except Exception as e:
logger.exception(e)
geoserver_finalize_upload.retry(exc=e)

# Sanity checks
if isinstance(xml_file, list):
if len(xml_file) > 0:
xml_file = xml_file[0]
else:
xml_file = None
elif not isinstance(xml_file, str):
xml_file = None
elif not isinstance(xml_file, str):
xml_file = None

if xml_file and os.path.exists(xml_file) and os.access(xml_file, os.R_OK):
instance.metadata_uploaded = True
if xml_file and os.path.exists(xml_file) and os.access(xml_file, os.R_OK):
instance.metadata_uploaded = True

try:
gs_resource = gs_catalog.get_resource(
name=instance.name,
store=instance.store,
workspace=instance.workspace)
except Exception:
try:
gs_resource = gs_catalog.get_resource(
name=instance.alternate,
name=instance.name,
store=instance.store,
workspace=instance.workspace)
except Exception:
try:
gs_resource = gs_catalog.get_resource(
name=instance.alternate or instance.typename)
name=instance.alternate,
store=instance.store,
workspace=instance.workspace)
except Exception:
gs_resource = None

if gs_resource:
# Updating GeoServer resource
gs_resource.title = instance.title
gs_resource.abstract = instance.abstract
gs_catalog.save(gs_resource)
if gs_resource.store:
instance.storeType = gs_resource.store.resource_type
if not instance.alternate:
instance.alternate = f"{gs_resource.store.workspace.name}:{gs_resource.name}"

if sld_uploaded:
geoserver_set_style(instance.id, sld_file)
else:
geoserver_create_style(instance.id, instance.name, sld_file, tempdir)

logger.debug(f'Finalizing (permissions and notifications) Layer {instance}')
instance.handle_moderated_uploads()
try:
gs_resource = gs_catalog.get_resource(
name=instance.alternate or instance.typename)
except Exception:
gs_resource = None

if gs_resource:
# Updating GeoServer resource
gs_resource.title = instance.title
gs_resource.abstract = instance.abstract
gs_catalog.save(gs_resource)
if gs_resource.store:
instance.storeType = gs_resource.store.resource_type
if not instance.alternate:
instance.alternate = f"{gs_resource.store.workspace.name}:{gs_resource.name}"

if sld_uploaded:
geoserver_set_style(instance.id, sld_file)
else:
geoserver_create_style(instance.id, instance.name, sld_file, tempdir)

if permissions is not None:
logger.debug(f'Setting permissions {permissions} for {instance.name}')
instance.set_permissions(permissions)
logger.debug(f'Finalizing (permissions and notifications) Layer {instance}')
instance.handle_moderated_uploads()

instance.save(notify=not created)
if permissions is not None:
logger.debug(f'Setting permissions {permissions} for {instance.name}')
instance.set_permissions(permissions)

try:
logger.debug(f"... Cleaning up the temporary folders {tempdir}")
if tempdir and os.path.exists(tempdir):
shutil.rmtree(tempdir)
except Exception as e:
logger.warning(e)
instance.save(notify=not created)

try:
# Update the upload sessions
geonode_upload_sessions = UploadSession.objects.filter(resource=instance)
geonode_upload_sessions.update(processed=True)
instance.upload_session = geonode_upload_sessions.first()
upload.complete = True
upload.save()
upload.set_processing_state(Upload.STATE_PROCESSED)
except Exception as e:
logger.exception(e)
upload.complete = False
upload.save()
upload.set_processing_state(Upload.STATE_INVALID)
geoserver_finalize_upload.retry(exc=e)
try:
logger.debug(f"... Cleaning up the temporary folders {tempdir}")
if tempdir and os.path.exists(tempdir):
shutil.rmtree(tempdir)
except Exception as e:
logger.warning(e)

signals.upload_complete.send(sender=geoserver_finalize_upload, layer=instance)
try:
# Update the upload sessions
geonode_upload_sessions = UploadSession.objects.filter(resource=instance)
geonode_upload_sessions.update(processed=True)
instance.upload_session = geonode_upload_sessions.first()
upload.complete = True
upload.save()
upload.set_processing_state(Upload.STATE_PROCESSED)
except Exception as e:
logger.exception(e)
upload.complete = False
upload.save()
upload.set_processing_state(Upload.STATE_INVALID)
geoserver_finalize_upload.retry(exc=e)

signals.upload_complete.send(sender=geoserver_finalize_upload, layer=instance)
finally:
lock.release()


@app.task(
Expand All @@ -343,11 +354,14 @@ def geoserver_post_save_layers(
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
sync_instance_with_geoserver(instance_id, *args, **kwargs)
try:
sync_instance_with_geoserver(instance_id, *args, **kwargs)

# Updating HAYSTACK Indexes if needed
if settings.HAYSTACK_SEARCH:
call_command('update_index')
# Updating HAYSTACK Indexes if needed
if settings.HAYSTACK_SEARCH:
call_command('update_index')
finally:
lock.release()


@app.task(
Expand Down Expand Up @@ -382,6 +396,8 @@ def geoserver_create_thumbnail(self, instance_id, overwrite=True, check_bbox=Tru
logger.debug(f"... Created Thumbnail for Layer {instance.title}")
except Exception as e:
geoserver_create_thumbnail.retry(exc=e)
finally:
lock.release()


@app.task(
Expand All @@ -404,4 +420,7 @@ def geoserver_cascading_delete(self, *args, **kwargs):
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
return cascading_delete(*args, **kwargs)
try:
return cascading_delete(*args, **kwargs)
finally:
lock.release()
3 changes: 3 additions & 0 deletions geonode/monitoring/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,9 @@ def collect_metric(**options):
except Exception as e:
log.info(f'[{lock_id}] Collecting Metrics - errored @ {_end_time}')
log.exception(e)
finally:
lock.release()

log.info(f'[{lock_id}] Collecting Metrics - exit @ {_end_time}')
return (_start_time, _end_time)

Expand Down
2 changes: 2 additions & 0 deletions geonode/services/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,5 @@ def probe_services(self):
logger.error(e)
except Exception as e:
logger.error(e)
finally:
lock.release()
9 changes: 6 additions & 3 deletions geonode/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def __enter__(self):
self.lock = memcache_lock(self.lock_id)
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
self.release()

def acquire(self):
if settings.ASYNC_SIGNALS:
try:
Expand All @@ -103,12 +106,12 @@ def acquire(self):
logger.warning(e)
return True

def __exit__(self, exc_type, exc_value, exc_traceback):
def release(self):
if self.lock:
try:
self.lock.release()
except Exception:
pass
except Exception as e:
logger.debug(e)


class FaultTolerantTask(celery.Task):
Expand Down
Loading

0 comments on commit b9a76b3

Please sign in to comment.