wip
Some checks failed
API CI / api-tests (pull_request) Failing after 20s
CI / test (pull_request) Failing after 23s

This commit is contained in:
Tim Lorsbach
2026-04-21 10:26:35 +02:00
parent 4012ac356b
commit 170f00504f
28 changed files with 653 additions and 69 deletions

View File

@ -7,6 +7,7 @@ import nh3
from django.conf import settings as s
from django.contrib.auth import get_user_model
from django.db import transaction
from django.db.models import QuerySet
from pydantic import ValidationError
from epdb.models import (
@ -364,6 +365,14 @@ class PackageManager(object):
groups = GroupManager.get_groups(user)
# EDIT START
if package.classification_level == Package.Classification.SECRET:
if package.data_pool not in groups:
return False
# EDIT END
perms = {"all": ["all"], "write": ["all", "write"], "read": ["all", "write", "read"]}
valid_perms = perms.get(permission)
@ -406,6 +415,7 @@ class PackageManager(object):
try:
p = Package.objects.get(uuid=package_id)
if PackageManager.readable(user, p):
p = PackageManager.check_package_classification(user, p)
return p
else:
# FIXME: use custom exception to be translatable to 403 in API
@ -415,6 +425,37 @@ class PackageManager(object):
except Package.DoesNotExist:
raise ValueError("Package with ID {} does not exist!".format(package_id))
# EDIT START
@staticmethod
def check_package_classification(user, pack: Package):
if pack.classification_level == Package.Classification.SECRET:
if pack.data_pool.user_member.filter(id=user.id).exists():
return pack
raise ValueError("Package is secret and not accessible to user!")
else:
return pack
@staticmethod
def check_package_classifications(user, package_qs: QuerySet[Package]):
non_secret = package_qs.exclude(classification_level=Package.Classification.SECRET)
secret = package_qs.filter(classification_level=Package.Classification.SECRET)
# TODO we should be able to do via the db
accessible_secret = []
for s_package in secret:
if s_package.data_pool.user_member.filter(id=user.id).exists():
accessible_secret.append(s_package.pk)
# Cannot combine a unique query with a non-unique query -> we have to call distinct
return Package.objects.filter(pk__in=accessible_secret).distinct() | non_secret.distinct()
# EDIT END
@staticmethod
def get_all_readable_packages(user, include_reviewed=False):
# UserPermission only exists if at least read is granted...
@ -441,6 +482,10 @@ class PackageManager(object):
qs = qs.distinct()
# EDIT START
qs = PackageManager.check_package_classifications(user, qs)
# EDIT END
return qs
@staticmethod
@ -487,11 +532,11 @@ class PackageManager(object):
qs = qs.distinct()
return qs
# EDIT START
qs = PackageManager.check_package_classifications(user, qs)
# EDIT END
@staticmethod
def get_packages():
return Package.objects.all()
return qs
@staticmethod
@transaction.atomic