Files
enviPy-bayer/epdb/legacy_api.py
jebus a8554c903c [Enhancement] Swappable Packages (#216)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#216
Reviewed-by: liambrydon <lbry121@aucklanduni.ac.nz>
Reviewed-by: Tobias O <tobias.olenyi@envipath.com>
2025-11-14 21:42:39 +13:00

1243 lines
41 KiB
Python

from typing import Any, Dict, List, Optional
from django.conf import settings as s
from django.contrib.auth import get_user_model
from django.http import HttpResponse
from django.shortcuts import redirect
from ninja import Field, Form, Router, Schema
from utilities.chem import FormatConverter
from .logic import PackageManager, SettingManager, UserManager
from .models import (
Compound,
CompoundStructure,
Edge,
Node,
Pathway,
Reaction,
Rule,
Scenario,
SimpleAmbitRule,
User,
UserPackagePermission,
)
Package = s.GET_PACKAGE_MODEL()
def _anonymous_or_real(request):
if request.user.is_authenticated and not request.user.is_anonymous:
return request.user
return get_user_model().objects.get(username="anonymous")
# router = Router(auth=SessionAuth())
router = Router()
class Error(Schema):
message: str
#################
# SimpleObjects #
#################
class SimpleUser(Schema):
id: str = Field(None, alias="url")
identifier: str = "user"
name: str = Field(None, alias="username")
email: str = Field(None, alias="email")
class SimpleGroup(Schema):
id: str = Field(None, alias="url")
identifier: str = "group"
name: str = Field(None, alias="name")
class SimpleSetting(Schema):
id: str = Field(None, alias="url")
identifier: str = "setting"
name: str = Field(None, alias="name")
class SimpleObject(Schema):
id: str = Field(None, alias="url")
name: str = Field(None, alias="name")
reviewStatus: str = Field(None, alias="review_status")
@staticmethod
def resolve_review_status(obj):
if isinstance(obj, Package):
return "reviewed" if obj.reviewed else "unreviewed"
elif hasattr(obj, "package"):
return "reviewed" if obj.package.reviewed else "unreviewed"
elif isinstance(obj, CompoundStructure):
return "reviewed" if obj.compound.package.reviewed else "unreviewed"
elif isinstance(obj, Node) or isinstance(obj, Edge):
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
else:
raise ValueError("Object has no package")
class SimplePackage(SimpleObject):
identifier: str = "package"
class SimpleCompound(SimpleObject):
identifier: str = "compound"
class SimpleCompoundStructure(SimpleObject):
identifier: str = "structure"
class SimpleRule(SimpleObject):
identifier: str = "rule"
@staticmethod
def resolve_url(obj: Rule):
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
class SimpleReaction(SimpleObject):
identifier: str = "reaction"
class SimpleScenario(SimpleObject):
identifier: str = "scenario"
class SimplePathway(SimpleObject):
identifier: str = "pathway"
class SimpleNode(SimpleObject):
identifier: str = "node"
class SimpleEdge(SimpleObject):
identifier: str = "edge"
################
# Login/Logout #
################
@router.post("/", response={200: SimpleUser, 403: Error})
def login(request, loginusername: Form[str], loginpassword: Form[str]):
from django.contrib.auth import authenticate, login
email = User.objects.get(username=loginusername).email
user = authenticate(username=email, password=loginpassword)
if user:
login(request, user)
return user
else:
return 403, {"message": "Invalid username and/or password"}
########
# User #
########
class UserWrapper(Schema):
user: List[SimpleUser]
class UserSchema(Schema):
affiliation: Dict[str, str] = Field(None, alias="affiliation")
defaultGroup: "SimpleGroup" = Field(None, alias="default_group")
defaultPackage: "SimplePackage" = Field(None, alias="default_package")
defaultSetting: "SimpleSetting" = Field(None, alias="default_setting")
email: str = Field(None, alias="email")
forename: str = "not specified"
groups: List["SimpleGroup"] = Field([], alias="groups")
id: str = Field(None, alias="url")
identifier: str = "user"
link: str = "empty"
name: str = Field(None, alias="username")
surname: str = "not specified"
settings: List["SimpleSetting"] = Field([], alias="settings")
@staticmethod
def resolve_affiliation(obj: User):
# TODO
return {"city": "not specified", "country": "not specified", "workplace": "not specified"}
@staticmethod
def resolve_settings(obj: User):
return SettingManager.get_all_settings(obj)
@router.get("/user", response={200: UserWrapper, 403: Error})
def get_users(request, whoami: str = None):
if whoami:
return {"user": [request.user]}
else:
return {"user": User.objects.all()}
@router.get("/user/{uuid:user_uuid}", response={200: UserSchema, 403: Error})
def get_user(request, user_uuid):
try:
u = UserManager.get_user_by_id(request.user, user_uuid)
return u
except ValueError:
return 403, {
"message": f"Getting User with id {user_uuid} failed due to insufficient rights!"
}
###########
# Package #
###########
class PackageWrapper(Schema):
package: List["PackageSchema"]
class PackageSchema(Schema):
description: str = Field(None, alias="description")
id: str = Field(None, alias="url")
links: List[Dict[str, List[str | int]]] = Field([], alias="links")
name: str = Field(None, alias="name")
primaryGroup: Optional[SimpleGroup] = None
readers: List[Dict[str, str]] = Field([], alias="readers")
reviewComment: str = Field(None, alias="review_comment")
reviewStatus: str = Field(None, alias="review_status")
writers: List[Dict[str, str]] = Field([], alias="writers")
@staticmethod
def resolve_links(obj: Package):
return [
{"Pathways": [f"{obj.url}/pathway", obj.pathways.count()]},
{"Rules": [f"{obj.url}/rule", obj.rules.count()]},
{"Compounds": [f"{obj.url}/compound", obj.compounds.count()]},
{"Reactions": [f"{obj.url}/reaction", obj.reactions.count()]},
{"Relative Reasoning": [f"{obj.url}/relative-reasoning", obj.models.count()]},
{"Scenarios": [f"{obj.url}/scenario", obj.scenarios.count()]},
]
@staticmethod
def resolve_readers(obj: Package):
users = User.objects.filter(
id__in=UserPackagePermission.objects.filter(
package=obj, permission=UserPackagePermission.READ[0]
).values_list("user", flat=True)
).distinct()
return [{u.id: u.name} for u in users]
@staticmethod
def resolve_writers(obj: Package):
users = User.objects.filter(
id__in=UserPackagePermission.objects.filter(
package=obj, permission=UserPackagePermission.WRITE[0]
).values_list("user", flat=True)
).distinct()
return [{u.id: u.name} for u in users]
@staticmethod
def resolve_review_comment(obj):
return ""
@staticmethod
def resolve_review_status(obj):
return "reviewed" if obj.reviewed else "unreviewed"
@router.get("/package", response={200: PackageWrapper, 403: Error})
def get_packages(request):
return {
"package": PackageManager.get_all_readable_packages(request.user, include_reviewed=True)
}
@router.get("/package/{uuid:package_uuid}", response={200: PackageSchema, 403: Error})
def get_package(request, package_uuid):
try:
return PackageManager.get_package_by_id(request.user, package_uuid)
except ValueError:
return 403, {
"message": f"Getting Package with id {package_uuid} failed due to insufficient rights!"
}
@router.post("/package")
def create_packages(
request, packageName: Form[str], packageDescription: Optional[str] = Form(None)
):
try:
if packageName.strip() == "":
raise ValueError("Package name cannot be empty!")
new_pacakge = PackageManager.create_package(request.user, packageName, packageDescription)
return redirect(new_pacakge.url)
except ValueError as e:
return 400, {"message": str(e)}
@router.post("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 400: Error})
def update_package(
request,
package_uuid,
packageDescription: Optional[str] = Form(None),
hiddenMethod: Optional[str] = Form(None),
exportAsJson: Optional[str] = Form(None),
permissions: Optional[str] = Form(None),
ppsURI: Optional[str] = Form(None),
read: Optional[str] = Form(None),
write: Optional[str] = Form(None),
):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if hiddenMethod:
if hiddenMethod == "DELETE":
p.delete()
elif packageDescription and packageDescription.strip() != "":
p.description = packageDescription
p.save()
return
elif exportAsJson == "true":
pack_json = PackageManager.export_package(
p, include_models=False, include_external_identifiers=False
)
return pack_json
elif all([permissions, ppsURI, read]):
PackageManager.update_permissions
elif all([permissions, ppsURI, write]):
pass
except ValueError as e:
return 400, {"message": str(e)}
################################
# Compound / CompoundStructure #
################################
class CompoundWrapper(Schema):
compound: List["SimpleCompound"]
class CompoundPathwayScenario(Schema):
scenarioId: str
scenarioName: str
scenarioType: str
class CompoundSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
externalReferences: Dict[str, List[str]] = Field(None, alias="external_references")
id: str = Field(None, alias="url")
halflifes: List[Dict[str, str]] = Field([], alias="halflifes")
identifier: str = "compound"
imageSize: int = 600
name: str = Field(None, alias="name")
pathwayScenarios: List[CompoundPathwayScenario] = Field([], alias="pathway_scenarios")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
pubchemCompoundReferences: List[str] = Field([], alias="pubchem_compound_references")
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
reviewStatus: str = Field(False, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
structures: List["CompoundStructureSchema"] = []
@staticmethod
def resolve_review_status(obj: CompoundStructure):
return "reviewed" if obj.package.reviewed else "unreviewed"
@staticmethod
def resolve_external_references(obj: Compound):
# TODO
return {}
@staticmethod
def resolve_structures(obj: Compound):
return CompoundStructure.objects.filter(compound=obj)
@staticmethod
def resolve_halflifes(obj: Compound):
return []
@staticmethod
def resolve_pubchem_compound_references(obj: Compound):
return []
@staticmethod
def resolve_pathway_scenarios(obj: Compound):
return [
{
"scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/cd8350cd-4249-4111-ba9f-4e2209338501",
"scenarioName": "Fritz, R. & Brauner, A. (1989) - (00004)",
"scenarioType": "Soil",
}
]
class CompoundStructureSchema(Schema):
InChI: str = Field(None, alias="inchi")
aliases: List[str] = Field([], alias="aliases")
canonicalSmiles: str = Field(None, alias="canonical_smiles")
charge: int = Field(None, alias="charge")
description: str = Field(None, alias="description")
externalReferences: Dict[str, List[str]] = Field(None, alias="external_references")
formula: str = Field(None, alias="formula")
halflifes: List[Dict[str, str]] = Field([], alias="halflifes")
id: str = Field(None, alias="url")
identifier: str = "structure"
imageSize: int = 600
inchikey: str = Field(None, alias="inchikey")
isDefaultStructure: bool = Field(None, alias="is_default_structure")
mass: float = Field(None, alias="mass")
name: str = Field(None, alias="name")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
pubchemCompoundReferences: List[str] = Field([], alias="pubchem_compound_references")
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
smiles: str = Field(None, alias="smiles")
@staticmethod
def resolve_review_status(obj: CompoundStructure):
return "reviewed" if obj.compound.package.reviewed else "unreviewed"
@staticmethod
def resolve_inchi(obj: CompoundStructure):
return FormatConverter.InChI(obj.smiles)
@staticmethod
def resolve_charge(obj: CompoundStructure):
return FormatConverter.charge(obj.smiles)
@staticmethod
def resolve_formula(obj: CompoundStructure):
return FormatConverter.formula(obj.smiles)
@staticmethod
def resolve_mass(obj: CompoundStructure):
return FormatConverter.mass(obj.smiles)
@staticmethod
def resolve_external_references(obj: CompoundStructure):
# TODO
return {}
@staticmethod
def resolve_halflifes(obj: CompoundStructure):
return []
@staticmethod
def resolve_pubchem_compound_references(obj: CompoundStructure):
return []
@staticmethod
def resolve_pathway_scenarios(obj: CompoundStructure):
return [
{
"scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/cd8350cd-4249-4111-ba9f-4e2209338501",
"scenarioName": "Fritz, R. & Brauner, A. (1989) - (00004)",
"scenarioType": "Soil",
}
]
class CompoundStructureWrapper(Schema):
structure: List["SimpleCompoundStructure"]
@router.get("/compound", response={200: CompoundWrapper, 403: Error})
def get_compounds(request):
return {
"compound": Compound.objects.filter(
package__in=PackageManager.get_reviewed_packages()
).prefetch_related("package")
}
@router.get("/package/{uuid:package_uuid}/compound", response={200: CompoundWrapper, 403: Error})
def get_package_compounds(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"compound": Compound.objects.filter(package=p).prefetch_related("package")}
except ValueError:
return 403, {
"message": f"Getting Compounds for Package with id {package_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}",
response={200: CompoundSchema, 403: Error},
)
def get_package_compound(request, package_uuid, compound_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return Compound.objects.get(package=p, uuid=compound_uuid)
except ValueError:
return 403, {
"message": f"Getting Compound with id {compound_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure",
response={200: CompoundStructureWrapper, 403: Error},
)
def get_package_compound_structures(request, package_uuid, compound_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"structure": Compound.objects.get(package=p, uuid=compound_uuid).structures.all()}
except ValueError:
return 403, {
"message": f"Getting CompoundStructures for Compound with id {compound_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure/{uuid:structure_uuid}",
response={200: CompoundStructureSchema, 403: Error},
)
def get_package_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return CompoundStructure.objects.get(
uuid=structure_uuid, compound=Compound.objects.get(package=p, uuid=compound_uuid)
)
except ValueError:
return 403, {
"message": f"Getting CompoundStructure with id {structure_uuid} failed due to insufficient rights!"
}
#########
# Rules #
#########
class RuleWrapper(Schema):
rule: List["SimpleRule"]
class SimpleRuleSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
engine: str = "ambit"
id: str = Field(None, alias="url")
identifier: str = Field(None, alias="identifier")
isCompositeRule: bool = False
name: str = Field(None, alias="name")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
productFilterSmarts: str = Field("", alias="product_filter_smarts")
productSmarts: str = Field(None, alias="products_smarts")
reactantFilterSmarts: str = Field("", alias="reactant_filter_smarts")
reactantSmarts: str = Field(None, alias="reactants_smarts")
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
smirks: str = Field("", alias="smirks")
# TODO
transformations: str = Field("", alias="transformations")
@staticmethod
def resolve_url(obj: Rule):
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
@staticmethod
def resolve_identifier(obj: Rule):
if "simple-rule" in obj.url:
return "simple-rule"
if "simple-ambit-rule" in obj.url:
return "simple-rule"
elif "parallel-rule" in obj.url:
return "parallel-rule"
elif "sequential-rule" in obj.url:
return "sequential-rule"
else:
return None
@staticmethod
def resolve_review_status(obj: Rule):
return "reviewed" if obj.package.reviewed else "unreviewed"
@staticmethod
def resolve_product_filter_smarts(obj: Rule):
return obj.product_filter_smarts if obj.product_filter_smarts else ""
@staticmethod
def resolve_reactant_filter_smarts(obj: Rule):
return obj.reactant_filter_smarts if obj.reactant_filter_smarts else ""
class CompositeRuleSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
id: str = Field(None, alias="url")
identifier: str = Field(None, alias="identifier")
isCompositeRule: bool = True
name: str = Field(None, alias="name")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
productFilterSmarts: str = Field("", alias="product_filter_smarts")
reactantFilterSmarts: str = Field("", alias="reactant_filter_smarts")
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
simpleRules: List["SimpleRule"] = Field([], alias="simple_rules")
@staticmethod
def resolve_ec_numbers(obj: Rule):
return []
@staticmethod
def resolve_url(obj: Rule):
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
@staticmethod
def resolve_identifier(obj: Rule):
if "simple-rule" in obj.url:
return "simple-rule"
if "simple-ambit-rule" in obj.url:
return "simple-rule"
elif "parallel-rule" in obj.url:
return "parallel-rule"
elif "sequential-rule" in obj.url:
return "sequential-rule"
else:
return None
@staticmethod
def resolve_review_status(obj: Rule):
return "reviewed" if obj.package.reviewed else "unreviewed"
@staticmethod
def resolve_product_filter_smarts(obj: Rule):
return obj.product_filter_smarts if obj.product_filter_smarts else ""
@staticmethod
def resolve_reactant_filter_smarts(obj: Rule):
return obj.reactant_filter_smarts if obj.reactant_filter_smarts else ""
@router.get("/rule", response={200: RuleWrapper, 403: Error})
def get_rules(request):
return {
"rule": Rule.objects.filter(
package__in=PackageManager.get_reviewed_packages()
).prefetch_related("package")
}
@router.get("/package/{uuid:package_uuid}/rule", response={200: RuleWrapper, 403: Error})
def get_package_rules(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"rule": Rule.objects.filter(package=p).prefetch_related("package")}
except ValueError:
return 403, {
"message": f"Getting Rules for Package with id {package_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}",
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
)
def get_package_rule(request, package_uuid, rule_uuid):
return _get_package_rule(request, package_uuid, rule_uuid)
@router.get(
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
)
def get_package_simple_rule(request, package_uuid, rule_uuid):
return _get_package_rule(request, package_uuid, rule_uuid)
@router.get(
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
)
def get_package_parallel_rule(request, package_uuid, rule_uuid):
return _get_package_rule(request, package_uuid, rule_uuid)
def _get_package_rule(request, package_uuid, rule_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return Rule.objects.get(package=p, uuid=rule_uuid)
except ValueError:
return 403, {
"message": f"Getting Rule with id {rule_uuid} failed due to insufficient rights!"
}
# POST
@router.post(
"/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}", response={200: str | Any, 403: Error}
)
def post_package_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
@router.post(
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
response={200: str | Any, 403: Error},
)
def post_package_simple_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
@router.post(
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
response={200: str | Any, 403: Error},
)
def post_package_parallel_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
def _post_package_rule(request, package_uuid, rule_uuid, compound: Form[str]):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
r = Rule.objects.get(package=p, uuid=rule_uuid)
if compound is not None:
if not compound.split():
return 400, {"message": "Compound is empty"}
product_sets = r.apply(compound)
res = []
for p_set in product_sets:
for product in p_set:
res.append(product)
return HttpResponse("\n".join(res), content_type="text/plain")
return r
except ValueError:
return 403, {
"message": f"Getting Rule with id {rule_uuid} failed due to insufficient rights!"
}
############
# Reaction #
############
class ReactionWrapper(Schema):
reaction: List["SimpleReaction"]
class ReactionCompoundStructure(Schema):
compoundName: str = Field(None, alias="name")
id: str = Field(None, alias="url")
smiles: str = Field(None, alias="smiles")
class ReactionSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
educts: List["ReactionCompoundStructure"] = Field([], alias="educts")
id: str = Field(None, alias="url")
identifier: str = "reaction"
medlineRefs: List[str] = Field([], alias="medline_references")
multistep: bool = Field(None, alias="multi_step")
name: str = Field(None, alias="name")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
products: List["ReactionCompoundStructure"] = Field([], alias="products")
references: List[Dict[str, List[str]]] = Field([], alias="references")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
smirks: str = Field("", alias="smirks")
@staticmethod
def resolve_smirks(obj: Reaction):
return obj.smirks()
@staticmethod
def resolve_ec_numbers(obj: Reaction):
# TODO fetch via scenario EnzymeAI
return []
@staticmethod
def resolve_references(obj: Reaction):
# TODO
return []
@staticmethod
def resolve_medline_references(obj: Reaction):
# TODO
return []
@staticmethod
def resolve_review_status(obj: Rule):
return "reviewed" if obj.package.reviewed else "unreviewed"
@router.get("/reaction", response={200: ReactionWrapper, 403: Error})
def get_reactions(request):
return {
"reaction": Reaction.objects.filter(
package__in=PackageManager.get_reviewed_packages()
).prefetch_related("package")
}
@router.get("/package/{uuid:package_uuid}/reaction", response={200: ReactionWrapper, 403: Error})
def get_package_reactions(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"reaction": Reaction.objects.filter(package=p).prefetch_related("package")}
except ValueError:
return 403, {
"message": f"Getting Reactions for Package with id {package_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/reaction/{uuid:reaction_uuid}",
response={200: ReactionSchema, 403: Error},
)
def get_package_reaction(request, package_uuid, reaction_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return Reaction.objects.get(package=p, uuid=reaction_uuid)
except ValueError:
return 403, {
"message": f"Getting Reaction with id {reaction_uuid} failed due to insufficient rights!"
}
############
# Scenario #
############
class ScenarioWrapper(Schema):
scenario: List["SimpleScenario"]
class ScenarioSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
collection: Dict["str", List[Dict[str, Any]]] = Field([], alias="collection")
collectionID: Optional[str] = None
description: str = Field(None, alias="description")
id: str = Field(None, alias="url")
identifier: str = "scenario"
linkedTo: List[Dict[str, str]] = Field({}, alias="linked_to")
name: str = Field(None, alias="name")
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
relatedScenarios: List[Dict[str, str]] = Field([], alias="related_scenarios")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
type: str = Field(None, alias="scenario_type")
@staticmethod
def resolve_collection(obj: Scenario):
return obj.additional_information
@staticmethod
def resolve_review_status(obj: Rule):
return "reviewed" if obj.package.reviewed else "unreviewed"
@router.get("/scenario", response={200: ScenarioWrapper, 403: Error})
def get_scenarios(request):
return {
"scenario": Scenario.objects.filter(
package__in=PackageManager.get_reviewed_packages()
).prefetch_related("package")
}
@router.get("/package/{uuid:package_uuid}/scenario", response={200: ScenarioWrapper, 403: Error})
def get_package_scenarios(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"scenario": Scenario.objects.filter(package=p).prefetch_related("package")}
except ValueError:
return 403, {
"message": f"Getting Scenarios for Package with id {package_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/scenario/{uuid:scenario_uuid}",
response={200: ScenarioSchema, 403: Error},
)
def get_package_scenario(request, package_uuid, scenario_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return Scenario.objects.get(package=p, uuid=scenario_uuid)
except ValueError:
return 403, {
"message": f"Getting Scenario with id {scenario_uuid} failed due to insufficient rights!"
}
###########
# Pathway #
###########
class PathwayWrapper(Schema):
pathway: List["SimplePathway"]
class PathwayEdge(Schema):
ecNumbers: List[str] = Field([], alias="ec_numbers")
id: str = Field(None, alias="url")
idreaction: str = Field(None, alias="edge_label.url")
multstep: bool = Field(None, alias="edge_label.multi_step")
name: str = Field(None, alias="name")
pseudo: bool = False
rule: Optional[str] = Field(None, alias="rule")
scenarios: List[SimpleScenario] = Field([], alias="scenarios")
source: int = -1
target: int = -1
@staticmethod
def resolve_rule(obj: Edge):
if obj.edge_label.rules.all().exists():
r = obj.edge_label.rules.all()[0]
if isinstance(r, SimpleAmbitRule):
return r.smirks
return None
class PathwayNode(Schema):
atomCount: int = Field(None, alias="atom_count")
depth: int = Field(None, alias="depth")
dt50s: List[Dict[str, str]] = Field([], alias="dt50s")
engineeredIntermediate: bool = Field(None, alias="engineered_intermediate")
id: str = Field(None, alias="url")
idcomp: str = Field(None, alias="default_node_label.url")
idreact: str = Field(None, alias="default_node_label.url")
image: str = Field(None, alias="image")
imageSize: int = Field(None, alias="image_size")
name: str = Field(None, alias="name")
proposed: List[Dict[str, str]] = Field([], alias="proposed_intermediate")
smiles: str = Field(None, alias="default_node_label.smiles")
@staticmethod
def resolve_atom_count(obj: Node):
from rdkit import Chem
return Chem.MolFromSmiles(obj.default_node_label.smiles).GetNumAtoms()
@staticmethod
def resolve_dt50s(obj: Node):
# TODO
return []
@staticmethod
def resolve_engineered_intermediate(obj: Node):
# TODO
return False
@staticmethod
def resolve_image(obj: Node):
return f"{obj.default_node_label.url}?image=svg"
@staticmethod
def resolve_image_size(obj: Node):
return 400
@staticmethod
def resolve_proposed_intermediate(obj: Node):
# TODO
return []
class PathwaySchema(Schema):
aliases: List[str] = Field([], alias="aliases")
completed: str = Field(None, alias="completed")
description: str = Field(None, alias="description")
id: str = Field(None, alias="url")
isIncremental: bool = Field(None, alias="is_incremental")
isPredicted: bool = Field(None, alias="is_predicted")
lastModified: int = Field(None, alias="last_modified")
links: List[PathwayEdge] = Field([], alias="edges")
name: str = Field(None, alias="name")
nodes: List[PathwayNode] = Field([], alias="nodes")
pathwayName: str = Field(None, alias="name")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
upToDate: bool = Field(None, alias="up_to_date")
@staticmethod
def resolve_review_status(obj: Pathway):
return "reviewed" if obj.package.reviewed else "unreviewed"
@staticmethod
def resolve_completed(obj: Pathway):
return str(obj.completed()).lower()
@staticmethod
def resolve_up_to_date(obj: Pathway):
return "true"
@staticmethod
def resolve_last_modified(obj: Pathway):
return int(obj.modified.timestamp())
@router.get("/pathway", response={200: PathwayWrapper, 403: Error})
def get_pathways(request):
return {
"pathway": Pathway.objects.filter(
package__in=PackageManager.get_reviewed_packages()
).prefetch_related("package")
}
@router.get("/package/{uuid:package_uuid}/pathway", response={200: PathwayWrapper, 403: Error})
def get_package_pathways(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"pathway": Pathway.objects.filter(package=p).prefetch_related("package")}
except ValueError:
return 403, {
"message": f"Getting Pathways for Package with id {package_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}",
response={200: PathwaySchema, 403: Error},
)
def get_package_pathway(request, package_uuid, pathway_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return Pathway.objects.get(package=p, uuid=pathway_uuid)
except ValueError:
return 403, {
"message": f"Getting Pathway with id {pathway_uuid} failed due to insufficient rights!"
}
@router.post("/package/{uuid:package_uuid}/pathway")
def create_pathway(
request,
package_uuid,
smilesinput: Form[str],
name: Optional[str] = Form(None),
description: Optional[str] = Form(None),
rootOnly: Optional[str] = Form(None),
selectedSetting: Optional[str] = Form(None),
):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
stand_smiles = FormatConverter.standardize(smilesinput.strip())
pw = Pathway.create(p, stand_smiles, name=name, description=description)
pw_mode = "predict"
if rootOnly and rootOnly == "true":
pw_mode = "build"
pw.kv.update({"mode": pw_mode})
pw.save()
if pw_mode == "predict":
setting = request.user.prediction_settings()
if selectedSetting:
setting = SettingManager.get_setting_by_url(request.user, selectedSetting)
pw.setting = setting
pw.save()
from .tasks import predict
predict.delay(pw.pk, setting.pk, limit=-1)
return redirect(pw.url)
except ValueError as e:
print(e)
########
# Node #
########
class NodeWrapper(Schema):
node: List["SimpleNode"]
class NodeCompoundStructure(Schema):
id: str = Field(None, alias="url")
image: str = Field(None, alias="image")
smiles: str = Field(None, alias="smiles")
name: str = Field(None, alias="name")
@staticmethod
def resolve_image(obj: CompoundStructure):
return f"{obj.url}?image=svg"
class NodeSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
confidenceScenarios: List[SimpleScenario] = Field([], alias="confidence_scenarios")
defaultStructure: NodeCompoundStructure = Field(None, alias="default_node_label")
depth: int = Field(None, alias="depth")
description: str = Field(None, alias="description")
engineeredIntermediate: bool = Field(None, alias="engineered_intermediate")
halflifes: Dict[str, str] = Field({}, alias="halflife")
id: str = Field(None, alias="url")
identifier: str = "node"
image: str = Field(None, alias="image")
name: str = Field(None, alias="name")
proposedValues: List[Dict[str, str]] = Field([], alias="proposed_values")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
smiles: str = Field(None, alias="default_node_label.smiles")
structures: List["CompoundStructureSchema"] = Field([], alias="node_labels")
@staticmethod
def resolve_engineered_intermediate(obj: Node):
# TODO
return False
@staticmethod
def resolve_image(obj: Node):
return f"{obj.default_node_label.url}?image=svg"
@staticmethod
def resolve_proposed_values(obj: Node):
# TODO
# {
# "scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/1f1a0b67-ce57-4f5a-929c-4fabf3a791fd",
# "scenarioName": "Annex IIA 7.1.1 a), Brumhard (2003) - (00000)"
# }
return []
@staticmethod
def resolve_review_status(obj: Node):
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
@router.get(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node",
response={200: NodeWrapper, 403: Error},
)
def get_package_pathway_nodes(request, package_uuid, pathway_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"node": Pathway.objects.get(package=p, uuid=pathway_uuid).nodes}
except ValueError:
return 403, {
"message": f"Getting Nodes for Pathway with id {pathway_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node/{uuid:node_uuid}",
response={200: NodeSchema, 403: Error},
)
def get_package_pathway_node(request, package_uuid, pathway_uuid, node_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
return Node.objects.get(pathway=pw, uuid=node_uuid)
except ValueError:
return 403, {
"message": f"Getting Node with id {node_uuid} failed due to insufficient rights!"
}
########
# Edge #
########
class EdgeWrapper(Schema):
edge: List["SimpleEdge"]
class EdgeNode(SimpleNode):
image: str = Field(None, alias="image")
@staticmethod
def resolve_image(obj: Node):
return f"{obj.default_node_label.url}?image=svg"
class EdgeSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
ecNumbers: List[str] = Field([], alias="ec_numbers")
endNodes: List["EdgeNode"] = Field([], alias="end_nodes")
id: str = Field(None, alias="url")
identifier: str = "edge"
name: str = Field(None, alias="name")
reactionName: str = Field(None, alias="edge_label.name")
reactionURI: str = Field(None, alias="edge_label.url")
reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
startNodes: List["EdgeNode"] = Field([], alias="start_nodes")
@staticmethod
def resolve_review_status(obj: Node):
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
@router.get(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge",
response={200: EdgeWrapper, 403: Error},
)
def get_package_pathway_edges(request, package_uuid, pathway_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return {"edge": Pathway.objects.get(package=p, uuid=pathway_uuid).edges}
except ValueError:
return 403, {
"message": f"Getting Edges for Pathway with id {pathway_uuid} failed due to insufficient rights!"
}
@router.get(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge/{uuid:edge_uuid}",
response={200: EdgeSchema, 403: Error},
)
def get_package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
return Edge.objects.get(pathway=pw, uuid=edge_uuid)
except ValueError:
return 403, {
"message": f"Getting Edge with id {edge_uuid} failed due to insufficient rights!"
}
###########
# Setting #
###########
class SettingWrapper(Schema):
setting: List["SimpleSetting"]
class SettingSchema(Schema):
duplicationHash: int = -1
id: str = Field(None, alias="url")
identifier: str = "setting"
includedPackages: List["SimplePackage"] = Field([], alias="rule_packages")
isPublic: bool = Field(None, alias="public")
name: str = Field(None, alias="name")
normalizationRules: List["SimpleRule"] = Field([], alias="normalization_rules")
propertyPlugins: List[str] = Field([], alias="property_plugins")
truncationstrategy: Optional[str] = Field(None, alias="truncation_strategy")
@router.get("/setting", response={200: SettingWrapper, 403: Error})
def get_settings(request):
return {"setting": SettingManager.get_all_settings(request.user)}
@router.get("/setting/{uuid:setting_uuid}", response={200: SettingSchema, 403: Error})
def get_setting(request, setting_uuid):
try:
return SettingManager.get_setting_by_id(request.user, setting_uuid)
except ValueError:
return 403, {
"message": f"Getting Setting with id {setting_uuid} failed due to insufficient rights!"
}