wip
Some checks failed
API CI / api-tests (pull_request) Failing after 15s
CI / test (pull_request) Failing after 32s

This commit is contained in:
Tim Lorsbach
2026-04-21 10:26:35 +02:00
parent d1a00f71b4
commit f9f65de8b3
30 changed files with 740 additions and 184 deletions

View File

@ -7,6 +7,7 @@ import nh3
from django.conf import settings as s
from django.contrib.auth import get_user_model
from django.db import transaction
from django.db.models import QuerySet
from pydantic import ValidationError
from epdb.models import (
@ -94,8 +95,8 @@ class EPDBURLParser:
def contains_package_url(self):
return (
bool(self.MODEL_PATTERNS["epdb.Package"].findall(self.url))
and not self.is_package_url()
bool(self.MODEL_PATTERNS["epdb.Package"].findall(self.url))
and not self.is_package_url()
)
def is_user_url(self) -> bool:
@ -187,7 +188,7 @@ class UserManager(object):
@staticmethod
@transaction.atomic
def create_user(
username, email, password, set_setting=True, add_to_group=True, *args, **kwargs
username, email, password, set_setting=True, add_to_group=True, *args, **kwargs
):
# Clean for potential XSS
clean_username = nh3.clean(username).strip()
@ -345,52 +346,15 @@ class PackageManager(object):
@staticmethod
def readable(user, package):
if (
UserPackagePermission.objects.filter(package=package, user=user).exists()
or GroupPackagePermission.objects.filter(
package=package, group__in=GroupManager.get_groups(user)
)
or package.reviewed is True
or user.is_superuser
):
return True
return False
return PackageManager.has_package_permission(user, package, "read")
@staticmethod
def writable(user, package):
if (
UserPackagePermission.objects.filter(
package=package, user=user, permission=Permission.WRITE[0]
).exists()
or GroupPackagePermission.objects.filter(
package=package,
group__in=GroupManager.get_groups(user),
permission=Permission.WRITE[0],
).exists()
or UserPackagePermission.objects.filter(
package=package, user=user, permission=Permission.ALL[0]
).exists()
or user.is_superuser
):
return True
return False
return PackageManager.has_package_permission(user, package, "write")
@staticmethod
def administrable(user, package):
if (
UserPackagePermission.objects.filter(
package=package, user=user, permission=Permission.ALL[0]
).exists()
or GroupPackagePermission.objects.filter(
package=package,
group__in=GroupManager.get_groups(user),
permission=Permission.ALL[0],
).exists()
or user.is_superuser
):
return True
return False
return PackageManager.has_package_permission(user, package, "all")
@staticmethod
def has_package_permission(user: "User", package: Union[str, UUID, "Package"], permission: str):
@ -399,18 +363,26 @@ class PackageManager(object):
groups = GroupManager.get_groups(user)
# EDIT START
if package.classification_level == Package.Classification.SECRET:
if package.data_pool not in groups:
return False
# EDIT END
perms = {"all": ["all"], "write": ["all", "write"], "read": ["all", "write", "read"]}
valid_perms = perms.get(permission)
if (
UserPackagePermission.objects.filter(
package=package, user=user, permission__in=valid_perms
).exists()
or GroupPackagePermission.objects.filter(
package=package, group__in=groups, permission__in=valid_perms
).exists()
or user.is_superuser
UserPackagePermission.objects.filter(
package=package, user=user, permission__in=valid_perms
).exists()
or GroupPackagePermission.objects.filter(
package=package, group__in=groups, permission__in=valid_perms
).exists()
or user.is_superuser
):
return True
@ -441,6 +413,7 @@ class PackageManager(object):
try:
p = Package.objects.get(uuid=package_id)
if PackageManager.readable(user, p):
p = PackageManager.check_package_classification(user, p)
return p
else:
# FIXME: use custom exception to be translatable to 403 in API
@ -450,6 +423,37 @@ class PackageManager(object):
except Package.DoesNotExist:
raise ValueError("Package with ID {} does not exist!".format(package_id))
# EDIT START
@staticmethod
def check_package_classification(user, pack: Package):
if pack.classification_level == Package.Classification.SECRET:
if pack.data_pool.user_member.filter(id=user.id).exists():
return pack
raise ValueError("Package is secret and not accessible to user!")
else:
return pack
@staticmethod
def check_package_classifications(user, package_qs: QuerySet[Package]):
non_secret = package_qs.exclude(classification_level=Package.Classification.SECRET)
secret = package_qs.filter(classification_level=Package.Classification.SECRET)
# TODO we should be able to do via the db
accessible_secret = []
for s_package in secret:
if s_package.data_pool.user_member.filter(id=user.id).exists():
accessible_secret.append(s_package.pk)
# Cannot combine a unique query with a non-unique query -> we have to call distinct
return Package.objects.filter(pk__in=accessible_secret).distinct() | non_secret.distinct()
# EDIT END
@staticmethod
def get_all_readable_packages(user, include_reviewed=False):
# UserPermission only exists if at least read is granted...
@ -474,7 +478,13 @@ class PackageManager(object):
# remove package if user is owner and package is reviewed e.g. admin
qs = qs.filter(reviewed=False)
return qs.distinct()
qs = qs.distinct()
# EDIT START
qs = PackageManager.check_package_classifications(user, qs)
# EDIT END
return qs
@staticmethod
def get_all_writeable_packages(user):
@ -518,11 +528,13 @@ class PackageManager(object):
qs = qs.filter(reviewed=False)
return qs.distinct()
qs = qs.distinct()
@staticmethod
def get_packages():
return Package.objects.all()
# EDIT START
qs = PackageManager.check_package_classifications(user, qs)
# EDIT END
return qs
@staticmethod
@transaction.atomic
@ -548,7 +560,7 @@ class PackageManager(object):
@staticmethod
@transaction.atomic
def update_permissions(
caller: User, package: Package, grantee: Union[User, Group], new_perm: Optional[str]
caller: User, package: Package, grantee: Union[User, Group], new_perm: Optional[str]
):
caller_perm = None
if not caller.is_superuser:
@ -591,7 +603,7 @@ class PackageManager(object):
@staticmethod
@transaction.atomic
def import_legacy_package(
data: dict, owner: User, keep_ids=False, add_import_timestamp=True, trust_reviewed=False
data: dict, owner: User, keep_ids=False, add_import_timestamp=True, trust_reviewed=False
):
from collections import defaultdict
from datetime import datetime
@ -671,7 +683,7 @@ class PackageManager(object):
# Check if parent exists and park this Scenario to convert it later into an
# AdditionalInformation object
for ex in scenario.get("additionalInformationCollection", {}).get(
"additionalInformation", []
"additionalInformation", []
):
if ex["name"] == "referringscenario":
postponed_scens[ex["data"]].append(scenario)
@ -694,7 +706,7 @@ class PackageManager(object):
mapping[scenario["id"]] = scen.uuid
for ex in scenario.get("additionalInformationCollection", {}).get(
"additionalInformation", []
"additionalInformation", []
):
name = ex["name"]
addinf_data = ex["data"]
@ -963,7 +975,7 @@ class PackageManager(object):
for parent, children in postponed_scens.items():
for child in children:
for ex in child.get("additionalInformationCollection", {}).get(
"additionalInformation", []
"additionalInformation", []
):
child_id = child["id"]
name = ex["name"]
@ -1104,11 +1116,11 @@ class PackageManager(object):
@staticmethod
@transaction.atomic
def import_package(
data: Dict[str, Any],
owner: User,
preserve_uuids=False,
add_import_timestamp=True,
trust_reviewed=False,
data: Dict[str, Any],
owner: User,
preserve_uuids=False,
add_import_timestamp=True,
trust_reviewed=False,
) -> Package:
importer = PackageImporter(data, preserve_uuids, add_import_timestamp, trust_reviewed)
imported_package = importer.do_import()
@ -1123,7 +1135,7 @@ class PackageManager(object):
@staticmethod
def export_package(
package: Package, include_models: bool = False, include_external_identifiers: bool = True
package: Package, include_models: bool = False, include_external_identifiers: bool = True
) -> Dict[str, Any]:
return PackageExporter(package).do_export()
@ -1150,10 +1162,10 @@ class SettingManager(object):
s = Setting.objects.get(uuid=setting_id)
if (
s.global_default
or s.public
or user.is_superuser
or UserSettingPermission.objects.filter(user=user, setting=s).exists()
s.global_default
or s.public
or user.is_superuser
or UserSettingPermission.objects.filter(user=user, setting=s).exists()
):
return s
@ -1163,24 +1175,24 @@ class SettingManager(object):
def get_all_settings(user):
sp = UserSettingPermission.objects.filter(user=user).values("setting")
return (
Setting.objects.filter(id__in=sp)
| Setting.objects.filter(public=True)
| Setting.objects.filter(global_default=True)
Setting.objects.filter(id__in=sp)
| Setting.objects.filter(public=True)
| Setting.objects.filter(global_default=True)
).distinct()
@staticmethod
@transaction.atomic
def create_setting(
user: User,
name: str = None,
description: str = None,
max_nodes: int = None,
max_depth: int = None,
rule_packages: List[Package] | None = None,
model: EPModel = None,
model_threshold: float = None,
expansion_scheme: ExpansionSchemeChoice = ExpansionSchemeChoice.BFS,
property_models: List["PropertyPluginModel"] | None = None,
user: User,
name: str = None,
description: str = None,
max_nodes: int = None,
max_depth: int = None,
rule_packages: List[Package] | None = None,
model: EPModel = None,
model_threshold: float = None,
expansion_scheme: ExpansionSchemeChoice = ExpansionSchemeChoice.BFS,
property_models: List["PropertyPluginModel"] | None = None,
):
new_s = Setting()
@ -1259,8 +1271,8 @@ class SearchManager(object):
pathway_qs = Pathway.objects.filter(
Q(package__in=packages)
& (
Q(edge__edge_label__educts__inchikey=searchterm)
| Q(edge__edge_label__products__inchikey=searchterm)
Q(edge__edge_label__educts__inchikey=searchterm)
| Q(edge__edge_label__products__inchikey=searchterm)
)
).distinct()
@ -1298,8 +1310,8 @@ class SearchManager(object):
pathway_qs = Pathway.objects.filter(
Q(package__in=packages)
& (
Q(edge__edge_label__educts__smiles=searchterm)
| Q(edge__edge_label__products__smiles=searchterm)
Q(edge__edge_label__educts__smiles=searchterm)
| Q(edge__edge_label__products__smiles=searchterm)
)
).distinct()
@ -1335,15 +1347,15 @@ class SearchManager(object):
reactions_qs = Reaction.objects.filter(
Q(package__in=packages)
& (
Q(educts__inchikey__startswith=inchi_front)
| Q(products__inchikey__startswith=inchi_front)
Q(educts__inchikey__startswith=inchi_front)
| Q(products__inchikey__startswith=inchi_front)
)
).distinct()
pathway_qs = Pathway.objects.filter(
Q(package__in=packages)
& (
Q(edge__edge_label__educts__inchikey__startswith=inchi_front)
| Q(edge__edge_label__products__inchikey__startswith=inchi_front)
Q(edge__edge_label__educts__inchikey__startswith=inchi_front)
| Q(edge__edge_label__products__inchikey__startswith=inchi_front)
)
).distinct()
@ -1381,8 +1393,8 @@ class SearchManager(object):
pathway_qs = Pathway.objects.filter(
Q(package__in=packages)
& (
Q(edge__edge_label__educts__canonical_smiles=searchterm)
| Q(edge__edge_label__products__canonical_smiles=searchterm)
Q(edge__edge_label__educts__canonical_smiles=searchterm)
| Q(edge__edge_label__products__canonical_smiles=searchterm)
)
).distinct()
@ -1457,11 +1469,11 @@ class SNode(object):
class SEdge(object):
def __init__(
self,
educts: Union[SNode, List[SNode]],
products: Union[SNode | List[SNode]],
rule: Optional["Rule"] = None,
probability: Optional[float] = None,
self,
educts: Union[SNode, List[SNode]],
products: Union[SNode | List[SNode]],
rule: Optional["Rule"] = None,
probability: Optional[float] = None,
):
if not isinstance(educts, list):
educts = [educts]
@ -1493,11 +1505,11 @@ class SEdge(object):
return False
if (
self.rule is not None
and other.rule is None
or self.rule is None
and other.rule is not None
or self.rule != other.rule
self.rule is not None
and other.rule is None
or self.rule is None
and other.rule is not None
or self.rule != other.rule
):
return False
@ -1505,8 +1517,8 @@ class SEdge(object):
return False
for n1, n2 in zip(
sorted(self.educts, key=lambda x: x.smiles),
sorted(other.educts, key=lambda x: x.smiles),
sorted(self.educts, key=lambda x: x.smiles),
sorted(other.educts, key=lambda x: x.smiles),
):
if n1.smiles != n2.smiles:
return False
@ -1515,8 +1527,8 @@ class SEdge(object):
return False
for n1, n2 in zip(
sorted(self.products, key=lambda x: x.smiles),
sorted(other.products, key=lambda x: x.smiles),
sorted(self.products, key=lambda x: x.smiles),
sorted(other.products, key=lambda x: x.smiles),
):
if n1.smiles != n2.smiles:
return False
@ -1529,10 +1541,10 @@ class SEdge(object):
class SPathway(object):
def __init__(
self,
root_nodes: Optional[Union[str, SNode, List[str | SNode]]] = None,
persist: Optional["Pathway"] = None,
prediction_setting: Optional[Setting] = None,
self,
root_nodes: Optional[Union[str, SNode, List[str | SNode]]] = None,
persist: Optional["Pathway"] = None,
prediction_setting: Optional[Setting] = None,
):
self.root_nodes = []
@ -1677,9 +1689,9 @@ class SPathway(object):
# We don't have any substrate, but technically we have at least one rule that triggered.
# If our substrate is a root node a.k.a. depth == 0 store that info in SPathway
if (
len(expansion_result["transformations"]) == 0
and expansion_result["rule_triggered"]
and sub.depth == 0
len(expansion_result["transformations"]) == 0
and expansion_result["rule_triggered"]
and sub.depth == 0
):
self.empty_due_to_threshold = True
@ -1786,8 +1798,8 @@ class SPathway(object):
for prod in edge.products:
# Either is a new product or a product and we found a path with a higher prob
if (
prod not in node_probs
or current_prob * edge.probability > node_probs[prod]
prod not in node_probs
or current_prob * edge.probability > node_probs[prod]
):
node_probs[prod] = current_prob * edge.probability