Skip to content

Commit

Permalink
[Backport to 3.3.x][Fixes #9056] Layer upload via API stuck in status… (
Browse files Browse the repository at this point in the history
#9060)

* [Backport to 3.3.x][Fixes #9056] Layer upload via API stuck in status waiting

* [CircleCI] Fix test cases

* [CircleCI] Fix test cases
  • Loading branch information
Alessio Fabiani authored Apr 7, 2022
1 parent b3adc55 commit 1fa8fb9
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 40 deletions.
111 changes: 96 additions & 15 deletions geonode/upload/api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,23 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import os
import shutil
import logging
import tempfile
from unittest import mock

from io import IOBase
from django.core.exceptions import ValidationError
from gisdata import GOOD_DATA
from time import sleep
from unittest import mock
from gisdata import GOOD_DATA, BAD_DATA
from urllib.request import urljoin

from django.conf import settings

from django.urls import reverse
from django.contrib.auth import authenticate, get_user_model
from django.test.utils import override_settings
from django.core.exceptions import ValidationError

from requests_toolbelt.multipart.encoder import MultipartEncoder

Expand All @@ -49,6 +50,7 @@

from geonode.tests.base import GeoNodeLiveTestSupport
from geonode.geoserver.helpers import ogc_server_settings
from geonode.upload.tasks import _update_upload_session_state
from geonode.upload.models import UploadSizeLimit

from ..models import Upload
Expand Down Expand Up @@ -253,7 +255,7 @@ def live_upload_file(self, _file):
f"probably not json, status {response.status_code} / {response.content}"))
return response, response.content

