Skip to content

Commit

Permalink
[3.3.x] Upload workflow stabilization (#9085)
Browse files Browse the repository at this point in the history
* - Fixing a variable referenced before assignment

 - Avoid import_session to be commited twice

* - Making sure we don't have duplicated Upload Sessions on the DB

* Fix upload workflow

* Fix LGTM issue

Co-authored-by: mattiagiupponi <[email protected]>
  • Loading branch information
Alessio Fabiani and mattiagiupponi authored Apr 13, 2022
1 parent f4f9010 commit 9e21415
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 99 deletions.
20 changes: 20 additions & 0 deletions geonode/upload/migrations/0035_auto_20220412_1700.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 2.2.24 on 2022-04-12 17:00

from django.conf import settings
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('upload', '0034_auto_20220408_0922'),
]

operations = [
migrations.AlterField(
model_name='upload',
name='name',
field=models.CharField(max_length=64),
)
]
5 changes: 3 additions & 2 deletions geonode/upload/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def invalidate_from_session(self, upload_session):
).update(state=Upload.STATE_INVALID)

def update_from_session(self, upload_session, layer=None):
self.get(
return self.get(
user=upload_session.user,
name=upload_session.name,
import_id=upload_session.import_session.id).update_from_session(
Expand Down Expand Up @@ -102,7 +102,7 @@ class Upload(models.Model):
layer = models.ForeignKey(Layer, null=True, on_delete=models.SET_NULL)
upload_dir = models.TextField(null=True)
store_spatial_files = models.BooleanField(default=True)
name = models.CharField(max_length=64, null=True)
name = models.CharField(max_length=64, null=False, blank=False)
complete = models.BooleanField(default=False)
# hold our serialized session object
session = models.TextField(null=True, blank=True)
Expand Down Expand Up @@ -196,6 +196,7 @@ def update_from_session(self, upload_session, layer=None):
self.complete = True

self.save()
return self.get_session

@property
def progress(self):
Expand Down
4 changes: 2 additions & 2 deletions geonode/upload/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs):
upload_workflow.apply_async()

# Let's finish the valid ones
session = None
for _upload in Upload.objects.exclude(state__in=[Upload.STATE_PROCESSED, Upload.STATE_INVALID]).exclude(id__in=_upload_ids_expired):
session = None
try:
if not _upload.import_id:
raise NotFound
Expand Down Expand Up @@ -195,7 +195,7 @@ def _update_upload_session_state(self, upload_session_id: int):
if _tasks_failed:
# GeoNode Layer creation errored!
_upload.set_processing_state(Upload.STATE_INVALID)
elif 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to and (_tasks_waiting or _tasks_ready):
elif 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to and (_tasks_waiting or _upload.get_session.time):
_upload.set_resume_url(_redirect_to)
_upload.set_processing_state(Upload.STATE_WAITING)
elif session.state in (Upload.STATE_PENDING, Upload.STATE_RUNNING) and not (_tasks_waiting or _tasks_ready):
Expand Down
139 changes: 84 additions & 55 deletions geonode/upload/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,15 @@ def _get_layer_type(spatial_files):
return the_layer_type


@transaction.atomic
def save_step(user, layer, spatial_files, overwrite=True, store_spatial_files=True,
mosaic=False, append_to_mosaic_opts=None, append_to_mosaic_name=None,
mosaic_time_regex=None, mosaic_time_value=None,
time_presentation=None, time_presentation_res=None,
time_presentation_default_value=None,
time_presentation_reference_value=None,
charset_encoding="UTF-8", target_store=None):
logger.debug(
_log(
f'Uploading layer: {layer}, files {spatial_files}')
if len(spatial_files) > 1:
# we only support more than one file if they're rasters for mosaicing
Expand All @@ -296,7 +297,7 @@ def save_step(user, layer, spatial_files, overwrite=True, store_spatial_files=Tr
logger.exception(Exception(msg))
raise UploadException(msg)
name = get_valid_layer_name(layer, overwrite)
logger.debug(f'Name for layer: {name}')
_log(f'Name for layer: {name}')
if not any(spatial_files.all_files()):
msg = "Unable to recognize the uploaded file(s)"
logger.exception(Exception(msg))
Expand All @@ -310,25 +311,25 @@ def save_step(user, layer, spatial_files, overwrite=True, store_spatial_files=Tr
logger.exception(Exception(msg))
raise RuntimeError(msg)
files_to_upload = preprocess_files(spatial_files)
logger.debug(f"files_to_upload: {files_to_upload}")
logger.debug(f'Uploading {the_layer_type}')
_log(f"files_to_upload: {files_to_upload}")
_log(f'Uploading {the_layer_type}')
error_msg = None
try:
upload = Upload.objects.filter(
user=user,
name=name,
state=Upload.STATE_READY,
upload_dir=spatial_files.dirname
).first()
upload = None
if Upload.objects.filter(user=user, name=name).exists():
upload = Upload.objects.filter(user=user, name=name).order_by('-date').first()
if upload:
import_session = upload.get_session.import_session
else:
if upload.state == Upload.STATE_READY:
import_session = upload.get_session.import_session
else:
upload = None
if not upload:
next_id = _get_next_id()
# Truncate name to maximum length defined by the field.
max_length = Upload._meta.get_field('name').max_length
name = name[:max_length]
# save record of this whether valid or not - will help w/ debugging
upload, _ = Upload.objects.get_or_create(
upload = Upload.objects.create(
user=user,
name=name,
state=Upload.STATE_READY,
Expand Down Expand Up @@ -633,13 +634,16 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
lock_id = f'final_step-{import_id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
_upload = None
try:
Upload.objects.get(import_id=import_id)
_upload = Upload.objects.get(import_id=import_id)
saved_layer = _upload.layer
except Exception as e:
logger.exception(e)
Upload.objects.invalidate_from_session(upload_session)
raise UploadException.from_exc(
_("The Upload Session is no more available"), e)

_log(f'Reloading session {import_id} to check validity')
try:
import_session = gs_uploader.get_session(import_id)
Expand All @@ -649,8 +653,9 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
raise UploadException.from_exc(
_("The GeoServer Import Session is no more available"), e)

upload_session.import_session = import_session
Upload.objects.update_from_session(upload_session)
upload_session.import_session = import_session.reload()

upload_session = Upload.objects.update_from_session(upload_session)

# Create the style and assign it to the created resource
# FIXME: Put this in gsconfig.py
Expand All @@ -659,9 +664,19 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
overwrite = task.updateMode == 'REPLACE'
# @todo see above in save_step, regarding computed unique name
name = task.layer.name
target = task.target

if layer_id:
name = Layer.objects.get(resourcebase_ptr_id=layer_id).name
if saved_layer:
name = saved_layer.name

_vals = dict(
title=upload_session.layer_title,
abstract=upload_session.layer_abstract,
alternate=task.get_target_layer_name(),
store=target.name,
name=task.layer.name,
workspace=target.workspace_name
)

_log(f'Getting from catalog [{name}]')
try:
Expand All @@ -675,12 +690,24 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
raise LayerNotReady(
_(f"Expected to find layer named '{name}' in geoserver"))

_tasks_ready = any([_task.state in ["READY"] for _task in import_session.tasks])
_tasks_failed = any([_task.state in ["BAD_FORMAT", "ERROR", "CANCELED"] for _task in import_session.tasks])
_tasks_waiting = any([_task.state in ["NO_CRS", "NO_BOUNDS", "NO_FORMAT"] for _task in import_session.tasks])

if not _tasks_failed and not _tasks_waiting and (import_session.state == 'READY' or (import_session.state == 'PENDING' and task.state == 'READY')):
if not saved_layer and not (_tasks_failed or _tasks_waiting) and (
import_session.state == Upload.STATE_READY or (import_session.state == Upload.STATE_PENDING and _tasks_ready)):
_log(f"final_step: Running Import Session {import_session.id} - target: {target.name} - alternate: {task.get_target_layer_name()}")
_log(f" -- session state: {import_session.state} - task state: {task.state}")
import_session.commit()
elif _tasks_failed or (import_session.state == 'INCOMPLETE' and task.state != 'ERROR'):
import_session = import_session.reload()
task = import_session.tasks[0]
name = task.layer.name
_vals['name'] = task.layer.name
_vals['alternate'] = task.get_target_layer_name()
_vals['store'] = task.target.name
_vals['workspace'] = task.target.workspace_name

elif import_session.state == Upload.STATE_INCOMPLETE and task.state != 'ERROR':
Upload.objects.invalidate_from_session(upload_session)
raise Exception(f'unknown item state: {task.state}')
try:
Expand All @@ -691,14 +718,11 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
raise UploadException.from_exc(
_("The GeoServer Import Session is no more available"), e)
upload_session.import_session = import_session
Upload.objects.update_from_session(upload_session)
upload_session = Upload.objects.update_from_session(upload_session)

_tasks_failed = any([_task.state in ["BAD_FORMAT", "ERROR", "CANCELED"] for _task in import_session.tasks])
# _tasks_failed = any([_task.state in ["BAD_FORMAT", "ERROR", "CANCELED"] for _task in import_session.tasks])
_tasks_waiting = any([_task.state in ["NO_CRS", "NO_BOUNDS", "NO_FORMAT"] for _task in import_session.tasks])

if _tasks_failed:
Upload.objects.invalidate_from_session(upload_session)
raise Exception('Import Session failed.')
if import_session.state != Upload.STATE_COMPLETE or _tasks_waiting:
return None

Expand All @@ -708,18 +732,18 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
Upload.objects.invalidate_from_session(upload_session)
raise Exception('Import Session failed. More than Upload Session associated to the Importer ID {import_id}')
saved_layer = Upload.objects.filter(import_id=import_id).get().layer
created = False

layer_uuid = None

if saved_layer:
if saved_layer.processed:
Upload.objects.filter(import_id=import_id).get().set_processing_state(Upload.STATE_PROCESSED)
_vals['name'] = saved_layer.name
_log(f'Django record for [{saved_layer.name}] already exists, updating with vals: {_vals}')
Layer.objects.filter(id=saved_layer.id).update(**_vals)
saved_layer.refresh_from_db()
return saved_layer
else:
_log(f'Django record for [{name}] does not exist, creating with vals: {_vals}')

target = task.target
alternate = task.get_target_layer_name()
layer_uuid = None
title = upload_session.layer_title
abstract = upload_session.layer_abstract
regions = []
keywords = []
vals = {}
Expand Down Expand Up @@ -753,7 +777,7 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
metadata_uploaded = True
except Exception as e:
Upload.objects.invalidate_from_session(upload_session)
logger.error(e)
logger.exception(e)
raise GeoNodeException(
_("Exception occurred while parsing the provided Metadata file."), e)

Expand All @@ -779,20 +803,20 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
if upload_session.mosaic:
if not upload_session.append_to_mosaic_opts:
saved_dataset_filter = Layer.objects.filter(
store=target.name,
alternate=alternate,
workspace=target.workspace_name,
name=task.layer.name)
store=_vals.get('store'),
alternate=_vals.get('alternate'),
workspace=_vals.get('workspace'),
name=_vals.get('name'))
if not saved_dataset_filter.exists():
saved_layer = Layer.objects.create(
uuid=layer_uuid or str(uuid.uuid4()),
store=target.name,
store=_vals.get('store'),
storeType=target.store_type,
alternate=alternate,
workspace=target.workspace_name,
title=title,
name=task.layer.name,
abstract=abstract or '',
alternate=_vals.get('alternate'),
workspace=_vals.get('workspace'),
title=_vals.get('title'),
name=_vals.get('name'),
abstract=_vals.get('abstract', _('No abstract provided')),
owner=user,
temporal_extent_start=start,
temporal_extent_end=end,
Expand Down Expand Up @@ -829,20 +853,20 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
else:
# The dataset is a standard one, no mosaic options enabled...
saved_dataset_filter = Layer.objects.filter(
store=target.name,
alternate=alternate,
workspace=target.workspace_name,
name=task.layer.name)
store=_vals.get('store'),
alternate=_vals.get('alternate'),
workspace=_vals.get('workspace'),
name=_vals.get('name'))
if not saved_dataset_filter.exists():
saved_layer = Layer.objects.create(
uuid=layer_uuid or str(uuid.uuid4()),
store=target.name,
store=_vals.get('store'),
storeType=target.store_type,
alternate=alternate,
workspace=target.workspace_name,
title=title,
name=task.layer.name,
abstract=abstract or '',
alternate=_vals.get('alternate'),
workspace=_vals.get('workspace'),
title=_vals.get('title'),
name=_vals.get('name'),
abstract=_vals.get('abstract', _('No abstract provided')),
owner=user,
temporal_extent_start=start,
temporal_extent_end=end,
Expand All @@ -857,8 +881,12 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
else:
raise GeoNodeException(f"There's an incosistent number of Datasets on the DB for {task.layer.name}")

assert _upload
assert saved_layer

_upload.layer = saved_layer
_upload.save()

if not created and not overwrite:
return saved_layer

Expand All @@ -868,7 +896,7 @@ def final_step(upload_session, user, charset="UTF-8", layer_id=None):
)
geonode_upload_session.processed = False
geonode_upload_session.save()
Upload.objects.update_from_session(upload_session, layer=saved_layer)
upload_session = Upload.objects.update_from_session(upload_session, layer=saved_layer)

# Add them to the upload session (new file fields are created).
assigned_name = None
Expand Down Expand Up @@ -959,6 +987,7 @@ def _store_file(saved_layer,
logger.exception(e)
sld_file = sld_file[0]
except Exception as e:
logger.exception(e)
raise UploadException.from_exc(_('Error uploading Dataset'), e)
sld_uploaded = True
else:
Expand Down
Loading

0 comments on commit 9e21415

Please sign in to comment.