Skip to content

Commit

Permalink
Change way of doing the duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
LePetitTim committed Aug 31, 2022
1 parent d6fcb91 commit 9d8a6d3
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 36 deletions.
35 changes: 3 additions & 32 deletions geotrek/common/mixins/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
import hashlib
import os
import shutil
import uuid

from PIL.Image import DecompressionBombError
from django.conf import settings
from django.core.mail import mail_managers
from django.db import models
from django.db.models import Q
from django.db.models.deletion import Collector
from django.template.defaultfilters import slugify
from django.template.loader import render_to_string
from django.utils.formats import date_format
Expand Down Expand Up @@ -435,34 +433,7 @@ def add_property(cls, name, func, verbose_name):
setattr(cls, '%s_verbose_name' % name, verbose_name)


class DuplicateModelMixin(CheckBoxActionMixin):
class DuplicateModelMixin(object):
def duplicate(self):
"""
Duplicate all related objects of obj setting
field to value. If one of the duplicate
objects has an FK to another duplicate object
does not update that as well.
"""
from geotrek.core.models import Topology

collector = Collector(using='default')
collector.collect([self])
collector.sort()
related_models = [key for key in collector.data.keys() if key is not Topology]
# Sometimes it's good enough just to save in reverse deletion order.
duplicate_order = reversed(related_models)
for model in duplicate_order:
# Replace each `sub_obj` with a duplicate.
sub_objects = collector.data[model]
for obj in sub_objects:
# Duplicate the object and save it.
obj.id = None
obj.pk = None
obj.uuid = uuid.uuid4()
if 'name' in [field.name for field in self.__class__._meta.get_fields()]:
obj.name = f'{self.name} (copy)'
obj.save()
obj.refresh_from_db()
if issubclass(self.__class__, Topology) and settings.TREKKING_TOPOLOGY_ENABLED:
new_topology = Topology.objects.create()
new_topology.mutate(self.topo_object, delete=False)
from geotrek.common.utils.helpers import clone_object
return clone_object(self)
3 changes: 1 addition & 2 deletions geotrek/common/mixins/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from mapentity.helpers import suffix_for
from pdfimpose import PageList

from geotrek.authent.decorators import same_structure_required
from geotrek.common.models import TargetPortal, FileType, Attachment
from geotrek.common.utils import logger
from geotrek.common.utils.portals import smart_get_template_by_portal
Expand Down Expand Up @@ -263,7 +262,7 @@ def duplicate_object(self, request, *args, **kwargs):
raise Exception(_(f"You don't have the right to duplicate this. "
f"This object is not from the same structure."))
else:
obj.duplicate()
obj = obj.duplicate()
messages.success(request, _(f"{self.model._meta.verbose_name} has been duplicated successfully"))
except Exception as exc:
messages.error(request, exc)
Expand Down
19 changes: 17 additions & 2 deletions geotrek/common/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
# Workaround https://code.djangoproject.com/ticket/22865
from freezegun import freeze_time

from geotrek.common.models import FileType # NOQA
from geotrek.common.models import Attachment, AccessibilityAttachment, FileType # NOQA
from geotrek.common.tests.factories import AttachmentFactory, AttachmentAccessibilityFactory
from geotrek.common.utils.testdata import get_dummy_uploaded_image

from mapentity.tests.factories import SuperUserFactory, UserFactory
from mapentity.registry import app_settings
Expand Down Expand Up @@ -112,6 +114,14 @@ def test_duplicate_object_with_structure(self, mock_requests):
structure = StructureFactory.create()
obj_1 = self.modelfactory.create(structure=structure)
obj_1.refresh_from_db()
AttachmentFactory.create(content_object=obj_1,
attachment_file=get_dummy_uploaded_image())

attachments_accessibility = 'attachments_accessibility' in [field.name for field in obj_1._meta.get_fields()]

if attachments_accessibility:
AttachmentAccessibilityFactory.create(content_object=obj_1,
attachment_accessibility_file=get_dummy_uploaded_image())

