[Feature] Legacy API (#224)

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#224
This commit is contained in:
2025-11-19 20:45:16 +13:00
parent 89c194dcca
commit 67b1baa5b0
6 changed files with 737 additions and 64 deletions

View File

@ -1,18 +1,21 @@
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import nh3
from django.conf import settings as s from django.conf import settings as s
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.http import HttpResponse from django.http import HttpResponse
from django.shortcuts import redirect from django.shortcuts import redirect
from ninja import Field, Form, Router, Schema from ninja import Field, Form, Router, Schema, Query
from utilities.chem import FormatConverter from utilities.chem import FormatConverter
from utilities.misc import PackageExporter
from .logic import PackageManager, SettingManager, UserManager from .logic import GroupManager, PackageManager, SettingManager, UserManager
from .models import ( from .models import (
Compound, Compound,
CompoundStructure, CompoundStructure,
Edge, Edge,
EPModel,
Node, Node,
Pathway, Pathway,
Reaction, Reaction,
@ -21,6 +24,7 @@ from .models import (
SimpleAmbitRule, SimpleAmbitRule,
User, User,
UserPackagePermission, UserPackagePermission,
ParallelRule,
) )
Package = s.GET_PACKAGE_MODEL() Package = s.GET_PACKAGE_MODEL()
@ -121,6 +125,10 @@ class SimpleEdge(SimpleObject):
identifier: str = "edge" identifier: str = "edge"
class SimpleModel(SimpleObject):
identifier: str = "relative-reasoning"
################ ################
# Login/Logout # # Login/Logout #
################ ################
@ -169,9 +177,13 @@ class UserSchema(Schema):
return SettingManager.get_all_settings(obj) return SettingManager.get_all_settings(obj)
class Me(Schema):
whoami: str | None = None
@router.get("/user", response={200: UserWrapper, 403: Error}) @router.get("/user", response={200: UserWrapper, 403: Error})
def get_users(request, whoami: str = None): def get_users(request, me: Query[Me]):
if whoami: if me.whoami:
return {"user": [request.user]} return {"user": [request.user]}
else: else:
return {"user": User.objects.all()} return {"user": User.objects.all()}
@ -253,67 +265,110 @@ def get_packages(request):
} }
@router.get("/package/{uuid:package_uuid}", response={200: PackageSchema, 403: Error}) class GetPackage(Schema):
def get_package(request, package_uuid): exportAsJson: str | None = None
@router.get("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 403: Error})
def get_package(request, package_uuid, gp: Query[GetPackage]):
try: try:
return PackageManager.get_package_by_id(request.user, package_uuid) p = PackageManager.get_package_by_id(request.user, package_uuid)
if gp.exportAsJson and gp.exportAsJson.strip() == "true":
return PackageExporter(p).do_export()
return p
except ValueError: except ValueError:
return 403, { return 403, {
"message": f"Getting Package with id {package_uuid} failed due to insufficient rights!" "message": f"Getting Package with id {package_uuid} failed due to insufficient rights!"
} }
class CreatePackage(Schema):
packageName: str
packageDescription: str | None = None
@router.post("/package") @router.post("/package")
def create_packages( def create_packages(
request, packageName: Form[str], packageDescription: Optional[str] = Form(None) request,
p: Form[CreatePackage],
): ):
try: try:
if packageName.strip() == "": if p.packageName.strip() == "":
raise ValueError("Package name cannot be empty!") raise ValueError("Package name cannot be empty!")
new_pacakge = PackageManager.create_package(request.user, packageName, packageDescription) new_pacakge = PackageManager.create_package(
request.user, p.packageName, p.packageDescription
)
return redirect(new_pacakge.url) return redirect(new_pacakge.url)
except ValueError as e: except ValueError as e:
return 400, {"message": str(e)} return 400, {"message": str(e)}
class UpdatePackage(Schema):
packageDescription: str | None = None
hiddenMethod: str | None = None
permissions: str | None = None
ppsURI: str | None = None
read: str | None = None
write: str | None = None
@router.post("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 400: Error}) @router.post("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 400: Error})
def update_package( def update_package(request, package_uuid, pack: Form[UpdatePackage]):
request,
package_uuid,
packageDescription: Optional[str] = Form(None),
hiddenMethod: Optional[str] = Form(None),
exportAsJson: Optional[str] = Form(None),
permissions: Optional[str] = Form(None),
ppsURI: Optional[str] = Form(None),
read: Optional[str] = Form(None),
write: Optional[str] = Form(None),
):
try: try:
p = PackageManager.get_package_by_id(request.user, package_uuid) p = PackageManager.get_package_by_id(request.user, package_uuid)
if hiddenMethod: if pack.hiddenMethod:
if hiddenMethod == "DELETE": if pack.hiddenMethod == "DELETE":
p.delete() p.delete()
elif packageDescription and packageDescription.strip() != "": elif pack.packageDescription is not None:
p.description = packageDescription description = nh3.clean(pack.packageDescription, tags=s.ALLOWED_HTML_TAGS).strip()
p.save()
return
elif exportAsJson == "true":
pack_json = PackageManager.export_package(
p, include_models=False, include_external_identifiers=False
)
return pack_json
elif all([permissions, ppsURI, read]):
PackageManager.update_permissions
elif all([permissions, ppsURI, write]):
pass
if description:
p.description = description
p.save()
return HttpResponse(status=200)
else:
raise ValueError("Package description cannot be empty!")
elif all([pack.permissions, pack.ppsURI, pack.read]):
if "group" in pack.ppsURI:
grantee = GroupManager.get_group_lp(pack.ppsURI)
else:
grantee = UserManager.get_user_lp(pack.ppsURI)
PackageManager.grant_read(request.user, p, grantee)
return HttpResponse(status=200)
elif all([pack.permissions, pack.ppsURI, pack.write]):
if "group" in pack.ppsURI:
grantee = GroupManager.get_group_lp(pack.ppsURI)
else:
grantee = UserManager.get_user_lp(pack.ppsURI)
PackageManager.grant_write(request.user, p, grantee)
return HttpResponse(status=200)
except ValueError as e: except ValueError as e:
return 400, {"message": str(e)} return 400, {"message": str(e)}
@router.delete("/package/{uuid:package_uuid}")
def delete_package(request, package_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.administrable(request.user, p):
p.delete()
return redirect(f"{s.SERVER_URL}/package")
else:
raise ValueError("You do not have the rights to delete this Package!")
except ValueError:
return 403, {
"message": f"Deleting Package with id {package_uuid} failed due to insufficient rights!"
}
################################ ################################
# Compound / CompoundStructure # # Compound / CompoundStructure #
################################ ################################
@ -511,6 +566,83 @@ def get_package_compound_structure(request, package_uuid, compound_uuid, structu
} }
class CreateCompound(Schema):
compoundSmiles: str
compoundName: str | None = None
compoundDescription: str | None = None
inchi: str | None = None
@router.post("/package/{uuid:package_uuid}/compound")
def create_package_compound(
request,
package_uuid,
c: Form[CreateCompound],
):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
# inchi is not used atm
c = Compound.create(
p, c.compoundSmiles, c.compoundName, c.compoundDescription, inchi=c.inchi
)
return redirect(c.url)
except ValueError as e:
return 400, {"message": str(e)}
@router.delete("/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}")
def delete_compound(request, package_uuid, compound_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
c = Compound.objects.get(package=p, uuid=compound_uuid)
c.delete()
return redirect(f"{p.url}/compound")
else:
raise ValueError("You do not have the rights to delete this Compound!")
except ValueError:
return 403, {
"message": f"Deleting Compound with id {compound_uuid} failed due to insufficient rights!"
}
@router.delete(
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure/{uuid:structure_uuid}"
)
def delete_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
c = Compound.objects.get(package=p, uuid=compound_uuid)
cs = CompoundStructure.objects.get(compound=c, uuid=structure_uuid)
# Check if we have to delete the compound as no structure is left
if len(cs.compound.structures.all()) == 1:
# This will delete the structure as well
c.delete()
return redirect(p.url + "/compound")
else:
if cs.normalized_structure:
c.delete()
return redirect(p.url + "/compound")
else:
if c.default_structure == cs:
cs.delete()
c.default_structure = c.structures.all().first()
return redirect(c.url + "/structure")
else:
cs.delete()
return redirect(c.url + "/structure")
else:
raise ValueError("You do not have the rights to delete this CompoundStructure!")
except ValueError:
return 403, {
"message": f"Deleting CompoundStructure with id {compound_uuid} failed due to insufficient rights!"
}
######### #########
# Rules # # Rules #
######### #########
@ -674,6 +806,73 @@ def _get_package_rule(request, package_uuid, rule_uuid):
# POST # POST
class CreateSimpleRule(Schema):
smirks: str
name: str | None = None
description: str | None = None
reactantFilterSmarts: str | None = None
productFilterSmarts: str | None = None
immediate: str | None = None
rdkitrule: str | None = None
@router.post("/package/{uuid:package_uuid}/simple-rule")
def create_package_simple_rule(
request,
package_uuid,
r: Form[CreateSimpleRule],
):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if r.rdkitrule and r.rdkitrule.strip() == "true":
raise ValueError("Not yet implemented!")
else:
sr = SimpleAmbitRule.create(
p, r.name, r.description, r.smirks, r.reactantFilterSmarts, r.productFilterSmarts
)
return redirect(sr.url)
except ValueError as e:
return 400, {"message": str(e)}
class CreateParallelRule(Schema):
simpleRules: str
name: str | None = None
description: str | None = None
reactantFilterSmarts: str | None = None
productFilterSmarts: str | None = None
immediate: str | None = None
@router.post("/package/{uuid:package_uuid}/parallel-rule")
def create_package_parallel_rule(
request,
package_uuid,
r: Form[CreateParallelRule],
):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
srs = SimpleRule.objects.filter(package=p, url__in=r.simpleRules)
if srs.count() != len(r.simpleRules):
raise ValueError(
f"Not all SimpleRules could be found in Package with id {package_uuid}!"
)
sr = ParallelRule.create(
p, list(srs), r.name, r.description, r.reactantFilterSmarts, r.productFilterSmarts
)
return redirect(sr.url)
except ValueError as e:
return 400, {"message": str(e)}
@router.post( @router.post(
"/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}", response={200: str | Any, 403: Error} "/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}", response={200: str | Any, 403: Error}
) )
@ -723,6 +922,41 @@ def _post_package_rule(request, package_uuid, rule_uuid, compound: Form[str]):
} }
@router.delete("/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}")
def delete_rule(request, package_uuid, rule_uuid):
return _delete_rule(request, package_uuid, rule_uuid)
@router.delete(
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
)
def delete_simple_rule(request, package_uuid, rule_uuid):
return _delete_rule(request, package_uuid, rule_uuid)
@router.delete(
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
)
def delete_parallel_rule(request, package_uuid, rule_uuid):
return _delete_rule(request, package_uuid, rule_uuid)
def _delete_rule(request, package_uuid, rule_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
r = Rule.objects.get(package=p, uuid=rule_uuid)
r.delete()
return redirect(f"{p.url}/rule")
else:
raise ValueError("You do not have the rights to delete this Rule!")
except ValueError:
return 403, {
"message": f"Deleting Rule with id {rule_uuid} failed due to insufficient rights!"
}
############ ############
# Reaction # # Reaction #
############ ############
@ -811,6 +1045,82 @@ def get_package_reaction(request, package_uuid, reaction_uuid):
} }
class CreateReaction(Schema):
reactionName: str | None = None
reactionDescription: str | None = None
smirks: str | None = None
educt: str | None = None
product: str | None = None
rule: str | None = None
@router.post("/package/{uuid:package_uuid}/reaction")
def create_package_reaction(
request,
package_uuid,
r: Form[CreateReaction],
):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if r.smirks is None and (r.educt is None or r.product is None):
raise ValueError("Either SMIRKS or educt/product must be provided")
if r.smirks is not None and (r.educt is not None and r.product is not None):
raise ValueError("SMIRKS and educt/product provided!")
rule = None
if r.rule:
try:
rule = Rule.objects.get(package=p, url=r.rule)
except Rule.DoesNotExist:
raise ValueError(f"Rule with id {r.rule} does not exist!")
if r.educt is not None:
try:
educt_cs = CompoundStructure.objects.get(compound__package=p, url=r.educt)
except CompoundStructure.DoesNotExist:
raise ValueError(f"Compound with id {r.educt} does not exist!")
try:
product_cs = CompoundStructure.objects.get(compound__package=p, url=r.product)
except CompoundStructure.DoesNotExist:
raise ValueError(f"Compound with id {r.product} does not exist!")
new_r = Reaction.create(
p, r.reactionName, r.reactionDescription, [educt_cs], [product_cs], rule
)
else:
educts = r.smirks.split(">>")[0].split("\\.")
products = r.smirks.split(">>")[1].split("\\.")
new_r = Reaction.create(
p, r.reactionName, r.reactionDescription, educts, products, rule
)
return redirect(new_r.url)
except ValueError as e:
return 400, {"message": str(e)}
@router.delete("/package/{uuid:package_uuid}/reaction/{uuid:reaction_uuid}")
def delete_reaction(request, package_uuid, reaction_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
r = Reaction.objects.get(package=p, uuid=reaction_uuid)
r.delete()
return redirect(f"{p.url}/reaction")
else:
raise ValueError("You do not have the rights to delete this Reaction!")
except ValueError:
return 403, {
"message": f"Deleting Reaction with id {reaction_uuid} failed due to insufficient rights!"
}
############ ############
# Scenario # # Scenario #
############ ############
@ -825,7 +1135,7 @@ class ScenarioSchema(Schema):
description: str = Field(None, alias="description") description: str = Field(None, alias="description")
id: str = Field(None, alias="url") id: str = Field(None, alias="url")
identifier: str = "scenario" identifier: str = "scenario"
linkedTo: List[Dict[str, str]] = Field({}, alias="linked_to") linkedTo: List[Dict[str, str]] = Field([], alias="linked_to")
name: str = Field(None, alias="name") name: str = Field(None, alias="name")
pathways: List["SimplePathway"] = Field([], alias="related_pathways") pathways: List["SimplePathway"] = Field([], alias="related_pathways")
relatedScenarios: List[Dict[str, str]] = Field([], alias="related_scenarios") relatedScenarios: List[Dict[str, str]] = Field([], alias="related_scenarios")
@ -876,6 +1186,38 @@ def get_package_scenario(request, package_uuid, scenario_uuid):
} }
@router.delete("/package/{uuid:package_uuid}/scenario")
def delete_scenarios(request, package_uuid, scenario_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
scens = Scenario.objects.filter(package=p)
scens.delete()
return redirect(f"{p.url}/scenario")
else:
raise ValueError("You do not have the rights to delete Scenarios!")
except ValueError:
return 403, {"message": "Deleting Scenarios failed due to insufficient rights!"}
@router.delete("/package/{uuid:package_uuid}/scenario/{uuid:scenario_uuid}")
def delete_scenario(request, package_uuid, scenario_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
scen = Scenario.objects.get(package=p, uuid=scenario_uuid)
scen.delete()
return redirect(f"{p.url}/scenario")
else:
raise ValueError("You do not have the rights to delete this Scenario!")
except ValueError:
return 403, {
"message": f"Deleting Scenario with id {scenario_uuid} failed due to insufficient rights!"
}
########### ###########
# Pathway # # Pathway #
########### ###########
@ -1015,46 +1357,67 @@ def get_package_pathway(request, package_uuid, pathway_uuid):
} }
class CreatePathway(Schema):
smilesinput: str
name: str | None = None
description: str | None = None
rootOnly: str | None = None
selectedSetting: str | None = None
@router.post("/package/{uuid:package_uuid}/pathway") @router.post("/package/{uuid:package_uuid}/pathway")
def create_pathway( def create_pathway(
request, request,
package_uuid, package_uuid,
smilesinput: Form[str], pw: Form[CreatePathway],
name: Optional[str] = Form(None),
description: Optional[str] = Form(None),
rootOnly: Optional[str] = Form(None),
selectedSetting: Optional[str] = Form(None),
): ):
try: try:
p = PackageManager.get_package_by_id(request.user, package_uuid) p = PackageManager.get_package_by_id(request.user, package_uuid)
stand_smiles = FormatConverter.standardize(smilesinput.strip()) stand_smiles = FormatConverter.standardize(pw.smilesinput.strip())
pw = Pathway.create(p, stand_smiles, name=name, description=description) new_pw = Pathway.create(p, stand_smiles, name=pw.name, description=pw.description)
pw_mode = "predict" pw_mode = "predict"
if rootOnly and rootOnly == "true": if pw.rootOnly and pw.rootOnly.strip() == "true":
pw_mode = "build" pw_mode = "build"
pw.kv.update({"mode": pw_mode}) new_pw.kv.update({"mode": pw_mode})
pw.save() new_pw.save()
if pw_mode == "predict": if pw_mode == "predict":
setting = request.user.prediction_settings() setting = request.user.prediction_settings()
if selectedSetting: if pw.selectedSetting:
setting = SettingManager.get_setting_by_url(request.user, selectedSetting) setting = SettingManager.get_setting_by_url(request.user, pw.selectedSetting)
pw.setting = setting new_pw.setting = setting
pw.save() new_pw.save()
from .tasks import predict from .tasks import dispatch, predict
predict.delay(pw.pk, setting.pk, limit=-1) dispatch(request.user, predict, new_pw.pk, setting.pk, limit=-1)
return redirect(pw.url) return redirect(new_pw.url)
except ValueError as e: except ValueError as e:
print(e) return 400, {"message": str(e)}
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}")
def delete_pathway(request, package_uuid, pathway_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
pw.delete()
return redirect(f"{p.url}/pathway")
else:
raise ValueError("You do not have the rights to delete this pathway!")
except ValueError:
return 403, {
"message": f"Deleting Pathway with id {pathway_uuid} failed due to insufficient rights!"
}
######## ########
@ -1145,6 +1508,52 @@ def get_package_pathway_node(request, package_uuid, pathway_uuid, node_uuid):
} }
class CreateNode(Schema):
nodeAsSmiles: str
nodeName: str | None = None
nodeReason: str | None = None
nodeDepth: str | None = None
@router.post(
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node",
response={200: str | Any, 403: Error},
)
def add_pathway_node(request, package_uuid, pathway_uuid, n: Form[CreateNode]):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
if n.nodeDepth is not None and n.nodeDepth.strip() != "":
node_depth = int(n.nodeDepth)
else:
node_depth = -1
n = Node.create(pw, n.nodeAsSmiles, node_depth, n.nodeName, n.nodeReason)
return redirect(n.url)
except ValueError:
return 403, {"message": "Adding node failed!"}
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node/{uuid:node_uuid}")
def delete_node(request, package_uuid, pathway_uuid, node_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
n = Node.objects.get(pathway=pw, uuid=node_uuid)
n.delete()
return redirect(f"{pw.url}/node")
else:
raise ValueError("You do not have the rights to delete this Node!")
except ValueError:
return 403, {
"message": f"Deleting Node with id {node_uuid} failed due to insufficient rights!"
}
######## ########
# Edge # # Edge #
######## ########
@ -1208,6 +1617,200 @@ def get_package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid):
} }
class CreateEdge(Schema):
edgeAsSmirks: str | None = None
educts: str | None = None # Node URIs comma sep
products: str | None = None # Node URIs comma sep
multistep: str | None = None
edgeReason: str | None = None
@router.post(
"/package/{uuid:package_uuid}/üathway/{uuid:pathway_uuid}/edge",
response={200: str | Any, 403: Error},
)
def add_pathway_edge(request, package_uuid, pathway_uuid, e: Form[CreateEdge]):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
if e.edgeAsSmirks is None and (e.educts is None or e.products is None):
raise ValueError("Either SMIRKS or educt/product must be provided")
if e.edgeAsSmirks is not None and (e.educts is not None and e.products is not None):
raise ValueError("SMIRKS and educt/product provided!")
educts = []
products = []
if e.edgeAsSmirks:
for ed in e.edgeAsSmirks.split(">>")[0].split("\\."):
educts.append(Node.objects.get(pathway=pw, default_node_label__smiles=ed))
for pr in e.edgeAsSmirks.split(">>")[1].split("\\."):
products.append(Node.objects.get(pathway=pw, default_node_label__smiles=pr))
else:
for ed in e.educts.split(","):
educts.append(Node.objects.get(pathway=pw, url=ed.strip()))
for pr in e.products.split(","):
products.append(Node.objects.get(pathway=pw, url=pr.strip()))
new_e = Edge.create(
pathway=pw,
start_nodes=educts,
end_nodes=products,
rule=None,
name=e.name,
description=e.edgeReason,
)
return redirect(new_e.url)
except ValueError:
return 403, {"message": "Adding node failed!"}
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge/{uuid:edge_uuid}")
def delete_edge(request, package_uuid, pathway_uuid, edge_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
e = Edge.objects.get(pathway=pw, uuid=edge_uuid)
e.delete()
return redirect(f"{pw.url}/edge")
else:
raise ValueError("You do not have the rights to delete this Edge!")
except ValueError:
return 403, {
"message": f"Deleting Edge with id {edge_uuid} failed due to insufficient rights!"
}
#########
# Model #
#########
class ModelWrapper(Schema):
relative_reasoning: List["SimpleModel"] = Field(..., alias="relative-reasoning")
class ModelSchema(Schema):
aliases: List[str] = Field([], alias="aliases")
description: str = Field(None, alias="description")
evalPackages: List["SimplePackage"] = Field([])
id: str = Field(None, alias="url")
identifier: str = "relative-reasoning"
# "info" : {
# "Accuracy (Single-Gen)" : "0.5932962678936605" ,
# "Area under PR-Curve (Single-Gen)" : "0.5654653182134282" ,
# "Area under ROC-Curve (Single-Gen)" : "0.8178302405034772" ,
# "Precision (Single-Gen)" : "0.6978730822873083" ,
# "Probability Threshold" : "0.5" ,
# "Recall/Sensitivity (Single-Gen)" : "0.4484149210261006"
# } ,
name: str = Field(None, alias="name")
pathwayPackages: List["SimplePackage"] = Field([])
reviewStatus: str = Field(None, alias="review_status")
rulePackages: List["SimplePackage"] = Field([])
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
status: str
statusMessage: str
threshold: str
type: str
@router.get("/model", response={200: ModelWrapper, 403: Error})
def get_models(request):
pass
@router.get("/package/{uuid:package_uuid}/model", response={200: ModelWrapper, 403: Error})
def get_package_models(request, package_uuid, model_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
return EPModel.objects.filter(package=p)
except ValueError:
return 403, {
"message": f"Getting Reaction with id {model_uuid} failed due to insufficient rights!"
}
class Classify(Schema):
smiles: str | None = None
@router.get(
"/package/{uuid:package_uuid}/model/{uuid:model_uuid}",
response={200: ModelSchema | Any, 403: Error, 400: Error},
)
def get_model(request, package_uuid, model_uuid, c: Query[Classify]):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
mod = EPModel.objects.get(package=p, uuid=model_uuid)
if c.smiles:
if c.smiles == "":
return 400, {"message": "Received empty SMILES"}
try:
stand_smiles = FormatConverter.standardize(c.smiles)
except ValueError:
return 400, {"message": f'"{c.smiles}" is not a valid SMILES'}
from epdb.tasks import dispatch_eager, predict_simple
pred_res = dispatch_eager(request.user, predict_simple, mod.pk, stand_smiles)
result = []
for pr in pred_res:
if len(pr) > 0:
products = []
for prod_set in pr.product_sets:
products.append(tuple([x for x in prod_set]))
res = {
"probability": pr.probability,
"products": list(set(products)),
}
if pr.rule:
res["id"] = pr.rule.url
res["identifier"] = pr.rule.get_rule_identifier()
res["name"] = pr.rule.name
res["reviewStatus"] = (
"reviewed" if pr.rule.package.reviewed else "unreviewed"
)
result.append(res)
return result
return mod
except ValueError:
return 403, {
"message": f"Getting Reaction with id {model_uuid} failed due to insufficient rights!"
}
@router.delete("/package/{uuid:package_uuid}/model/{uuid:model_uuid}")
def delete_model(request, package_uuid, model_uuid):
try:
p = PackageManager.get_package_by_id(request.user, package_uuid)
if PackageManager.writable(request.user, p):
m = EPModel.objects.get(package=p, uuid=model_uuid)
m.delete()
return redirect(f"{p.url}/model")
else:
raise ValueError("You do not have the rights to delete this Model!")
except ValueError:
return 403, {
"message": f"Deleting Model with id {model_uuid} failed due to insufficient rights!"
}
########### ###########
# Setting # # Setting #
########### ###########

View File

@ -579,6 +579,14 @@ class PackageManager(object):
else: else:
_ = perm_cls.objects.update_or_create(defaults={"permission": new_perm}, **data) _ = perm_cls.objects.update_or_create(defaults={"permission": new_perm}, **data)
@staticmethod
def grant_read(caller: User, package: Package, grantee: Union[User, Group]):
PackageManager.update_permissions(caller, package, grantee, Permission.READ[0])
@staticmethod
def grant_write(caller: User, package: Package, grantee: Union[User, Group]):
PackageManager.update_permissions(caller, package, grantee, Permission.WRITE[0])
@staticmethod @staticmethod
@transaction.atomic @transaction.atomic
def import_legacy_package( def import_legacy_package(
@ -595,7 +603,6 @@ class PackageManager(object):
CompoundStructure, CompoundStructure,
Edge, Edge,
Node, Node,
Package,
ParallelRule, ParallelRule,
Pathway, Pathway,
Reaction, Reaction,

View File

@ -1079,6 +1079,10 @@ class Rule(PolymorphicModel, EnviPathModel, AliasMixin, ScenarioMixin):
def apply(self, *args, **kwargs): def apply(self, *args, **kwargs):
pass pass
@abc.abstractmethod
def get_rule_identifier(self) -> str:
pass
@staticmethod @staticmethod
def cls_for_type(rule_type: str): def cls_for_type(rule_type: str):
if rule_type == "SimpleAmbitRule": if rule_type == "SimpleAmbitRule":
@ -1233,6 +1237,9 @@ class SimpleAmbitRule(SimpleRule):
def _url(self): def _url(self):
return "{}/simple-ambit-rule/{}".format(self.package.url, self.uuid) return "{}/simple-ambit-rule/{}".format(self.package.url, self.uuid)
def get_rule_identifier(self) -> str:
return "simple-rule"
def apply(self, smiles): def apply(self, smiles):
return FormatConverter.apply(smiles, self.smirks) return FormatConverter.apply(smiles, self.smirks)
@ -1278,6 +1285,9 @@ class ParallelRule(Rule):
def _url(self): def _url(self):
return "{}/parallel-rule/{}".format(self.package.url, self.uuid) return "{}/parallel-rule/{}".format(self.package.url, self.uuid)
def get_rule_identifier(self) -> str:
return "parallel-rule"
@cached_property @cached_property
def srs(self) -> QuerySet: def srs(self) -> QuerySet:
return self.simple_rules.all() return self.simple_rules.all()
@ -1309,6 +1319,57 @@ class ParallelRule(Rule):
return res return res
@staticmethod
@transaction.atomic
def create(
package: "Package",
simple_rules: List["SimpleRule"],
name: str = None,
description: str = None,
reactant_filter_smarts: str = None,
product_filter_smarts: str = None,
):
if len(simple_rules) == 0:
raise ValueError("At least one simple rule is required!")
for sr in simple_rules:
if sr.package != package:
raise ValueError(
f"Simple rule {sr.uuid} does not belong to package {package.uuid}!"
)
r = ParallelRule()
r.package = package
if name is not None:
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if name is None or name == "":
name = f"Rule {Rule.objects.filter(package=package).count() + 1}"
r.name = name
if description is not None and description.strip() != "":
r.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
if reactant_filter_smarts is not None and reactant_filter_smarts.strip() != "":
if not FormatConverter.is_valid_smarts(reactant_filter_smarts.strip()):
raise ValueError(f'Reactant Filter SMARTS "{reactant_filter_smarts}" is invalid!')
else:
r.reactant_filter_smarts = reactant_filter_smarts.strip()
if product_filter_smarts is not None and product_filter_smarts.strip() != "":
if not FormatConverter.is_valid_smarts(product_filter_smarts.strip()):
raise ValueError(f'Product Filter SMARTS "{product_filter_smarts}" is invalid!')
else:
r.product_filter_smarts = product_filter_smarts.strip()
r.save()
for sr in simple_rules:
r.simple_rules.add(sr)
return r
class SequentialRule(Rule): class SequentialRule(Rule):
simple_rules = models.ManyToManyField( simple_rules = models.ManyToManyField(
@ -1318,6 +1379,9 @@ class SequentialRule(Rule):
def _url(self): def _url(self):
return "{}/sequential-rule/{}".format(self.compound.url, self.uuid) return "{}/sequential-rule/{}".format(self.compound.url, self.uuid)
def get_rule_identifier(self) -> str:
return "sequential-rule"
@property @property
def srs(self): def srs(self):
return self.simple_rules.all() return self.simple_rules.all()

View File

@ -907,9 +907,10 @@ def package_model(request, package_uuid, model_uuid):
if classify: if classify:
from epdb.tasks import dispatch_eager, predict_simple from epdb.tasks import dispatch_eager, predict_simple
res = dispatch_eager(current_user, predict_simple, current_model.pk, stand_smiles) pred_res = dispatch_eager(
current_user, predict_simple, current_model.pk, stand_smiles
)
pred_res = current_model.predict(stand_smiles)
res = [] res = []
for pr in pred_res: for pr in pred_res:
@ -1069,9 +1070,7 @@ def package(request, package_uuid):
return redirect(s.SERVER_URL + "/package") return redirect(s.SERVER_URL + "/package")
elif hidden == "publish-package": elif hidden == "publish-package":
for g in Group.objects.filter(public=True): for g in Group.objects.filter(public=True):
PackageManager.update_permissions( PackageManager.grant_read(current_user, current_package, g)
current_user, current_package, g, Permission.READ[0]
)
return redirect(current_package.url) return redirect(current_package.url)
elif hidden == "copy": elif hidden == "copy":
object_to_copy = request.POST.get("object_to_copy") object_to_copy = request.POST.get("object_to_copy")

View File

@ -87,7 +87,7 @@
data-toggle="collapse" data-toggle="collapse"
data-parent="#model-detail" data-parent="#model-detail"
href="#reaction-package" href="#reaction-package"
>Rule Packages</a >Data Packages</a
> >
</h4> </h4>
</div> </div>
@ -112,7 +112,7 @@
data-toggle="collapse" data-toggle="collapse"
data-parent="#model-detail" data-parent="#model-detail"
href="#eval-package" href="#eval-package"
>Rule Packages</a >Evaluation Packages</a
> >
</h4> </h4>
</div> </div>

View File

@ -65,7 +65,7 @@ class PredictionResult(object):
return iter(self.product_sets) return iter(self.product_sets)
def __repr__(self): def __repr__(self):
return f"--{self.probability}/{self.rule}--> {self.product_sets}" return f"--{self.probability:.2f}/{self.rule}--> {self.product_sets}"
class FormatConverter(object): class FormatConverter(object):