Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#912] Harden access control on case-details/documents #329

Merged
merged 2 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/open_inwoner/accounts/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
path("themes/", MyCategoriesView.as_view(), name="my_themes"),
path("cases/", CaseListView.as_view(), name="my_cases"),
path(
"cases/document/<str:object_id>/",
"cases/<str:object_id>/document/<str:info_id>/",
CaseDocumentDownloadView.as_view(),
name="case_document_download",
),
Expand Down
119 changes: 72 additions & 47 deletions src/open_inwoner/accounts/views/cases.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dataclasses
from typing import List

from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
from django.contrib.auth.mixins import AccessMixin
from django.core.cache import cache
from django.core.exceptions import PermissionDenied
from django.http import Http404, StreamingHttpResponse
Expand All @@ -14,9 +14,12 @@

from view_breadcrumbs import BaseBreadcrumbMixin

from open_inwoner.openzaak.api_models import Zaak
from open_inwoner.openzaak.cases import (
fetch_case_information_objects,
fetch_case_information_objects_for_case_and_info,
fetch_cases,
fetch_roles_for_case_and_bsn,
fetch_single_case,
fetch_specific_statuses,
fetch_status_history,
Expand All @@ -35,24 +38,52 @@
from open_inwoner.openzaak.utils import filter_info_object_visibility


class CaseListView(
BaseBreadcrumbMixin, LoginRequiredMixin, UserPassesTestMixin, TemplateView
):
template_name = "pages/cases/list.html"
class CaseAccessMixin(AccessMixin):
"""
Shared authorisation check

@cached_property
def crumbs(self):
return [(_("Mijn aanvragen"), reverse("accounts:my_cases"))]
Base checks:
- user is authenticated
- user has a BSN

When retrieving a case :
- users BSN has a role for this case
"""

case: Zaak = None

def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return self.handle_no_permission()

if not request.user.bsn:
return self.handle_no_permission()

if "object_id" in kwargs:
case_uuid = kwargs["object_id"]
self.case = fetch_single_case(case_uuid)

if self.case:
# check if we have a role in this case
if not fetch_roles_for_case_and_bsn(self.case.url, request.user.bsn):
return self.handle_no_permission()

def test_func(self):
return self.request.user.bsn is not None
return super().dispatch(request, *args, **kwargs)

def handle_no_permission(self):
if self.request.user.is_authenticated:
return redirect(reverse("root"))

return super().handle_no_permission()


class CaseListView(BaseBreadcrumbMixin, CaseAccessMixin, TemplateView):
template_name = "pages/cases/list.html"

@cached_property
def crumbs(self):
return [(_("Mijn aanvragen"), reverse("accounts:my_cases"))]

def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)

Expand Down Expand Up @@ -122,9 +153,7 @@ class SimpleFile:
url: str


class CaseDetailView(
BaseBreadcrumbMixin, LoginRequiredMixin, UserPassesTestMixin, TemplateView
):
class CaseDetailView(BaseBreadcrumbMixin, CaseAccessMixin, TemplateView):
template_name = "pages/cases/status.html"

@cached_property
Expand All @@ -137,40 +166,30 @@ def crumbs(self):
),
]

def test_func(self):
return self.request.user.bsn is not None

def handle_no_permission(self):
if self.request.user.is_authenticated:
return redirect(reverse("root"))

return super().handle_no_permission()

def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)

case_uuid = context["object_id"]
case = fetch_single_case(case_uuid)

if case:
documents = self.get_case_document_files(case)
if self.case:
documents = self.get_case_document_files(self.case)

statuses = fetch_status_history(case.url)
statuses = fetch_status_history(self.case.url)
statuses.sort(key=lambda status: status.datum_status_gezet)

case_type = fetch_single_case_type(case.zaaktype)
status_types = fetch_status_types(case_type=case.zaaktype)
case_type = fetch_single_case_type(self.case.zaaktype)
status_types = fetch_status_types(case_type=self.case.zaaktype)

status_types_mapping = {st.url: st for st in status_types}
for status in statuses:
status_type = status_types_mapping[status.statustype]
status.statustype = status_type

