Files
enviPy-bayer/epdb/legacy_api.py
2025-11-20 09:56:11 +13:00

1901 lines
63 KiB
Python

from typing import Any, Dict, List, Optional
import nh3
from django.conf import settings as s
from django.contrib.auth import get_user_model
from django.http import HttpResponse
from django.shortcuts import redirect
from ninja import Field, Form, Router, Schema, Query
from ninja.security import SessionAuth
from utilities.chem import FormatConverter
from utilities.misc import PackageExporter
from .logic import GroupManager, PackageManager, SettingManager, UserManager, SearchManager
from .models import (
Compound,
CompoundStructure,
Edge,
EPModel,
Node,
Pathway,
Reaction,
Rule,
Scenario,
SimpleAmbitRule,
User,
UserPackagePermission,
ParallelRule,
)
Package = s.GET_PACKAGE_MODEL()
def _anonymous_or_real(request):
if request.user.is_authenticated and not request.user.is_anonymous:
return request.user
return get_user_model().objects.get(username="anonymous")
router = Router(auth=SessionAuth(csrf=False))
class Error(Schema):
message: str
#################
# SimpleObjects #
#################
class SimpleUser(Schema):
id: str = Field(None, alias="url")
identifier: str = "user"
name: str = Field(None, alias="username")
email: str = Field(None, alias="email")
class SimpleGroup(Schema):
id: str = Field(None, alias="url")
identifier: str = "group"
name: str = Field(None, alias="name")
class SimpleSetting(Schema):
id: str = Field(None, alias="url")
identifier: str = "setting"
name: str = Field(None, alias="name")
class SimpleObject(Schema):
id: str = Field(None, alias="url")
name: str = Field(None, alias="name")
reviewStatus: str = Field(None, alias="review_status")
@staticmethod
def resolve_review_status(obj):
if isinstance(obj, Package):
return "reviewed" if obj.reviewed else "unreviewed"
elif hasattr(obj, "package"):
return "reviewed" if obj.package.reviewed else "unreviewed"
elif isinstance(obj, CompoundStructure):
return "reviewed" if obj.compound.package.reviewed else "unreviewed"
elif isinstance(obj, Node) or isinstance(obj, Edge):
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
else:
raise ValueError("Object has no package")
class SimplePackage(SimpleObject):
identifier: str = "package"
class SimpleCompound(SimpleObject):
identifier: str = "compound"
class SimpleCompoundStructure(SimpleObject):
identifier: str = "structure"
class SimpleRule(SimpleObject):
identifier: str = "rule"
@staticmethod
def resolve_url(obj: Rule):
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
class SimpleReaction(SimpleObject):
identifier: str = "reaction"
class SimpleScenario(SimpleObject):
identifier: str = "scenario"
class SimplePathway(SimpleObject):
identifier: str = "pathway"
class SimpleNode(SimpleObject):
identifier: str = "node"
class SimpleEdge(SimpleObject):
identifier: str = "edge"
class SimpleModel(SimpleObject):
identifier: str = "relative-reasoning"
################
# Login/Logout #
################
@router.post("/", response={200: SimpleUser, 403: Error}, auth=None)
def login(request, loginusername: Form[str], loginpassword: Form[str]):
from django.contrib.auth import authenticate, login
email = User.objects.get(username=loginusername).email
user = authenticate(username=email, password=loginpassword)
if user:
login(request, user)
return user
else:
return 403, {"message": "Invalid username and/or password"}
########
# User #
########
class UserWrapper(Schema):
user: List[SimpleUser]
class UserSchema(Schema):
affiliation: Dict[str, str] = Field(None, alias="affiliation")
defaultGroup: "SimpleGroup" = Field(None, alias="default_group")
defaultPackage: "SimplePackage" = Field(None, alias="default_package")
defaultSetting: "SimpleSetting" = Field(None, alias="default_setting")
email: str = Field(None, alias="email")
forename: str = "not specified"
groups: List["SimpleGroup"] = Field([], alias="groups")
id: str = Field(None, alias="url")
identifier: str = "user"
link: str = "empty"
name: str = Field(None, alias="username")
surname: str = "not specified"
settings: List["SimpleSetting"] = Field([], alias="settings")
@staticmethod
def resolve_affiliation(obj: User):
# TODO
return {"city": "not specified", "country": "not specified", "workplace": "not specified"}
@staticmethod
def resolve_settings(obj: User):
return SettingManager.get_all_settings(obj)
class Me(Schema):
whoami: str | None = None
@router.get("/user", response={200: UserWrapper, 403: Error})
def get_users(request, me: Query[Me]):
if me.whoami:
return {"user": [request.user]}
else:
return {"user": User.objects.all()}
@router.get("/user/{uuid:user_uuid}", response={200: UserSchema, 403: Error})
def get_user(request, user_uuid):
try:
u = UserManager.get_user_by_id(request.user, user_uuid)
return u
except ValueError:
return 403, {
"message": f"Getting User with id {user_uuid} failed due to insufficient rights!"
}
class Search(Schema):
packages: List[str] = Field(alias="packages[]")
search: str
method: str
@router.get("/search", response={200: Any, 403: Error})
def search(request, search: Query[Search]):
try:
packs = []
for package in search.packages:
packs.append(PackageManager.get_package_by_url(request.user, package))
method = None
if search.method == "text":
method = "text"
elif search.method == "inchikey":
method = "inchikey"
elif search.method == "defaultSmiles":
method = "default"
elif search.method == "canonicalSmiles":
method = "canonical"
elif search.method == "exactSmiles":
method = "exact"
if method is None:
raise ValueError(f"Search method {search.method} is not supported!")
search_res = SearchManager.search(packs, search.search, method)
res = {}
if "Compounds" in search_res:
res["compound"] = search_res["Compounds"]
if "Compound Structures" in search_res:
res["structure"] = search_res["Compound Structures"]
if "Reaction" in search_res:
res["reaction"] = search_res["Reaction"]
if "Pathway" in search_res:
res["pathway"] = search_res["Pathway"]
if "Rules" in search_res:
res["rule"] = search_res["Rules"]
for key in res:
for v in res[key]:
v["id"] = v["url"].replace("simple-ambit-rule", "simple-rule")
return res
except ValueError as e:
return 403, {"message": f"Search failed due to {e}"}
###########
# Package #
###########
class PackageWrapper(Schema):
package: List["PackageSchema"]
class PackageSchema(Schema):
description: str = Field(None, alias="description")
id: str = Field(None, alias="url")
links: List[Dict[str, List[str | int]]] = Field([], alias="links")
name: str = Field(None, alias="name")
primaryGroup: Optional[SimpleGroup] = None
readers: List[Dict[str, str]] = Field([], alias="readers")
reviewComment: str = Field(None, alias="review_comment")
reviewStatus: str = Field(None, alias="review_status")
writers: List[Dict[str, str]] = Field([], alias="writers")
@staticmethod
def resolve_links(obj: Package):
return [
{"Pathways": [f"{obj.url}/pathway", obj.pathways.count()]},
{"Rules": [f"{obj.url}/rule", obj.rules.count()]},
{"Compounds": [f"{obj.url}/compound", obj.compounds.count()]},
{"Reactions": [f"{obj.url}/reaction", obj.reactions.count()]},
{"Relative Reasoning": [f"{obj.url}/relative-reasoning", obj.models.count()]},
{"Scenarios": [f"{obj.url}/scenario", obj.scenarios.count()]},
]
@staticmethod
def resolve_readers(obj: Package):
users = User.objects.filter(
id__in=UserPackagePermission.objects.filter(
package=obj, permission=UserPackagePermission.READ[0]
).values_list("user", flat=True)
).distinct()
return [{u.id: u.name} for u in users]
@staticmethod
def resolve_writers(obj: Package):
users = User.objects.filter(
id__in=UserPackagePermission.objects.filter(
package=obj, permission=UserPackagePermission.WRITE[0]
).values_list("user", flat=True)
).distinct()
return [{u.id: u.name} for u in users]
@staticmethod
def resolve_review_comment(obj):
return ""
@staticmethod
def resolve_review_status(obj):
return "reviewed" if obj.reviewed else "unreviewed"
@router.get("/package", response={200: PackageWrapper, 403: Error})
def get_packages(request):
return {
"package": PackageManager.get_all_readable_packages(request.user, include_reviewed=True)
}
class GetPackage(Schema):
exportAsJson: str | None = None
@router.get("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 403: Error})
def get_package(request, package_uuid, gp: Query[GetPackage]):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if gp.exportAsJson and gp.exportAsJson.strip() == "true":
return PackageExporter(p).do_export()
return p
except ValueError:
return 403, {
"message": f"Getting Package with id {package_uuid} failed due to insufficient rights!"
}
class CreatePackage(Schema):
packageName: str
packageDescription: str | None = None
@router.post("/package")
def create_packages(
request,
p: Form[CreatePackage],
):
try:
if p.packageName.strip() == "":
raise ValueError("Package name cannot be empty!")
new_pacakge = PackageManager.create_package(
request.user, p.packageName, p.packageDescription
)
return redirect(new_pacakge.url)
except ValueError as e:
return 400, {"message": str(e)}
class UpdatePackage(Schema):
packageDescription: str | None = None
hiddenMethod: str | None = None
permissions: str | None = None
ppsURI: str | None = None
read: str | None = None
write: str | None = None
@router.post("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 400: Error})
def update_package(request, package_uuid, pack: Form[UpdatePackage]):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if pack.hiddenMethod:
if pack.hiddenMethod == "DELETE":
p.delete()
elif pack.packageDescription is not None:
description = nh3.clean(pack.packageDescription, tags=s.ALLOWED_HTML_TAGS).strip()
if description:
p.description = description
p.save()
return HttpResponse(status=200)
else:
raise ValueError("Package description cannot be empty!")
elif all([pack.permissions, pack.ppsURI, pack.read]):
if "group" in pack.ppsURI:
grantee = GroupManager.get_group_lp(pack.ppsURI)
else:
grantee = UserManager.get_user_lp(pack.ppsURI)
PackageManager.grant_read(request.user, p, grantee)
return HttpResponse(status=200)
elif all([pack.permissions, pack.ppsURI, pack.write]):
if "group" in pack.ppsURI:
grantee = GroupManager.get_group_lp(pack.ppsURI)
else:
grantee = UserManager.get_user_lp(pack.ppsURI)
PackageManager.grant_write(request.user, p, grantee)
return HttpResponse(status=200)
except ValueError as e:
return 400, {"message": str(e)}
@router.delete("/package/{uuid:package_uuid}")
def delete_package(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.administrable(request.user, p):
p.delete()
return redirect(f"{s.SERVER_URL}/package")
else:
raise ValueError("You do not have the rights to delete this Package!")
except ValueError:
return 403, {
"message": f"Deleting Package with id {package_uuid} failed due to insufficient rights!"
}
################################
# Compound / CompoundStructure #
################################
class CompoundWrapper(Schema):
compound: List["SimpleCompound"]
class CompoundPathwayScenario(Schema):
scenarioId: str
scenarioName: str
scenarioType: str
class CompoundSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
externalReferences: Dict[str, List[str]] = Field(None, alias="external_references")
id: str = Field(None, alias="url")
halflifes: List[Dict[str, str]] = Field([], alias="halflifes")
identifier: str = "compound"
imageSize: int = 600
name: str = Field(None, alias="name")
pathwayScenarios: List[CompoundPathwayScenario] = Field([], alias="pathway_scenarios")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
pubchemCompoundReferences: List[str] = Field([], alias="pubchem_compound_references")
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
reviewStatus: str = Field(False, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
structures: List["CompoundStructureSchema"] = []
@staticmethod
def resolve_review_status(obj: CompoundStructure):
return "reviewed" if obj.package.reviewed else "unreviewed"
@staticmethod
def resolve_external_references(obj: Compound):
# TODO
return {}
@staticmethod
def resolve_structures(obj: Compound):
return CompoundStructure.objects.filter(compound=obj)
@staticmethod
def resolve_halflifes(obj: Compound):
return []
@staticmethod
def resolve_pubchem_compound_references(obj: Compound):
return []
@staticmethod
def resolve_pathway_scenarios(obj: Compound):
return [
{
"scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/cd8350cd-4249-4111-ba9f-4e2209338501",
"scenarioName": "Fritz, R. & Brauner, A. (1989) - (00004)",
"scenarioType": "Soil",
}
]
class CompoundStructureSchema(Schema):
InChI: str = Field(None, alias="inchi")
aliases: List[str] = Field([], alias="aliases")
canonicalSmiles: str = Field(None, alias="canonical_smiles")
charge: int = Field(None, alias="charge")
description: str = Field(None, alias="description")
externalReferences: Dict[str, List[str]] = Field(None, alias="external_references")
formula: str = Field(None, alias="formula")
halflifes: List[Dict[str, str]] = Field([], alias="halflifes")
id: str = Field(None, alias="url")
identifier: str = "structure"
imageSize: int = 600
inchikey: str = Field(None, alias="inchikey")
isDefaultStructure: bool = Field(None, alias="is_default_structure")
mass: float = Field(None, alias="mass")
name: str = Field(None, alias="name")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
pubchemCompoundReferences: List[str] = Field([], alias="pubchem_compound_references")
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
smiles: str = Field(None, alias="smiles")
@staticmethod
def resolve_review_status(obj: CompoundStructure):
return "reviewed" if obj.compound.package.reviewed else "unreviewed"
@staticmethod
def resolve_inchi(obj: CompoundStructure):
return FormatConverter.InChI(obj.smiles)
@staticmethod
def resolve_charge(obj: CompoundStructure):
return FormatConverter.charge(obj.smiles)
@staticmethod
def resolve_formula(obj: CompoundStructure):
return FormatConverter.formula(obj.smiles)
@staticmethod
def resolve_mass(obj: CompoundStructure):
return FormatConverter.mass(obj.smiles)
@staticmethod
def resolve_external_references(obj: CompoundStructure):
# TODO
return {}
@staticmethod
def resolve_halflifes(obj: CompoundStructure):
return []
@staticmethod
def resolve_pubchem_compound_references(obj: CompoundStructure):
return []
@staticmethod
def resolve_pathway_scenarios(obj: CompoundStructure):
return [
{
"scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/cd8350cd-4249-4111-ba9f-4e2209338501",
"scenarioName": "Fritz, R. & Brauner, A. (1989) - (00004)",
"scenarioType": "Soil",
}
]
class CompoundStructureWrapper(Schema):
structure: List["SimpleCompoundStructure"]
@router.get("/compound", response={200: CompoundWrapper, 403: Error})
def get_compounds(request):
return {
"compound": Compound.objects.filter(
package__in=PackageManager.get_reviewed_packages()
).prefetch_related("package")
}
@router.get("/package/{uuid:package_uuid}/compound", response={200: CompoundWrapper, 403: Error})
def get_package_compounds(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"compound": Compound.objects.filter(package=p).prefetch_related("package")}
except ValueError:
return 403, {
"message": f"Getting Compounds for Package with id {package_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}",
response={200: CompoundSchema, 403: Error},
)
def get_package_compound(request, package_uuid, compound_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return Compound.objects.get(package=p, uuid=compound_uuid)
except ValueError:
return 403, {
"message": f"Getting Compound with id {compound_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure",
response={200: CompoundStructureWrapper, 403: Error},
)
def get_package_compound_structures(request, package_uuid, compound_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"structure": Compound.objects.get(package=p, uuid=compound_uuid).structures.all()}
except ValueError:
return 403, {
"message": f"Getting CompoundStructures for Compound with id {compound_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure/{uuid:structure_uuid}",
response={200: CompoundStructureSchema, 403: Error},
)
def get_package_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return CompoundStructure.objects.get(
uuid=structure_uuid, compound=Compound.objects.get(package=p, uuid=compound_uuid)
)
except ValueError:
return 403, {
"message": f"Getting CompoundStructure with id {structure_uuid} failed due to insufficient rights!"
}
class CreateCompound(Schema):
compoundSmiles: str
compoundName: str | None = None
compoundDescription: str | None = None
inchi: str | None = None
@router.post("/package/{uuid:package_uuid}/compound")
def create_package_compound(
request,
package_uuid,
c: Form[CreateCompound],
):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
# inchi is not used atm
c = Compound.create(
p, c.compoundSmiles, c.compoundName, c.compoundDescription, inchi=c.inchi
)
return redirect(c.url)
except ValueError as e:
return 400, {"message": str(e)}
@router.delete("/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}")
def delete_compound(request, package_uuid, compound_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
c = Compound.objects.get(package=p, uuid=compound_uuid)
c.delete()
return redirect(f"{p.url}/compound")
else:
raise ValueError("You do not have the rights to delete this Compound!")
except ValueError:
return 403, {
"message": f"Deleting Compound with id {compound_uuid} failed due to insufficient rights!"
}
@router.delete(
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure/{uuid:structure_uuid}"
)
def delete_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
c = Compound.objects.get(package=p, uuid=compound_uuid)
cs = CompoundStructure.objects.get(compound=c, uuid=structure_uuid)
# Check if we have to delete the compound as no structure is left
if len(cs.compound.structures.all()) == 1:
# This will delete the structure as well
c.delete()
return redirect(p.url + "/compound")
else:
if cs.normalized_structure:
c.delete()
return redirect(p.url + "/compound")
else:
if c.default_structure == cs:
cs.delete()
c.default_structure = c.structures.all().first()
return redirect(c.url + "/structure")
else:
cs.delete()
return redirect(c.url + "/structure")
else:
raise ValueError("You do not have the rights to delete this CompoundStructure!")
except ValueError:
return 403, {
"message": f"Deleting CompoundStructure with id {compound_uuid} failed due to insufficient rights!"
}
#########
# Rules #
#########
class RuleWrapper(Schema):
rule: List["SimpleRule"]
class SimpleRuleSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
engine: str = "ambit"
id: str = Field(None, alias="url")
identifier: str = Field(None, alias="identifier")
isCompositeRule: bool = False
name: str = Field(None, alias="name")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
productFilterSmarts: str = Field("", alias="product_filter_smarts")
productSmarts: str = Field(None, alias="products_smarts")
reactantFilterSmarts: str = Field("", alias="reactant_filter_smarts")
reactantSmarts: str = Field(None, alias="reactants_smarts")
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
smirks: str = Field("", alias="smirks")
# TODO
transformations: str = Field("", alias="transformations")
@staticmethod
def resolve_url(obj: Rule):
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
@staticmethod
def resolve_identifier(obj: Rule):
if "simple-rule" in obj.url:
return "simple-rule"
if "simple-ambit-rule" in obj.url:
return "simple-rule"
elif "parallel-rule" in obj.url:
return "parallel-rule"
elif "sequential-rule" in obj.url:
return "sequential-rule"
else:
return None
@staticmethod
def resolve_review_status(obj: Rule):
return "reviewed" if obj.package.reviewed else "unreviewed"
@staticmethod
def resolve_product_filter_smarts(obj: Rule):
return obj.product_filter_smarts if obj.product_filter_smarts else ""
@staticmethod
def resolve_reactant_filter_smarts(obj: Rule):
return obj.reactant_filter_smarts if obj.reactant_filter_smarts else ""
class CompositeRuleSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
id: str = Field(None, alias="url")
identifier: str = Field(None, alias="identifier")
isCompositeRule: bool = True
name: str = Field(None, alias="name")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
productFilterSmarts: str = Field("", alias="product_filter_smarts")
reactantFilterSmarts: str = Field("", alias="reactant_filter_smarts")
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
simpleRules: List["SimpleRule"] = Field([], alias="simple_rules")
@staticmethod
def resolve_ec_numbers(obj: Rule):
return []
@staticmethod
def resolve_url(obj: Rule):
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
@staticmethod
def resolve_identifier(obj: Rule):
if "simple-rule" in obj.url:
return "simple-rule"
if "simple-ambit-rule" in obj.url:
return "simple-rule"
elif "parallel-rule" in obj.url:
return "parallel-rule"
elif "sequential-rule" in obj.url:
return "sequential-rule"
else:
return None
@staticmethod
def resolve_review_status(obj: Rule):
return "reviewed" if obj.package.reviewed else "unreviewed"
@staticmethod
def resolve_product_filter_smarts(obj: Rule):
return obj.product_filter_smarts if obj.product_filter_smarts else ""
@staticmethod
def resolve_reactant_filter_smarts(obj: Rule):
return obj.reactant_filter_smarts if obj.reactant_filter_smarts else ""
@router.get("/rule", response={200: RuleWrapper, 403: Error})
def get_rules(request):
return {
"rule": Rule.objects.filter(
package__in=PackageManager.get_reviewed_packages()
).prefetch_related("package")
}
@router.get("/package/{uuid:package_uuid}/rule", response={200: RuleWrapper, 403: Error})
def get_package_rules(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"rule": Rule.objects.filter(package=p).prefetch_related("package")}
except ValueError:
return 403, {
"message": f"Getting Rules for Package with id {package_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}",
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
)
def get_package_rule(request, package_uuid, rule_uuid):
return _get_package_rule(request, package_uuid, rule_uuid)
@router.get(
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
)
def get_package_simple_rule(request, package_uuid, rule_uuid):
return _get_package_rule(request, package_uuid, rule_uuid)
@router.get(
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
)
def get_package_parallel_rule(request, package_uuid, rule_uuid):
return _get_package_rule(request, package_uuid, rule_uuid)
def _get_package_rule(request, package_uuid, rule_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return Rule.objects.get(package=p, uuid=rule_uuid)
except ValueError:
return 403, {
"message": f"Getting Rule with id {rule_uuid} failed due to insufficient rights!"
}
# POST
class CreateSimpleRule(Schema):
smirks: str
name: str | None = None
description: str | None = None
reactantFilterSmarts: str | None = None
productFilterSmarts: str | None = None
immediate: str | None = None
rdkitrule: str | None = None
@router.post("/package/{uuid:package_uuid}/simple-rule")
def create_package_simple_rule(
request,
package_uuid,
r: Form[CreateSimpleRule],
):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if r.rdkitrule and r.rdkitrule.strip() == "true":
raise ValueError("Not yet implemented!")
else:
sr = SimpleAmbitRule.create(
p, r.name, r.description, r.smirks, r.reactantFilterSmarts, r.productFilterSmarts
)
return redirect(sr.url)
except ValueError as e:
return 400, {"message": str(e)}
class CreateParallelRule(Schema):
simpleRules: str
name: str | None = None
description: str | None = None
reactantFilterSmarts: str | None = None
productFilterSmarts: str | None = None
immediate: str | None = None
@router.post("/package/{uuid:package_uuid}/parallel-rule")
def create_package_parallel_rule(
request,
package_uuid,
r: Form[CreateParallelRule],
):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
srs = SimpleRule.objects.filter(package=p, url__in=r.simpleRules)
if srs.count() != len(r.simpleRules):
raise ValueError(
f"Not all SimpleRules could be found in Package with id {package_uuid}!"
)
sr = ParallelRule.create(
p, list(srs), r.name, r.description, r.reactantFilterSmarts, r.productFilterSmarts
)
return redirect(sr.url)
except ValueError as e:
return 400, {"message": str(e)}
@router.post(
"/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}", response={200: str | Any, 403: Error}
)
def post_package_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
@router.post(
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
response={200: str | Any, 403: Error},
)
def post_package_simple_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
@router.post(
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
response={200: str | Any, 403: Error},
)
def post_package_parallel_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
def _post_package_rule(request, package_uuid, rule_uuid, compound: Form[str]):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
r = Rule.objects.get(package=p, uuid=rule_uuid)
if compound is not None:
if not compound.split():
return 400, {"message": "Compound is empty"}
product_sets = r.apply(compound)
res = []
for p_set in product_sets:
for product in p_set:
res.append(product)
return HttpResponse("\n".join(res), content_type="text/plain")
return r
except ValueError:
return 403, {
"message": f"Getting Rule with id {rule_uuid} failed due to insufficient rights!"
}
@router.delete("/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}")
def delete_rule(request, package_uuid, rule_uuid):
return _delete_rule(request, package_uuid, rule_uuid)
@router.delete(
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
)
def delete_simple_rule(request, package_uuid, rule_uuid):
return _delete_rule(request, package_uuid, rule_uuid)
@router.delete(
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
)
def delete_parallel_rule(request, package_uuid, rule_uuid):
return _delete_rule(request, package_uuid, rule_uuid)
def _delete_rule(request, package_uuid, rule_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
r = Rule.objects.get(package=p, uuid=rule_uuid)
r.delete()
return redirect(f"{p.url}/rule")
else:
raise ValueError("You do not have the rights to delete this Rule!")
except ValueError:
return 403, {
"message": f"Deleting Rule with id {rule_uuid} failed due to insufficient rights!"
}
############
# Reaction #
############
class ReactionWrapper(Schema):
reaction: List["SimpleReaction"]
class ReactionCompoundStructure(Schema):
compoundName: str = Field(None, alias="name")
id: str = Field(None, alias="url")
smiles: str = Field(None, alias="smiles")
class ReactionSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
educts: List["ReactionCompoundStructure"] = Field([], alias="educts")
id: str = Field(None, alias="url")
identifier: str = "reaction"
medlineRefs: List[str] = Field([], alias="medline_references")
multistep: bool = Field(None, alias="multi_step")
name: str = Field(None, alias="name")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
products: List["ReactionCompoundStructure"] = Field([], alias="products")
references: List[Dict[str, List[str]]] = Field([], alias="references")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
smirks: str = Field("", alias="smirks")
@staticmethod
def resolve_smirks(obj: Reaction):
return obj.smirks()
@staticmethod
def resolve_ec_numbers(obj: Reaction):
# TODO fetch via scenario EnzymeAI
return []
@staticmethod
def resolve_references(obj: Reaction):
# TODO
return []
@staticmethod
def resolve_medline_references(obj: Reaction):
# TODO
return []
@staticmethod
def resolve_review_status(obj: Rule):
return "reviewed" if obj.package.reviewed else "unreviewed"
@router.get("/reaction", response={200: ReactionWrapper, 403: Error})
def get_reactions(request):
return {
"reaction": Reaction.objects.filter(
package__in=PackageManager.get_reviewed_packages()
).prefetch_related("package")
}
@router.get("/package/{uuid:package_uuid}/reaction", response={200: ReactionWrapper, 403: Error})
def get_package_reactions(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"reaction": Reaction.objects.filter(package=p).prefetch_related("package")}
except ValueError:
return 403, {
"message": f"Getting Reactions for Package with id {package_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/reaction/{uuid:reaction_uuid}",
response={200: ReactionSchema, 403: Error},
)
def get_package_reaction(request, package_uuid, reaction_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return Reaction.objects.get(package=p, uuid=reaction_uuid)
except ValueError:
return 403, {
"message": f"Getting Reaction with id {reaction_uuid} failed due to insufficient rights!"
}
class CreateReaction(Schema):
reactionName: str | None = None
reactionDescription: str | None = None
smirks: str | None = None
educt: str | None = None
product: str | None = None
rule: str | None = None
@router.post("/package/{uuid:package_uuid}/reaction")
def create_package_reaction(
request,
package_uuid,
r: Form[CreateReaction],
):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if r.smirks is None and (r.educt is None or r.product is None):
raise ValueError("Either SMIRKS or educt/product must be provided")
if r.smirks is not None and (r.educt is not None and r.product is not None):
raise ValueError("SMIRKS and educt/product provided!")
rule = None
if r.rule:
try:
rule = Rule.objects.get(package=p, url=r.rule)
except Rule.DoesNotExist:
raise ValueError(f"Rule with id {r.rule} does not exist!")
if r.educt is not None:
try:
educt_cs = CompoundStructure.objects.get(compound__package=p, url=r.educt)
except CompoundStructure.DoesNotExist:
raise ValueError(f"Compound with id {r.educt} does not exist!")
try:
product_cs = CompoundStructure.objects.get(compound__package=p, url=r.product)
except CompoundStructure.DoesNotExist:
raise ValueError(f"Compound with id {r.product} does not exist!")
new_r = Reaction.create(
p, r.reactionName, r.reactionDescription, [educt_cs], [product_cs], rule
)
else:
educts = r.smirks.split(">>")[0].split("\\.")
products = r.smirks.split(">>")[1].split("\\.")
new_r = Reaction.create(
p, r.reactionName, r.reactionDescription, educts, products, rule
)
return redirect(new_r.url)
except ValueError as e:
return 400, {"message": str(e)}
@router.delete("/package/{uuid:package_uuid}/reaction/{uuid:reaction_uuid}")
def delete_reaction(request, package_uuid, reaction_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
r = Reaction.objects.get(package=p, uuid=reaction_uuid)
r.delete()
return redirect(f"{p.url}/reaction")
else:
raise ValueError("You do not have the rights to delete this Reaction!")
except ValueError:
return 403, {
"message": f"Deleting Reaction with id {reaction_uuid} failed due to insufficient rights!"
}
############
# Scenario #
############
class ScenarioWrapper(Schema):
scenario: List["SimpleScenario"]
class ScenarioSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
collection: Dict["str", List[Dict[str, Any]]] = Field([], alias="collection")
collectionID: Optional[str] = None
description: str = Field(None, alias="description")
id: str = Field(None, alias="url")
identifier: str = "scenario"
linkedTo: List[Dict[str, str]] = Field([], alias="linked_to")
name: str = Field(None, alias="name")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
relatedScenarios: List[Dict[str, str]] = Field([], alias="related_scenarios")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
type: str = Field(None, alias="scenario_type")
@staticmethod
def resolve_collection(obj: Scenario):
return obj.additional_information
@staticmethod
def resolve_review_status(obj: Rule):
return "reviewed" if obj.package.reviewed else "unreviewed"
@router.get("/scenario", response={200: ScenarioWrapper, 403: Error})
def get_scenarios(request):
return {
"scenario": Scenario.objects.filter(
package__in=PackageManager.get_reviewed_packages()
).prefetch_related("package")
}
@router.get("/package/{uuid:package_uuid}/scenario", response={200: ScenarioWrapper, 403: Error})
def get_package_scenarios(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"scenario": Scenario.objects.filter(package=p).prefetch_related("package")}
except ValueError:
return 403, {
"message": f"Getting Scenarios for Package with id {package_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/scenario/{uuid:scenario_uuid}",
response={200: ScenarioSchema, 403: Error},
)
def get_package_scenario(request, package_uuid, scenario_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return Scenario.objects.get(package=p, uuid=scenario_uuid)
except ValueError:
return 403, {
"message": f"Getting Scenario with id {scenario_uuid} failed due to insufficient rights!"
}
@router.delete("/package/{uuid:package_uuid}/scenario")
def delete_scenarios(request, package_uuid, scenario_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
scens = Scenario.objects.filter(package=p)
scens.delete()
return redirect(f"{p.url}/scenario")
else:
raise ValueError("You do not have the rights to delete Scenarios!")
except ValueError:
return 403, {"message": "Deleting Scenarios failed due to insufficient rights!"}
@router.delete("/package/{uuid:package_uuid}/scenario/{uuid:scenario_uuid}")
def delete_scenario(request, package_uuid, scenario_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
scen = Scenario.objects.get(package=p, uuid=scenario_uuid)
scen.delete()
return redirect(f"{p.url}/scenario")
else:
raise ValueError("You do not have the rights to delete this Scenario!")
except ValueError:
return 403, {
"message": f"Deleting Scenario with id {scenario_uuid} failed due to insufficient rights!"
}
###########
# Pathway #
###########
class PathwayWrapper(Schema):
pathway: List["SimplePathway"]
class PathwayEdge(Schema):
ecNumbers: List[str] = Field([], alias="ec_numbers")
id: str = Field(None, alias="url")
idreaction: str = Field(None, alias="edge_label.url")
multstep: bool = Field(None, alias="edge_label.multi_step")
name: str = Field(None, alias="name")
pseudo: bool = False
rule: Optional[str] = Field(None, alias="rule")
scenarios: List[SimpleScenario] = Field([], alias="scenarios")
source: int = -1
target: int = -1
@staticmethod
def resolve_rule(obj: Edge):
if obj.edge_label.rules.all().exists():
r = obj.edge_label.rules.all()[0]
if isinstance(r, SimpleAmbitRule):
return r.smirks
return None
class PathwayNode(Schema):
atomCount: int = Field(None, alias="atom_count")
depth: int = Field(None, alias="depth")
dt50s: List[Dict[str, str]] = Field([], alias="dt50s")
engineeredIntermediate: bool = Field(None, alias="engineered_intermediate")
id: str = Field(None, alias="url")
idcomp: str = Field(None, alias="default_node_label.url")
idreact: str = Field(None, alias="default_node_label.url")
image: str = Field(None, alias="image")
imageSize: int = Field(None, alias="image_size")
name: str = Field(None, alias="name")
proposed: List[Dict[str, str]] = Field([], alias="proposed_intermediate")
smiles: str = Field(None, alias="default_node_label.smiles")
@staticmethod
def resolve_atom_count(obj: Node):
from rdkit import Chem
return Chem.MolFromSmiles(obj.default_node_label.smiles).GetNumAtoms()
@staticmethod
def resolve_dt50s(obj: Node):
# TODO
return []
@staticmethod
def resolve_engineered_intermediate(obj: Node):
# TODO
return False
@staticmethod
def resolve_image(obj: Node):
return f"{obj.default_node_label.url}?image=svg"
@staticmethod
def resolve_image_size(obj: Node):
return 400
@staticmethod
def resolve_proposed_intermediate(obj: Node):
# TODO
return []
class PathwaySchema(Schema):
aliases: List[str] = Field([], alias="aliases")
completed: str = Field(None, alias="completed")
description: str = Field(None, alias="description")
id: str = Field(None, alias="url")
isIncremental: bool = Field(None, alias="is_incremental")
isPredicted: bool = Field(None, alias="is_predicted")
lastModified: int = Field(None, alias="last_modified")
links: List[PathwayEdge] = Field([], alias="edges")
name: str = Field(None, alias="name")
nodes: List[PathwayNode] = Field([], alias="nodes")
pathwayName: str = Field(None, alias="name")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
upToDate: bool = Field(None, alias="up_to_date")
@staticmethod
def resolve_review_status(obj: Pathway):
return "reviewed" if obj.package.reviewed else "unreviewed"
@staticmethod
def resolve_completed(obj: Pathway):
return str(obj.completed()).lower()
@staticmethod
def resolve_up_to_date(obj: Pathway):
return "true"
@staticmethod
def resolve_last_modified(obj: Pathway):
return int(obj.modified.timestamp())
@router.get("/pathway", response={200: PathwayWrapper, 403: Error})
def get_pathways(request):
return {
"pathway": Pathway.objects.filter(
package__in=PackageManager.get_reviewed_packages()
).prefetch_related("package")
}
@router.get("/package/{uuid:package_uuid}/pathway", response={200: PathwayWrapper, 403: Error})
def get_package_pathways(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"pathway": Pathway.objects.filter(package=p).prefetch_related("package")}
except ValueError:
return 403, {
"message": f"Getting Pathways for Package with id {package_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}",
response={200: PathwaySchema, 403: Error},
)
def get_package_pathway(request, package_uuid, pathway_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return Pathway.objects.get(package=p, uuid=pathway_uuid)
except ValueError:
return 403, {
"message": f"Getting Pathway with id {pathway_uuid} failed due to insufficient rights!"
}
class CreatePathway(Schema):
smilesinput: str
name: str | None = None
description: str | None = None
rootOnly: str | None = None
selectedSetting: str | None = None
@router.post("/package/{uuid:package_uuid}/pathway")
def create_pathway(
request,
package_uuid,
pw: Form[CreatePathway],
):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
stand_smiles = FormatConverter.standardize(pw.smilesinput.strip())
new_pw = Pathway.create(p, stand_smiles, name=pw.name, description=pw.description)
pw_mode = "predict"
if pw.rootOnly and pw.rootOnly.strip() == "true":
pw_mode = "build"
new_pw.kv.update({"mode": pw_mode})
new_pw.save()
if pw_mode == "predict":
setting = request.user.prediction_settings()
if pw.selectedSetting:
setting = SettingManager.get_setting_by_url(request.user, pw.selectedSetting)
new_pw.setting = setting
new_pw.save()
from .tasks import dispatch, predict
dispatch(request.user, predict, new_pw.pk, setting.pk, limit=-1)
return redirect(new_pw.url)
except ValueError as e:
return 400, {"message": str(e)}
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}")
def delete_pathway(request, package_uuid, pathway_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
pw.delete()
return redirect(f"{p.url}/pathway")
else:
raise ValueError("You do not have the rights to delete this pathway!")
except ValueError:
return 403, {
"message": f"Deleting Pathway with id {pathway_uuid} failed due to insufficient rights!"
}
########
# Node #
########
class NodeWrapper(Schema):
node: List["SimpleNode"]
class NodeCompoundStructure(Schema):
id: str = Field(None, alias="url")
image: str = Field(None, alias="image")
smiles: str = Field(None, alias="smiles")
name: str = Field(None, alias="name")
@staticmethod
def resolve_image(obj: CompoundStructure):
return f"{obj.url}?image=svg"
class NodeSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
confidenceScenarios: List[SimpleScenario] = Field([], alias="confidence_scenarios")
defaultStructure: NodeCompoundStructure = Field(None, alias="default_node_label")
depth: int = Field(None, alias="depth")
description: str = Field(None, alias="description")
engineeredIntermediate: bool = Field(None, alias="engineered_intermediate")
halflifes: Dict[str, str] = Field({}, alias="halflife")
id: str = Field(None, alias="url")
identifier: str = "node"
image: str = Field(None, alias="image")
name: str = Field(None, alias="name")
proposedValues: List[Dict[str, str]] = Field([], alias="proposed_values")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
smiles: str = Field(None, alias="default_node_label.smiles")
structures: List["CompoundStructureSchema"] = Field([], alias="node_labels")
@staticmethod
def resolve_engineered_intermediate(obj: Node):
# TODO
return False
@staticmethod
def resolve_image(obj: Node):
return f"{obj.default_node_label.url}?image=svg"
@staticmethod
def resolve_proposed_values(obj: Node):
# TODO
# {
# "scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/1f1a0b67-ce57-4f5a-929c-4fabf3a791fd",
# "scenarioName": "Annex IIA 7.1.1 a), Brumhard (2003) - (00000)"
# }
return []
@staticmethod
def resolve_review_status(obj: Node):
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
@router.get(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node",
response={200: NodeWrapper, 403: Error},
)
def get_package_pathway_nodes(request, package_uuid, pathway_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"node": Pathway.objects.get(package=p, uuid=pathway_uuid).nodes}
except ValueError:
return 403, {
"message": f"Getting Nodes for Pathway with id {pathway_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node/{uuid:node_uuid}",
response={200: NodeSchema, 403: Error},
)
def get_package_pathway_node(request, package_uuid, pathway_uuid, node_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
return Node.objects.get(pathway=pw, uuid=node_uuid)
except ValueError:
return 403, {
"message": f"Getting Node with id {node_uuid} failed due to insufficient rights!"
}
class CreateNode(Schema):
nodeAsSmiles: str
nodeName: str | None = None
nodeReason: str | None = None
nodeDepth: str | None = None
@router.post(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node",
response={200: str | Any, 403: Error},
)
def add_pathway_node(request, package_uuid, pathway_uuid, n: Form[CreateNode]):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
if n.nodeDepth is not None and n.nodeDepth.strip() != "":
node_depth = int(n.nodeDepth)
else:
node_depth = -1
n = Node.create(pw, n.nodeAsSmiles, node_depth, n.nodeName, n.nodeReason)
return redirect(n.url)
except ValueError:
return 403, {"message": "Adding node failed!"}
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node/{uuid:node_uuid}")
def delete_node(request, package_uuid, pathway_uuid, node_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
n = Node.objects.get(pathway=pw, uuid=node_uuid)
n.delete()
return redirect(f"{pw.url}/node")
else:
raise ValueError("You do not have the rights to delete this Node!")
except ValueError:
return 403, {
"message": f"Deleting Node with id {node_uuid} failed due to insufficient rights!"
}
########
# Edge #
########
class EdgeWrapper(Schema):
edge: List["SimpleEdge"]
class EdgeNode(SimpleNode):
image: str = Field(None, alias="image")
@staticmethod
def resolve_image(obj: Node):
return f"{obj.default_node_label.url}?image=svg"
class EdgeSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
ecNumbers: List[str] = Field([], alias="ec_numbers")
endNodes: List["EdgeNode"] = Field([], alias="end_nodes")
id: str = Field(None, alias="url")
identifier: str = "edge"
name: str = Field(None, alias="name")
reactionName: str = Field(None, alias="edge_label.name")
reactionURI: str = Field(None, alias="edge_label.url")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
startNodes: List["EdgeNode"] = Field([], alias="start_nodes")
@staticmethod
def resolve_review_status(obj: Node):
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
@router.get(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge",
response={200: EdgeWrapper, 403: Error},
)
def get_package_pathway_edges(request, package_uuid, pathway_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"edge": Pathway.objects.get(package=p, uuid=pathway_uuid).edges}
except ValueError:
return 403, {
"message": f"Getting Edges for Pathway with id {pathway_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge/{uuid:edge_uuid}",
response={200: EdgeSchema, 403: Error},
)
def get_package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
return Edge.objects.get(pathway=pw, uuid=edge_uuid)
except ValueError:
return 403, {
"message": f"Getting Edge with id {edge_uuid} failed due to insufficient rights!"
}
class CreateEdge(Schema):
edgeAsSmirks: str | None = None
educts: str | None = None # Node URIs comma sep
products: str | None = None # Node URIs comma sep
multistep: str | None = None
edgeReason: str | None = None
@router.post(
"/package/{uuid:package_uuid}/üathway/{uuid:pathway_uuid}/edge",
response={200: str | Any, 403: Error},
)
def add_pathway_edge(request, package_uuid, pathway_uuid, e: Form[CreateEdge]):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
if e.edgeAsSmirks is None and (e.educts is None or e.products is None):
raise ValueError("Either SMIRKS or educt/product must be provided")
if e.edgeAsSmirks is not None and (e.educts is not None and e.products is not None):
raise ValueError("SMIRKS and educt/product provided!")
educts = []
products = []
if e.edgeAsSmirks:
for ed in e.edgeAsSmirks.split(">>")[0].split("\\."):
educts.append(Node.objects.get(pathway=pw, default_node_label__smiles=ed))
for pr in e.edgeAsSmirks.split(">>")[1].split("\\."):
products.append(Node.objects.get(pathway=pw, default_node_label__smiles=pr))
else:
for ed in e.educts.split(","):
educts.append(Node.objects.get(pathway=pw, url=ed.strip()))
for pr in e.products.split(","):
products.append(Node.objects.get(pathway=pw, url=pr.strip()))
new_e = Edge.create(
pathway=pw,
start_nodes=educts,
end_nodes=products,
rule=None,
name=e.name,
description=e.edgeReason,
)
return redirect(new_e.url)
except ValueError:
return 403, {"message": "Adding node failed!"}
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge/{uuid:edge_uuid}")
def delete_edge(request, package_uuid, pathway_uuid, edge_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
e = Edge.objects.get(pathway=pw, uuid=edge_uuid)
e.delete()
return redirect(f"{pw.url}/edge")
else:
raise ValueError("You do not have the rights to delete this Edge!")
except ValueError:
return 403, {
"message": f"Deleting Edge with id {edge_uuid} failed due to insufficient rights!"
}
#########
# Model #
#########
class ModelWrapper(Schema):
relative_reasoning: List["SimpleModel"] = Field(..., alias="relative-reasoning")
class ModelSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
evalPackages: List["SimplePackage"] = Field([])
id: str = Field(None, alias="url")
identifier: str = "relative-reasoning"
# "info" : {
# "Accuracy (Single-Gen)" : "0.5932962678936605" ,
# "Area under PR-Curve (Single-Gen)" : "0.5654653182134282" ,
# "Area under ROC-Curve (Single-Gen)" : "0.8178302405034772" ,
# "Precision (Single-Gen)" : "0.6978730822873083" ,
# "Probability Threshold" : "0.5" ,
# "Recall/Sensitivity (Single-Gen)" : "0.4484149210261006"
# } ,
name: str = Field(None, alias="name")
pathwayPackages: List["SimplePackage"] = Field([])
reviewStatus: str = Field(None, alias="review_status")
rulePackages: List["SimplePackage"] = Field([])
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
status: str
statusMessage: str
threshold: str
type: str
@router.get("/model", response={200: ModelWrapper, 403: Error})
def get_models(request):
pass
@router.get("/package/{uuid:package_uuid}/model", response={200: ModelWrapper, 403: Error})
def get_package_models(request, package_uuid, model_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return EPModel.objects.filter(package=p)
except ValueError:
return 403, {
"message": f"Getting Reaction with id {model_uuid} failed due to insufficient rights!"
}
class Classify(Schema):
smiles: str | None = None
@router.get(
"/package/{uuid:package_uuid}/model/{uuid:model_uuid}",
response={200: ModelSchema | Any, 403: Error, 400: Error},
)
def get_model(request, package_uuid, model_uuid, c: Query[Classify]):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
mod = EPModel.objects.get(package=p, uuid=model_uuid)
if c.smiles:
if c.smiles == "":
return 400, {"message": "Received empty SMILES"}
try:
stand_smiles = FormatConverter.standardize(c.smiles)
except ValueError:
return 400, {"message": f'"{c.smiles}" is not a valid SMILES'}
from epdb.tasks import dispatch_eager, predict_simple
pred_res = dispatch_eager(request.user, predict_simple, mod.pk, stand_smiles)
result = []
for pr in pred_res:
if len(pr) > 0:
products = []
for prod_set in pr.product_sets:
products.append(tuple([x for x in prod_set]))
res = {
"probability": pr.probability,
"products": list(set(products)),
}
if pr.rule:
res["id"] = pr.rule.url
res["identifier"] = pr.rule.get_rule_identifier()
res["name"] = pr.rule.name
res["reviewStatus"] = (
"reviewed" if pr.rule.package.reviewed else "unreviewed"
)
result.append(res)
return result
return mod
except ValueError:
return 403, {
"message": f"Getting Reaction with id {model_uuid} failed due to insufficient rights!"
}
@router.delete("/package/{uuid:package_uuid}/model/{uuid:model_uuid}")
def delete_model(request, package_uuid, model_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
m = EPModel.objects.get(package=p, uuid=model_uuid)
m.delete()
return redirect(f"{p.url}/model")
else:
raise ValueError("You do not have the rights to delete this Model!")
except ValueError:
return 403, {
"message": f"Deleting Model with id {model_uuid} failed due to insufficient rights!"
}
###########
# Setting #
###########
class SettingWrapper(Schema):
setting: List["SimpleSetting"]
class SettingSchema(Schema):
duplicationHash: int = -1
id: str = Field(None, alias="url")
identifier: str = "setting"
includedPackages: List["SimplePackage"] = Field([], alias="rule_packages")
isPublic: bool = Field(None, alias="public")
name: str = Field(None, alias="name")
normalizationRules: List["SimpleRule"] = Field([], alias="normalization_rules")
propertyPlugins: List[str] = Field([], alias="property_plugins")
truncationstrategy: Optional[str] = Field(None, alias="truncation_strategy")
@router.get("/setting", response={200: SettingWrapper, 403: Error})
def get_settings(request):
return {"setting": SettingManager.get_all_settings(request.user)}
@router.get("/setting/{uuid:setting_uuid}", response={200: SettingSchema, 403: Error})
def get_setting(request, setting_uuid):
try:
return SettingManager.get_setting_by_id(request.user, setting_uuid)
except ValueError:
return 403, {
"message": f"Getting Setting with id {setting_uuid} failed due to insufficient rights!"
}