response_duplicate = self.client.get(
reverse(f'{self.model._meta.app_label}:{self.model._meta.model_name}-drf-duplicate-object',
Expand All @@ -129,7 +139,12 @@ def test_duplicate_object_with_structure(self, mock_requests):
kwargs={"obj_pk": obj_1.pk})
)
msg = [str(message) for message in messages.get_messages(response.wsgi_request)]

self.assertEqual(self.model.objects.count(), 2)
self.assertEqual(Attachment.objects.filter(object_id=obj_1.pk).count(), 1)
self.assertEqual(Attachment.objects.filter(object_id=self.model.objects.last().pk).count(), 1)
if attachments_accessibility:
self.assertEqual(AccessibilityAttachment.objects.filter(object_id=obj_1.pk).count(), 1)
self.assertEqual(AccessibilityAttachment.objects.filter(object_id=self.model.objects.last().pk).count(), 1)
self.assertEqual(msg[1],
f"{self.model._meta.verbose_name} has been duplicated successfully")
self.assertEqual(response_duplicate.status_code, 302)
Expand Down
82 changes: 82 additions & 0 deletions geotrek/common/utils/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import uuid

from django.conf import settings
from django.core.files.uploadedfile import SimpleUploadedFile

from geotrek.core.models import Topology
from geotrek.common.models import Attachment
from geotrek.common.models import AccessibilityAttachment


def clone_attachment(attachment, clone, field_file):
fields = attachment._meta.get_fields()
clone_values = {}
for field in fields:
if not field.auto_created:
if field.name == "pk":
continue
elif field.name == "uuid":
clone_values['uuid'] = uuid.uuid4()
elif field.name == "content_object":
clone_values['content_object'] = clone
elif field.name == field_file:
attachment_content = getattr(attachment, field_file).read()
attachment_name = getattr(attachment, field_file).name.split("/")[-1]
clone_values[field_file] = SimpleUploadedFile(attachment_name, attachment_content)
else:
clone_values[field.name] = getattr(attachment, field.name, None)
attachment._meta.model.objects.create(**clone_values)


def clone_object(obj, attrs={}):

# we start by building a "flat" clone
clone = obj._meta.model.objects.get(pk=obj.pk)
clone.pk = None
clone.id = None
clone.uuid = uuid.uuid4()
if 'name' in [field.name for field in obj._meta.get_fields()]:
clone.name = f'{obj.name} (copy)'
# if caller specified some attributes to be overridden,
# use them

for key, value in attrs.items():
setattr(clone, key, value)

# save the partial clone to have a valid ID assigned
clone.save()

if issubclass(obj.__class__, Topology) and settings.TREKKING_TOPOLOGY_ENABLED:
new_topology = Topology.objects.create()
new_topology.mutate(obj.topo_object, delete=False)

# Scan field to further investigate relations
fields = clone._meta.get_fields()
for field in fields:
# Manage M2M fields by replicating all related records
# found on parent "obj" into "clone"
if not field.auto_created and field.many_to_many:
for row in getattr(obj, field.name).all():
getattr(clone, field.name).add(row)

# Manage 1-N and 1-1 relations by cloning child objects
if field.auto_created and field.is_relation:
if field.many_to_many:
# do nothing
pass
else:
# provide "clone" object to replace "obj"
# on remote field
attrs = {
field.remote_field.name: clone
}
children = field.related_model.objects.filter(**{field.remote_field.name: obj})

for child in children:
clone_object(child, attrs)
for attachment in Attachment.objects.filter(object_id=obj.pk):
clone_attachment(attachment, clone, 'attachment_file')
for accessibility_attachment in AccessibilityAttachment.objects.filter(object_id=obj.pk):
clone_attachment(accessibility_attachment, clone, 'attachment_accessibility_file')
return clone

0 comments on commit 9d8a6d3

Please sign in to comment.