def rest_upload_file(self, _file, username=GEONODE_USER, password=GEONODE_PASSWD, non_interactive=False):
def rest_upload_by_path(self, _file, username=GEONODE_USER, password=GEONODE_PASSWD, non_interactive=False):
""" function that uploads a file, or a collection of files, to
the GeoNode"""
assert authenticate(username=username, password=password)
Expand Down Expand Up @@ -360,6 +362,7 @@ def rest_upload_file_by_path(self, _file, username=GEONODE_USER, password=GEONOD
# """
# pass

# AF: Intermittent failures on CircleCI
# @as_superuser
# def test_live_uploads(self):
# """
Expand All @@ -370,13 +373,12 @@ def rest_upload_file_by_path(self, _file, username=GEONODE_USER, password=GEONOD
# resp, data = self.live_upload_file(fname)
# self.assertEqual(resp.status_code, 200)
# self.assertTrue(data['success'])

# self.assertIn('redirect_to', data)
# headers = {
# 'X-CSRFToken': self.csrf_token,
# 'X-Requested-With': 'XMLHttpRequest',
# 'Cookie': f'csrftoken={self.csrf_token}; sessionid={self.session_id}'
# }

# url = urljoin(
# settings.SITEURL,
# f"{reverse('uploads-list')}.json")
Expand Down Expand Up @@ -474,7 +476,7 @@ def test_rest_uploads(self):
"""
# Try to upload a good raster file and check the session IDs
fname = os.path.join(GOOD_DATA, 'raster', 'relief_san_andres.tif')
resp, data = self.rest_upload_file(fname)
resp, data = self.rest_upload_by_path(fname)
self.assertEqual(resp.status_code, 200)
self.assertTrue(data['success'])

Expand Down Expand Up @@ -549,7 +551,7 @@ def test_rest_uploads_non_interactive(self):
"""
# Try to upload a good raster file and check the session IDs
fname = os.path.join(GOOD_DATA, 'raster', 'relief_san_andres.tif')
resp, data = self.rest_upload_file(fname, non_interactive=True)
resp, data = self.rest_upload_by_path(fname, non_interactive=True)
self.assertEqual(resp.status_code, 200)
self.assertTrue(data['success'])

Expand Down Expand Up @@ -624,7 +626,7 @@ def test_rest_uploads_by_path(self):
"""
# Try to upload a good raster file and check the session IDs
fname = os.path.join(GOOD_DATA, 'raster', 'relief_san_andres.tif')
resp, data = self.rest_upload_file_by_path(fname)
resp, data = self.rest_upload_by_path_by_path(fname)
self.assertEqual(resp.status_code, 200)
self.assertTrue(data['success'])

Expand Down Expand Up @@ -697,9 +699,9 @@ def test_rest_uploads_no_crs(self):
"""
Ensure the upload process turns to `WAITING` status whenever a `CRS` info is missing from the GIS backend.
"""
# Try to upload a good raster file and check the session IDs
# Try to upload a shapefile without a CRS def.
fname = os.path.join(os.getcwd(), 'geonode/tests/data/san_andres_y_providencia_coastline_no_prj.zip')
resp, data = self.rest_upload_file(fname)
resp, data = self.rest_upload_by_path(fname)
self.assertEqual(resp.status_code, 200)
self.assertEqual(data['status'], 'incomplete')
self.assertTrue(data['success'])
Expand All @@ -713,12 +715,91 @@ def test_rest_uploads_no_crs(self):
self.assertEqual(response.data['total'], 1, response.data['total'])
# Pagination
self.assertEqual(len(response.data['uploads']), 1)
logger.debug(response.data)
self.assertEqual(response.status_code, 200)
upload_data = response.data['uploads'][0]
self.assertIsNotNone(upload_data)
self.assertEqual(upload_data['name'], 'san_andres_y_providencia_coastline_no_prj', upload_data['name'])
if upload_data['state'] != Upload.STATE_WAITING:
for _cnt in range(0, 10):
response = self.client.get(url, format='json')
self.assertEqual(response.status_code, 200)
logger.error(f"[{_cnt + 1}] ... {response.data}")
upload_data = response.data['uploads'][0]
self.assertIsNotNone(upload_data)
self.assertEqual(upload_data['name'], 'san_andres_y_providencia_coastline_no_prj', upload_data['name'])
if upload_data['state'] == Upload.STATE_WAITING:
break
else:
for _upload in Upload.objects.filter(state=upload_data['state']):
_update_upload_session_state.apply((_upload.id,))
sleep(3.0)
self.assertEqual(upload_data['state'], Upload.STATE_WAITING, upload_data['state'])

def test_emulate_upload_through_rest_apis(self):
"""
Emulating Upload via REST APIs with several datasets.
"""
# Try to upload a bad ESRI shapefile with no CRS available
fname = os.path.join(BAD_DATA, 'points_epsg2249_no_prj.shp')
resp, data = self.rest_upload_by_path(fname)
self.assertEqual(resp.status_code, 200)
self.assertEqual(data['status'], 'incomplete')
self.assertTrue(data['success'])

# Try to upload a good raster file and check the session IDs
fname = os.path.join(GOOD_DATA, 'raster', 'relief_san_andres.tif')
resp, data = self.rest_upload_by_path(fname)
self.assertEqual(resp.status_code, 200)
self.assertEqual(data['status'], 'finished')
self.assertTrue(data['success'])

# Try to upload a good ESRI shapefile and check the session IDs
fname = os.path.join(GOOD_DATA, 'vector', 'san_andres_y_providencia_coastline.shp')
resp, data = self.rest_upload_by_path(fname)
self.assertEqual(resp.status_code, 200)
self.assertEqual(data['status'], 'finished')
self.assertTrue(data['success'])

def assert_processed_or_failed(total_uploads, upload_index, dataset_name, state):
url = reverse('uploads-list')
# Admin
self.assertTrue(self.client.login(username=GEONODE_USER, password=GEONODE_PASSWD))
response = self.client.get(url, format='json')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 5, response.data)
self.assertEqual(response.data['total'], total_uploads, response.data['total'])
# Pagination
self.assertEqual(len(response.data['uploads']), total_uploads)
self.assertEqual(response.status_code, 200)

upload_data = response.data['uploads'][upload_index]
self.assertIsNotNone(upload_data)
self.assertEqual(upload_data['name'], dataset_name, upload_data['name'])
if upload_data['state'] != state:
for _cnt in range(0, 10):
response = self.client.get(url, format='json')
self.assertEqual(response.status_code, 200)
logger.error(f"[{_cnt + 1}] ... {response.data}")
upload_data = response.data['uploads'][upload_index]
self.assertIsNotNone(upload_data)
self.assertEqual(upload_data['name'], dataset_name, upload_data['name'])
if upload_data['state'] == state:
break
else:
for _upload in Upload.objects.filter(state=upload_data['state']):
_update_upload_session_state.apply((_upload.id,))
sleep(3.0)
self.assertEqual(upload_data['state'], state, upload_data['state'])

# Vector
assert_processed_or_failed(3, 0, 'san_andres_y_providencia_coastline', Upload.STATE_PROCESSED)

# Raster
assert_processed_or_failed(3, 1, 'relief_san_andres', Upload.STATE_PROCESSED)

# Unsupported
assert_processed_or_failed(3, 2, 'points_epsg2249_no_prj', Upload.STATE_WAITING)

@mock.patch("geonode.upload.forms.ValidationError")
@mock.patch("geonode.upload.uploadhandler.SimpleUploadedFile")
def test_rest_uploads_with_size_limit(self, mocked_uploaded_file, mocked_validation_error):
Expand Down Expand Up @@ -747,7 +828,7 @@ def test_rest_uploads_with_size_limit(self, mocked_uploaded_file, mocked_validat
with mock.patch(max_size_path, new_callable=mock.PropertyMock) as max_size_mock:
max_size_mock.return_value = lambda x: 209715200

resp, data = self.rest_upload_file(fname)
resp, data = self.rest_upload_by_path(fname)
self.assertEqual(resp.status_code, 400)
mocked_validation_error.assert_called_once_with(expected_error)
mocked_uploaded_file.assert_not_called()
Expand Down Expand Up @@ -777,7 +858,7 @@ def test_rest_uploads_with_size_limit_before_upload(self, mocked_uploaded_file,
with mock.patch(max_size_path, new_callable=mock.PropertyMock) as max_size_mock:
max_size_mock.return_value = lambda x: 1

resp, data = self.rest_upload_file(fname)
resp, data = self.rest_upload_by_path(fname)
# Assertions
self.assertEqual(resp.status_code, 400)
mocked_validation_error.assert_called_once_with(expected_error)
Expand Down
58 changes: 33 additions & 25 deletions geonode/upload/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ def _upload_workflow_error(self, task_name: str, upload_ids: list):
"""Task invoked at 'upload_workflow.chord' end in the case some error occurred.
"""
logger.error(f"Task {task_name} upload ids: {upload_ids} did not finish correctly!")
for _upload in Upload.objects.filter(id__in=upload_ids):
_upload.set_processing_state(Upload.STATE_INVALID)


@app.task(
Expand All @@ -184,34 +186,40 @@ def _update_upload_session_state(self, upload_session_id: int):
_response_json = json.loads(_content)
_success = _response_json.get('success', False)
_redirect_to = _response_json.get('redirect_to', '')

_tasks_failed = any([_task.state in ["BAD_FORMAT", "ERROR", "CANCELED"] for _task in session.tasks])
_tasks_waiting = any([_task.state in ["NO_CRS", "NO_BOUNDS", "NO_FORMAT"] for _task in session.tasks])

if _success:
if 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to:
if _tasks_failed:
# GeoNode Layer creation errored!
_upload.set_processing_state(Upload.STATE_INVALID)
elif 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to and _tasks_waiting:
_upload.set_resume_url(_redirect_to)
_upload.set_processing_state(Upload.STATE_WAITING)
else:
if session.state == Upload.STATE_COMPLETE and _upload.state == Upload.STATE_PENDING:
if not _upload.layer or not _upload.layer.processed:
_response = final_step_view(None, _upload.get_session)
_upload.refresh_from_db()
_content = _response.content
if isinstance(_content, bytes):
_content = _content.decode('UTF-8')
_response_json = json.loads(_content)
_success = _response_json.get('success', False)
_status = _response_json.get('status', 'error')
if _status == 'error':
# GeoNode Layer creation errored!
_upload.set_processing_state(Upload.STATE_INVALID)
elif _status == 'pending':
# GeoNode Layer not ready yet...
_upload.set_processing_state(Upload.STATE_PENDING)
elif _upload.state != Upload.STATE_PROCESSED:
if _upload.layer and _upload.layer.processed:
# GeoNode Layer successfully processed...
_upload.set_processing_state(Upload.STATE_PROCESSED)
else:
# GeoNode Layer updating...
_upload.set_processing_state(Upload.STATE_RUNNING)
elif session.state == Upload.STATE_COMPLETE and _upload.state == Upload.STATE_PENDING:
if not _upload.layer or not _upload.layer.processed:
_response = final_step_view(None, _upload.get_session)
_upload.refresh_from_db()
_content = _response.content
if isinstance(_content, bytes):
_content = _content.decode('UTF-8')
_response_json = json.loads(_content)
_success = _response_json.get('success', False)
_status = _response_json.get('status', 'error')
if _status == 'error':
# GeoNode Layer creation errored!
_upload.set_processing_state(Upload.STATE_INVALID)
elif _status == 'pending':
# GeoNode Layer not ready yet...
_upload.set_processing_state(Upload.STATE_PENDING)
elif _upload.state != Upload.STATE_PROCESSED:
if _upload.layer and _upload.layer.processed:
# GeoNode Layer successfully processed...
_upload.set_processing_state(Upload.STATE_PROCESSED)
else:
# GeoNode Layer updating...
_upload.set_processing_state(Upload.STATE_RUNNING)
logger.debug(f"Upload {upload_session_id} updated with state {_upload.state}.")
except (NotFound, Exception) as e:
logger.exception(e)
Expand Down

0 comments on commit 1fa8fb9

Please sign in to comment.