context["case"] = {
"identification": case.identificatie,
"start_date": case.startdatum,
"end_date": (case.einddatum if hasattr(case, "einddatum") else None),
"description": case.omschrijving,
"identification": self.case.identificatie,
"start_date": self.case.startdatum,
"end_date": (
self.case.einddatum if hasattr(self.case, "einddatum") else None
),
"description": self.case.omschrijving,
"type_description": (
case_type.omschrijving if case_type else _("No data available")
),
Expand Down Expand Up @@ -222,7 +241,8 @@ def get_case_document_files(self, case) -> List[SimpleFile]:
url=reverse(
"accounts:case_document_download",
kwargs={
"object_id": info_obj.uuid,
"object_id": case.uuid,
"info_id": info_obj.uuid,
},
),
)
Expand All @@ -241,29 +261,30 @@ def get_anchors(self, statuses, documents):
return anchors


class CaseDocumentDownloadView(LoginRequiredMixin, UserPassesTestMixin, View):
def test_func(self):
return self.request.user.bsn is not None

def handle_no_permission(self):
if self.request.user.is_authenticated:
return redirect(reverse("root"))

return super().handle_no_permission()

def get(self, *args, **kwargs):
info_object_uuid = kwargs["object_id"]
class CaseDocumentDownloadView(CaseAccessMixin, View):
def get(self, request, *args, **kwargs):
if not self.case:
raise Http404

info_object_uuid = kwargs["info_id"]
info_object = fetch_single_information_object(uuid=info_object_uuid)
if not info_object:
raise Http404

# check if this info_object belongs to this case
if not fetch_case_information_objects_for_case_and_info(
self.case.url, info_object.url
):
raise PermissionDenied()

# check if this info_object should be visible
config = OpenZaakConfig.get_solo()
if not filter_info_object_visibility(
info_object, config.document_max_confidentiality
):
raise PermissionDenied()

# retrieve and stream content
content_stream = download_document(info_object.inhoud)
if not content_stream:
raise Http404
Expand All @@ -275,3 +296,7 @@ def get(self, *args, **kwargs):
}
response = StreamingHttpResponse(content_stream, headers=headers)
return response

def handle_no_permission(self):
# plain error and no redirect
raise PermissionDenied()
60 changes: 59 additions & 1 deletion src/open_inwoner/openzaak/cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from requests import RequestException
from zds_client import ClientError
from zgw_consumers.api_models.base import factory
from zgw_consumers.api_models.zaken import Status, Zaak
from zgw_consumers.api_models.zaken import Rol, Status, Zaak
from zgw_consumers.service import get_paginated_results

from .api_models import Zaak, ZaakInformatieObject
Expand Down Expand Up @@ -119,3 +119,61 @@ def fetch_specific_statuses(status_urls: List[str]) -> List[Status]:
statuses = factory(Status, list(_statuses))

return statuses


def fetch_roles_for_case_and_bsn(case_url: str, bsn: str) -> List[Rol]:
client = build_client("zaak")

if client is None:
return []

try:
response = client.list(
"rol",
request_kwargs={
"params": {
"zaak": case_url,
"betrokkeneIdentificatie__natuurlijkPersoon__inpBsn": bsn,
}
},
)
except RequestException as e:
logger.exception("exception while making request", exc_info=e)
return []
except ClientError as e:
logger.exception("exception while making request", exc_info=e)
return []

roles = factory(Rol, response["results"])

return roles


def fetch_case_information_objects_for_case_and_info(
case_url: str, info_object_url: str
) -> List[ZaakInformatieObject]:
client = build_client("zaak")

if client is None:
return []

try:
response = client.list(
"zaakinformatieobject",
request_kwargs={
"params": {
"zaak": case_url,
"informatieobject": info_object_url,
},
},
)
except RequestException as e:
logger.exception("exception while making request", exc_info=e)
return []
except ClientError as e:
logger.exception("exception while making request", exc_info=e)
return []

case_info_objects = factory(ZaakInformatieObject, response)

return case_info_objects
Loading