diff --git a/src/open_inwoner/accounts/admin.py b/src/open_inwoner/accounts/admin.py index 450decad96..4a2af3f162 100644 --- a/src/open_inwoner/accounts/admin.py +++ b/src/open_inwoner/accounts/admin.py @@ -1,10 +1,11 @@ from django.contrib import admin, messages -from django.contrib.auth.admin import UserAdmin +from django.contrib.auth.admin import GroupAdmin, UserAdmin from django.contrib.auth.forms import UserChangeForm, UserCreationForm +from django.contrib.auth.models import Group from django.forms import ValidationError from django.http.request import HttpRequest from django.urls import reverse, reverse_lazy -from django.utils.html import format_html +from django.utils.html import format_html, format_html_join from django.utils.translation import ngettext, ugettext_lazy as _ from image_cropping import ImageCroppingMixin @@ -13,6 +14,7 @@ from open_inwoner.utils.mixins import UUIDAdminFirstInOrder from .choices import ContactTypeChoices +from .forms import GroupAdminForm from .models import Action, Document, Invite, Message, User @@ -167,6 +169,36 @@ class _UserAdmin(ImageCroppingMixin, UserAdmin): ) +admin.site.unregister(Group) + + +@admin.register(Group) +class _GroupAdmin(GroupAdmin): + form = GroupAdminForm + + list_display = [ + "name", + "get_categories", + ] + filter_horizontal = [ + "permissions", + ] + list_filter = ["managed_categories", "permissions"] + + fieldsets = ( + (None, {"fields": ("name", "permissions")}), + ( + _("Additional permissions"), + {"fields": ("managed_categories",)}, + ), + ) + + def get_categories(self, obj): + return ", ".join(g.name for g in obj.managed_categories.all()) + + get_categories.short_description = _("Admin categories") + + @admin.register(Action) class ActionAdmin( ReadOnlyFileMixin, UUIDAdminFirstInOrder, PrivateMediaMixin, admin.ModelAdmin diff --git a/src/open_inwoner/accounts/forms.py b/src/open_inwoner/accounts/forms.py index 5ff685ec75..17ece22f89 100644 --- a/src/open_inwoner/accounts/forms.py +++ b/src/open_inwoner/accounts/forms.py @@ -1,12 +1,11 @@ -from typing import Optional - from django import forms from django.conf import settings +from django.contrib.admin.widgets import FilteredSelectMultiple from django.contrib.auth import authenticate from django.contrib.auth.forms import PasswordResetForm +from django.contrib.auth.models import Group from django.core.exceptions import ValidationError from django.core.mail import EmailMultiAlternatives -from django.db.models import Q from django.template import loader from django.utils.translation import ugettext_lazy as _ @@ -212,6 +211,35 @@ def __init__(self, user, *args, **kwargs): del self.fields[field_name] +class GroupAdminForm(forms.ModelForm): + """ + Add the ability to add/remove 'managed_categories' to the GroupAdmin. + """ + + managed_categories = forms.ModelMultipleChoiceField( + label=_("Restricted PDC categories"), + queryset=Category.objects.all(), + required=False, + widget=FilteredSelectMultiple(verbose_name=_("Category"), is_stacked=False), + ) + + class Meta: + model = Group + fields = ("name", "permissions") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + if self.instance and self.instance.pk: + self.fields[ + "managed_categories" + ].initial = self.instance.managed_categories.all() + + def _save_m2m(self): + super()._save_m2m() + self.instance.managed_categories.set(self.cleaned_data["managed_categories"]) + + class CustomPasswordResetForm(PasswordResetForm): def get_users(self, email): users = super().get_users(email) diff --git a/src/open_inwoner/accounts/models.py b/src/open_inwoner/accounts/models.py index 5359bf71e2..b07c5da1f2 100644 --- a/src/open_inwoner/accounts/models.py +++ b/src/open_inwoner/accounts/models.py @@ -3,11 +3,11 @@ from uuid import uuid4 from django.conf import settings -from django.contrib.auth.models import AbstractBaseUser, PermissionsMixin +from django.contrib.auth.models import AbstractBaseUser, Group, PermissionsMixin from django.contrib.contenttypes.fields import GenericRelation from django.core.exceptions import ValidationError from django.db import models -from django.db.models import CheckConstraint, Q, UniqueConstraint +from django.db.models import Q, UniqueConstraint from django.urls import reverse from django.utils import timezone from django.utils.crypto import get_random_string @@ -469,6 +469,16 @@ def is_digid_user_with_brp(self) -> bool: def is_eherkenning_user(self) -> bool: return self.login_type == LoginTypeChoices.eherkenning + def has_group_managed_categories(self) -> bool: + from ..pdc.models import Category + + return Category.objects.filter(access_groups__user=self).exists() + + def get_group_managed_categories(self): + from ..pdc.models import Category + + return Category.objects.filter(access_groups__user=self) + class Document(models.Model): uuid = models.UUIDField( diff --git a/src/open_inwoner/accounts/tests/factories.py b/src/open_inwoner/accounts/tests/factories.py index 3c0f158871..710e908f27 100644 --- a/src/open_inwoner/accounts/tests/factories.py +++ b/src/open_inwoner/accounts/tests/factories.py @@ -10,6 +10,13 @@ from open_inwoner.accounts.choices import LoginTypeChoices +class GroupFactory(factory.django.DjangoModelFactory): + class Meta: + model = "auth.Group" + + name = factory.Faker("word") + + @factory.django.mute_signals(pre_save, post_save) class UserFactory(factory.django.DjangoModelFactory): class Meta: diff --git a/src/open_inwoner/pdc/admin/category.py b/src/open_inwoner/pdc/admin/category.py index 4558fc0649..58b7aba6a5 100644 --- a/src/open_inwoner/pdc/admin/category.py +++ b/src/open_inwoner/pdc/admin/category.py @@ -143,6 +143,7 @@ class CategoryAdmin( "visible_for_anonymous", "visible_for_companies", "visible_for_citizens", + "get_access_groups_label", ) list_editable = ( "highlighted", @@ -176,6 +177,7 @@ class CategoryAdmin( "visible_for_anonymous", "visible_for_companies", "visible_for_citizens", + "access_groups", ), }, ), @@ -189,12 +191,47 @@ class CategoryAdmin( }, ), ) + readonly_fields = [ + "access_groups", + ] + + list_filter = [ + "published", + "visible_for_anonymous", + "visible_for_companies", + "visible_for_citizens", + "access_groups", + ] # import-export import_template_name = "admin/category_import.html" resource_class = CategoryImportResource formats = [base_formats.XLSX, base_formats.CSV] + def get_inlines(self, request, obj): + inlines = list(super().get_inlines(request, obj)) + + # disable product management if we have restrictions + if request.user.has_group_managed_categories(): + try: + inlines.remove(CategoryProductInline) + except ValueError: + pass + + return inlines + + def get_queryset(self, request): + qs = super().get_queryset(request) + categories = request.user.get_group_managed_categories() + if categories: + qs = qs.filter(id__in=categories) + return qs + + def get_access_groups_label(self, obj): + return ", ".join(g.name for g in obj.access_groups.all()) + + get_access_groups_label.short_description = _("Allowed admin groups") + def get_export_resource_class(self): return CategoryExportResource diff --git a/src/open_inwoner/pdc/admin/product.py b/src/open_inwoner/pdc/admin/product.py index 76571498a2..c3989097f7 100644 --- a/src/open_inwoner/pdc/admin/product.py +++ b/src/open_inwoner/pdc/admin/product.py @@ -1,5 +1,6 @@ from django import forms from django.contrib import admin +from django.contrib.admin.widgets import FilteredSelectMultiple from django.utils.translation import gettext as _ from import_export.admin import ImportExportMixin @@ -11,6 +12,7 @@ from open_inwoner.utils.mixins import UUIDAdminFirstInOrder from ..models import ( + Category, Product, ProductCondition, ProductContact, @@ -40,6 +42,47 @@ class Meta: fields = "__all__" widgets = {"content": CKEditorWidget} + categories = forms.ModelMultipleChoiceField( + label=_("Allowed admin categories"), + queryset=Category.objects.all(), + required=False, + widget=FilteredSelectMultiple(verbose_name=_("Category"), is_stacked=False), + ) + + def __init__(self, *args, **kwargs): + self.request = kwargs.pop("request") + super().__init__(*args, **kwargs) + + # we only want to add/remove Categories we have access to and keep te rest + user_categories = self.request.user.get_group_managed_categories() + if user_categories: + self.fields["categories"].queryset = user_categories + if self.instance and self.instance.pk: + self.fields[ + "categories" + ].initial = self.instance.categories.intersection(user_categories) + + def clean(self): + cleaned_data = super().clean() + if len(cleaned_data["categories"]) == 0: + self.add_error("categories", _("At least one category is required")) + return cleaned_data + + def _save_m2m(self): + # remember this before we run regular _save_m2m() + current = set(self.instance.categories.all()) + + super()._save_m2m() + + # we only want to add/remove Categories we have access to and keep the ones we don't, + # so do some set operations to figure it out + managed = set(self.request.user.get_group_managed_categories()) + if managed: + want_managed = managed & set(self.cleaned_data["categories"]) + keep_not_ours = current - managed + combined = keep_not_ours | want_managed + self.instance.categories.set(combined) + @admin.register(Product) class ProductAdmin(ImportExportMixin, admin.ModelAdmin): @@ -48,7 +91,6 @@ class ProductAdmin(ImportExportMixin, admin.ModelAdmin): list_editable = ("published",) date_hierarchy = "created_on" autocomplete_fields = ( - "categories", "related_products", "tags", "organizations", @@ -72,6 +114,17 @@ class ProductAdmin(ImportExportMixin, admin.ModelAdmin): import_template_name = "admin/product_import.html" formats = [base_formats.XLSX, base_formats.CSV] + def get_form(self, request, obj=None, change=False, **kwargs): + # workaround to get the request in the Modelform + Form = super().get_form(request, obj=obj, change=change, **kwargs) + + class RequestForm(Form): + def __new__(cls, *args, **kwargs): + kwargs["request"] = request + return Form(*args, **kwargs) + + return RequestForm + def get_export_resource_class(self): return ProductExportResource @@ -86,6 +139,9 @@ def export_action(self, request, *args, **kwargs): def get_queryset(self, request): qs = super().get_queryset(request) + categories = request.user.get_group_managed_categories() + if categories: + qs = qs.filter(categories__in=categories) return qs.prefetch_related("links", "locations", "contacts") def display_categories(self, obj): diff --git a/src/open_inwoner/pdc/migrations/0066_category_access_groups.py b/src/open_inwoner/pdc/migrations/0066_category_access_groups.py new file mode 100644 index 0000000000..7e7ddcd095 --- /dev/null +++ b/src/open_inwoner/pdc/migrations/0066_category_access_groups.py @@ -0,0 +1,25 @@ +# Generated by Django 3.2.23 on 2024-01-23 10:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("auth", "0012_alter_user_first_name_max_length"), + ("pdc", "0065_auto_20240112_1046"), + ] + + operations = [ + migrations.AddField( + model_name="category", + name="access_groups", + field=models.ManyToManyField( + blank=True, + help_text="If set, only users belonging to this group can edit this category and its products", + related_name="managed_categories", + to="auth.Group", + verbose_name="Restrict to admin groups", + ), + ), + ] diff --git a/src/open_inwoner/pdc/models/category.py b/src/open_inwoner/pdc/models/category.py index 37dce6d284..2ff8a732f4 100644 --- a/src/open_inwoner/pdc/models/category.py +++ b/src/open_inwoner/pdc/models/category.py @@ -104,6 +104,16 @@ class Category(MP_Node): ), ) + access_groups = models.ManyToManyField( + "auth.Group", + verbose_name=_("Restrict to admin groups"), + blank=True, + help_text=_( + "If set, only users belonging to this group can edit this category and its products" + ), + related_name="managed_categories", + ) + objects = CategoryPublishedQueryset.as_manager() class Meta: diff --git a/src/open_inwoner/pdc/tests/test_category_admin.py b/src/open_inwoner/pdc/tests/test_category_admin.py index 7538f73196..b44eea245a 100644 --- a/src/open_inwoner/pdc/tests/test_category_admin.py +++ b/src/open_inwoner/pdc/tests/test_category_admin.py @@ -1,11 +1,12 @@ from unittest.mock import patch +from django.contrib.auth.models import Permission from django.urls import reverse from django.utils.translation import gettext as _ from django_webtest import WebTest -from open_inwoner.accounts.tests.factories import UserFactory +from open_inwoner.accounts.tests.factories import GroupFactory, UserFactory from open_inwoner.openzaak.tests.factories import ZaakTypeConfigFactory from ..models.category import Category @@ -95,6 +96,72 @@ def test_user_cannot_publish_child_category_without_root_published_on_list_page( updated_category = Category.objects.get(slug="bar4") self.assertFalse(updated_category.published) + def test_access_limited_to_linked_auth_groups(self): + super_user = UserFactory(is_superuser=True, is_staff=True) + + group = GroupFactory() + group_user = UserFactory(is_staff=True) + group_user.user_permissions.add( + Permission.objects.get(codename="view_category"), + ) + group_user.groups.add(group) + + category_general = CategoryFactory( + path="everyone", name="everyone", published=True + ) + category_grouped = CategoryFactory( + path="grouped", name="grouped", published=True + ) + category_grouped.access_groups.add(group) + + with self.subTest("superuser sees all in list"): + response = self.app.get( + reverse("admin:pdc_category_changelist"), user=super_user + ) + categories = list(response.context["cl"].queryset.all()) + # list shows all categories + self.assertEqual(categories, [category_general, category_grouped]) + + with self.subTest("user list is limited by group"): + response = self.app.get( + reverse("admin:pdc_category_changelist"), user=group_user + ) + categories = list(response.context["cl"].queryset.all()) + # list shows only categories linked to group + self.assertEqual(categories, [category_grouped]) + + with self.subTest("user cannot access not-linked category"): + response = self.app.get( + reverse( + "admin:pdc_category_change", + kwargs={"object_id": category_general.id}, + ), + user=group_user, + ) + # status code is 302 when object is not found and redirects to admin index + self.assertEqual(response.status_code, 302) + self.assertEqual(response.follow().request.path, "/admin/") + + def test_access_categories_cannot_be_edited_by_restricted(self): + group = GroupFactory() + group_user = UserFactory(is_staff=True) + group_user.user_permissions.add( + Permission.objects.get(codename="change_category"), + ) + group_user.groups.add(group) + + category = CategoryFactory(path="grouped", name="grouped", published=True) + category.access_groups.add(group) + response = self.app.get( + reverse( + "admin:pdc_category_change", + kwargs={"object_id": category.id}, + ), + user=group_user, + ) + self.assertIn("name", response.form.fields) + self.assertNotIn("access_groups", response.form.fields) + @patch("open_inwoner.openzaak.models.OpenZaakConfig.get_solo") def test_user_can_link_zaaktypen_if_category_filtering_with_zaken_feature_flag_enabled( self, mock_solo diff --git a/src/open_inwoner/pdc/tests/test_logging.py b/src/open_inwoner/pdc/tests/test_logging.py index 2f2fc0b513..e6186b6758 100644 --- a/src/open_inwoner/pdc/tests/test_logging.py +++ b/src/open_inwoner/pdc/tests/test_logging.py @@ -31,6 +31,7 @@ def test_addition(self): form["slug"] = self.product.slug form["content"] = self.product.content form["summary"] = self.product.summary + form["categories"] = [self.category.id] form["costs"] = 0.0 form.submit() product = Product.objects.filter(slug=self.product.slug).first() diff --git a/src/open_inwoner/pdc/tests/test_product_admin.py b/src/open_inwoner/pdc/tests/test_product_admin.py new file mode 100644 index 0000000000..60b2b6e3c5 --- /dev/null +++ b/src/open_inwoner/pdc/tests/test_product_admin.py @@ -0,0 +1,153 @@ +from django.contrib.auth.models import Permission +from django.urls import reverse +from django.utils.translation import ugettext as _ + +from django_webtest import WebTest + +from open_inwoner.accounts.tests.factories import GroupFactory, UserFactory + +from .factories import CategoryFactory, ProductFactory + + +class TestAdminProductForm(WebTest): + def test_access_limited_to_linked_auth_groups(self): + super_user = UserFactory(is_superuser=True, is_staff=True) + + group = GroupFactory() + group_user = UserFactory(is_staff=True) + group_user.user_permissions.add(Permission.objects.get(codename="view_product")) + group_user.groups.add(group) + + category_general = CategoryFactory( + path="everyone", name="everyone", published=True + ) + category_grouped = CategoryFactory( + path="grouped", name="grouped", published=True + ) + category_grouped.access_groups.add(group) + + product_general = ProductFactory(name="General Product") + category_general.products.add(product_general) + + product_grouped = ProductFactory(name="Grouped Product") + category_grouped.products.add(product_grouped) + + with self.subTest("superuser sees all in list"): + response = self.app.get( + reverse("admin:pdc_product_changelist"), user=super_user + ) + products = list(response.context["cl"].queryset.all()) + # list shows all categories + self.assertEqual(products, [product_general, product_grouped]) + + with self.subTest("user list is limited by group"): + response = self.app.get( + reverse("admin:pdc_product_changelist"), user=group_user + ) + categories = list(response.context["cl"].queryset.all()) + # list shows only product in categories linked to group + self.assertEqual(categories, [product_grouped]) + + with self.subTest("user cannot access not-linked category"): + response = self.app.get( + reverse( + "admin:pdc_product_change", + kwargs={"object_id": product_general.id}, + ), + user=group_user, + ) + # status code is 302 when object is not found and redirects to admin index + self.assertEqual(response.status_code, 302) + self.assertEqual(response.follow().request.path, "/admin/") + + def test_product_keeps_mixed_restricted_and_unrestricted_categories(self): + super_user = UserFactory(is_superuser=True, is_staff=True) + + group = GroupFactory() + group_user = UserFactory(is_staff=True) + group_user.user_permissions.add( + Permission.objects.get(codename="view_product"), + Permission.objects.get(codename="change_product"), + ) + group_user.groups.add(group) + + category_general = CategoryFactory( + path="everyone", name="everyone", published=True + ) + category_grouped = CategoryFactory( + path="grouped", name="grouped", published=True + ) + category_extra = CategoryFactory(path="grouped", name="extra", published=True) + group.managed_categories.add(category_grouped) + group.managed_categories.add(category_extra) + + product = ProductFactory(name="Product") + product.categories.add(category_general) + product.categories.add(category_grouped) + + url = reverse("admin:pdc_product_change", kwargs={"object_id": product.id}) + + with self.subTest("superuser"): + response = self.app.get(url, user=super_user) + + # all categories visible + self.assertEqual(len(response.form["categories"].options), 3) + self.assertEqual( + set(response.form["categories"].value), + {str(category_general.id), str(category_grouped.id)}, + ) + response = response.form.submit("_continue").follow() + + # no changes on resubmit + self.assertEqual( + set(product.categories.all()), {category_general, category_grouped} + ) + + # sanity check with different combinations + response.form["categories"].select_multiple( + value=[str(category_general.id), str(category_extra.id)] + ) + response = response.form.submit("_continue").follow() + self.assertEqual( + set(product.categories.all()), {category_general, category_extra} + ) + + # sanity check with different combinations + response.form["categories"].select_multiple(value=[str(category_extra.id)]) + response = response.form.submit("_continue").follow() + self.assertEqual(set(product.categories.all()), {category_extra}) + + product.categories.set([category_general, category_grouped]) + + with self.subTest("restricted user"): + response = self.app.get(url, user=group_user) + + # only our restricted + self.assertEqual(len(response.form["categories"].options), 2) + self.assertEqual( + set(response.form["categories"].value), + {str(category_grouped.id)}, + ) + response = response.form.submit("_continue").follow() + + # on resubmit the non-group category is still there + self.assertEqual( + set(product.categories.all()), {category_general, category_grouped} + ) + # select different category + response.form["categories"].select_multiple(value=[str(category_extra.id)]) + response.form.submit().follow() + + # swapped different category but the non-group category is still there + self.assertEqual( + set(product.categories.all()), {category_general, category_extra} + ) + + # requires at least one category + response.form["categories"].select_multiple(value=[]) + response = response.form.submit() + self.assertEqual(response.status_code, 200) + self.assertEqual( + response.context["errors"][0][0], + _("At least one category is required"), + )