from typing import Any, Dict, List, Optional import nh3 from django.conf import settings as s from django.contrib.auth import get_user_model from django.http import HttpResponse from django.shortcuts import redirect from ninja import Field, Form, Router, Schema, Query from utilities.chem import FormatConverter from utilities.misc import PackageExporter from .logic import GroupManager, PackageManager, SettingManager, UserManager from .models import ( Compound, CompoundStructure, Edge, EPModel, Node, Pathway, Reaction, Rule, Scenario, SimpleAmbitRule, User, UserPackagePermission, ParallelRule, ) Package = s.GET_PACKAGE_MODEL() def _anonymous_or_real(request): if request.user.is_authenticated and not request.user.is_anonymous: return request.user return get_user_model().objects.get(username="anonymous") # router = Router(auth=SessionAuth()) router = Router() class Error(Schema): message: str ################# # SimpleObjects # ################# class SimpleUser(Schema): id: str = Field(None, alias="url") identifier: str = "user" name: str = Field(None, alias="username") email: str = Field(None, alias="email") class SimpleGroup(Schema): id: str = Field(None, alias="url") identifier: str = "group" name: str = Field(None, alias="name") class SimpleSetting(Schema): id: str = Field(None, alias="url") identifier: str = "setting" name: str = Field(None, alias="name") class SimpleObject(Schema): id: str = Field(None, alias="url") name: str = Field(None, alias="name") reviewStatus: str = Field(None, alias="review_status") @staticmethod def resolve_review_status(obj): if isinstance(obj, Package): return "reviewed" if obj.reviewed else "unreviewed" elif hasattr(obj, "package"): return "reviewed" if obj.package.reviewed else "unreviewed" elif isinstance(obj, CompoundStructure): return "reviewed" if obj.compound.package.reviewed else "unreviewed" elif isinstance(obj, Node) or isinstance(obj, Edge): return "reviewed" if obj.pathway.package.reviewed else "unreviewed" else: raise ValueError("Object has no package") class SimplePackage(SimpleObject): identifier: str = "package" class SimpleCompound(SimpleObject): identifier: str = "compound" class SimpleCompoundStructure(SimpleObject): identifier: str = "structure" class SimpleRule(SimpleObject): identifier: str = "rule" @staticmethod def resolve_url(obj: Rule): return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-") class SimpleReaction(SimpleObject): identifier: str = "reaction" class SimpleScenario(SimpleObject): identifier: str = "scenario" class SimplePathway(SimpleObject): identifier: str = "pathway" class SimpleNode(SimpleObject): identifier: str = "node" class SimpleEdge(SimpleObject): identifier: str = "edge" class SimpleModel(SimpleObject): identifier: str = "relative-reasoning" ################ # Login/Logout # ################ @router.post("/", response={200: SimpleUser, 403: Error}) def login(request, loginusername: Form[str], loginpassword: Form[str]): from django.contrib.auth import authenticate, login email = User.objects.get(username=loginusername).email user = authenticate(username=email, password=loginpassword) if user: login(request, user) return user else: return 403, {"message": "Invalid username and/or password"} ######## # User # ######## class UserWrapper(Schema): user: List[SimpleUser] class UserSchema(Schema): affiliation: Dict[str, str] = Field(None, alias="affiliation") defaultGroup: "SimpleGroup" = Field(None, alias="default_group") defaultPackage: "SimplePackage" = Field(None, alias="default_package") defaultSetting: "SimpleSetting" = Field(None, alias="default_setting") email: str = Field(None, alias="email") forename: str = "not specified" groups: List["SimpleGroup"] = Field([], alias="groups") id: str = Field(None, alias="url") identifier: str = "user" link: str = "empty" name: str = Field(None, alias="username") surname: str = "not specified" settings: List["SimpleSetting"] = Field([], alias="settings") @staticmethod def resolve_affiliation(obj: User): # TODO return {"city": "not specified", "country": "not specified", "workplace": "not specified"} @staticmethod def resolve_settings(obj: User): return SettingManager.get_all_settings(obj) class Me(Schema): whoami: str | None = None @router.get("/user", response={200: UserWrapper, 403: Error}) def get_users(request, me: Query[Me]): if me.whoami: return {"user": [request.user]} else: return {"user": User.objects.all()} @router.get("/user/{uuid:user_uuid}", response={200: UserSchema, 403: Error}) def get_user(request, user_uuid): try: u = UserManager.get_user_by_id(request.user, user_uuid) return u except ValueError: return 403, { "message": f"Getting User with id {user_uuid} failed due to insufficient rights!" } ########### # Package # ########### class PackageWrapper(Schema): package: List["PackageSchema"] class PackageSchema(Schema): description: str = Field(None, alias="description") id: str = Field(None, alias="url") links: List[Dict[str, List[str | int]]] = Field([], alias="links") name: str = Field(None, alias="name") primaryGroup: Optional[SimpleGroup] = None readers: List[Dict[str, str]] = Field([], alias="readers") reviewComment: str = Field(None, alias="review_comment") reviewStatus: str = Field(None, alias="review_status") writers: List[Dict[str, str]] = Field([], alias="writers") @staticmethod def resolve_links(obj: Package): return [ {"Pathways": [f"{obj.url}/pathway", obj.pathways.count()]}, {"Rules": [f"{obj.url}/rule", obj.rules.count()]}, {"Compounds": [f"{obj.url}/compound", obj.compounds.count()]}, {"Reactions": [f"{obj.url}/reaction", obj.reactions.count()]}, {"Relative Reasoning": [f"{obj.url}/relative-reasoning", obj.models.count()]}, {"Scenarios": [f"{obj.url}/scenario", obj.scenarios.count()]}, ] @staticmethod def resolve_readers(obj: Package): users = User.objects.filter( id__in=UserPackagePermission.objects.filter( package=obj, permission=UserPackagePermission.READ[0] ).values_list("user", flat=True) ).distinct() return [{u.id: u.name} for u in users] @staticmethod def resolve_writers(obj: Package): users = User.objects.filter( id__in=UserPackagePermission.objects.filter( package=obj, permission=UserPackagePermission.WRITE[0] ).values_list("user", flat=True) ).distinct() return [{u.id: u.name} for u in users] @staticmethod def resolve_review_comment(obj): return "" @staticmethod def resolve_review_status(obj): return "reviewed" if obj.reviewed else "unreviewed" @router.get("/package", response={200: PackageWrapper, 403: Error}) def get_packages(request): return { "package": PackageManager.get_all_readable_packages(request.user, include_reviewed=True) } class GetPackage(Schema): exportAsJson: str | None = None @router.get("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 403: Error}) def get_package(request, package_uuid, gp: Query[GetPackage]): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if gp.exportAsJson and gp.exportAsJson.strip() == "true": return PackageExporter(p).do_export() return p except ValueError: return 403, { "message": f"Getting Package with id {package_uuid} failed due to insufficient rights!" } class CreatePackage(Schema): packageName: str packageDescription: str | None = None @router.post("/package") def create_packages( request, p: Form[CreatePackage], ): try: if p.packageName.strip() == "": raise ValueError("Package name cannot be empty!") new_pacakge = PackageManager.create_package( request.user, p.packageName, p.packageDescription ) return redirect(new_pacakge.url) except ValueError as e: return 400, {"message": str(e)} class UpdatePackage(Schema): packageDescription: str | None = None hiddenMethod: str | None = None permissions: str | None = None ppsURI: str | None = None read: str | None = None write: str | None = None @router.post("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 400: Error}) def update_package(request, package_uuid, pack: Form[UpdatePackage]): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if pack.hiddenMethod: if pack.hiddenMethod == "DELETE": p.delete() elif pack.packageDescription is not None: description = nh3.clean(pack.packageDescription, tags=s.ALLOWED_HTML_TAGS).strip() if description: p.description = description p.save() return HttpResponse(status=200) else: raise ValueError("Package description cannot be empty!") elif all([pack.permissions, pack.ppsURI, pack.read]): if "group" in pack.ppsURI: grantee = GroupManager.get_group_lp(pack.ppsURI) else: grantee = UserManager.get_user_lp(pack.ppsURI) PackageManager.grant_read(request.user, p, grantee) return HttpResponse(status=200) elif all([pack.permissions, pack.ppsURI, pack.write]): if "group" in pack.ppsURI: grantee = GroupManager.get_group_lp(pack.ppsURI) else: grantee = UserManager.get_user_lp(pack.ppsURI) PackageManager.grant_write(request.user, p, grantee) return HttpResponse(status=200) except ValueError as e: return 400, {"message": str(e)} @router.delete("/package/{uuid:package_uuid}") def delete_package(request, package_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if PackageManager.administrable(request.user, p): p.delete() return redirect(f"{s.SERVER_URL}/package") else: raise ValueError("You do not have the rights to delete this Package!") except ValueError: return 403, { "message": f"Deleting Package with id {package_uuid} failed due to insufficient rights!" } ################################ # Compound / CompoundStructure # ################################ class CompoundWrapper(Schema): compound: List["SimpleCompound"] class CompoundPathwayScenario(Schema): scenarioId: str scenarioName: str scenarioType: str class CompoundSchema(Schema): aliases: List[str] = Field([], alias="aliases") description: str = Field(None, alias="description") externalReferences: Dict[str, List[str]] = Field(None, alias="external_references") id: str = Field(None, alias="url") halflifes: List[Dict[str, str]] = Field([], alias="halflifes") identifier: str = "compound" imageSize: int = 600 name: str = Field(None, alias="name") pathwayScenarios: List[CompoundPathwayScenario] = Field([], alias="pathway_scenarios") pathways: List["SimplePathway"] = Field([], alias="related_pathways") pubchemCompoundReferences: List[str] = Field([], alias="pubchem_compound_references") reactions: List["SimpleReaction"] = Field([], alias="related_reactions") reviewStatus: str = Field(False, alias="review_status") scenarios: List["SimpleScenario"] = Field([], alias="scenarios") structures: List["CompoundStructureSchema"] = [] @staticmethod def resolve_review_status(obj: CompoundStructure): return "reviewed" if obj.package.reviewed else "unreviewed" @staticmethod def resolve_external_references(obj: Compound): # TODO return {} @staticmethod def resolve_structures(obj: Compound): return CompoundStructure.objects.filter(compound=obj) @staticmethod def resolve_halflifes(obj: Compound): return [] @staticmethod def resolve_pubchem_compound_references(obj: Compound): return [] @staticmethod def resolve_pathway_scenarios(obj: Compound): return [ { "scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/cd8350cd-4249-4111-ba9f-4e2209338501", "scenarioName": "Fritz, R. & Brauner, A. (1989) - (00004)", "scenarioType": "Soil", } ] class CompoundStructureSchema(Schema): InChI: str = Field(None, alias="inchi") aliases: List[str] = Field([], alias="aliases") canonicalSmiles: str = Field(None, alias="canonical_smiles") charge: int = Field(None, alias="charge") description: str = Field(None, alias="description") externalReferences: Dict[str, List[str]] = Field(None, alias="external_references") formula: str = Field(None, alias="formula") halflifes: List[Dict[str, str]] = Field([], alias="halflifes") id: str = Field(None, alias="url") identifier: str = "structure" imageSize: int = 600 inchikey: str = Field(None, alias="inchikey") isDefaultStructure: bool = Field(None, alias="is_default_structure") mass: float = Field(None, alias="mass") name: str = Field(None, alias="name") pathways: List["SimplePathway"] = Field([], alias="related_pathways") pubchemCompoundReferences: List[str] = Field([], alias="pubchem_compound_references") reactions: List["SimpleReaction"] = Field([], alias="related_reactions") reviewStatus: str = Field(None, alias="review_status") scenarios: List["SimpleScenario"] = Field([], alias="scenarios") smiles: str = Field(None, alias="smiles") @staticmethod def resolve_review_status(obj: CompoundStructure): return "reviewed" if obj.compound.package.reviewed else "unreviewed" @staticmethod def resolve_inchi(obj: CompoundStructure): return FormatConverter.InChI(obj.smiles) @staticmethod def resolve_charge(obj: CompoundStructure): return FormatConverter.charge(obj.smiles) @staticmethod def resolve_formula(obj: CompoundStructure): return FormatConverter.formula(obj.smiles) @staticmethod def resolve_mass(obj: CompoundStructure): return FormatConverter.mass(obj.smiles) @staticmethod def resolve_external_references(obj: CompoundStructure): # TODO return {} @staticmethod def resolve_halflifes(obj: CompoundStructure): return [] @staticmethod def resolve_pubchem_compound_references(obj: CompoundStructure): return [] @staticmethod def resolve_pathway_scenarios(obj: CompoundStructure): return [ { "scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/cd8350cd-4249-4111-ba9f-4e2209338501", "scenarioName": "Fritz, R. & Brauner, A. (1989) - (00004)", "scenarioType": "Soil", } ] class CompoundStructureWrapper(Schema): structure: List["SimpleCompoundStructure"] @router.get("/compound", response={200: CompoundWrapper, 403: Error}) def get_compounds(request): return { "compound": Compound.objects.filter( package__in=PackageManager.get_reviewed_packages() ).prefetch_related("package") } @router.get("/package/{uuid:package_uuid}/compound", response={200: CompoundWrapper, 403: Error}) def get_package_compounds(request, package_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return {"compound": Compound.objects.filter(package=p).prefetch_related("package")} except ValueError: return 403, { "message": f"Getting Compounds for Package with id {package_uuid} failed due to insufficient rights!" } @router.get( "/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}", response={200: CompoundSchema, 403: Error}, ) def get_package_compound(request, package_uuid, compound_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return Compound.objects.get(package=p, uuid=compound_uuid) except ValueError: return 403, { "message": f"Getting Compound with id {compound_uuid} failed due to insufficient rights!" } @router.get( "/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure", response={200: CompoundStructureWrapper, 403: Error}, ) def get_package_compound_structures(request, package_uuid, compound_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return {"structure": Compound.objects.get(package=p, uuid=compound_uuid).structures.all()} except ValueError: return 403, { "message": f"Getting CompoundStructures for Compound with id {compound_uuid} failed due to insufficient rights!" } @router.get( "/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure/{uuid:structure_uuid}", response={200: CompoundStructureSchema, 403: Error}, ) def get_package_compound_structure(request, package_uuid, compound_uuid, structure_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return CompoundStructure.objects.get( uuid=structure_uuid, compound=Compound.objects.get(package=p, uuid=compound_uuid) ) except ValueError: return 403, { "message": f"Getting CompoundStructure with id {structure_uuid} failed due to insufficient rights!" } class CreateCompound(Schema): compoundSmiles: str compoundName: str | None = None compoundDescription: str | None = None inchi: str | None = None @router.post("/package/{uuid:package_uuid}/compound") def create_package_compound( request, package_uuid, c: Form[CreateCompound], ): try: p = PackageManager.get_package_by_id(request.user, package_uuid) # inchi is not used atm c = Compound.create( p, c.compoundSmiles, c.compoundName, c.compoundDescription, inchi=c.inchi ) return redirect(c.url) except ValueError as e: return 400, {"message": str(e)} @router.delete("/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}") def delete_compound(request, package_uuid, compound_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if PackageManager.writable(request.user, p): c = Compound.objects.get(package=p, uuid=compound_uuid) c.delete() return redirect(f"{p.url}/compound") else: raise ValueError("You do not have the rights to delete this Compound!") except ValueError: return 403, { "message": f"Deleting Compound with id {compound_uuid} failed due to insufficient rights!" } @router.delete( "/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure/{uuid:structure_uuid}" ) def delete_compound_structure(request, package_uuid, compound_uuid, structure_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if PackageManager.writable(request.user, p): c = Compound.objects.get(package=p, uuid=compound_uuid) cs = CompoundStructure.objects.get(compound=c, uuid=structure_uuid) # Check if we have to delete the compound as no structure is left if len(cs.compound.structures.all()) == 1: # This will delete the structure as well c.delete() return redirect(p.url + "/compound") else: if cs.normalized_structure: c.delete() return redirect(p.url + "/compound") else: if c.default_structure == cs: cs.delete() c.default_structure = c.structures.all().first() return redirect(c.url + "/structure") else: cs.delete() return redirect(c.url + "/structure") else: raise ValueError("You do not have the rights to delete this CompoundStructure!") except ValueError: return 403, { "message": f"Deleting CompoundStructure with id {compound_uuid} failed due to insufficient rights!" } ######### # Rules # ######### class RuleWrapper(Schema): rule: List["SimpleRule"] class SimpleRuleSchema(Schema): aliases: List[str] = Field([], alias="aliases") description: str = Field(None, alias="description") ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers") engine: str = "ambit" id: str = Field(None, alias="url") identifier: str = Field(None, alias="identifier") isCompositeRule: bool = False name: str = Field(None, alias="name") pathways: List["SimplePathway"] = Field([], alias="related_pathways") productFilterSmarts: str = Field("", alias="product_filter_smarts") productSmarts: str = Field(None, alias="products_smarts") reactantFilterSmarts: str = Field("", alias="reactant_filter_smarts") reactantSmarts: str = Field(None, alias="reactants_smarts") reactions: List["SimpleReaction"] = Field([], alias="related_reactions") reviewStatus: str = Field(None, alias="review_status") scenarios: List["SimpleScenario"] = Field([], alias="scenarios") smirks: str = Field("", alias="smirks") # TODO transformations: str = Field("", alias="transformations") @staticmethod def resolve_url(obj: Rule): return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-") @staticmethod def resolve_identifier(obj: Rule): if "simple-rule" in obj.url: return "simple-rule" if "simple-ambit-rule" in obj.url: return "simple-rule" elif "parallel-rule" in obj.url: return "parallel-rule" elif "sequential-rule" in obj.url: return "sequential-rule" else: return None @staticmethod def resolve_review_status(obj: Rule): return "reviewed" if obj.package.reviewed else "unreviewed" @staticmethod def resolve_product_filter_smarts(obj: Rule): return obj.product_filter_smarts if obj.product_filter_smarts else "" @staticmethod def resolve_reactant_filter_smarts(obj: Rule): return obj.reactant_filter_smarts if obj.reactant_filter_smarts else "" class CompositeRuleSchema(Schema): aliases: List[str] = Field([], alias="aliases") description: str = Field(None, alias="description") ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers") id: str = Field(None, alias="url") identifier: str = Field(None, alias="identifier") isCompositeRule: bool = True name: str = Field(None, alias="name") pathways: List["SimplePathway"] = Field([], alias="related_pathways") productFilterSmarts: str = Field("", alias="product_filter_smarts") reactantFilterSmarts: str = Field("", alias="reactant_filter_smarts") reactions: List["SimpleReaction"] = Field([], alias="related_reactions") reviewStatus: str = Field(None, alias="review_status") scenarios: List["SimpleScenario"] = Field([], alias="scenarios") simpleRules: List["SimpleRule"] = Field([], alias="simple_rules") @staticmethod def resolve_ec_numbers(obj: Rule): return [] @staticmethod def resolve_url(obj: Rule): return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-") @staticmethod def resolve_identifier(obj: Rule): if "simple-rule" in obj.url: return "simple-rule" if "simple-ambit-rule" in obj.url: return "simple-rule" elif "parallel-rule" in obj.url: return "parallel-rule" elif "sequential-rule" in obj.url: return "sequential-rule" else: return None @staticmethod def resolve_review_status(obj: Rule): return "reviewed" if obj.package.reviewed else "unreviewed" @staticmethod def resolve_product_filter_smarts(obj: Rule): return obj.product_filter_smarts if obj.product_filter_smarts else "" @staticmethod def resolve_reactant_filter_smarts(obj: Rule): return obj.reactant_filter_smarts if obj.reactant_filter_smarts else "" @router.get("/rule", response={200: RuleWrapper, 403: Error}) def get_rules(request): return { "rule": Rule.objects.filter( package__in=PackageManager.get_reviewed_packages() ).prefetch_related("package") } @router.get("/package/{uuid:package_uuid}/rule", response={200: RuleWrapper, 403: Error}) def get_package_rules(request, package_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return {"rule": Rule.objects.filter(package=p).prefetch_related("package")} except ValueError: return 403, { "message": f"Getting Rules for Package with id {package_uuid} failed due to insufficient rights!" } @router.get( "/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}", response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error}, ) def get_package_rule(request, package_uuid, rule_uuid): return _get_package_rule(request, package_uuid, rule_uuid) @router.get( "/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}", response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error}, ) def get_package_simple_rule(request, package_uuid, rule_uuid): return _get_package_rule(request, package_uuid, rule_uuid) @router.get( "/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}", response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error}, ) def get_package_parallel_rule(request, package_uuid, rule_uuid): return _get_package_rule(request, package_uuid, rule_uuid) def _get_package_rule(request, package_uuid, rule_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return Rule.objects.get(package=p, uuid=rule_uuid) except ValueError: return 403, { "message": f"Getting Rule with id {rule_uuid} failed due to insufficient rights!" } # POST class CreateSimpleRule(Schema): smirks: str name: str | None = None description: str | None = None reactantFilterSmarts: str | None = None productFilterSmarts: str | None = None immediate: str | None = None rdkitrule: str | None = None @router.post("/package/{uuid:package_uuid}/simple-rule") def create_package_simple_rule( request, package_uuid, r: Form[CreateSimpleRule], ): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if r.rdkitrule and r.rdkitrule.strip() == "true": raise ValueError("Not yet implemented!") else: sr = SimpleAmbitRule.create( p, r.name, r.description, r.smirks, r.reactantFilterSmarts, r.productFilterSmarts ) return redirect(sr.url) except ValueError as e: return 400, {"message": str(e)} class CreateParallelRule(Schema): simpleRules: str name: str | None = None description: str | None = None reactantFilterSmarts: str | None = None productFilterSmarts: str | None = None immediate: str | None = None @router.post("/package/{uuid:package_uuid}/parallel-rule") def create_package_parallel_rule( request, package_uuid, r: Form[CreateParallelRule], ): try: p = PackageManager.get_package_by_id(request.user, package_uuid) srs = SimpleRule.objects.filter(package=p, url__in=r.simpleRules) if srs.count() != len(r.simpleRules): raise ValueError( f"Not all SimpleRules could be found in Package with id {package_uuid}!" ) sr = ParallelRule.create( p, list(srs), r.name, r.description, r.reactantFilterSmarts, r.productFilterSmarts ) return redirect(sr.url) except ValueError as e: return 400, {"message": str(e)} @router.post( "/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}", response={200: str | Any, 403: Error} ) def post_package_rule(request, package_uuid, rule_uuid, compound: Form[str] = None): return _post_package_rule(request, package_uuid, rule_uuid, compound=compound) @router.post( "/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}", response={200: str | Any, 403: Error}, ) def post_package_simple_rule(request, package_uuid, rule_uuid, compound: Form[str] = None): return _post_package_rule(request, package_uuid, rule_uuid, compound=compound) @router.post( "/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}", response={200: str | Any, 403: Error}, ) def post_package_parallel_rule(request, package_uuid, rule_uuid, compound: Form[str] = None): return _post_package_rule(request, package_uuid, rule_uuid, compound=compound) def _post_package_rule(request, package_uuid, rule_uuid, compound: Form[str]): try: p = PackageManager.get_package_by_id(request.user, package_uuid) r = Rule.objects.get(package=p, uuid=rule_uuid) if compound is not None: if not compound.split(): return 400, {"message": "Compound is empty"} product_sets = r.apply(compound) res = [] for p_set in product_sets: for product in p_set: res.append(product) return HttpResponse("\n".join(res), content_type="text/plain") return r except ValueError: return 403, { "message": f"Getting Rule with id {rule_uuid} failed due to insufficient rights!" } @router.delete("/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}") def delete_rule(request, package_uuid, rule_uuid): return _delete_rule(request, package_uuid, rule_uuid) @router.delete( "/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}", ) def delete_simple_rule(request, package_uuid, rule_uuid): return _delete_rule(request, package_uuid, rule_uuid) @router.delete( "/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}", ) def delete_parallel_rule(request, package_uuid, rule_uuid): return _delete_rule(request, package_uuid, rule_uuid) def _delete_rule(request, package_uuid, rule_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if PackageManager.writable(request.user, p): r = Rule.objects.get(package=p, uuid=rule_uuid) r.delete() return redirect(f"{p.url}/rule") else: raise ValueError("You do not have the rights to delete this Rule!") except ValueError: return 403, { "message": f"Deleting Rule with id {rule_uuid} failed due to insufficient rights!" } ############ # Reaction # ############ class ReactionWrapper(Schema): reaction: List["SimpleReaction"] class ReactionCompoundStructure(Schema): compoundName: str = Field(None, alias="name") id: str = Field(None, alias="url") smiles: str = Field(None, alias="smiles") class ReactionSchema(Schema): aliases: List[str] = Field([], alias="aliases") description: str = Field(None, alias="description") ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers") educts: List["ReactionCompoundStructure"] = Field([], alias="educts") id: str = Field(None, alias="url") identifier: str = "reaction" medlineRefs: List[str] = Field([], alias="medline_references") multistep: bool = Field(None, alias="multi_step") name: str = Field(None, alias="name") pathways: List["SimplePathway"] = Field([], alias="related_pathways") products: List["ReactionCompoundStructure"] = Field([], alias="products") references: List[Dict[str, List[str]]] = Field([], alias="references") reviewStatus: str = Field(None, alias="review_status") scenarios: List["SimpleScenario"] = Field([], alias="scenarios") smirks: str = Field("", alias="smirks") @staticmethod def resolve_smirks(obj: Reaction): return obj.smirks() @staticmethod def resolve_ec_numbers(obj: Reaction): # TODO fetch via scenario EnzymeAI return [] @staticmethod def resolve_references(obj: Reaction): # TODO return [] @staticmethod def resolve_medline_references(obj: Reaction): # TODO return [] @staticmethod def resolve_review_status(obj: Rule): return "reviewed" if obj.package.reviewed else "unreviewed" @router.get("/reaction", response={200: ReactionWrapper, 403: Error}) def get_reactions(request): return { "reaction": Reaction.objects.filter( package__in=PackageManager.get_reviewed_packages() ).prefetch_related("package") } @router.get("/package/{uuid:package_uuid}/reaction", response={200: ReactionWrapper, 403: Error}) def get_package_reactions(request, package_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return {"reaction": Reaction.objects.filter(package=p).prefetch_related("package")} except ValueError: return 403, { "message": f"Getting Reactions for Package with id {package_uuid} failed due to insufficient rights!" } @router.get( "/package/{uuid:package_uuid}/reaction/{uuid:reaction_uuid}", response={200: ReactionSchema, 403: Error}, ) def get_package_reaction(request, package_uuid, reaction_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return Reaction.objects.get(package=p, uuid=reaction_uuid) except ValueError: return 403, { "message": f"Getting Reaction with id {reaction_uuid} failed due to insufficient rights!" } class CreateReaction(Schema): reactionName: str | None = None reactionDescription: str | None = None smirks: str | None = None educt: str | None = None product: str | None = None rule: str | None = None @router.post("/package/{uuid:package_uuid}/reaction") def create_package_reaction( request, package_uuid, r: Form[CreateReaction], ): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if r.smirks is None and (r.educt is None or r.product is None): raise ValueError("Either SMIRKS or educt/product must be provided") if r.smirks is not None and (r.educt is not None and r.product is not None): raise ValueError("SMIRKS and educt/product provided!") rule = None if r.rule: try: rule = Rule.objects.get(package=p, url=r.rule) except Rule.DoesNotExist: raise ValueError(f"Rule with id {r.rule} does not exist!") if r.educt is not None: try: educt_cs = CompoundStructure.objects.get(compound__package=p, url=r.educt) except CompoundStructure.DoesNotExist: raise ValueError(f"Compound with id {r.educt} does not exist!") try: product_cs = CompoundStructure.objects.get(compound__package=p, url=r.product) except CompoundStructure.DoesNotExist: raise ValueError(f"Compound with id {r.product} does not exist!") new_r = Reaction.create( p, r.reactionName, r.reactionDescription, [educt_cs], [product_cs], rule ) else: educts = r.smirks.split(">>")[0].split("\\.") products = r.smirks.split(">>")[1].split("\\.") new_r = Reaction.create( p, r.reactionName, r.reactionDescription, educts, products, rule ) return redirect(new_r.url) except ValueError as e: return 400, {"message": str(e)} @router.delete("/package/{uuid:package_uuid}/reaction/{uuid:reaction_uuid}") def delete_reaction(request, package_uuid, reaction_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if PackageManager.writable(request.user, p): r = Reaction.objects.get(package=p, uuid=reaction_uuid) r.delete() return redirect(f"{p.url}/reaction") else: raise ValueError("You do not have the rights to delete this Reaction!") except ValueError: return 403, { "message": f"Deleting Reaction with id {reaction_uuid} failed due to insufficient rights!" } ############ # Scenario # ############ class ScenarioWrapper(Schema): scenario: List["SimpleScenario"] class ScenarioSchema(Schema): aliases: List[str] = Field([], alias="aliases") collection: Dict["str", List[Dict[str, Any]]] = Field([], alias="collection") collectionID: Optional[str] = None description: str = Field(None, alias="description") id: str = Field(None, alias="url") identifier: str = "scenario" linkedTo: List[Dict[str, str]] = Field([], alias="linked_to") name: str = Field(None, alias="name") pathways: List["SimplePathway"] = Field([], alias="related_pathways") relatedScenarios: List[Dict[str, str]] = Field([], alias="related_scenarios") reviewStatus: str = Field(None, alias="review_status") scenarios: List["SimpleScenario"] = Field([], alias="scenarios") type: str = Field(None, alias="scenario_type") @staticmethod def resolve_collection(obj: Scenario): return obj.additional_information @staticmethod def resolve_review_status(obj: Rule): return "reviewed" if obj.package.reviewed else "unreviewed" @router.get("/scenario", response={200: ScenarioWrapper, 403: Error}) def get_scenarios(request): return { "scenario": Scenario.objects.filter( package__in=PackageManager.get_reviewed_packages() ).prefetch_related("package") } @router.get("/package/{uuid:package_uuid}/scenario", response={200: ScenarioWrapper, 403: Error}) def get_package_scenarios(request, package_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return {"scenario": Scenario.objects.filter(package=p).prefetch_related("package")} except ValueError: return 403, { "message": f"Getting Scenarios for Package with id {package_uuid} failed due to insufficient rights!" } @router.get( "/package/{uuid:package_uuid}/scenario/{uuid:scenario_uuid}", response={200: ScenarioSchema, 403: Error}, ) def get_package_scenario(request, package_uuid, scenario_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return Scenario.objects.get(package=p, uuid=scenario_uuid) except ValueError: return 403, { "message": f"Getting Scenario with id {scenario_uuid} failed due to insufficient rights!" } @router.delete("/package/{uuid:package_uuid}/scenario") def delete_scenarios(request, package_uuid, scenario_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if PackageManager.writable(request.user, p): scens = Scenario.objects.filter(package=p) scens.delete() return redirect(f"{p.url}/scenario") else: raise ValueError("You do not have the rights to delete Scenarios!") except ValueError: return 403, {"message": "Deleting Scenarios failed due to insufficient rights!"} @router.delete("/package/{uuid:package_uuid}/scenario/{uuid:scenario_uuid}") def delete_scenario(request, package_uuid, scenario_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if PackageManager.writable(request.user, p): scen = Scenario.objects.get(package=p, uuid=scenario_uuid) scen.delete() return redirect(f"{p.url}/scenario") else: raise ValueError("You do not have the rights to delete this Scenario!") except ValueError: return 403, { "message": f"Deleting Scenario with id {scenario_uuid} failed due to insufficient rights!" } ########### # Pathway # ########### class PathwayWrapper(Schema): pathway: List["SimplePathway"] class PathwayEdge(Schema): ecNumbers: List[str] = Field([], alias="ec_numbers") id: str = Field(None, alias="url") idreaction: str = Field(None, alias="edge_label.url") multstep: bool = Field(None, alias="edge_label.multi_step") name: str = Field(None, alias="name") pseudo: bool = False rule: Optional[str] = Field(None, alias="rule") scenarios: List[SimpleScenario] = Field([], alias="scenarios") source: int = -1 target: int = -1 @staticmethod def resolve_rule(obj: Edge): if obj.edge_label.rules.all().exists(): r = obj.edge_label.rules.all()[0] if isinstance(r, SimpleAmbitRule): return r.smirks return None class PathwayNode(Schema): atomCount: int = Field(None, alias="atom_count") depth: int = Field(None, alias="depth") dt50s: List[Dict[str, str]] = Field([], alias="dt50s") engineeredIntermediate: bool = Field(None, alias="engineered_intermediate") id: str = Field(None, alias="url") idcomp: str = Field(None, alias="default_node_label.url") idreact: str = Field(None, alias="default_node_label.url") image: str = Field(None, alias="image") imageSize: int = Field(None, alias="image_size") name: str = Field(None, alias="name") proposed: List[Dict[str, str]] = Field([], alias="proposed_intermediate") smiles: str = Field(None, alias="default_node_label.smiles") @staticmethod def resolve_atom_count(obj: Node): from rdkit import Chem return Chem.MolFromSmiles(obj.default_node_label.smiles).GetNumAtoms() @staticmethod def resolve_dt50s(obj: Node): # TODO return [] @staticmethod def resolve_engineered_intermediate(obj: Node): # TODO return False @staticmethod def resolve_image(obj: Node): return f"{obj.default_node_label.url}?image=svg" @staticmethod def resolve_image_size(obj: Node): return 400 @staticmethod def resolve_proposed_intermediate(obj: Node): # TODO return [] class PathwaySchema(Schema): aliases: List[str] = Field([], alias="aliases") completed: str = Field(None, alias="completed") description: str = Field(None, alias="description") id: str = Field(None, alias="url") isIncremental: bool = Field(None, alias="is_incremental") isPredicted: bool = Field(None, alias="is_predicted") lastModified: int = Field(None, alias="last_modified") links: List[PathwayEdge] = Field([], alias="edges") name: str = Field(None, alias="name") nodes: List[PathwayNode] = Field([], alias="nodes") pathwayName: str = Field(None, alias="name") reviewStatus: str = Field(None, alias="review_status") scenarios: List["SimpleScenario"] = Field([], alias="scenarios") upToDate: bool = Field(None, alias="up_to_date") @staticmethod def resolve_review_status(obj: Pathway): return "reviewed" if obj.package.reviewed else "unreviewed" @staticmethod def resolve_completed(obj: Pathway): return str(obj.completed()).lower() @staticmethod def resolve_up_to_date(obj: Pathway): return "true" @staticmethod def resolve_last_modified(obj: Pathway): return int(obj.modified.timestamp()) @router.get("/pathway", response={200: PathwayWrapper, 403: Error}) def get_pathways(request): return { "pathway": Pathway.objects.filter( package__in=PackageManager.get_reviewed_packages() ).prefetch_related("package") } @router.get("/package/{uuid:package_uuid}/pathway", response={200: PathwayWrapper, 403: Error}) def get_package_pathways(request, package_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return {"pathway": Pathway.objects.filter(package=p).prefetch_related("package")} except ValueError: return 403, { "message": f"Getting Pathways for Package with id {package_uuid} failed due to insufficient rights!" } @router.get( "/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}", response={200: PathwaySchema, 403: Error}, ) def get_package_pathway(request, package_uuid, pathway_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return Pathway.objects.get(package=p, uuid=pathway_uuid) except ValueError: return 403, { "message": f"Getting Pathway with id {pathway_uuid} failed due to insufficient rights!" } class CreatePathway(Schema): smilesinput: str name: str | None = None description: str | None = None rootOnly: str | None = None selectedSetting: str | None = None @router.post("/package/{uuid:package_uuid}/pathway") def create_pathway( request, package_uuid, pw: Form[CreatePathway], ): try: p = PackageManager.get_package_by_id(request.user, package_uuid) stand_smiles = FormatConverter.standardize(pw.smilesinput.strip()) new_pw = Pathway.create(p, stand_smiles, name=pw.name, description=pw.description) pw_mode = "predict" if pw.rootOnly and pw.rootOnly.strip() == "true": pw_mode = "build" new_pw.kv.update({"mode": pw_mode}) new_pw.save() if pw_mode == "predict": setting = request.user.prediction_settings() if pw.selectedSetting: setting = SettingManager.get_setting_by_url(request.user, pw.selectedSetting) new_pw.setting = setting new_pw.save() from .tasks import dispatch, predict dispatch(request.user, predict, new_pw.pk, setting.pk, limit=-1) return redirect(new_pw.url) except ValueError as e: return 400, {"message": str(e)} @router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}") def delete_pathway(request, package_uuid, pathway_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if PackageManager.writable(request.user, p): pw = Pathway.objects.get(package=p, uuid=pathway_uuid) pw.delete() return redirect(f"{p.url}/pathway") else: raise ValueError("You do not have the rights to delete this pathway!") except ValueError: return 403, { "message": f"Deleting Pathway with id {pathway_uuid} failed due to insufficient rights!" } ######## # Node # ######## class NodeWrapper(Schema): node: List["SimpleNode"] class NodeCompoundStructure(Schema): id: str = Field(None, alias="url") image: str = Field(None, alias="image") smiles: str = Field(None, alias="smiles") name: str = Field(None, alias="name") @staticmethod def resolve_image(obj: CompoundStructure): return f"{obj.url}?image=svg" class NodeSchema(Schema): aliases: List[str] = Field([], alias="aliases") confidenceScenarios: List[SimpleScenario] = Field([], alias="confidence_scenarios") defaultStructure: NodeCompoundStructure = Field(None, alias="default_node_label") depth: int = Field(None, alias="depth") description: str = Field(None, alias="description") engineeredIntermediate: bool = Field(None, alias="engineered_intermediate") halflifes: Dict[str, str] = Field({}, alias="halflife") id: str = Field(None, alias="url") identifier: str = "node" image: str = Field(None, alias="image") name: str = Field(None, alias="name") proposedValues: List[Dict[str, str]] = Field([], alias="proposed_values") reviewStatus: str = Field(None, alias="review_status") scenarios: List["SimpleScenario"] = Field([], alias="scenarios") smiles: str = Field(None, alias="default_node_label.smiles") structures: List["CompoundStructureSchema"] = Field([], alias="node_labels") @staticmethod def resolve_engineered_intermediate(obj: Node): # TODO return False @staticmethod def resolve_image(obj: Node): return f"{obj.default_node_label.url}?image=svg" @staticmethod def resolve_proposed_values(obj: Node): # TODO # { # "scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/1f1a0b67-ce57-4f5a-929c-4fabf3a791fd", # "scenarioName": "Annex IIA 7.1.1 a), Brumhard (2003) - (00000)" # } return [] @staticmethod def resolve_review_status(obj: Node): return "reviewed" if obj.pathway.package.reviewed else "unreviewed" @router.get( "/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node", response={200: NodeWrapper, 403: Error}, ) def get_package_pathway_nodes(request, package_uuid, pathway_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return {"node": Pathway.objects.get(package=p, uuid=pathway_uuid).nodes} except ValueError: return 403, { "message": f"Getting Nodes for Pathway with id {pathway_uuid} failed due to insufficient rights!" } @router.get( "/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node/{uuid:node_uuid}", response={200: NodeSchema, 403: Error}, ) def get_package_pathway_node(request, package_uuid, pathway_uuid, node_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) pw = Pathway.objects.get(package=p, uuid=pathway_uuid) return Node.objects.get(pathway=pw, uuid=node_uuid) except ValueError: return 403, { "message": f"Getting Node with id {node_uuid} failed due to insufficient rights!" } class CreateNode(Schema): nodeAsSmiles: str nodeName: str | None = None nodeReason: str | None = None nodeDepth: str | None = None @router.post( "/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node", response={200: str | Any, 403: Error}, ) def add_pathway_node(request, package_uuid, pathway_uuid, n: Form[CreateNode]): try: p = PackageManager.get_package_by_id(request.user, package_uuid) pw = Pathway.objects.get(package=p, uuid=pathway_uuid) if n.nodeDepth is not None and n.nodeDepth.strip() != "": node_depth = int(n.nodeDepth) else: node_depth = -1 n = Node.create(pw, n.nodeAsSmiles, node_depth, n.nodeName, n.nodeReason) return redirect(n.url) except ValueError: return 403, {"message": "Adding node failed!"} @router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node/{uuid:node_uuid}") def delete_node(request, package_uuid, pathway_uuid, node_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if PackageManager.writable(request.user, p): pw = Pathway.objects.get(package=p, uuid=pathway_uuid) n = Node.objects.get(pathway=pw, uuid=node_uuid) n.delete() return redirect(f"{pw.url}/node") else: raise ValueError("You do not have the rights to delete this Node!") except ValueError: return 403, { "message": f"Deleting Node with id {node_uuid} failed due to insufficient rights!" } ######## # Edge # ######## class EdgeWrapper(Schema): edge: List["SimpleEdge"] class EdgeNode(SimpleNode): image: str = Field(None, alias="image") @staticmethod def resolve_image(obj: Node): return f"{obj.default_node_label.url}?image=svg" class EdgeSchema(Schema): aliases: List[str] = Field([], alias="aliases") description: str = Field(None, alias="description") ecNumbers: List[str] = Field([], alias="ec_numbers") endNodes: List["EdgeNode"] = Field([], alias="end_nodes") id: str = Field(None, alias="url") identifier: str = "edge" name: str = Field(None, alias="name") reactionName: str = Field(None, alias="edge_label.name") reactionURI: str = Field(None, alias="edge_label.url") reviewStatus: str = Field(None, alias="review_status") scenarios: List["SimpleScenario"] = Field([], alias="scenarios") startNodes: List["EdgeNode"] = Field([], alias="start_nodes") @staticmethod def resolve_review_status(obj: Node): return "reviewed" if obj.pathway.package.reviewed else "unreviewed" @router.get( "/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge", response={200: EdgeWrapper, 403: Error}, ) def get_package_pathway_edges(request, package_uuid, pathway_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return {"edge": Pathway.objects.get(package=p, uuid=pathway_uuid).edges} except ValueError: return 403, { "message": f"Getting Edges for Pathway with id {pathway_uuid} failed due to insufficient rights!" } @router.get( "/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge/{uuid:edge_uuid}", response={200: EdgeSchema, 403: Error}, ) def get_package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) pw = Pathway.objects.get(package=p, uuid=pathway_uuid) return Edge.objects.get(pathway=pw, uuid=edge_uuid) except ValueError: return 403, { "message": f"Getting Edge with id {edge_uuid} failed due to insufficient rights!" } class CreateEdge(Schema): edgeAsSmirks: str | None = None educts: str | None = None # Node URIs comma sep products: str | None = None # Node URIs comma sep multistep: str | None = None edgeReason: str | None = None @router.post( "/package/{uuid:package_uuid}/üathway/{uuid:pathway_uuid}/edge", response={200: str | Any, 403: Error}, ) def add_pathway_edge(request, package_uuid, pathway_uuid, e: Form[CreateEdge]): try: p = PackageManager.get_package_by_id(request.user, package_uuid) pw = Pathway.objects.get(package=p, uuid=pathway_uuid) if e.edgeAsSmirks is None and (e.educts is None or e.products is None): raise ValueError("Either SMIRKS or educt/product must be provided") if e.edgeAsSmirks is not None and (e.educts is not None and e.products is not None): raise ValueError("SMIRKS and educt/product provided!") educts = [] products = [] if e.edgeAsSmirks: for ed in e.edgeAsSmirks.split(">>")[0].split("\\."): educts.append(Node.objects.get(pathway=pw, default_node_label__smiles=ed)) for pr in e.edgeAsSmirks.split(">>")[1].split("\\."): products.append(Node.objects.get(pathway=pw, default_node_label__smiles=pr)) else: for ed in e.educts.split(","): educts.append(Node.objects.get(pathway=pw, url=ed.strip())) for pr in e.products.split(","): products.append(Node.objects.get(pathway=pw, url=pr.strip())) new_e = Edge.create( pathway=pw, start_nodes=educts, end_nodes=products, rule=None, name=e.name, description=e.edgeReason, ) return redirect(new_e.url) except ValueError: return 403, {"message": "Adding node failed!"} @router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge/{uuid:edge_uuid}") def delete_edge(request, package_uuid, pathway_uuid, edge_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if PackageManager.writable(request.user, p): pw = Pathway.objects.get(package=p, uuid=pathway_uuid) e = Edge.objects.get(pathway=pw, uuid=edge_uuid) e.delete() return redirect(f"{pw.url}/edge") else: raise ValueError("You do not have the rights to delete this Edge!") except ValueError: return 403, { "message": f"Deleting Edge with id {edge_uuid} failed due to insufficient rights!" } ######### # Model # ######### class ModelWrapper(Schema): relative_reasoning: List["SimpleModel"] = Field(..., alias="relative-reasoning") class ModelSchema(Schema): aliases: List[str] = Field([], alias="aliases") description: str = Field(None, alias="description") evalPackages: List["SimplePackage"] = Field([]) id: str = Field(None, alias="url") identifier: str = "relative-reasoning" # "info" : { # "Accuracy (Single-Gen)" : "0.5932962678936605" , # "Area under PR-Curve (Single-Gen)" : "0.5654653182134282" , # "Area under ROC-Curve (Single-Gen)" : "0.8178302405034772" , # "Precision (Single-Gen)" : "0.6978730822873083" , # "Probability Threshold" : "0.5" , # "Recall/Sensitivity (Single-Gen)" : "0.4484149210261006" # } , name: str = Field(None, alias="name") pathwayPackages: List["SimplePackage"] = Field([]) reviewStatus: str = Field(None, alias="review_status") rulePackages: List["SimplePackage"] = Field([]) scenarios: List["SimpleScenario"] = Field([], alias="scenarios") status: str statusMessage: str threshold: str type: str @router.get("/model", response={200: ModelWrapper, 403: Error}) def get_models(request): pass @router.get("/package/{uuid:package_uuid}/model", response={200: ModelWrapper, 403: Error}) def get_package_models(request, package_uuid, model_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) return EPModel.objects.filter(package=p) except ValueError: return 403, { "message": f"Getting Reaction with id {model_uuid} failed due to insufficient rights!" } class Classify(Schema): smiles: str | None = None @router.get( "/package/{uuid:package_uuid}/model/{uuid:model_uuid}", response={200: ModelSchema | Any, 403: Error, 400: Error}, ) def get_model(request, package_uuid, model_uuid, c: Query[Classify]): try: p = PackageManager.get_package_by_id(request.user, package_uuid) mod = EPModel.objects.get(package=p, uuid=model_uuid) if c.smiles: if c.smiles == "": return 400, {"message": "Received empty SMILES"} try: stand_smiles = FormatConverter.standardize(c.smiles) except ValueError: return 400, {"message": f'"{c.smiles}" is not a valid SMILES'} from epdb.tasks import dispatch_eager, predict_simple pred_res = dispatch_eager(request.user, predict_simple, mod.pk, stand_smiles) result = [] for pr in pred_res: if len(pr) > 0: products = [] for prod_set in pr.product_sets: products.append(tuple([x for x in prod_set])) res = { "probability": pr.probability, "products": list(set(products)), } if pr.rule: res["id"] = pr.rule.url res["identifier"] = pr.rule.get_rule_identifier() res["name"] = pr.rule.name res["reviewStatus"] = ( "reviewed" if pr.rule.package.reviewed else "unreviewed" ) result.append(res) return result return mod except ValueError: return 403, { "message": f"Getting Reaction with id {model_uuid} failed due to insufficient rights!" } @router.delete("/package/{uuid:package_uuid}/model/{uuid:model_uuid}") def delete_model(request, package_uuid, model_uuid): try: p = PackageManager.get_package_by_id(request.user, package_uuid) if PackageManager.writable(request.user, p): m = EPModel.objects.get(package=p, uuid=model_uuid) m.delete() return redirect(f"{p.url}/model") else: raise ValueError("You do not have the rights to delete this Model!") except ValueError: return 403, { "message": f"Deleting Model with id {model_uuid} failed due to insufficient rights!" } ########### # Setting # ########### class SettingWrapper(Schema): setting: List["SimpleSetting"] class SettingSchema(Schema): duplicationHash: int = -1 id: str = Field(None, alias="url") identifier: str = "setting" includedPackages: List["SimplePackage"] = Field([], alias="rule_packages") isPublic: bool = Field(None, alias="public") name: str = Field(None, alias="name") normalizationRules: List["SimpleRule"] = Field([], alias="normalization_rules") propertyPlugins: List[str] = Field([], alias="property_plugins") truncationstrategy: Optional[str] = Field(None, alias="truncation_strategy") @router.get("/setting", response={200: SettingWrapper, 403: Error}) def get_settings(request): return {"setting": SettingManager.get_all_settings(request.user)} @router.get("/setting/{uuid:setting_uuid}", response={200: SettingSchema, 403: Error}) def get_setting(request, setting_uuid): try: return SettingManager.get_setting_by_id(request.user, setting_uuid) except ValueError: return 403, { "message": f"Getting Setting with id {setting_uuid} failed due to insufficient rights!" }