forked from enviPath/enviPy
Initial bayer app Show Pack Classification Adjusted docker compose to bayer specifics Adjusted Dockerfile for Bayer Adding secret flags to group, add secret pools to packages Adjusted View for Package creation Prep configs, added Package Create Modal wip More on PES wip wip
2199 lines
71 KiB
Python
2199 lines
71 KiB
Python
import hashlib
|
|
from collections import defaultdict
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import jwt
|
|
import requests
|
|
|
|
import nh3
|
|
from django.conf import settings as s
|
|
from django.contrib.auth import get_user_model
|
|
from django.http import HttpResponse, JsonResponse
|
|
from django.shortcuts import redirect
|
|
from ninja import Field, Form, Query, Router, Schema
|
|
from ninja.errors import HttpError
|
|
from ninja.security import HttpBearer
|
|
from ninja.security import SessionAuth
|
|
|
|
from utilities.chem import FormatConverter
|
|
from utilities.misc import PackageExporter
|
|
from .logic import (
|
|
EPDBURLParser,
|
|
GroupManager,
|
|
PackageManager,
|
|
SearchManager,
|
|
SettingManager,
|
|
UserManager,
|
|
)
|
|
from .models import (
|
|
AdditionalInformation,
|
|
Compound,
|
|
CompoundStructure,
|
|
Edge,
|
|
EnviFormer,
|
|
EPModel,
|
|
Group,
|
|
GroupPackagePermission,
|
|
MLRelativeReasoning,
|
|
Node,
|
|
PackageBasedModel,
|
|
ParallelRule,
|
|
Pathway,
|
|
Reaction,
|
|
Rule,
|
|
RuleBasedRelativeReasoning,
|
|
Scenario,
|
|
SimpleAmbitRule,
|
|
User,
|
|
UserPackagePermission,
|
|
)
|
|
|
|
Package = s.GET_PACKAGE_MODEL()
|
|
|
|
|
|
def get_package_for_write(user, package_uuid):
|
|
p = PackageManager.get_package_by_id(user, package_uuid)
|
|
if not PackageManager.writable(user, p):
|
|
raise ValueError("You do not have the rights to write to this Package!")
|
|
return p
|
|
|
|
|
|
def _anonymous_or_real(request):
|
|
if request.user.is_authenticated and not request.user.is_anonymous:
|
|
return request.user
|
|
return get_user_model().objects.get(username="anonymous")
|
|
|
|
|
|
def validate_token(token: str) -> dict:
|
|
TENANT_ID = s.MS_ENTRA_TENANT_ID
|
|
CLIENT_ID = s.MS_ENTRA_CLIENT_ID
|
|
|
|
# Fetch Microsoft's public keys
|
|
jwks_uri = f"https://login.microsoftonline.com/{TENANT_ID}/discovery/v2.0/keys"
|
|
jwks = requests.get(jwks_uri).json()
|
|
|
|
header = jwt.get_unverified_header(token)
|
|
|
|
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(
|
|
next(k for k in jwks["keys"] if k["kid"] == header["kid"])
|
|
)
|
|
|
|
claims = jwt.decode(
|
|
token,
|
|
public_key,
|
|
algorithms=["RS256"],
|
|
audience=[CLIENT_ID, f"api://{CLIENT_ID}"],
|
|
issuer=f"https://sts.windows.net/{TENANT_ID}/",
|
|
)
|
|
return claims
|
|
|
|
|
|
class MSBearerTokenAuth(HttpBearer):
|
|
|
|
def authenticate(self, request, token):
|
|
if token is None:
|
|
return None
|
|
|
|
claims = validate_token(token)
|
|
|
|
if not User.objects.filter(uuid=claims['oid']).exists():
|
|
return None
|
|
|
|
request.user = User.objects.get(uuid=claims['oid'])
|
|
return request.user
|
|
|
|
|
|
router = Router(auth=MSBearerTokenAuth())
|
|
|
|
|
|
class Error(Schema):
|
|
message: str
|
|
|
|
|
|
#################
|
|
# SimpleObjects #
|
|
#################
|
|
class SimpleUser(Schema):
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "user"
|
|
name: str = Field(None, alias="username")
|
|
email: str = Field(None, alias="email")
|
|
|
|
|
|
class SimpleGroup(Schema):
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "group"
|
|
name: str = Field(None, alias="name")
|
|
|
|
|
|
class SimpleSetting(Schema):
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "setting"
|
|
name: str = Field(None, alias="name")
|
|
|
|
|
|
class SimpleObject(Schema):
|
|
id: str = Field(None, alias="url")
|
|
name: str = Field(None, alias="name")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj):
|
|
if isinstance(obj, Package):
|
|
return "reviewed" if obj.reviewed else "unreviewed"
|
|
elif hasattr(obj, "package"):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
elif isinstance(obj, CompoundStructure):
|
|
return "reviewed" if obj.compound.package.reviewed else "unreviewed"
|
|
elif isinstance(obj, Node) or isinstance(obj, Edge):
|
|
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
|
|
elif isinstance(obj, dict) and "review_status" in obj:
|
|
return "reviewed" if obj.get("review_status") else "unreviewed"
|
|
else:
|
|
raise ValueError("Object has no package")
|
|
|
|
|
|
class SimplePackage(SimpleObject):
|
|
identifier: str = "package"
|
|
|
|
|
|
class SimpleCompound(SimpleObject):
|
|
identifier: str = "compound"
|
|
|
|
|
|
class SimpleCompoundStructure(SimpleObject):
|
|
identifier: str = "structure"
|
|
|
|
|
|
class SimpleRule(SimpleObject):
|
|
identifier: str = "rule"
|
|
|
|
@staticmethod
|
|
def resolve_url(obj: Rule):
|
|
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
|
|
|
|
|
|
class SimpleReaction(SimpleObject):
|
|
identifier: str = "reaction"
|
|
|
|
|
|
class SimpleScenario(SimpleObject):
|
|
identifier: str = "scenario"
|
|
|
|
|
|
class SimplePathway(SimpleObject):
|
|
identifier: str = "pathway"
|
|
|
|
|
|
class SimpleNode(SimpleObject):
|
|
identifier: str = "node"
|
|
|
|
|
|
class SimpleEdge(SimpleObject):
|
|
identifier: str = "edge"
|
|
|
|
|
|
class SimpleModel(SimpleObject):
|
|
identifier: str = "relative-reasoning"
|
|
|
|
|
|
|
|
########
|
|
# User #
|
|
########
|
|
class UserWrapper(Schema):
|
|
user: List[SimpleUser]
|
|
|
|
|
|
class UserSchema(Schema):
|
|
affiliation: Dict[str, str] = Field(None, alias="affiliation")
|
|
defaultGroup: "SimpleGroup" = Field(None, alias="default_group")
|
|
defaultPackage: "SimplePackage" = Field(None, alias="default_package")
|
|
defaultSetting: "SimpleSetting" = Field(None, alias="default_setting")
|
|
email: str = Field(None, alias="email")
|
|
forename: str = "not specified"
|
|
groups: List["SimpleGroup"] = Field([], alias="groups")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "user"
|
|
link: str = "empty"
|
|
name: str = Field(None, alias="username")
|
|
surname: str = "not specified"
|
|
settings: List["SimpleSetting"] = Field([], alias="settings")
|
|
|
|
@staticmethod
|
|
def resolve_affiliation(obj: User):
|
|
# TODO
|
|
return {"city": "not specified", "country": "not specified", "workplace": "not specified"}
|
|
|
|
@staticmethod
|
|
def resolve_settings(obj: User):
|
|
return SettingManager.get_all_settings(obj)
|
|
|
|
|
|
class Me(Schema):
|
|
whoami: str | None = None
|
|
|
|
|
|
@router.get("/user", response={200: UserWrapper, 403: Error})
|
|
def get_users(request, me: Query[Me]):
|
|
if me.whoami:
|
|
return {"user": [request.user]}
|
|
else:
|
|
return {"user": User.objects.all()}
|
|
|
|
|
|
@router.get("/user/{uuid:user_uuid}", response={200: UserSchema, 403: Error})
|
|
def get_user(request, user_uuid):
|
|
try:
|
|
u = UserManager.get_user_by_id(request.user, user_uuid)
|
|
return u
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting User with id {user_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
########
|
|
# Group #
|
|
########
|
|
|
|
|
|
class GroupMember(Schema):
|
|
id: str
|
|
identifier: str
|
|
name: str
|
|
|
|
|
|
class GroupWrapper(Schema):
|
|
group: List[SimpleGroup]
|
|
|
|
|
|
class GroupSchema(Schema):
|
|
description: str
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "group"
|
|
members: List[GroupMember] = Field([], alias="members")
|
|
name: str = Field(None, alias="name")
|
|
ownerid: str = Field(None, alias="owner.url")
|
|
ownername: str = Field(None, alias="owner.get_name")
|
|
packages: List["SimplePackage"] = Field([], alias="packages")
|
|
readers: List[GroupMember] = Field([], alias="readers")
|
|
writers: List[GroupMember] = Field([], alias="writers")
|
|
|
|
@staticmethod
|
|
def resolve_members(obj: Group):
|
|
res = []
|
|
for member in obj.user_member.all():
|
|
res.append(GroupMember(id=member.url, identifier="usermember", name=member.get_name()))
|
|
|
|
for member in obj.group_member.all():
|
|
res.append(GroupMember(id=member.url, identifier="groupmember", name=member.get_name()))
|
|
|
|
return res
|
|
|
|
@staticmethod
|
|
def resolve_packages(obj: Group):
|
|
return Package.objects.filter(
|
|
id__in=[
|
|
GroupPackagePermission.objects.filter(group=obj).values_list(
|
|
"package_id", flat=True
|
|
)
|
|
]
|
|
)
|
|
|
|
@staticmethod
|
|
def resolve_readers(obj: Group):
|
|
return GroupSchema.resolve_members(obj)
|
|
|
|
@staticmethod
|
|
def resolve_writers(obj: Group):
|
|
return [GroupMember(id=obj.owner.url, identifier="usermember", name=obj.owner.username)]
|
|
|
|
|
|
@router.get("/group", response={200: GroupWrapper, 403: Error})
|
|
def get_groups(request):
|
|
return {"group": GroupManager.get_groups(request.user)}
|
|
|
|
|
|
@router.get("/group/{uuid:group_uuid}", response={200: GroupSchema, 403: Error})
|
|
def get_group(request, group_uuid):
|
|
try:
|
|
g = GroupManager.get_group_by_id(request.user, group_uuid)
|
|
return g
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Group with id {group_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
##########
|
|
# Search #
|
|
##########
|
|
class Search(Schema):
|
|
packages: List[str] = Field(alias="packages[]")
|
|
search: str
|
|
method: str
|
|
|
|
|
|
@router.get("/search", response={200: Any, 403: Error})
|
|
def search(request, search: Query[Search]):
|
|
try:
|
|
packs = []
|
|
for package in search.packages:
|
|
packs.append(PackageManager.get_package_by_url(request.user, package))
|
|
|
|
method = None
|
|
|
|
if search.method == "text":
|
|
method = "text"
|
|
elif search.method == "inchikey":
|
|
method = "inchikey"
|
|
elif search.method == "defaultSmiles":
|
|
method = "default"
|
|
elif search.method == "canonicalSmiles":
|
|
method = "canonical"
|
|
elif search.method == "exactSmiles":
|
|
method = "exact"
|
|
|
|
if method is None:
|
|
raise ValueError(f"Search method {search.method} is not supported!")
|
|
|
|
search_res = SearchManager.search(packs, search.search, method)
|
|
res = {}
|
|
if "Compounds" in search_res:
|
|
res["compound"] = search_res["Compounds"]
|
|
|
|
if "Compound Structures" in search_res:
|
|
res["structure"] = search_res["Compound Structures"]
|
|
|
|
if "Reactions" in search_res:
|
|
res["reaction"] = search_res["Reactions"]
|
|
|
|
if "Pathways" in search_res:
|
|
res["pathway"] = search_res["Pathways"]
|
|
|
|
if "Rules" in search_res:
|
|
res["rule"] = search_res["Rules"]
|
|
|
|
for key in res:
|
|
for v in res[key]:
|
|
v["id"] = v["url"].replace("simple-ambit-rule", "simple-rule")
|
|
|
|
return res
|
|
except ValueError as e:
|
|
return 403, {"message": f"Search failed due to {e}"}
|
|
|
|
|
|
###########
|
|
# Package #
|
|
###########
|
|
class PackageWrapper(Schema):
|
|
package: List["PackageSchema"]
|
|
|
|
|
|
class PackageSchema(Schema):
|
|
description: str = Field(None, alias="description")
|
|
id: str = Field(None, alias="url")
|
|
links: List[Dict[str, List[str | int]]] = Field([], alias="links")
|
|
name: str = Field(None, alias="name")
|
|
primaryGroup: Optional[SimpleGroup] = None
|
|
readers: List[Dict[str, str]] = Field([], alias="readers")
|
|
reviewComment: str = Field(None, alias="review_comment")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
writers: List[Dict[str, str]] = Field([], alias="writers")
|
|
|
|
@staticmethod
|
|
def resolve_links(obj: Package):
|
|
return [
|
|
{"Pathways": [f"{obj.url}/pathway", obj.pathways.count()]},
|
|
{"Rules": [f"{obj.url}/rule", obj.rules.count()]},
|
|
{"Compounds": [f"{obj.url}/compound", obj.compounds.count()]},
|
|
{"Reactions": [f"{obj.url}/reaction", obj.reactions.count()]},
|
|
{"Relative Reasoning": [f"{obj.url}/relative-reasoning", obj.models.count()]},
|
|
{"Scenarios": [f"{obj.url}/scenario", obj.scenarios.count()]},
|
|
]
|
|
|
|
@staticmethod
|
|
def resolve_readers(obj: Package):
|
|
users = User.objects.filter(
|
|
id__in=UserPackagePermission.objects.filter(
|
|
package=obj, permission=UserPackagePermission.READ[0]
|
|
).values_list("user", flat=True)
|
|
).distinct()
|
|
|
|
return [{u.id: u.get_name()} for u in users]
|
|
|
|
@staticmethod
|
|
def resolve_writers(obj: Package):
|
|
users = User.objects.filter(
|
|
id__in=UserPackagePermission.objects.filter(
|
|
package=obj, permission=UserPackagePermission.WRITE[0]
|
|
).values_list("user", flat=True)
|
|
).distinct()
|
|
|
|
return [{u.id: u.get_name()} for u in users]
|
|
|
|
@staticmethod
|
|
def resolve_review_comment(obj):
|
|
return ""
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj):
|
|
return "reviewed" if obj.reviewed else "unreviewed"
|
|
|
|
|
|
@router.get("/package", response={200: PackageWrapper, 403: Error})
|
|
def get_packages(request):
|
|
return {
|
|
"package": PackageManager.get_all_readable_packages(request.user, include_reviewed=True)
|
|
}
|
|
|
|
|
|
class GetPackage(Schema):
|
|
exportAsJson: str | None = None
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 403: Error})
|
|
def get_package(request, package_uuid, gp: Query[GetPackage]):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if gp.exportAsJson and gp.exportAsJson.strip() == "true":
|
|
return PackageExporter(p).do_export()
|
|
|
|
return p
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreatePackage(Schema):
|
|
packageName: str
|
|
packageDescription: str | None = None
|
|
|
|
|
|
@router.post("/package")
|
|
def create_packages(
|
|
request,
|
|
p: Form[CreatePackage],
|
|
):
|
|
try:
|
|
if p.packageName.strip() == "":
|
|
raise ValueError("Package name cannot be empty!")
|
|
|
|
new_pacakge = PackageManager.create_package(
|
|
request.user, p.packageName, p.packageDescription
|
|
)
|
|
return redirect(new_pacakge.url)
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
class UpdatePackage(Schema):
|
|
packageDescription: str | None = None
|
|
hiddenMethod: str | None = None
|
|
permissions: str | None = None
|
|
ppsURI: str | None = None
|
|
read: str | None = None
|
|
write: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}", response={200: PackageSchema | Any, 400: Error})
|
|
def update_package(request, package_uuid, pack: Form[UpdatePackage]):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
if pack.hiddenMethod:
|
|
if pack.hiddenMethod == "DELETE":
|
|
p.delete()
|
|
|
|
elif pack.packageDescription is not None:
|
|
description = nh3.clean(pack.packageDescription, tags=s.ALLOWED_HTML_TAGS).strip()
|
|
|
|
if description:
|
|
p.description = description
|
|
p.save()
|
|
return HttpResponse(status=200)
|
|
else:
|
|
raise ValueError("Package description cannot be empty!")
|
|
elif all([pack.permissions, pack.ppsURI, pack.read]):
|
|
if "group" in pack.ppsURI:
|
|
grantee = GroupManager.get_group_lp(pack.ppsURI)
|
|
else:
|
|
grantee = UserManager.get_user_lp(pack.ppsURI)
|
|
|
|
PackageManager.grant_read(request.user, p, grantee)
|
|
return HttpResponse(status=200)
|
|
elif all([pack.permissions, pack.ppsURI, pack.write]):
|
|
if "group" in pack.ppsURI:
|
|
grantee = GroupManager.get_group_lp(pack.ppsURI)
|
|
else:
|
|
grantee = UserManager.get_user_lp(pack.ppsURI)
|
|
|
|
PackageManager.grant_write(request.user, p, grantee)
|
|
return HttpResponse(status=200)
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}")
|
|
def delete_package(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
|
|
if PackageManager.administrable(request.user, p):
|
|
p.delete()
|
|
return redirect(f"{s.SERVER_URL}/package")
|
|
else:
|
|
raise ValueError("You do not have the rights to delete this Package!")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
################################
|
|
# Compound / CompoundStructure #
|
|
################################
|
|
class CompoundWrapper(Schema):
|
|
compound: List["SimpleCompound"]
|
|
|
|
|
|
class CompoundPathwayScenario(Schema):
|
|
scenarioId: str
|
|
scenarioName: str
|
|
scenarioType: str
|
|
|
|
|
|
class CompoundSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
externalReferences: Dict[str, List[str]] = Field(None, alias="external_references")
|
|
id: str = Field(None, alias="url")
|
|
halflifes: List[Dict[str, str]] = Field([], alias="halflifes")
|
|
identifier: str = "compound"
|
|
imageSize: int = 600
|
|
name: str = Field(None, alias="name")
|
|
pathwayScenarios: List[CompoundPathwayScenario] = Field([], alias="pathway_scenarios")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
pubchemCompoundReferences: List[str] = Field([], alias="pubchem_compound_references")
|
|
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
|
|
reviewStatus: str = Field(False, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
structures: List["CompoundStructureSchema"] = []
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: CompoundStructure):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
@staticmethod
|
|
def resolve_external_references(obj: Compound):
|
|
# TODO
|
|
return {}
|
|
|
|
@staticmethod
|
|
def resolve_structures(obj: Compound):
|
|
return CompoundStructure.objects.filter(compound=obj)
|
|
|
|
@staticmethod
|
|
def resolve_halflifes(obj: Compound):
|
|
res = []
|
|
for scen, hls in obj.half_lifes().items():
|
|
for hl in hls:
|
|
res.append(
|
|
{
|
|
"hl": str(hl.dt50),
|
|
"hlComment": hl.comment,
|
|
"hlFit": hl.fit,
|
|
"hlModel": hl.model,
|
|
"scenarioId": scen.url,
|
|
"scenarioName": scen.name,
|
|
"scenarioType": scen.scenario_type,
|
|
"source": hl.source,
|
|
}
|
|
)
|
|
return res
|
|
|
|
@staticmethod
|
|
def resolve_pubchem_compound_references(obj: Compound):
|
|
# TODO
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_pathway_scenarios(obj: Compound):
|
|
res = []
|
|
for pw in obj.related_pathways:
|
|
for scen in pw.scenarios.all():
|
|
res.append(
|
|
{
|
|
"scenarioId": scen.url,
|
|
"scenarioName": scen.name,
|
|
"scenarioType": scen.scenario_type,
|
|
}
|
|
)
|
|
|
|
return res
|
|
|
|
|
|
class CompoundStructureSchema(Schema):
|
|
InChI: str = Field(None, alias="inchi")
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
canonicalSmiles: str = Field(None, alias="canonical_smiles")
|
|
charge: int = Field(None, alias="charge")
|
|
description: str = Field(None, alias="description")
|
|
externalReferences: Dict[str, List[str]] = Field(None, alias="external_references")
|
|
formula: str = Field(None, alias="formula")
|
|
halflifes: List[Dict[str, str]] = Field([], alias="halflifes")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "structure"
|
|
imageSize: int = 600
|
|
inchikey: str = Field(None, alias="inchikey")
|
|
isDefaultStructure: bool = Field(None, alias="is_default_structure")
|
|
mass: float = Field(None, alias="mass")
|
|
name: str = Field(None, alias="name")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
pubchemCompoundReferences: List[str] = Field([], alias="pubchem_compound_references")
|
|
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
smiles: str = Field(None, alias="smiles")
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: CompoundStructure):
|
|
return "reviewed" if obj.compound.package.reviewed else "unreviewed"
|
|
|
|
@staticmethod
|
|
def resolve_inchi(obj: CompoundStructure):
|
|
return FormatConverter.InChI(obj.smiles)
|
|
|
|
@staticmethod
|
|
def resolve_charge(obj: CompoundStructure):
|
|
return FormatConverter.charge(obj.smiles)
|
|
|
|
@staticmethod
|
|
def resolve_formula(obj: CompoundStructure):
|
|
return FormatConverter.formula(obj.smiles)
|
|
|
|
@staticmethod
|
|
def resolve_mass(obj: CompoundStructure):
|
|
return FormatConverter.mass(obj.smiles)
|
|
|
|
@staticmethod
|
|
def resolve_external_references(obj: CompoundStructure):
|
|
# TODO
|
|
return {}
|
|
|
|
@staticmethod
|
|
def resolve_halflifes(obj: CompoundStructure):
|
|
res = []
|
|
for scen, hls in obj.half_lifes().items():
|
|
for hl in hls:
|
|
res.append(
|
|
{
|
|
"hl": str(hl.dt50),
|
|
"hlComment": hl.comment,
|
|
"hlFit": hl.fit,
|
|
"hlModel": hl.model,
|
|
"scenarioId": scen.url,
|
|
"scenarioName": scen.name,
|
|
"scenarioType": scen.scenario_type,
|
|
"source": hl.source,
|
|
}
|
|
)
|
|
return res
|
|
|
|
@staticmethod
|
|
def resolve_pubchem_compound_references(obj: CompoundStructure):
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_pathway_scenarios(obj: CompoundStructure):
|
|
res = []
|
|
for pw in obj.related_pathways:
|
|
for scen in pw.scenarios.all():
|
|
res.append(
|
|
{
|
|
"scenarioId": scen.url,
|
|
"scenarioName": scen.name,
|
|
"scenarioType": scen.scenario_type,
|
|
}
|
|
)
|
|
|
|
return res
|
|
|
|
|
|
class CompoundStructureWrapper(Schema):
|
|
structure: List["SimpleCompoundStructure"]
|
|
|
|
|
|
@router.get("/compound", response={200: CompoundWrapper, 403: Error})
|
|
def get_compounds(request):
|
|
return {
|
|
"compound": Compound.objects.filter(
|
|
package__in=PackageManager.get_reviewed_packages()
|
|
).prefetch_related("package")
|
|
}
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/compound", response={200: CompoundWrapper, 403: Error})
|
|
def get_package_compounds(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"compound": Compound.objects.filter(package=p).prefetch_related("package")}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Compounds for Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}",
|
|
response={200: CompoundSchema, 403: Error},
|
|
)
|
|
def get_package_compound(request, package_uuid, compound_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return Compound.objects.get(package=p, uuid=compound_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Compound with id {compound_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure",
|
|
response={200: CompoundStructureWrapper, 403: Error},
|
|
)
|
|
def get_package_compound_structures(request, package_uuid, compound_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"structure": Compound.objects.get(package=p, uuid=compound_uuid).structures.all()}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting CompoundStructures for Compound with id {compound_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure/{uuid:structure_uuid}",
|
|
response={200: CompoundStructureSchema, 403: Error},
|
|
)
|
|
def get_package_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return CompoundStructure.objects.get(
|
|
uuid=structure_uuid, compound=Compound.objects.get(package=p, uuid=compound_uuid)
|
|
)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting CompoundStructure with id {structure_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreateCompound(Schema):
|
|
compoundSmiles: str
|
|
compoundName: str | None = None
|
|
compoundDescription: str | None = None
|
|
inchi: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}/compound")
|
|
def create_package_compound(
|
|
request,
|
|
package_uuid,
|
|
c: Form[CreateCompound],
|
|
):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
# inchi is not used atm
|
|
c = Compound.create(
|
|
p, c.compoundSmiles, c.compoundName, c.compoundDescription, inchi=c.inchi
|
|
)
|
|
return redirect(c.url)
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}")
|
|
def delete_compound(request, package_uuid, compound_uuid):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
c = Compound.objects.get(package=p, uuid=compound_uuid)
|
|
c.delete()
|
|
return redirect(f"{p.url}/compound")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Compound with id {compound_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.delete(
|
|
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure/{uuid:structure_uuid}"
|
|
)
|
|
def delete_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
c = Compound.objects.get(package=p, uuid=compound_uuid)
|
|
cs = CompoundStructure.objects.get(compound=c, uuid=structure_uuid)
|
|
|
|
# Check if we have to delete the compound as no structure is left
|
|
if len(cs.compound.structures.all()) == 1:
|
|
# This will delete the structure as well
|
|
c.delete()
|
|
return redirect(p.url + "/compound")
|
|
else:
|
|
if cs.normalized_structure:
|
|
c.delete()
|
|
return redirect(p.url + "/compound")
|
|
else:
|
|
if c.default_structure == cs:
|
|
cs.delete()
|
|
c.default_structure = c.structures.all().first()
|
|
return redirect(c.url + "/structure")
|
|
else:
|
|
cs.delete()
|
|
return redirect(c.url + "/structure")
|
|
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting CompoundStructure with id {compound_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
#########
|
|
# Rules #
|
|
#########
|
|
class RuleWrapper(Schema):
|
|
rule: List["SimpleRule"]
|
|
|
|
|
|
class SimpleRuleSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
|
|
engine: str = "ambit"
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = Field(None, alias="identifier")
|
|
isCompositeRule: bool = False
|
|
name: str = Field(None, alias="name")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
productFilterSmarts: str = Field("", alias="product_filter_smarts")
|
|
productSmarts: str = Field(None, alias="products_smarts")
|
|
reactantFilterSmarts: str = Field("", alias="reactant_filter_smarts")
|
|
reactantSmarts: str = Field(None, alias="reactants_smarts")
|
|
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
smirks: str = Field("", alias="smirks")
|
|
# TODO
|
|
transformations: str = Field("", alias="transformations")
|
|
|
|
@staticmethod
|
|
def resolve_url(obj: Rule):
|
|
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
|
|
|
|
@staticmethod
|
|
def resolve_identifier(obj: Rule):
|
|
if "simple-rule" in obj.url:
|
|
return "simple-rule"
|
|
if "simple-ambit-rule" in obj.url:
|
|
return "simple-rule"
|
|
elif "parallel-rule" in obj.url:
|
|
return "parallel-rule"
|
|
elif "sequential-rule" in obj.url:
|
|
return "sequential-rule"
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Rule):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
@staticmethod
|
|
def resolve_product_filter_smarts(obj: Rule):
|
|
return obj.product_filter_smarts if obj.product_filter_smarts else ""
|
|
|
|
@staticmethod
|
|
def resolve_reactant_filter_smarts(obj: Rule):
|
|
return obj.reactant_filter_smarts if obj.reactant_filter_smarts else ""
|
|
|
|
|
|
class CompositeRuleSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = Field(None, alias="identifier")
|
|
isCompositeRule: bool = True
|
|
name: str = Field(None, alias="name")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
productFilterSmarts: str = Field("", alias="product_filter_smarts")
|
|
reactantFilterSmarts: str = Field("", alias="reactant_filter_smarts")
|
|
reactions: List["SimpleReaction"] = Field([], alias="related_reactions")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
simpleRules: List["SimpleRule"] = Field([], alias="simple_rules")
|
|
|
|
@staticmethod
|
|
def resolve_ec_numbers(obj: Rule):
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_url(obj: Rule):
|
|
return obj.url.replace("-ambit-", "-").replace("-rdkit-", "-")
|
|
|
|
@staticmethod
|
|
def resolve_identifier(obj: Rule):
|
|
if "simple-rule" in obj.url:
|
|
return "simple-rule"
|
|
if "simple-ambit-rule" in obj.url:
|
|
return "simple-rule"
|
|
elif "parallel-rule" in obj.url:
|
|
return "parallel-rule"
|
|
elif "sequential-rule" in obj.url:
|
|
return "sequential-rule"
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Rule):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
@staticmethod
|
|
def resolve_product_filter_smarts(obj: Rule):
|
|
return obj.product_filter_smarts if obj.product_filter_smarts else ""
|
|
|
|
@staticmethod
|
|
def resolve_reactant_filter_smarts(obj: Rule):
|
|
return obj.reactant_filter_smarts if obj.reactant_filter_smarts else ""
|
|
|
|
|
|
@router.get("/rule", response={200: RuleWrapper, 403: Error})
|
|
def get_rules(request):
|
|
return {
|
|
"rule": Rule.objects.filter(
|
|
package__in=PackageManager.get_reviewed_packages()
|
|
).prefetch_related("package")
|
|
}
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/rule", response={200: RuleWrapper, 403: Error})
|
|
def get_package_rules(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"rule": Rule.objects.filter(package=p).prefetch_related("package")}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Rules for Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}",
|
|
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
|
|
)
|
|
def get_package_rule(request, package_uuid, rule_uuid):
|
|
return _get_package_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
|
|
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
|
|
)
|
|
def get_package_simple_rule(request, package_uuid, rule_uuid):
|
|
return _get_package_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
|
|
response={200: SimpleRuleSchema | CompositeRuleSchema, 403: Error},
|
|
)
|
|
def get_package_parallel_rule(request, package_uuid, rule_uuid):
|
|
return _get_package_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
def _get_package_rule(request, package_uuid, rule_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return Rule.objects.get(package=p, uuid=rule_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Rule with id {rule_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
# POST
|
|
class CreateSimpleRule(Schema):
|
|
smirks: str
|
|
name: str | None = None
|
|
description: str | None = None
|
|
reactantFilterSmarts: str | None = None
|
|
productFilterSmarts: str | None = None
|
|
immediate: str | None = None
|
|
rdkitrule: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}/simple-rule")
|
|
def create_package_simple_rule(
|
|
request,
|
|
package_uuid,
|
|
r: Form[CreateSimpleRule],
|
|
):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
if r.rdkitrule and r.rdkitrule.strip() == "true":
|
|
raise ValueError("Not yet implemented!")
|
|
else:
|
|
sr = SimpleAmbitRule.create(
|
|
p,
|
|
r.name,
|
|
r.description,
|
|
r.smirks,
|
|
r.reactantFilterSmarts,
|
|
r.productFilterSmarts,
|
|
)
|
|
|
|
return redirect(sr.url)
|
|
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
class CreateParallelRule(Schema):
|
|
simpleRules: str
|
|
name: str | None = None
|
|
description: str | None = None
|
|
reactantFilterSmarts: str | None = None
|
|
productFilterSmarts: str | None = None
|
|
immediate: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}/parallel-rule")
|
|
def create_package_parallel_rule(
|
|
request,
|
|
package_uuid,
|
|
r: Form[CreateParallelRule],
|
|
):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
srs = SimpleRule.objects.filter(package=p, url__in=r.simpleRules)
|
|
|
|
if srs.count() != len(r.simpleRules):
|
|
raise ValueError(
|
|
f"Not all SimpleRules could be found in Package with id {package_uuid}!"
|
|
)
|
|
|
|
sr = ParallelRule.create(
|
|
p, list(srs), r.name, r.description, r.reactantFilterSmarts, r.productFilterSmarts
|
|
)
|
|
|
|
return redirect(sr.url)
|
|
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
@router.post(
|
|
"/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}", response={200: str | Any, 403: Error}
|
|
)
|
|
def post_package_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
|
|
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
|
|
|
|
|
|
@router.post(
|
|
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
|
|
response={200: str | Any, 403: Error},
|
|
)
|
|
def post_package_simple_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
|
|
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
|
|
|
|
|
|
@router.post(
|
|
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
|
|
response={200: str | Any, 403: Error},
|
|
)
|
|
def post_package_parallel_rule(request, package_uuid, rule_uuid, compound: Form[str] = None):
|
|
return _post_package_rule(request, package_uuid, rule_uuid, compound=compound)
|
|
|
|
|
|
def _post_package_rule(request, package_uuid, rule_uuid, compound: Form[str]):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
r = Rule.objects.get(package=p, uuid=rule_uuid)
|
|
|
|
if compound is not None:
|
|
if not compound.split():
|
|
return 400, {"message": "Compound is empty"}
|
|
|
|
product_sets = r.apply(compound)
|
|
|
|
res = []
|
|
for p_set in product_sets:
|
|
for product in p_set:
|
|
res.append(product)
|
|
|
|
return HttpResponse("\n".join(res), content_type="text/plain")
|
|
|
|
return r
|
|
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Rule with id {rule_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/rule/{uuid:rule_uuid}")
|
|
def delete_rule(request, package_uuid, rule_uuid):
|
|
return _delete_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
@router.delete(
|
|
"/package/{uuid:package_uuid}/simple-rule/{uuid:rule_uuid}",
|
|
)
|
|
def delete_simple_rule(request, package_uuid, rule_uuid):
|
|
return _delete_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
@router.delete(
|
|
"/package/{uuid:package_uuid}/parallel-rule/{uuid:rule_uuid}",
|
|
)
|
|
def delete_parallel_rule(request, package_uuid, rule_uuid):
|
|
return _delete_rule(request, package_uuid, rule_uuid)
|
|
|
|
|
|
def _delete_rule(request, package_uuid, rule_uuid):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
r = Rule.objects.get(package=p, uuid=rule_uuid)
|
|
r.delete()
|
|
return redirect(f"{p.url}/rule")
|
|
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Rule with id {rule_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
############
|
|
# Reaction #
|
|
############
|
|
class ReactionWrapper(Schema):
|
|
reaction: List["SimpleReaction"]
|
|
|
|
|
|
class ReactionCompoundStructure(Schema):
|
|
compoundName: str = Field(None, alias="name")
|
|
id: str = Field(None, alias="url")
|
|
smiles: str = Field(None, alias="smiles")
|
|
|
|
|
|
class ReactionSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
ecNumbers: List[Dict[str, str]] = Field([], alias="ec_numbers")
|
|
educts: List["ReactionCompoundStructure"] = Field([], alias="educts")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "reaction"
|
|
medlineRefs: List[str] = Field([], alias="medline_references")
|
|
multistep: bool = Field(None, alias="multi_step")
|
|
name: str = Field(None, alias="name")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
products: List["ReactionCompoundStructure"] = Field([], alias="products")
|
|
references: Dict[str, List[str]] = Field({}, alias="references")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
smirks: str = Field("", alias="smirks")
|
|
|
|
@staticmethod
|
|
def resolve_smirks(obj: Reaction):
|
|
return obj.smirks()
|
|
|
|
@staticmethod
|
|
def resolve_ec_numbers(obj: Reaction):
|
|
# TODO fetch via scenario EnzymeAI
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_references(obj: Reaction):
|
|
rhea_refs = []
|
|
for rhea in obj.get_rhea_identifiers():
|
|
rhea_refs.append(f"{rhea.identifier_value}")
|
|
|
|
# TODO UniProt
|
|
return {"rheaReferences": rhea_refs, "uniprotCount": []}
|
|
|
|
@staticmethod
|
|
def resolve_medline_references(obj: Reaction):
|
|
# TODO
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Rule):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
|
|
@router.get("/reaction", response={200: ReactionWrapper, 403: Error})
|
|
def get_reactions(request):
|
|
return {
|
|
"reaction": Reaction.objects.filter(
|
|
package__in=PackageManager.get_reviewed_packages()
|
|
).prefetch_related("package")
|
|
}
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/reaction", response={200: ReactionWrapper, 403: Error})
|
|
def get_package_reactions(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"reaction": Reaction.objects.filter(package=p).prefetch_related("package")}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Reactions for Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/reaction/{uuid:reaction_uuid}",
|
|
response={200: ReactionSchema, 403: Error},
|
|
)
|
|
def get_package_reaction(request, package_uuid, reaction_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return Reaction.objects.get(package=p, uuid=reaction_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Reaction with id {reaction_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreateReaction(Schema):
|
|
reactionName: str | None = None
|
|
reactionDescription: str | None = None
|
|
smirks: str | None = None
|
|
educt: str | None = None
|
|
product: str | None = None
|
|
rule: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}/reaction")
|
|
def create_package_reaction(
|
|
request,
|
|
package_uuid,
|
|
r: Form[CreateReaction],
|
|
):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
if r.smirks is None and (r.educt is None or r.product is None):
|
|
raise ValueError("Either SMIRKS or educt/product must be provided")
|
|
|
|
if r.smirks is not None and (r.educt is not None and r.product is not None):
|
|
raise ValueError("SMIRKS and educt/product provided!")
|
|
|
|
rule = None
|
|
if r.rule:
|
|
try:
|
|
rule = Rule.objects.get(package=p, url=r.rule)
|
|
except Rule.DoesNotExist:
|
|
raise ValueError(f"Rule with id {r.rule} does not exist!")
|
|
|
|
if r.educt is not None:
|
|
try:
|
|
educt_cs = CompoundStructure.objects.get(compound__package=p, url=r.educt)
|
|
except CompoundStructure.DoesNotExist:
|
|
raise ValueError(f"Compound with id {r.educt} does not exist!")
|
|
|
|
try:
|
|
product_cs = CompoundStructure.objects.get(compound__package=p, url=r.product)
|
|
except CompoundStructure.DoesNotExist:
|
|
raise ValueError(f"Compound with id {r.product} does not exist!")
|
|
|
|
new_r = Reaction.create(
|
|
p, r.reactionName, r.reactionDescription, [educt_cs], [product_cs], rule
|
|
)
|
|
else:
|
|
educts = r.smirks.split(">>")[0].split("\\.")
|
|
products = r.smirks.split(">>")[1].split("\\.")
|
|
|
|
new_r = Reaction.create(
|
|
p, r.reactionName, r.reactionDescription, educts, products, rule
|
|
)
|
|
|
|
return redirect(new_r.url)
|
|
|
|
except ValueError as e:
|
|
return 400, {"message": str(e)}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/reaction/{uuid:reaction_uuid}")
|
|
def delete_reaction(request, package_uuid, reaction_uuid):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
r = Reaction.objects.get(package=p, uuid=reaction_uuid)
|
|
r.delete()
|
|
return redirect(f"{p.url}/reaction")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Reaction with id {reaction_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
############
|
|
# Scenario #
|
|
############
|
|
class ScenarioWrapper(Schema):
|
|
scenario: List["SimpleScenario"]
|
|
|
|
|
|
class ScenarioSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
collection: Dict["str", List[Dict[str, Any]]] = Field([], alias="collection")
|
|
collectionID: Optional[str] = None
|
|
description: str = Field(None, alias="description")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "scenario"
|
|
linkedTo: List[Dict[str, str]] = Field([], alias="linked_to")
|
|
name: str = Field(None, alias="name")
|
|
pathways: List["SimplePathway"] = Field([], alias="related_pathways")
|
|
relatedScenarios: List[Dict[str, str]] = Field([], alias="related_scenarios")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
type: str = Field(None, alias="scenario_type")
|
|
|
|
@staticmethod
|
|
def resolve_collection(obj: Scenario):
|
|
res = defaultdict(list)
|
|
|
|
for ai in obj.get_additional_information(direct_only=False):
|
|
data = ai.data
|
|
data["related"] = ai.content_object.simple_json() if ai.content_object else None
|
|
res[ai.type].append(data)
|
|
|
|
return res
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Rule):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
|
|
@router.get("/scenario", response={200: ScenarioWrapper, 403: Error})
|
|
def get_scenarios(request):
|
|
return {
|
|
"scenario": Scenario.objects.filter(
|
|
package__in=PackageManager.get_reviewed_packages()
|
|
).prefetch_related("package")
|
|
}
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/scenario", response={200: ScenarioWrapper, 403: Error})
|
|
def get_package_scenarios(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"scenario": Scenario.objects.filter(package=p).prefetch_related("package")}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Scenarios for Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/scenario/{uuid:scenario_uuid}",
|
|
response={200: ScenarioSchema, 403: Error},
|
|
)
|
|
def get_package_scenario(request, package_uuid, scenario_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return Scenario.objects.get(package=p, uuid=scenario_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Scenario with id {scenario_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}/scenario", response={200: str | Any, 403: Error})
|
|
def create_package_scenario(request, package_uuid):
|
|
from utilities.legacy import build_additional_information_from_request
|
|
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
scen_date = None
|
|
date_year = request.POST.get("dateYear")
|
|
date_month = request.POST.get("dateMonth")
|
|
date_day = request.POST.get("dateDay")
|
|
|
|
if date_year:
|
|
scen_date = date_year
|
|
if date_month:
|
|
scen_date += f"-{date_month}"
|
|
if date_day:
|
|
scen_date += f"-{date_day}"
|
|
|
|
name = request.POST.get("studyname")
|
|
description = request.POST.get("studydescription")
|
|
study_type = request.POST.get("type")
|
|
|
|
ais = []
|
|
types = request.POST.get("adInfoTypes[]", [])
|
|
|
|
if types:
|
|
types = types.split(",")
|
|
|
|
for t in types:
|
|
ais.append(build_additional_information_from_request(request, t))
|
|
|
|
new_s = Scenario.create(p, name, description, scen_date, study_type, ais)
|
|
|
|
return JsonResponse({"scenarioLocation": new_s.url})
|
|
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/scenario")
|
|
def delete_scenarios(request, package_uuid):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
scens = Scenario.objects.filter(package=p)
|
|
scens.delete()
|
|
return redirect(f"{p.url}/scenario")
|
|
|
|
except ValueError:
|
|
return 403, {"message": "Deleting Scenarios failed due to insufficient rights!"}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/scenario/{uuid:scenario_uuid}")
|
|
def delete_scenario(request, package_uuid, scenario_uuid):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
scen = Scenario.objects.get(package=p, uuid=scenario_uuid)
|
|
scen.delete()
|
|
return redirect(f"{p.url}/scenario")
|
|
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Scenario with id {scenario_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.post(
|
|
"/package/{uuid:package_uuid}/additional-information", response={200: str | Any, 403: Error}
|
|
)
|
|
def create_package_additional_information(request, package_uuid):
|
|
from utilities.legacy import build_additional_information_from_request
|
|
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
scen = request.POST.get("scenario")
|
|
scenario = Scenario.objects.get(package=p, url=scen)
|
|
|
|
url_parser = EPDBURLParser(request.POST.get("attach_obj"))
|
|
attach_obj = url_parser.get_object()
|
|
|
|
if not hasattr(attach_obj, "additional_information"):
|
|
raise ValueError("Can't attach additional information to this object!")
|
|
|
|
if not attach_obj.url.startswith(p.url):
|
|
raise ValueError(
|
|
"Additional Information can only be set to objects stored in the same package!"
|
|
)
|
|
|
|
types = request.POST.get("adInfoTypes[]", "").split(",")
|
|
|
|
for t in types:
|
|
ai = build_additional_information_from_request(request, t)
|
|
|
|
AdditionalInformation.create(
|
|
p,
|
|
ai,
|
|
scenario=scenario,
|
|
content_object=attach_obj,
|
|
)
|
|
|
|
# TODO implement additional information endpoint ?
|
|
return redirect(f"{scenario.url}")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
###########
|
|
# Pathway #
|
|
###########
|
|
class PathwayWrapper(Schema):
|
|
pathway: List["SimplePathway"]
|
|
|
|
|
|
class PathwayEdge(Schema):
|
|
ecNumbers: List[str] = Field([], alias="ec_numbers")
|
|
id: str = Field(None, alias="url")
|
|
idreaction: str = Field(None, alias="edge_label.url")
|
|
multstep: bool = Field(None, alias="edge_label.multi_step")
|
|
name: str = Field(None, alias="name")
|
|
pseudo: bool = False
|
|
rule: Optional[str] = Field(None, alias="rule")
|
|
scenarios: List[SimpleScenario] = Field([], alias="scenarios")
|
|
source: int = Field(-1)
|
|
target: int = Field(-1)
|
|
|
|
@staticmethod
|
|
def resolve_rule(obj: Edge):
|
|
if obj.edge_label.rules.all().exists():
|
|
r = obj.edge_label.rules.all()[0]
|
|
if isinstance(r, SimpleAmbitRule):
|
|
return r.smirks
|
|
return None
|
|
|
|
|
|
class PathwayNode(Schema):
|
|
atomCount: int = Field(None, alias="atom_count")
|
|
depth: float = Field(None, alias="depth")
|
|
dt50s: List[Dict[str, str]] = Field([], alias="dt50s")
|
|
engineeredIntermediate: bool = Field(None, alias="engineered_intermediate")
|
|
id: str = Field(None, alias="url")
|
|
idcomp: str = Field(None, alias="default_node_label.url")
|
|
idreact: str = Field(None, alias="default_node_label.url")
|
|
image: str = Field(None, alias="image")
|
|
imageSize: int = Field(None, alias="image_size")
|
|
name: str = Field(None, alias="name")
|
|
proposed: List[Dict[str, str]] = Field([], alias="proposed_intermediate")
|
|
smiles: str = Field(None, alias="default_node_label.smiles")
|
|
|
|
@staticmethod
|
|
def resolve_atom_count(obj: Node):
|
|
from rdkit import Chem
|
|
|
|
return Chem.MolFromSmiles(obj.default_node_label.smiles).GetNumAtoms()
|
|
|
|
@staticmethod
|
|
def resolve_dt50s(obj: Node):
|
|
# TODO
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_engineered_intermediate(obj: Node):
|
|
# TODO
|
|
return False
|
|
|
|
@staticmethod
|
|
def resolve_image(obj: Node):
|
|
return f"{obj.default_node_label.url}?image=svg"
|
|
|
|
@staticmethod
|
|
def resolve_image_size(obj: Node):
|
|
return 400
|
|
|
|
@staticmethod
|
|
def resolve_proposed_intermediate(obj: Node):
|
|
# TODO
|
|
return []
|
|
|
|
|
|
class PathwaySchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
completed: str = Field(None, alias="completed")
|
|
description: str = Field(None, alias="description")
|
|
id: str = Field(None, alias="url")
|
|
isIncremental: bool = Field(None, alias="is_incremental")
|
|
isPredicted: bool = Field(None, alias="is_predicted")
|
|
lastModified: int = Field(None, alias="last_modified")
|
|
links: List[PathwayEdge] = Field([])
|
|
name: str = Field(None, alias="name")
|
|
nodes: List[PathwayNode] = Field([])
|
|
pathwayName: str = Field(None, alias="name")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
upToDate: bool = Field(None, alias="up_to_date")
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Pathway):
|
|
return "reviewed" if obj.package.reviewed else "unreviewed"
|
|
|
|
@staticmethod
|
|
def resolve_completed(obj: Pathway):
|
|
return str(obj.completed()).lower()
|
|
|
|
@staticmethod
|
|
def resolve_up_to_date(obj: Pathway):
|
|
return "true"
|
|
|
|
@staticmethod
|
|
def resolve_last_modified(obj: Pathway):
|
|
return int(obj.modified.timestamp())
|
|
|
|
@staticmethod
|
|
def resolve_links(obj: Pathway):
|
|
return obj.d3_json().get("links", [])
|
|
|
|
@staticmethod
|
|
def resolve_nodes(obj: Pathway):
|
|
return obj.d3_json().get("nodes", [])
|
|
|
|
|
|
@router.get("/pathway", response={200: PathwayWrapper, 403: Error})
|
|
def get_pathways(request):
|
|
return {
|
|
"pathway": Pathway.objects.filter(
|
|
package__in=PackageManager.get_reviewed_packages()
|
|
).prefetch_related("package")
|
|
}
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/pathway", response={200: PathwayWrapper, 403: Error})
|
|
def get_package_pathways(request, package_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"pathway": Pathway.objects.filter(package=p).prefetch_related("package")}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Pathways for Package with id {package_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}",
|
|
response={200: PathwaySchema, 403: Error},
|
|
)
|
|
def get_package_pathway(request, package_uuid, pathway_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Pathway with id {pathway_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreatePathway(Schema):
|
|
smilesinput: str
|
|
name: str | None = None
|
|
description: str | None = None
|
|
rootOnly: str | None = None
|
|
selectedSetting: str | None = None
|
|
|
|
|
|
@router.post("/package/{uuid:package_uuid}/pathway", response={200: Any, 403: Error})
|
|
def create_package_pathway(
|
|
request,
|
|
package_uuid,
|
|
pw: Form[CreatePathway],
|
|
):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
stand_smiles = FormatConverter.standardize(pw.smilesinput.strip(), remove_stereo=True)
|
|
|
|
new_pw = Pathway.create(p, stand_smiles, name=pw.name, description=pw.description)
|
|
|
|
pw_mode = "predict"
|
|
if pw.rootOnly and pw.rootOnly.strip() == "true":
|
|
pw_mode = "build"
|
|
|
|
new_pw.kv.update({"mode": pw_mode})
|
|
new_pw.save()
|
|
|
|
if pw_mode == "predict":
|
|
setting = request.user.prediction_settings()
|
|
|
|
if pw.selectedSetting:
|
|
setting = SettingManager.get_setting_by_url(request.user, pw.selectedSetting)
|
|
|
|
new_pw.setting = setting
|
|
new_pw.kv.update({"status": "running"})
|
|
new_pw.save()
|
|
|
|
from .tasks import dispatch, predict
|
|
|
|
dispatch(request.user, predict, new_pw.pk, setting.pk, limit=None)
|
|
|
|
return redirect(new_pw.url)
|
|
except ValueError as e:
|
|
return 403, {"message": str(e)}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}")
|
|
def delete_pathway(request, package_uuid, pathway_uuid):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
pw.delete()
|
|
return redirect(f"{p.url}/pathway")
|
|
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Pathway with id {pathway_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
########
|
|
# Node #
|
|
########
|
|
class NodeWrapper(Schema):
|
|
node: List["SimpleNode"]
|
|
|
|
|
|
class NodeCompoundStructure(Schema):
|
|
id: str = Field(None, alias="url")
|
|
image: str = Field(None, alias="image")
|
|
smiles: str = Field(None, alias="smiles")
|
|
name: str = Field(None, alias="name")
|
|
|
|
@staticmethod
|
|
def resolve_image(obj: CompoundStructure):
|
|
return f"{obj.url}?image=svg"
|
|
|
|
|
|
class NodeSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
confidenceScenarios: List[SimpleScenario] = Field([], alias="confidence_scenarios")
|
|
defaultStructure: NodeCompoundStructure = Field(None, alias="default_node_label")
|
|
depth: int = Field(None, alias="depth")
|
|
description: str = Field(None, alias="description")
|
|
engineeredIntermediate: bool = Field(None, alias="engineered_intermediate")
|
|
halflifes: Dict[str, str] = Field({}, alias="halflife")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "node"
|
|
image: str = Field(None, alias="image")
|
|
name: str = Field(None, alias="name")
|
|
proposedValues: List[Dict[str, str]] = Field([], alias="proposed_values")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
smiles: str = Field(None, alias="default_node_label.smiles")
|
|
structures: List["CompoundStructureSchema"] = Field([], alias="node_labels")
|
|
|
|
@staticmethod
|
|
def resolve_engineered_intermediate(obj: Node):
|
|
# TODO
|
|
return False
|
|
|
|
@staticmethod
|
|
def resolve_image(obj: Node):
|
|
return f"{obj.default_node_label.url}?image=svg"
|
|
|
|
@staticmethod
|
|
def resolve_proposed_values(obj: Node):
|
|
# TODO
|
|
# {
|
|
# "scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/1f1a0b67-ce57-4f5a-929c-4fabf3a791fd",
|
|
# "scenarioName": "Annex IIA 7.1.1 a), Brumhard (2003) - (00000)"
|
|
# }
|
|
return []
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Node):
|
|
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node",
|
|
response={200: NodeWrapper, 403: Error},
|
|
)
|
|
def get_package_pathway_nodes(request, package_uuid, pathway_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"node": Pathway.objects.get(package=p, uuid=pathway_uuid).nodes}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Nodes for Pathway with id {pathway_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node/{uuid:node_uuid}",
|
|
response={200: NodeSchema, 403: Error},
|
|
)
|
|
def get_package_pathway_node(request, package_uuid, pathway_uuid, node_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
return Node.objects.get(pathway=pw, uuid=node_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Node with id {node_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreateNode(Schema):
|
|
nodeAsSmiles: str
|
|
nodeName: str | None = None
|
|
nodeReason: str | None = None
|
|
nodeDepth: str | None = None
|
|
|
|
|
|
@router.post(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node",
|
|
response={200: str | Any, 403: Error},
|
|
)
|
|
def add_pathway_node(request, package_uuid, pathway_uuid, n: Form[CreateNode]):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
|
|
if n.nodeDepth is not None and n.nodeDepth.strip() != "":
|
|
node_depth = int(n.nodeDepth)
|
|
else:
|
|
node_depth = -1
|
|
|
|
n = Node.create(pw, n.nodeAsSmiles, node_depth, n.nodeName, n.nodeReason)
|
|
|
|
return redirect(n.url)
|
|
except ValueError:
|
|
return 403, {"message": "Adding node failed!"}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node/{uuid:node_uuid}")
|
|
def delete_node(request, package_uuid, pathway_uuid, node_uuid):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
n = Node.objects.get(pathway=pw, uuid=node_uuid)
|
|
n.delete()
|
|
return redirect(f"{pw.url}/node")
|
|
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Node with id {node_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
########
|
|
# Edge #
|
|
########
|
|
class EdgeWrapper(Schema):
|
|
edge: List["SimpleEdge"]
|
|
|
|
|
|
class EdgeNode(SimpleNode):
|
|
image: str = Field(None, alias="image")
|
|
|
|
@staticmethod
|
|
def resolve_image(obj: Node):
|
|
return f"{obj.default_node_label.url}?image=svg"
|
|
|
|
|
|
class EdgeSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
ecNumbers: List[str] = Field([], alias="ec_numbers")
|
|
endNodes: List["EdgeNode"] = Field([], alias="end_nodes")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "edge"
|
|
name: str = Field(None, alias="name")
|
|
reactionName: str = Field(None, alias="edge_label.get_name")
|
|
reactionURI: str = Field(None, alias="edge_label.url")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
startNodes: List["EdgeNode"] = Field([], alias="start_nodes")
|
|
|
|
@staticmethod
|
|
def resolve_review_status(obj: Edge):
|
|
return "reviewed" if obj.pathway.package.reviewed else "unreviewed"
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge",
|
|
response={200: EdgeWrapper, 403: Error},
|
|
)
|
|
def get_package_pathway_edges(request, package_uuid, pathway_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return {"edge": Pathway.objects.get(package=p, uuid=pathway_uuid).edges}
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Edges for Pathway with id {pathway_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge/{uuid:edge_uuid}",
|
|
response={200: EdgeSchema, 403: Error},
|
|
)
|
|
def get_package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
return Edge.objects.get(pathway=pw, uuid=edge_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Edge with id {edge_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class CreateEdge(Schema):
|
|
edgeAsSmirks: str | None = None
|
|
educts: str | None = None # Node URIs comma sep
|
|
products: str | None = None # Node URIs comma sep
|
|
multistep: str | None = None
|
|
edgeReason: str | None = None
|
|
|
|
|
|
@router.post(
|
|
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge",
|
|
response={200: str | Any, 403: Error},
|
|
)
|
|
def add_pathway_edge(request, package_uuid, pathway_uuid, e: Form[CreateEdge]):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
|
|
if e.edgeAsSmirks is None and (e.educts is None or e.products is None):
|
|
raise ValueError("Either SMIRKS or educt/product must be provided")
|
|
|
|
if e.edgeAsSmirks is not None and (e.educts is not None and e.products is not None):
|
|
raise ValueError("SMIRKS and educt/product provided!")
|
|
|
|
educts = []
|
|
products = []
|
|
|
|
subclasses = CompoundStructure.__subclasses__()
|
|
|
|
if e.edgeAsSmirks:
|
|
for ed in e.edgeAsSmirks.split(">>")[0].split("\\."):
|
|
stand_ed = FormatConverter.standardize(ed, remove_stereo=True)
|
|
educts.append(
|
|
Node.objects.get(
|
|
pathway=pw,
|
|
default_node_label=CompoundStructure.objects.not_instance_of(*subclasses).
|
|
get(
|
|
compound__package=p, smiles=stand_ed
|
|
).compound.default_structure,
|
|
)
|
|
)
|
|
|
|
for pr in e.edgeAsSmirks.split(">>")[1].split("\\."):
|
|
stand_pr = FormatConverter.standardize(pr, remove_stereo=True)
|
|
products.append(
|
|
Node.objects.get(
|
|
pathway=pw,
|
|
default_node_label=CompoundStructure.objects.not_instance_of(*subclasses).
|
|
get(
|
|
compound__package=p, smiles=stand_pr
|
|
).compound.default_structure,
|
|
)
|
|
)
|
|
else:
|
|
for ed in e.educts.split(","):
|
|
educts.append(Node.objects.get(pathway=pw, url=ed.strip()))
|
|
|
|
for pr in e.products.split(","):
|
|
products.append(Node.objects.get(pathway=pw, url=pr.strip()))
|
|
|
|
new_e = Edge.create(
|
|
pathway=pw,
|
|
start_nodes=educts,
|
|
end_nodes=products,
|
|
rule=None,
|
|
name=None,
|
|
description=e.edgeReason,
|
|
)
|
|
|
|
# Update depths as sideeffect of above operation
|
|
pw.update_depths()
|
|
|
|
return redirect(new_e.url)
|
|
except ValueError:
|
|
return 403, {"message": "Adding Edge failed!"}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/edge/{uuid:edge_uuid}")
|
|
def delete_edge(request, package_uuid, pathway_uuid, edge_uuid):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
|
e = Edge.objects.get(pathway=pw, uuid=edge_uuid)
|
|
e.delete()
|
|
return redirect(f"{pw.url}/edge")
|
|
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Edge with id {edge_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
#########
|
|
# Model #
|
|
#########
|
|
class ModelWrapper(Schema):
|
|
relative_reasoning: List["SimpleModel"] = Field(..., alias="relative-reasoning")
|
|
|
|
|
|
class ModelSchema(Schema):
|
|
aliases: List[str] = Field([], alias="aliases")
|
|
description: str = Field(None, alias="description")
|
|
evalPackages: List["SimplePackage"] = Field([], alias="eval_packages")
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "relative-reasoning"
|
|
info: dict = Field({}, alias="info")
|
|
name: str = Field(None, alias="name")
|
|
pathwayPackages: List["SimplePackage"] = Field([], alias="pathway_packages")
|
|
reviewStatus: str = Field(None, alias="review_status")
|
|
rulePackages: List["SimplePackage"] = Field([], alias="rule_packages")
|
|
scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
|
|
status: str = Field(None, alias="model_status")
|
|
statusMessage: str = Field(None, alias="status_message")
|
|
threshold: str = Field(None, alias="threshold")
|
|
type: str = Field(None, alias="model_type")
|
|
|
|
@staticmethod
|
|
def resolve_info(obj: EPModel):
|
|
return {}
|
|
|
|
@staticmethod
|
|
def resolve_status_message(obj: EPModel):
|
|
for k, v in PackageBasedModel.PROGRESS_STATUS_CHOICES.items():
|
|
if k == obj.model_status:
|
|
return v
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def resolve_threshold(obj: EPModel):
|
|
return f"{obj.threshold:.2f}"
|
|
|
|
@staticmethod
|
|
def resolve_model_type(obj: EPModel):
|
|
if isinstance(obj, RuleBasedRelativeReasoning):
|
|
return "RULEBASED"
|
|
elif isinstance(obj, MLRelativeReasoning):
|
|
return "ECC"
|
|
elif isinstance(obj, EnviFormer):
|
|
return "ENVIFORMER"
|
|
else:
|
|
return None
|
|
|
|
|
|
@router.get("/model", response={200: ModelWrapper, 403: Error})
|
|
def get_models(request):
|
|
pass
|
|
|
|
|
|
@router.get("/package/{uuid:package_uuid}/model", response={200: ModelWrapper, 403: Error})
|
|
def get_package_models(request, package_uuid, model_uuid):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
return EPModel.objects.filter(package=p)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Reaction with id {model_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
class Classify(Schema):
|
|
smiles: str | None = None
|
|
|
|
|
|
@router.get(
|
|
"/package/{uuid:package_uuid}/model/{uuid:model_uuid}",
|
|
response={200: ModelSchema | Any, 403: Error, 400: Error},
|
|
)
|
|
def get_model(request, package_uuid, model_uuid, c: Query[Classify]):
|
|
try:
|
|
p = PackageManager.get_package_by_id(request.user, package_uuid)
|
|
mod = EPModel.objects.get(package=p, uuid=model_uuid)
|
|
|
|
if c.smiles:
|
|
if c.smiles == "":
|
|
return 400, {"message": "Received empty SMILES"}
|
|
|
|
try:
|
|
stand_smiles = FormatConverter.standardize(c.smiles, remove_stereo=True)
|
|
except ValueError:
|
|
return 400, {"message": f'"{c.smiles}" is not a valid SMILES'}
|
|
|
|
from epdb.tasks import dispatch_eager, predict_simple
|
|
|
|
_, pred_res = dispatch_eager(request.user, predict_simple, mod.pk, stand_smiles)
|
|
|
|
result = []
|
|
|
|
for pr in pred_res:
|
|
if len(pr) > 0:
|
|
products = []
|
|
for prod_set in pr.product_sets:
|
|
products.append(tuple([x for x in prod_set]))
|
|
|
|
res = {
|
|
"probability": pr.probability,
|
|
"products": list(set(products)),
|
|
}
|
|
|
|
if pr.rule:
|
|
res["id"] = pr.rule.url
|
|
res["identifier"] = pr.rule.get_rule_identifier()
|
|
res["name"] = pr.rule.get_name()
|
|
res["reviewStatus"] = (
|
|
"reviewed" if pr.rule.package.reviewed else "unreviewed"
|
|
)
|
|
|
|
result.append(res)
|
|
|
|
return result
|
|
|
|
return mod
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Reaction with id {model_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
@router.delete("/package/{uuid:package_uuid}/model/{uuid:model_uuid}")
|
|
def delete_model(request, package_uuid, model_uuid):
|
|
try:
|
|
p = get_package_for_write(request.user, package_uuid)
|
|
|
|
m = EPModel.objects.get(package=p, uuid=model_uuid)
|
|
m.delete()
|
|
return redirect(f"{p.url}/model")
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Deleting Model with id {model_uuid} failed due to insufficient rights!"
|
|
}
|
|
|
|
|
|
###########
|
|
# Setting #
|
|
###########
|
|
class SettingWrapper(Schema):
|
|
setting: List["SimpleSetting"]
|
|
|
|
|
|
class SettingSchema(Schema):
|
|
duplicationHash: int = -1
|
|
id: str = Field(None, alias="url")
|
|
identifier: str = "setting"
|
|
includedPackages: List["SimplePackage"] = Field([], alias="rule_packages")
|
|
isPublic: bool = Field(None, alias="public")
|
|
name: str = Field(None, alias="name")
|
|
normalizationRules: List["SimpleRule"] = Field([], alias="normalization_rules")
|
|
propertyPlugins: List[str] = Field([], alias="property_plugins")
|
|
truncationstrategy: Optional[str] = Field(None, alias="truncation_strategy")
|
|
|
|
|
|
@router.get("/setting", response={200: SettingWrapper, 403: Error})
|
|
def get_settings(request):
|
|
return {"setting": SettingManager.get_all_settings(request.user)}
|
|
|
|
|
|
@router.get("/setting/{uuid:setting_uuid}", response={200: SettingSchema, 403: Error})
|
|
def get_setting(request, setting_uuid):
|
|
try:
|
|
return SettingManager.get_setting_by_id(request.user, setting_uuid)
|
|
except ValueError:
|
|
return 403, {
|
|
"message": f"Getting Setting with id {setting_uuid} failed due to insufficient rights!"
|
|
}
|