forked from enviPath/enviPy
1901 lines
63 KiB
Python
1901 lines
63 KiB
Python
from typing import Any, Dict, List, Optional
|
|
|
|
import nh3
|
|
from django.conf import settings as s
|
|
from django.contrib.auth import get_user_model
|
|
from django.http import HttpResponse
|
|
from django.shortcuts import redirect
|
|
from ninja import Field, Form, Router, Schema, Query
|
|
from ninja.security import SessionAuth
|
|
|
|
from utilities.chem import FormatConverter
|
|
from utilities.misc import PackageExporter
|
|
|
|
from .logic import GroupManager, PackageManager, SettingManager, UserManager, SearchManager
|
|
from .models import (
|
|
Compound,
|
|
CompoundStructure,
|
|
Edge,
|
|
EPModel,
|
|
Node,
|
|
Pathway,
|
|
Reaction,
|
|
Rule,
|
|
Scenario,
|
|
SimpleAmbitRule,
|
|
User,
|
|
UserPackagePermission,
|
|
ParallelRule,
|
|
)
|
|
|
|
Package = s.GET_PACKAGE_MODEL()
|
|
|
|
|
|
def _anonymous_or_real(request):
|
|
if request.user.is_authenticated and not request.user.is_anonymous:
|
|
return request.user
|
|
return get_user_model().objects.get(username="anonymous")
|
|
|
|
|
|
router = Router(auth=SessionAuth(csrf=False))
|
|
|
|
|
|
class Error(Schema):
|
|
message: str
|
|
|
|
|
|
#################
|
|
# SimpleObjects #
|
|
#################
|
|
class SimpleUser(Schema):
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "user"
|
|
name: str = Field(None, alias="username")
|
|
email: str = Field(None, alias="email")
|
|
|
|
|
|
class SimpleGroup(Schema):
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "group"
|
|
name: str = Field(None, alias="name")
|
|
|
|
|
|
class SimpleSetting(Schema):
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "setting"
|
|
name: str = Field(None, alias="name")
|
|
|
|
|
|
class SimpleObject(Schema):
|
|
id: str = Field(None, alias="url")
|
|
name: str = Field(None, alias="name")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj):
|
|
if isinstance(obj, Package):
|
|
return "reviewed" if obj.reviewed else "unreviewed"
|
|
elif hasattr(obj, "package"):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
elif isinstance(obj, CompoundStructure):
|
|
return "reviewed" if obj.compound.package.reviewed else "unreviewed"
|
|
elif isinstance(obj, Node) or isinstance(obj, Edge):
|
|
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
|
|
else:
|
|
raise ValueError("Object has no package")
|
|
|
|
|
|
class SimplePackage(SimpleObject):
|
|
identifier: str = "package"
|
|
|
|
|
|
class SimpleCompound(SimpleObject):
|
|
identifier: str = "compound"
|
|
|
|
|
|
class SimpleCompoundStructure(SimpleObject):
|
|
identifier: str = "structure"
|
|
|
|
|
|
class SimpleRule(SimpleObject):
|
|
identifier: str = "rule"
|
|
|
|
@staticmethod
|
|
def resolve_url(obj: Rule):
|
|
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
|
|
|
|
|
|
class SimpleReaction(SimpleObject):
|
|
identifier: str = "reaction"
|
|
|
|
|
|
class SimpleScenario(SimpleObject):
|
|
identifier: str = "scenario"
|
|
|
|
|
|
class SimplePathway(SimpleObject):
|
|
identifier: str = "pathway"
|
|
|
|
|
|
class SimpleNode(SimpleObject):
|
|
identifier: str = "node"
|
|
|
|
|
|
class SimpleEdge(SimpleObject):
|
|
identifier: str = "edge"
|
|
|
|
|
|
class SimpleModel(SimpleObject):
|
|
identifier: str = "relative-reasoning"
|
|
|
|
|
|
################
|
|
# Login/Logout #
|
|
################
|
|
@router.post("/", response={200: SimpleUser, 403: Error}, auth=None)
|
|
def login(request, loginusername: Form[str], loginpassword: Form[str]):
|
|
from django.contrib.auth import authenticate, login
|
|
|
|
email = User.objects.get(username=loginusername).email
|
|
user = authenticate(username=email, password=loginpassword)
|
|
if user:
|
|
login(request, user)
|
|
return user
|
|
else:
|
|
return 403, {"message": "Invalid username and/or password"}
|
|
|
|
|
|
########
|
|
# User #
|
|
########
|
|
class UserWrapper(Schema):
|
|
user: List[SimpleUser]
|
|
|
|
|
|
class UserSchema(Schema):
|
|
affiliation: Dict[str, str] = Field(None, alias="affiliation")
|
|
defaultGroup: "SimpleGroup" = Field(None, alias="default_group")
|
|
defaultPackage: "SimplePackage" = Field(None, alias="default_package")
|
|
defaultSetting: "SimpleSetting" = Field(None, alias="default_setting")
|
|
email: str = Field(None, alias="email")
|
|
forename: str = "not specified"
|
|
groups: List["SimpleGroup"] = Field([], alias="groups")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "user"
|
|
link: str = "empty"
|
|
name: str = Field(None, alias="username")
|
|
surname: str = "not specified"
|
|
settings: List["SimpleSetting"] = Field([], alias="settings")
|
|
|
|
@staticmethod
|
|
def resolve_affiliation(obj: User):
|
|
# TODO
|
|
return {"city": "not specified", "country": "not specified", "workplace": "not specified"}
|
|
|
|
@staticmethod
|
|
def resolve_settings(obj: User):
|
|
return SettingManager.get_all_settings(obj)
|
|
|
|
|
|
class Me(Schema):
|
|
whoami: str | None = None
|
|
|
|
|
|
@router.get("/user", response={200: UserWrapper, 403: Error})
|
|
def get_users(request, me: Query[Me]):
|
|
if me.whoami:
|
|
return {"user": [request.user]}
|
|
else:
|
|
return {"user": User.objects.all()}
|
|
|
|
|
|
@router.get("/user/{uuid:user_uuid}", response={200: UserSchema, 403: Error})
|
|
def get_user(request, user_uuid):
|
|
try:
|
|
u = UserManager.get_user_by_id(request.user, user_uuid)
|
|
return u
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting User with id {user_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class Search(Schema):
|
|
packages: List[str] = Field(alias="packages[]")
|
|
search: str
|
|
method: str
|
|
|
|
|
|
@router.get("/search", response={200: Any, 403: Error})
|
|
def search(request, search: Query[Search]):
|
|
try:
|
|
packs = []
|
|
for package in search.packages:
|
|
packs.append(PackageManager.get_package_by_url(request.user, package))
|
|
|
|
method = None
|
|
|
|
if search.method == "text":
|
|
method = "text"
|
|
elif search.method == "inchikey":
|
|
method = "inchikey"
|
|
elif search.method == "defaultSmiles":
|
|
method = "default"
|
|
elif search.method == "canonicalSmiles":
|
|
method = "canonical"
|
|
elif search.method == "exactSmiles":
|
|
method = "exact"
|
|
|
|
if method is None:
|
|
raise ValueError(f"Search method {search.method} is not supported!")
|
|
|
|
search_res = SearchManager.search(packs, search.search, method)
|
|
res = {}
|
|
if "Compounds" in search_res:
|
|
res["compound"] = search_res["Compounds"]
|
|
|
|
if "Compound Structures" in search_res:
|
|
res["structure"] = search_res["Compound Structures"]
|
|
|
|
if "Reaction" in search_res:
|
|
res["reaction"] = search_res["Reaction"]
|
|
|
|
if "Pathway" in search_res:
|
|
res["pathway"] = search_res["Pathway"]
|
|
|
|
if "Rules" in search_res:
|
|
res["rule"] = search_res["Rules"]
|
|
|
|
for key in res:
|
|
for v in res[key]:
|
|
v["id"] = v["url"].replace("simple-ambit-rule", "simple-rule")
|
|
|
|
return res
|
|
except ValueError as e:
|
|
return 403, {"message": f"Search failed due to {e}"}
|
|
|
|
|
|
###########
|
|
# Package #
|
|
###########
|
|
class PackageWrapper(Schema):
|
|
package: List["PackageSchema"]
|
|
|
|
|
|
class PackageSchema(Schema):
|
|
description: str = Field(None, alias="description")
|
|
id: str = Field(None, alias="url")
|
|
links: List[Dict[str, List[str | int]]] = Field([], alias="links")
|
|
name: str = Field(None, alias="name")
|
|
primaryGroup: Optional[SimpleGroup] = None
|
|
readers: List[Dict[str, str]] = Field([], alias="readers")
|
|
reviewComment: str = Field(None, alias="review_comment")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
writers: List[Dict[str, str]] = Field([], alias="writers")
|
|
|
|
@staticmethod
|
|
def resolve_links(obj: Package):
|
|
return [
|
|
{"Pathways": [f"{obj.url}/pathway", obj.pathways.count()]},
|
|
{"Rules": [f"{obj.url}/rule", obj.rules.count()]},
|
|
{"Compounds": [f"{obj.url}/compound", obj.compounds.count()]},
|
|
{"Reactions": [f"{obj.url}/reaction", obj.reactions.count()]},
|
|
{"Relative Reasoning": [f"{obj.url}/relative-reasoning", obj.models.count()]},
|
|
{"Scenarios": [f"{obj.url}/scenario", obj.scenarios.count()]},
|
|
]
|
|
|
|
@staticmethod
|
|
def resolve_readers(obj: Package):
|
|
users = User.objects.filter(
|
|
id__in=UserPackagePermission.objects.filter(
|
|
package=obj, permission=UserPackagePermission.READ[0]
|
|
).values_list("user", flat=True)
|
|
).distinct()
|
|
|
|
return [{u.id: u.name} for u in users]
|
|
|
|
@staticmethod
|
|
def resolve_writers(obj: Package):
|
|
users = User.objects.filter(
|
|
id__in=UserPackagePermission.objects.filter(
|
|
package=obj, permission=UserPackagePermission.WRITE[0]
|
|
).values_list("user", flat=True)
|
|
).distinct()
|
|
|
|
return [{u.id: u.name} for u in users]
|
|
|
|
@staticmethod
|
|
def resolve_review_comment(obj):
|
|
return ""
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj):
|
|
return "reviewed" if obj.reviewed else "unreviewed"
|
|
|
|
|
|
@router.get("/package", response={200: PackageWrapper, 403: Error})
|
|
def get_packages(request):
|
|
return {
|
|
"package": PackageManager.get_all_readable_packages(request.user, include_reviewed=True)
|
|
}
|
|
|
|
|
|
class GetPackage(Schema):
|
|
exportAsJson: str | None = None
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 403: Error})
|
|
def get_package(request, package_uuid, gp: Query[GetPackage]):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if gp.exportAsJson and gp.exportAsJson.strip() == "true":
|
|
return PackageExporter(p).do_export()
|
|
|
|
return p
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreatePackage(Schema):
|
|
packageName: str
|
|
packageDescription: str | None = None
|
|
|
|
|
|
@router.post("/package")
|
|
def create_packages(
|
|
request,
|
|
p: Form[CreatePackage],
|
|
):
|
|
try:
|
|
if p.packageName.strip() == "":
|
|
raise ValueError("Package name cannot be empty!")
|
|
|
|
new_pacakge = PackageManager.create_package(
|
|
request.user, p.packageName, p.packageDescription
|
|
)
|
|
return redirect(new_pacakge.url)
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
class UpdatePackage(Schema):
|
|
packageDescription: str | None = None
|
|
hiddenMethod: str | None = None
|
|
permissions: str | None = None
|
|
ppsURI: str | None = None
|
|
read: str | None = None
|
|
write: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 400: Error})
|
|
def update_package(request, package_uuid, pack: Form[UpdatePackage]):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if pack.hiddenMethod:
|
|
if pack.hiddenMethod == "DELETE":
|
|
p.delete()
|
|
|
|
elif pack.packageDescription is not None:
|
|
description = nh3.clean(pack.packageDescription, tags=s.ALLOWED_HTML_TAGS).strip()
|
|
|
|
if description:
|
|
p.description = description
|
|
p.save()
|
|
return HttpResponse(status=200)
|
|
else:
|
|
raise ValueError("Package description cannot be empty!")
|
|
elif all([pack.permissions, pack.ppsURI, pack.read]):
|
|
if "group" in pack.ppsURI:
|
|
grantee = GroupManager.get_group_lp(pack.ppsURI)
|
|
else:
|
|
grantee = UserManager.get_user_lp(pack.ppsURI)
|
|
|
|
PackageManager.grant_read(request.user, p, grantee)
|
|
return HttpResponse(status=200)
|
|
elif all([pack.permissions, pack.ppsURI, pack.write]):
|
|
if "group" in pack.ppsURI:
|
|
grantee = GroupManager.get_group_lp(pack.ppsURI)
|
|
else:
|
|
grantee = UserManager.get_user_lp(pack.ppsURI)
|
|
|
|
PackageManager.grant_write(request.user, p, grantee)
|
|
return HttpResponse(status=200)
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}")
|
|
def delete_package(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.administrable(request.user, p):
|
|
p.delete()
|
|
return redirect(f"{s.SERVER_URL}/package")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete this Package!")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
################################
|
|
# Compound / CompoundStructure #
|
|
################################
|
|
class CompoundWrapper(Schema):
|
|
compound: List["SimpleCompound"]
|
|
|
|
|
|
class CompoundPathwayScenario(Schema):
|
|
scenarioId: str
|
|
scenarioName: str
|
|
scenarioType: str
|
|
|
|
|
|
class CompoundSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
externalReferences: Dict[str, List[str]] = Field(None, alias="external_references")
|
|
id: str = Field(None, alias="url")
|
|
halflifes: List[Dict[str, str]] = Field([], alias="halflifes")
|
|
identifier: str = "compound"
|
|
imageSize: int = 600
|
|
name: str = Field(None, alias="name")
|
|
pathwayScenarios: List[CompoundPathwayScenario] = Field([], alias="pathway_scenarios")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
pubchemCompoundReferences: List[str] = Field([], alias="pubchem_compound_references")
|
|
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
|
|
reviewStatus: str = Field(False, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
structures: List["CompoundStructureSchema"] = []
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: CompoundStructure):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
@staticmethod
|
|
def resolve_external_references(obj: Compound):
|
|
# TODO
|
|
return {}
|
|
|
|
@staticmethod
|
|
def resolve_structures(obj: Compound):
|
|
return CompoundStructure.objects.filter(compound=obj)
|
|
|
|
@staticmethod
|
|
def resolve_halflifes(obj: Compound):
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_pubchem_compound_references(obj: Compound):
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_pathway_scenarios(obj: Compound):
|
|
return [
|
|
{
|
|
"scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/cd8350cd-4249-4111-ba9f-4e2209338501",
|
|
"scenarioName": "Fritz, R. & Brauner, A. (1989) - (00004)",
|
|
"scenarioType": "Soil",
|
|
}
|
|
]
|
|
|
|
|
|
class CompoundStructureSchema(Schema):
|
|
InChI: str = Field(None, alias="inchi")
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
canonicalSmiles: str = Field(None, alias="canonical_smiles")
|
|
charge: int = Field(None, alias="charge")
|
|
description: str = Field(None, alias="description")
|
|
externalReferences: Dict[str, List[str]] = Field(None, alias="external_references")
|
|
formula: str = Field(None, alias="formula")
|
|
halflifes: List[Dict[str, str]] = Field([], alias="halflifes")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "structure"
|
|
imageSize: int = 600
|
|
inchikey: str = Field(None, alias="inchikey")
|
|
isDefaultStructure: bool = Field(None, alias="is_default_structure")
|
|
mass: float = Field(None, alias="mass")
|
|
name: str = Field(None, alias="name")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
pubchemCompoundReferences: List[str] = Field([], alias="pubchem_compound_references")
|
|
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
smiles: str = Field(None, alias="smiles")
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: CompoundStructure):
|
|
return "reviewed" if obj.compound.package.reviewed else "unreviewed"
|
|
|
|
@staticmethod
|
|
def resolve_inchi(obj: CompoundStructure):
|
|
return FormatConverter.InChI(obj.smiles)
|
|
|
|
@staticmethod
|
|
def resolve_charge(obj: CompoundStructure):
|
|
return FormatConverter.charge(obj.smiles)
|
|
|
|
@staticmethod
|
|
def resolve_formula(obj: CompoundStructure):
|
|
return FormatConverter.formula(obj.smiles)
|
|
|
|
@staticmethod
|
|
def resolve_mass(obj: CompoundStructure):
|
|
return FormatConverter.mass(obj.smiles)
|
|
|
|
@staticmethod
|
|
def resolve_external_references(obj: CompoundStructure):
|
|
# TODO
|
|
return {}
|
|
|
|
@staticmethod
|
|
def resolve_halflifes(obj: CompoundStructure):
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_pubchem_compound_references(obj: CompoundStructure):
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_pathway_scenarios(obj: CompoundStructure):
|
|
return [
|
|
{
|
|
"scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/cd8350cd-4249-4111-ba9f-4e2209338501",
|
|
"scenarioName": "Fritz, R. & Brauner, A. (1989) - (00004)",
|
|
"scenarioType": "Soil",
|
|
}
|
|
]
|
|
|
|
|
|
class CompoundStructureWrapper(Schema):
|
|
structure: List["SimpleCompoundStructure"]
|
|
|
|
|
|
@router.get("/compound", response={200: CompoundWrapper, 403: Error})
|
|
def get_compounds(request):
|
|
return {
|
|
"compound": Compound.objects.filter(
|
|
package__in=PackageManager.get_reviewed_packages()
|
|
).prefetch_related("package")
|
|
}
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/compound", response={200: CompoundWrapper, 403: Error})
|
|
def get_package_compounds(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"compound": Compound.objects.filter(package=p).prefetch_related("package")}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Compounds for Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}",
|
|
response={200: CompoundSchema, 403: Error},
|
|
)
|
|
def get_package_compound(request, package_uuid, compound_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return Compound.objects.get(package=p, uuid=compound_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Compound with id {compound_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure",
|
|
response={200: CompoundStructureWrapper, 403: Error},
|
|
)
|
|
def get_package_compound_structures(request, package_uuid, compound_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"structure": Compound.objects.get(package=p, uuid=compound_uuid).structures.all()}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting CompoundStructures for Compound with id {compound_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure/{uuid:structure_uuid}",
|
|
response={200: CompoundStructureSchema, 403: Error},
|
|
)
|
|
def get_package_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return CompoundStructure.objects.get(
|
|
uuid=structure_uuid, compound=Compound.objects.get(package=p, uuid=compound_uuid)
|
|
)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting CompoundStructure with id {structure_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreateCompound(Schema):
|
|
compoundSmiles: str
|
|
compoundName: str | None = None
|
|
compoundDescription: str | None = None
|
|
inchi: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}/compound")
|
|
def create_package_compound(
|
|
request,
|
|
package_uuid,
|
|
c: Form[CreateCompound],
|
|
):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
# inchi is not used atm
|
|
c = Compound.create(
|
|
p, c.compoundSmiles, c.compoundName, c.compoundDescription, inchi=c.inchi
|
|
)
|
|
return redirect(c.url)
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}")
|
|
def delete_compound(request, package_uuid, compound_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.writable(request.user, p):
|
|
c = Compound.objects.get(package=p, uuid=compound_uuid)
|
|
c.delete()
|
|
return redirect(f"{p.url}/compound")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete this Compound!")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Compound with id {compound_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.delete(
|
|
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure/{uuid:structure_uuid}"
|
|
)
|
|
def delete_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.writable(request.user, p):
|
|
c = Compound.objects.get(package=p, uuid=compound_uuid)
|
|
cs = CompoundStructure.objects.get(compound=c, uuid=structure_uuid)
|
|
|
|
# Check if we have to delete the compound as no structure is left
|
|
if len(cs.compound.structures.all()) == 1:
|
|
# This will delete the structure as well
|
|
c.delete()
|
|
return redirect(p.url + "/compound")
|
|
else:
|
|
if cs.normalized_structure:
|
|
c.delete()
|
|
return redirect(p.url + "/compound")
|
|
else:
|
|
if c.default_structure == cs:
|
|
cs.delete()
|
|
c.default_structure = c.structures.all().first()
|
|
return redirect(c.url + "/structure")
|
|
else:
|
|
cs.delete()
|
|
return redirect(c.url + "/structure")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete this CompoundStructure!")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting CompoundStructure with id {compound_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
#########
|
|
# Rules #
|
|
#########
|
|
class RuleWrapper(Schema):
|
|
rule: List["SimpleRule"]
|
|
|
|
|
|
class SimpleRuleSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
|
|
engine: str = "ambit"
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = Field(None, alias="identifier")
|
|
isCompositeRule: bool = False
|
|
name: str = Field(None, alias="name")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
productFilterSmarts: str = Field("", alias="product_filter_smarts")
|
|
productSmarts: str = Field(None, alias="products_smarts")
|
|
reactantFilterSmarts: str = Field("", alias="reactant_filter_smarts")
|
|
reactantSmarts: str = Field(None, alias="reactants_smarts")
|
|
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
smirks: str = Field("", alias="smirks")
|
|
# TODO
|
|
transformations: str = Field("", alias="transformations")
|
|
|
|
@staticmethod
|
|
def resolve_url(obj: Rule):
|
|
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
|
|
|
|
@staticmethod
|
|
def resolve_identifier(obj: Rule):
|
|
if "simple-rule" in obj.url:
|
|
return "simple-rule"
|
|
if "simple-ambit-rule" in obj.url:
|
|
return "simple-rule"
|
|
elif "parallel-rule" in obj.url:
|
|
return "parallel-rule"
|
|
elif "sequential-rule" in obj.url:
|
|
return "sequential-rule"
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Rule):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
@staticmethod
|
|
def resolve_product_filter_smarts(obj: Rule):
|
|
return obj.product_filter_smarts if obj.product_filter_smarts else ""
|
|
|
|
@staticmethod
|
|
def resolve_reactant_filter_smarts(obj: Rule):
|
|
return obj.reactant_filter_smarts if obj.reactant_filter_smarts else ""
|
|
|
|
|
|
class CompositeRuleSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = Field(None, alias="identifier")
|
|
isCompositeRule: bool = True
|
|
name: str = Field(None, alias="name")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
productFilterSmarts: str = Field("", alias="product_filter_smarts")
|
|
reactantFilterSmarts: str = Field("", alias="reactant_filter_smarts")
|
|
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
simpleRules: List["SimpleRule"] = Field([], alias="simple_rules")
|
|
|
|
@staticmethod
|
|
def resolve_ec_numbers(obj: Rule):
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_url(obj: Rule):
|
|
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
|
|
|
|
@staticmethod
|
|
def resolve_identifier(obj: Rule):
|
|
if "simple-rule" in obj.url:
|
|
return "simple-rule"
|
|
if "simple-ambit-rule" in obj.url:
|
|
return "simple-rule"
|
|
elif "parallel-rule" in obj.url:
|
|
return "parallel-rule"
|
|
elif "sequential-rule" in obj.url:
|
|
return "sequential-rule"
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Rule):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
@staticmethod
|
|
def resolve_product_filter_smarts(obj: Rule):
|
|
return obj.product_filter_smarts if obj.product_filter_smarts else ""
|
|
|
|
@staticmethod
|
|
def resolve_reactant_filter_smarts(obj: Rule):
|
|
return obj.reactant_filter_smarts if obj.reactant_filter_smarts else ""
|
|
|
|
|
|
@router.get("/rule", response={200: RuleWrapper, 403: Error})
|
|
def get_rules(request):
|
|
return {
|
|
"rule": Rule.objects.filter(
|
|
package__in=PackageManager.get_reviewed_packages()
|
|
).prefetch_related("package")
|
|
}
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/rule", response={200: RuleWrapper, 403: Error})
|
|
def get_package_rules(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"rule": Rule.objects.filter(package=p).prefetch_related("package")}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Rules for Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}",
|
|
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
|
|
)
|
|
def get_package_rule(request, package_uuid, rule_uuid):
|
|
return _get_package_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
|
|
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
|
|
)
|
|
def get_package_simple_rule(request, package_uuid, rule_uuid):
|
|
return _get_package_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
|
|
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
|
|
)
|
|
def get_package_parallel_rule(request, package_uuid, rule_uuid):
|
|
return _get_package_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
def _get_package_rule(request, package_uuid, rule_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return Rule.objects.get(package=p, uuid=rule_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Rule with id {rule_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
# POST
|
|
class CreateSimpleRule(Schema):
|
|
smirks: str
|
|
name: str | None = None
|
|
description: str | None = None
|
|
reactantFilterSmarts: str | None = None
|
|
productFilterSmarts: str | None = None
|
|
immediate: str | None = None
|
|
rdkitrule: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}/simple-rule")
|
|
def create_package_simple_rule(
|
|
request,
|
|
package_uuid,
|
|
r: Form[CreateSimpleRule],
|
|
):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if r.rdkitrule and r.rdkitrule.strip() == "true":
|
|
raise ValueError("Not yet implemented!")
|
|
else:
|
|
sr = SimpleAmbitRule.create(
|
|
p, r.name, r.description, r.smirks, r.reactantFilterSmarts, r.productFilterSmarts
|
|
)
|
|
|
|
return redirect(sr.url)
|
|
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
class CreateParallelRule(Schema):
|
|
simpleRules: str
|
|
name: str | None = None
|
|
description: str | None = None
|
|
reactantFilterSmarts: str | None = None
|
|
productFilterSmarts: str | None = None
|
|
immediate: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}/parallel-rule")
|
|
def create_package_parallel_rule(
|
|
request,
|
|
package_uuid,
|
|
r: Form[CreateParallelRule],
|
|
):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
srs = SimpleRule.objects.filter(package=p, url__in=r.simpleRules)
|
|
|
|
if srs.count() != len(r.simpleRules):
|
|
raise ValueError(
|
|
f"Not all SimpleRules could be found in Package with id {package_uuid}!"
|
|
)
|
|
|
|
sr = ParallelRule.create(
|
|
p, list(srs), r.name, r.description, r.reactantFilterSmarts, r.productFilterSmarts
|
|
)
|
|
|
|
return redirect(sr.url)
|
|
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
@router.post(
|
|
"/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}", response={200: str | Any, 403: Error}
|
|
)
|
|
def post_package_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
|
|
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
|
|
|
|
|
|
@router.post(
|
|
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
|
|
response={200: str | Any, 403: Error},
|
|
)
|
|
def post_package_simple_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
|
|
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
|
|
|
|
|
|
@router.post(
|
|
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
|
|
response={200: str | Any, 403: Error},
|
|
)
|
|
def post_package_parallel_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
|
|
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
|
|
|
|
|
|
def _post_package_rule(request, package_uuid, rule_uuid, compound: Form[str]):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
r = Rule.objects.get(package=p, uuid=rule_uuid)
|
|
|
|
if compound is not None:
|
|
if not compound.split():
|
|
return 400, {"message": "Compound is empty"}
|
|
|
|
product_sets = r.apply(compound)
|
|
|
|
res = []
|
|
for p_set in product_sets:
|
|
for product in p_set:
|
|
res.append(product)
|
|
|
|
return HttpResponse("\n".join(res), content_type="text/plain")
|
|
|
|
return r
|
|
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Rule with id {rule_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}")
|
|
def delete_rule(request, package_uuid, rule_uuid):
|
|
return _delete_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
@router.delete(
|
|
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
|
|
)
|
|
def delete_simple_rule(request, package_uuid, rule_uuid):
|
|
return _delete_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
@router.delete(
|
|
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
|
|
)
|
|
def delete_parallel_rule(request, package_uuid, rule_uuid):
|
|
return _delete_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
def _delete_rule(request, package_uuid, rule_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.writable(request.user, p):
|
|
r = Rule.objects.get(package=p, uuid=rule_uuid)
|
|
r.delete()
|
|
return redirect(f"{p.url}/rule")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete this Rule!")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Rule with id {rule_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
############
|
|
# Reaction #
|
|
############
|
|
class ReactionWrapper(Schema):
|
|
reaction: List["SimpleReaction"]
|
|
|
|
|
|
class ReactionCompoundStructure(Schema):
|
|
compoundName: str = Field(None, alias="name")
|
|
id: str = Field(None, alias="url")
|
|
smiles: str = Field(None, alias="smiles")
|
|
|
|
|
|
class ReactionSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
|
|
educts: List["ReactionCompoundStructure"] = Field([], alias="educts")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "reaction"
|
|
medlineRefs: List[str] = Field([], alias="medline_references")
|
|
multistep: bool = Field(None, alias="multi_step")
|
|
name: str = Field(None, alias="name")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
products: List["ReactionCompoundStructure"] = Field([], alias="products")
|
|
references: List[Dict[str, List[str]]] = Field([], alias="references")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
smirks: str = Field("", alias="smirks")
|
|
|
|
@staticmethod
|
|
def resolve_smirks(obj: Reaction):
|
|
return obj.smirks()
|
|
|
|
@staticmethod
|
|
def resolve_ec_numbers(obj: Reaction):
|
|
# TODO fetch via scenario EnzymeAI
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_references(obj: Reaction):
|
|
# TODO
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_medline_references(obj: Reaction):
|
|
# TODO
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Rule):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
|
|
@router.get("/reaction", response={200: ReactionWrapper, 403: Error})
|
|
def get_reactions(request):
|
|
return {
|
|
"reaction": Reaction.objects.filter(
|
|
package__in=PackageManager.get_reviewed_packages()
|
|
).prefetch_related("package")
|
|
}
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/reaction", response={200: ReactionWrapper, 403: Error})
|
|
def get_package_reactions(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"reaction": Reaction.objects.filter(package=p).prefetch_related("package")}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Reactions for Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/reaction/{uuid:reaction_uuid}",
|
|
response={200: ReactionSchema, 403: Error},
|
|
)
|
|
def get_package_reaction(request, package_uuid, reaction_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return Reaction.objects.get(package=p, uuid=reaction_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Reaction with id {reaction_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreateReaction(Schema):
|
|
reactionName: str | None = None
|
|
reactionDescription: str | None = None
|
|
smirks: str | None = None
|
|
educt: str | None = None
|
|
product: str | None = None
|
|
rule: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}/reaction")
|
|
def create_package_reaction(
|
|
request,
|
|
package_uuid,
|
|
r: Form[CreateReaction],
|
|
):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if r.smirks is None and (r.educt is None or r.product is None):
|
|
raise ValueError("Either SMIRKS or educt/product must be provided")
|
|
|
|
if r.smirks is not None and (r.educt is not None and r.product is not None):
|
|
raise ValueError("SMIRKS and educt/product provided!")
|
|
|
|
rule = None
|
|
if r.rule:
|
|
try:
|
|
rule = Rule.objects.get(package=p, url=r.rule)
|
|
except Rule.DoesNotExist:
|
|
raise ValueError(f"Rule with id {r.rule} does not exist!")
|
|
|
|
if r.educt is not None:
|
|
try:
|
|
educt_cs = CompoundStructure.objects.get(compound__package=p, url=r.educt)
|
|
except CompoundStructure.DoesNotExist:
|
|
raise ValueError(f"Compound with id {r.educt} does not exist!")
|
|
|
|
try:
|
|
product_cs = CompoundStructure.objects.get(compound__package=p, url=r.product)
|
|
except CompoundStructure.DoesNotExist:
|
|
raise ValueError(f"Compound with id {r.product} does not exist!")
|
|
|
|
new_r = Reaction.create(
|
|
p, r.reactionName, r.reactionDescription, [educt_cs], [product_cs], rule
|
|
)
|
|
else:
|
|
educts = r.smirks.split(">>")[0].split("\\.")
|
|
products = r.smirks.split(">>")[1].split("\\.")
|
|
|
|
new_r = Reaction.create(
|
|
p, r.reactionName, r.reactionDescription, educts, products, rule
|
|
)
|
|
|
|
return redirect(new_r.url)
|
|
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/reaction/{uuid:reaction_uuid}")
|
|
def delete_reaction(request, package_uuid, reaction_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.writable(request.user, p):
|
|
r = Reaction.objects.get(package=p, uuid=reaction_uuid)
|
|
r.delete()
|
|
return redirect(f"{p.url}/reaction")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete this Reaction!")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Reaction with id {reaction_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
############
|
|
# Scenario #
|
|
############
|
|
class ScenarioWrapper(Schema):
|
|
scenario: List["SimpleScenario"]
|
|
|
|
|
|
class ScenarioSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
collection: Dict["str", List[Dict[str, Any]]] = Field([], alias="collection")
|
|
collectionID: Optional[str] = None
|
|
description: str = Field(None, alias="description")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "scenario"
|
|
linkedTo: List[Dict[str, str]] = Field([], alias="linked_to")
|
|
name: str = Field(None, alias="name")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
relatedScenarios: List[Dict[str, str]] = Field([], alias="related_scenarios")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
type: str = Field(None, alias="scenario_type")
|
|
|
|
@staticmethod
|
|
def resolve_collection(obj: Scenario):
|
|
return obj.additional_information
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Rule):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
|
|
@router.get("/scenario", response={200: ScenarioWrapper, 403: Error})
|
|
def get_scenarios(request):
|
|
return {
|
|
"scenario": Scenario.objects.filter(
|
|
package__in=PackageManager.get_reviewed_packages()
|
|
).prefetch_related("package")
|
|
}
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/scenario", response={200: ScenarioWrapper, 403: Error})
|
|
def get_package_scenarios(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"scenario": Scenario.objects.filter(package=p).prefetch_related("package")}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Scenarios for Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/scenario/{uuid:scenario_uuid}",
|
|
response={200: ScenarioSchema, 403: Error},
|
|
)
|
|
def get_package_scenario(request, package_uuid, scenario_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return Scenario.objects.get(package=p, uuid=scenario_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Scenario with id {scenario_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/scenario")
|
|
def delete_scenarios(request, package_uuid, scenario_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.writable(request.user, p):
|
|
scens = Scenario.objects.filter(package=p)
|
|
scens.delete()
|
|
return redirect(f"{p.url}/scenario")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete Scenarios!")
|
|
except ValueError:
|
|
return 403, {"message": "Deleting Scenarios failed due to insufficient rights!"}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/scenario/{uuid:scenario_uuid}")
|
|
def delete_scenario(request, package_uuid, scenario_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.writable(request.user, p):
|
|
scen = Scenario.objects.get(package=p, uuid=scenario_uuid)
|
|
scen.delete()
|
|
return redirect(f"{p.url}/scenario")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete this Scenario!")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Scenario with id {scenario_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
###########
|
|
# Pathway #
|
|
###########
|
|
class PathwayWrapper(Schema):
|
|
pathway: List["SimplePathway"]
|
|
|
|
|
|
class PathwayEdge(Schema):
|
|
ecNumbers: List[str] = Field([], alias="ec_numbers")
|
|
id: str = Field(None, alias="url")
|
|
idreaction: str = Field(None, alias="edge_label.url")
|
|
multstep: bool = Field(None, alias="edge_label.multi_step")
|
|
name: str = Field(None, alias="name")
|
|
pseudo: bool = False
|
|
rule: Optional[str] = Field(None, alias="rule")
|
|
scenarios: List[SimpleScenario] = Field([], alias="scenarios")
|
|
source: int = -1
|
|
target: int = -1
|
|
|
|
@staticmethod
|
|
def resolve_rule(obj: Edge):
|
|
if obj.edge_label.rules.all().exists():
|
|
r = obj.edge_label.rules.all()[0]
|
|
if isinstance(r, SimpleAmbitRule):
|
|
return r.smirks
|
|
return None
|
|
|
|
|
|
class PathwayNode(Schema):
|
|
atomCount: int = Field(None, alias="atom_count")
|
|
depth: int = Field(None, alias="depth")
|
|
dt50s: List[Dict[str, str]] = Field([], alias="dt50s")
|
|
engineeredIntermediate: bool = Field(None, alias="engineered_intermediate")
|
|
id: str = Field(None, alias="url")
|
|
idcomp: str = Field(None, alias="default_node_label.url")
|
|
idreact: str = Field(None, alias="default_node_label.url")
|
|
image: str = Field(None, alias="image")
|
|
imageSize: int = Field(None, alias="image_size")
|
|
name: str = Field(None, alias="name")
|
|
proposed: List[Dict[str, str]] = Field([], alias="proposed_intermediate")
|
|
smiles: str = Field(None, alias="default_node_label.smiles")
|
|
|
|
@staticmethod
|
|
def resolve_atom_count(obj: Node):
|
|
from rdkit import Chem
|
|
|
|
return Chem.MolFromSmiles(obj.default_node_label.smiles).GetNumAtoms()
|
|
|
|
@staticmethod
|
|
def resolve_dt50s(obj: Node):
|
|
# TODO
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_engineered_intermediate(obj: Node):
|
|
# TODO
|
|
return False
|
|
|
|
@staticmethod
|
|
def resolve_image(obj: Node):
|
|
return f"{obj.default_node_label.url}?image=svg"
|
|
|
|
@staticmethod
|
|
def resolve_image_size(obj: Node):
|
|
return 400
|
|
|
|
@staticmethod
|
|
def resolve_proposed_intermediate(obj: Node):
|
|
# TODO
|
|
return []
|
|
|
|
|
|
class PathwaySchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
completed: str = Field(None, alias="completed")
|
|
description: str = Field(None, alias="description")
|
|
id: str = Field(None, alias="url")
|
|
isIncremental: bool = Field(None, alias="is_incremental")
|
|
isPredicted: bool = Field(None, alias="is_predicted")
|
|
lastModified: int = Field(None, alias="last_modified")
|
|
links: List[PathwayEdge] = Field([], alias="edges")
|
|
name: str = Field(None, alias="name")
|
|
nodes: List[PathwayNode] = Field([], alias="nodes")
|
|
pathwayName: str = Field(None, alias="name")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
upToDate: bool = Field(None, alias="up_to_date")
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Pathway):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
@staticmethod
|
|
def resolve_completed(obj: Pathway):
|
|
return str(obj.completed()).lower()
|
|
|
|
@staticmethod
|
|
def resolve_up_to_date(obj: Pathway):
|
|
return "true"
|
|
|
|
@staticmethod
|
|
def resolve_last_modified(obj: Pathway):
|
|
return int(obj.modified.timestamp())
|
|
|
|
|
|
@router.get("/pathway", response={200: PathwayWrapper, 403: Error})
|
|
def get_pathways(request):
|
|
return {
|
|
"pathway": Pathway.objects.filter(
|
|
package__in=PackageManager.get_reviewed_packages()
|
|
).prefetch_related("package")
|
|
}
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/pathway", response={200: PathwayWrapper, 403: Error})
|
|
def get_package_pathways(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"pathway": Pathway.objects.filter(package=p).prefetch_related("package")}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Pathways for Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}",
|
|
response={200: PathwaySchema, 403: Error},
|
|
)
|
|
def get_package_pathway(request, package_uuid, pathway_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Pathway with id {pathway_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreatePathway(Schema):
|
|
smilesinput: str
|
|
name: str | None = None
|
|
description: str | None = None
|
|
rootOnly: str | None = None
|
|
selectedSetting: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}/pathway")
|
|
def create_pathway(
|
|
request,
|
|
package_uuid,
|
|
pw: Form[CreatePathway],
|
|
):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
stand_smiles = FormatConverter.standardize(pw.smilesinput.strip())
|
|
|
|
new_pw = Pathway.create(p, stand_smiles, name=pw.name, description=pw.description)
|
|
|
|
pw_mode = "predict"
|
|
if pw.rootOnly and pw.rootOnly.strip() == "true":
|
|
pw_mode = "build"
|
|
|
|
new_pw.kv.update({"mode": pw_mode})
|
|
new_pw.save()
|
|
|
|
if pw_mode == "predict":
|
|
setting = request.user.prediction_settings()
|
|
|
|
if pw.selectedSetting:
|
|
setting = SettingManager.get_setting_by_url(request.user, pw.selectedSetting)
|
|
|
|
new_pw.setting = setting
|
|
new_pw.save()
|
|
|
|
from .tasks import dispatch, predict
|
|
|
|
dispatch(request.user, predict, new_pw.pk, setting.pk, limit=-1)
|
|
|
|
return redirect(new_pw.url)
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}")
|
|
def delete_pathway(request, package_uuid, pathway_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.writable(request.user, p):
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
pw.delete()
|
|
return redirect(f"{p.url}/pathway")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete this pathway!")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Pathway with id {pathway_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
########
|
|
# Node #
|
|
########
|
|
class NodeWrapper(Schema):
|
|
node: List["SimpleNode"]
|
|
|
|
|
|
class NodeCompoundStructure(Schema):
|
|
id: str = Field(None, alias="url")
|
|
image: str = Field(None, alias="image")
|
|
smiles: str = Field(None, alias="smiles")
|
|
name: str = Field(None, alias="name")
|
|
|
|
@staticmethod
|
|
def resolve_image(obj: CompoundStructure):
|
|
return f"{obj.url}?image=svg"
|
|
|
|
|
|
class NodeSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
confidenceScenarios: List[SimpleScenario] = Field([], alias="confidence_scenarios")
|
|
defaultStructure: NodeCompoundStructure = Field(None, alias="default_node_label")
|
|
depth: int = Field(None, alias="depth")
|
|
description: str = Field(None, alias="description")
|
|
engineeredIntermediate: bool = Field(None, alias="engineered_intermediate")
|
|
halflifes: Dict[str, str] = Field({}, alias="halflife")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "node"
|
|
image: str = Field(None, alias="image")
|
|
name: str = Field(None, alias="name")
|
|
proposedValues: List[Dict[str, str]] = Field([], alias="proposed_values")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
smiles: str = Field(None, alias="default_node_label.smiles")
|
|
structures: List["CompoundStructureSchema"] = Field([], alias="node_labels")
|
|
|
|
@staticmethod
|
|
def resolve_engineered_intermediate(obj: Node):
|
|
# TODO
|
|
return False
|
|
|
|
@staticmethod
|
|
def resolve_image(obj: Node):
|
|
return f"{obj.default_node_label.url}?image=svg"
|
|
|
|
@staticmethod
|
|
def resolve_proposed_values(obj: Node):
|
|
# TODO
|
|
# {
|
|
# "scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/1f1a0b67-ce57-4f5a-929c-4fabf3a791fd",
|
|
# "scenarioName": "Annex IIA 7.1.1 a), Brumhard (2003) - (00000)"
|
|
# }
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Node):
|
|
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node",
|
|
response={200: NodeWrapper, 403: Error},
|
|
)
|
|
def get_package_pathway_nodes(request, package_uuid, pathway_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"node": Pathway.objects.get(package=p, uuid=pathway_uuid).nodes}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Nodes for Pathway with id {pathway_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node/{uuid:node_uuid}",
|
|
response={200: NodeSchema, 403: Error},
|
|
)
|
|
def get_package_pathway_node(request, package_uuid, pathway_uuid, node_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
return Node.objects.get(pathway=pw, uuid=node_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Node with id {node_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreateNode(Schema):
|
|
nodeAsSmiles: str
|
|
nodeName: str | None = None
|
|
nodeReason: str | None = None
|
|
nodeDepth: str | None = None
|
|
|
|
|
|
@router.post(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node",
|
|
response={200: str | Any, 403: Error},
|
|
)
|
|
def add_pathway_node(request, package_uuid, pathway_uuid, n: Form[CreateNode]):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
|
|
if n.nodeDepth is not None and n.nodeDepth.strip() != "":
|
|
node_depth = int(n.nodeDepth)
|
|
else:
|
|
node_depth = -1
|
|
|
|
n = Node.create(pw, n.nodeAsSmiles, node_depth, n.nodeName, n.nodeReason)
|
|
|
|
return redirect(n.url)
|
|
except ValueError:
|
|
return 403, {"message": "Adding node failed!"}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node/{uuid:node_uuid}")
|
|
def delete_node(request, package_uuid, pathway_uuid, node_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.writable(request.user, p):
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
n = Node.objects.get(pathway=pw, uuid=node_uuid)
|
|
n.delete()
|
|
return redirect(f"{pw.url}/node")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete this Node!")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Node with id {node_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
########
|
|
# Edge #
|
|
########
|
|
class EdgeWrapper(Schema):
|
|
edge: List["SimpleEdge"]
|
|
|
|
|
|
class EdgeNode(SimpleNode):
|
|
image: str = Field(None, alias="image")
|
|
|
|
@staticmethod
|
|
def resolve_image(obj: Node):
|
|
return f"{obj.default_node_label.url}?image=svg"
|
|
|
|
|
|
class EdgeSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
ecNumbers: List[str] = Field([], alias="ec_numbers")
|
|
endNodes: List["EdgeNode"] = Field([], alias="end_nodes")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "edge"
|
|
name: str = Field(None, alias="name")
|
|
reactionName: str = Field(None, alias="edge_label.name")
|
|
reactionURI: str = Field(None, alias="edge_label.url")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
startNodes: List["EdgeNode"] = Field([], alias="start_nodes")
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Node):
|
|
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge",
|
|
response={200: EdgeWrapper, 403: Error},
|
|
)
|
|
def get_package_pathway_edges(request, package_uuid, pathway_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"edge": Pathway.objects.get(package=p, uuid=pathway_uuid).edges}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Edges for Pathway with id {pathway_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge/{uuid:edge_uuid}",
|
|
response={200: EdgeSchema, 403: Error},
|
|
)
|
|
def get_package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
return Edge.objects.get(pathway=pw, uuid=edge_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Edge with id {edge_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreateEdge(Schema):
|
|
edgeAsSmirks: str | None = None
|
|
educts: str | None = None # Node URIs comma sep
|
|
products: str | None = None # Node URIs comma sep
|
|
multistep: str | None = None
|
|
edgeReason: str | None = None
|
|
|
|
|
|
@router.post(
|
|
"/package/{uuid:package_uuid}/üathway/{uuid:pathway_uuid}/edge",
|
|
response={200: str | Any, 403: Error},
|
|
)
|
|
def add_pathway_edge(request, package_uuid, pathway_uuid, e: Form[CreateEdge]):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
|
|
if e.edgeAsSmirks is None and (e.educts is None or e.products is None):
|
|
raise ValueError("Either SMIRKS or educt/product must be provided")
|
|
|
|
if e.edgeAsSmirks is not None and (e.educts is not None and e.products is not None):
|
|
raise ValueError("SMIRKS and educt/product provided!")
|
|
|
|
educts = []
|
|
products = []
|
|
|
|
if e.edgeAsSmirks:
|
|
for ed in e.edgeAsSmirks.split(">>")[0].split("\\."):
|
|
educts.append(Node.objects.get(pathway=pw, default_node_label__smiles=ed))
|
|
|
|
for pr in e.edgeAsSmirks.split(">>")[1].split("\\."):
|
|
products.append(Node.objects.get(pathway=pw, default_node_label__smiles=pr))
|
|
else:
|
|
for ed in e.educts.split(","):
|
|
educts.append(Node.objects.get(pathway=pw, url=ed.strip()))
|
|
|
|
for pr in e.products.split(","):
|
|
products.append(Node.objects.get(pathway=pw, url=pr.strip()))
|
|
|
|
new_e = Edge.create(
|
|
pathway=pw,
|
|
start_nodes=educts,
|
|
end_nodes=products,
|
|
rule=None,
|
|
name=e.name,
|
|
description=e.edgeReason,
|
|
)
|
|
|
|
return redirect(new_e.url)
|
|
except ValueError:
|
|
return 403, {"message": "Adding node failed!"}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge/{uuid:edge_uuid}")
|
|
def delete_edge(request, package_uuid, pathway_uuid, edge_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.writable(request.user, p):
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
e = Edge.objects.get(pathway=pw, uuid=edge_uuid)
|
|
e.delete()
|
|
return redirect(f"{pw.url}/edge")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete this Edge!")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Edge with id {edge_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
#########
|
|
# Model #
|
|
#########
|
|
class ModelWrapper(Schema):
|
|
relative_reasoning: List["SimpleModel"] = Field(..., alias="relative-reasoning")
|
|
|
|
|
|
class ModelSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
evalPackages: List["SimplePackage"] = Field([])
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "relative-reasoning"
|
|
# "info" : {
|
|
# "Accuracy (Single-Gen)" : "0.5932962678936605" ,
|
|
# "Area under PR-Curve (Single-Gen)" : "0.5654653182134282" ,
|
|
# "Area under ROC-Curve (Single-Gen)" : "0.8178302405034772" ,
|
|
# "Precision (Single-Gen)" : "0.6978730822873083" ,
|
|
# "Probability Threshold" : "0.5" ,
|
|
# "Recall/Sensitivity (Single-Gen)" : "0.4484149210261006"
|
|
# } ,
|
|
name: str = Field(None, alias="name")
|
|
pathwayPackages: List["SimplePackage"] = Field([])
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
rulePackages: List["SimplePackage"] = Field([])
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
status: str
|
|
statusMessage: str
|
|
threshold: str
|
|
type: str
|
|
|
|
|
|
@router.get("/model", response={200: ModelWrapper, 403: Error})
|
|
def get_models(request):
|
|
pass
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/model", response={200: ModelWrapper, 403: Error})
|
|
def get_package_models(request, package_uuid, model_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return EPModel.objects.filter(package=p)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Reaction with id {model_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class Classify(Schema):
|
|
smiles: str | None = None
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/model/{uuid:model_uuid}",
|
|
response={200: ModelSchema | Any, 403: Error, 400: Error},
|
|
)
|
|
def get_model(request, package_uuid, model_uuid, c: Query[Classify]):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
mod = EPModel.objects.get(package=p, uuid=model_uuid)
|
|
|
|
if c.smiles:
|
|
if c.smiles == "":
|
|
return 400, {"message": "Received empty SMILES"}
|
|
|
|
try:
|
|
stand_smiles = FormatConverter.standardize(c.smiles)
|
|
except ValueError:
|
|
return 400, {"message": f'"{c.smiles}" is not a valid SMILES'}
|
|
|
|
from epdb.tasks import dispatch_eager, predict_simple
|
|
|
|
pred_res = dispatch_eager(request.user, predict_simple, mod.pk, stand_smiles)
|
|
|
|
result = []
|
|
|
|
for pr in pred_res:
|
|
if len(pr) > 0:
|
|
products = []
|
|
for prod_set in pr.product_sets:
|
|
products.append(tuple([x for x in prod_set]))
|
|
|
|
res = {
|
|
"probability": pr.probability,
|
|
"products": list(set(products)),
|
|
}
|
|
|
|
if pr.rule:
|
|
res["id"] = pr.rule.url
|
|
res["identifier"] = pr.rule.get_rule_identifier()
|
|
res["name"] = pr.rule.name
|
|
res["reviewStatus"] = (
|
|
"reviewed" if pr.rule.package.reviewed else "unreviewed"
|
|
)
|
|
|
|
result.append(res)
|
|
|
|
return result
|
|
|
|
return mod
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Reaction with id {model_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/model/{uuid:model_uuid}")
|
|
def delete_model(request, package_uuid, model_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.writable(request.user, p):
|
|
m = EPModel.objects.get(package=p, uuid=model_uuid)
|
|
m.delete()
|
|
return redirect(f"{p.url}/model")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete this Model!")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Model with id {model_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
###########
|
|
# Setting #
|
|
###########
|
|
class SettingWrapper(Schema):
|
|
setting: List["SimpleSetting"]
|
|
|
|
|
|
class SettingSchema(Schema):
|
|
duplicationHash: int = -1
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "setting"
|
|
includedPackages: List["SimplePackage"] = Field([], alias="rule_packages")
|
|
isPublic: bool = Field(None, alias="public")
|
|
name: str = Field(None, alias="name")
|
|
normalizationRules: List["SimpleRule"] = Field([], alias="normalization_rules")
|
|
propertyPlugins: List[str] = Field([], alias="property_plugins")
|
|
truncationstrategy: Optional[str] = Field(None, alias="truncation_strategy")
|
|
|
|
|
|
@router.get("/setting", response={200: SettingWrapper, 403: Error})
|
|
def get_settings(request):
|
|
return {"setting": SettingManager.get_all_settings(request.user)}
|
|
|
|
|
|
@router.get("/setting/{uuid:setting_uuid}", response={200: SettingSchema, 403: Error})
|
|
def get_setting(request, setting_uuid):
|
|
try:
|
|
return SettingManager.get_setting_by_id(request.user, setting_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Setting with id {setting_uuid} failed due to insufficient rights!"
|
|
}
|