forked from enviPath/enviPy
adjusted migration
Initial bayer app Show Pack Classification Adjusted docker compose to bayer specifics Adjusted Dockerfile for Bayer Adding secret flags to group, add secret pools to packages Adjusted View for Package creation Prep configs, added Package Create Modal wip More on PES wip wip Wip minor PW interactions API PES wip Make Select Widget reflect required make required generallay available Update UI if pathway mode is set to build Added ais circle adjustments Initial Zoom, fix AD Creation wip
This commit is contained in:
@ -1,17 +1,20 @@
|
||||
from collections import defaultdict
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import jwt
|
||||
import nh3
|
||||
import requests
|
||||
from django.conf import settings as s
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.core.cache import cache
|
||||
from django.http import HttpResponse, JsonResponse
|
||||
from django.shortcuts import redirect
|
||||
from jwt import InvalidIssuerError
|
||||
from ninja import Field, Form, Query, Router, Schema
|
||||
from ninja.security import SessionAuth
|
||||
from ninja.security import HttpBearer
|
||||
|
||||
from utilities.chem import FormatConverter
|
||||
from utilities.misc import PackageExporter
|
||||
|
||||
from .logic import (
|
||||
EPDBURLParser,
|
||||
GroupManager,
|
||||
@ -46,6 +49,26 @@ from .models import (
|
||||
Package = s.GET_PACKAGE_MODEL()
|
||||
|
||||
|
||||
def get_cached_jwks(tenant_id: str, force=False) -> Dict:
|
||||
"""Get JWKS using Django cache"""
|
||||
cache_key = f"jwks_{tenant_id}"
|
||||
|
||||
jwks = cache.get(cache_key)
|
||||
|
||||
if jwks is None or force:
|
||||
# Cache miss, fetch new keys
|
||||
jwks_uri = f"https://login.microsoftonline.com/{tenant_id}/discovery/v2.0/keys"
|
||||
response = requests.get(jwks_uri)
|
||||
response.raise_for_status()
|
||||
|
||||
jwks = response.json()
|
||||
|
||||
# Cache for 1 hour (3600 seconds)
|
||||
cache.set(cache_key, jwks, 3600)
|
||||
|
||||
return jwks
|
||||
|
||||
|
||||
def get_package_for_write(user, package_uuid):
|
||||
p = PackageManager.get_package_by_id(user, package_uuid)
|
||||
if not PackageManager.writable(user, p):
|
||||
@ -59,7 +82,52 @@ def _anonymous_or_real(request):
|
||||
return get_user_model().objects.get(username="anonymous")
|
||||
|
||||
|
||||
router = Router(auth=SessionAuth(csrf=False))
|
||||
def validate_token(token: str) -> dict:
|
||||
TENANT_ID = s.MS_ENTRA_TENANT_ID
|
||||
CLIENT_ID = s.MS_ENTRA_CLIENT_ID
|
||||
|
||||
jwks = get_cached_jwks(TENANT_ID)
|
||||
|
||||
header = jwt.get_unverified_header(token)
|
||||
|
||||
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(
|
||||
next(k for k in jwks["keys"] if k["kid"] == header["kid"])
|
||||
)
|
||||
|
||||
# Handle V1 and V2 tokens
|
||||
try:
|
||||
claims = jwt.decode(
|
||||
token,
|
||||
public_key,
|
||||
algorithms=["RS256"],
|
||||
audience=[CLIENT_ID, f"api://{CLIENT_ID}"],
|
||||
issuer=[
|
||||
f"https://sts.windows.net/{TENANT_ID}/",
|
||||
f"https://login.microsoftonline.com/{TENANT_ID}/v2.0"
|
||||
]
|
||||
)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Token verification failed! - {e}")
|
||||
|
||||
return claims
|
||||
|
||||
|
||||
class MSBearerTokenAuth(HttpBearer):
|
||||
|
||||
def authenticate(self, request, token):
|
||||
if token is None:
|
||||
return None
|
||||
|
||||
claims = validate_token(token)
|
||||
|
||||
if not User.objects.filter(uuid=claims['oid']).exists():
|
||||
return None
|
||||
|
||||
request.user = User.objects.get(uuid=claims['oid'])
|
||||
return request.user
|
||||
|
||||
|
||||
router = Router(auth=MSBearerTokenAuth())
|
||||
|
||||
|
||||
class Error(Schema):
|
||||
@ -153,21 +221,6 @@ class SimpleModel(SimpleObject):
|
||||
identifier: str = "relative-reasoning"
|
||||
|
||||
|
||||
################
|
||||
# Login/Logout #
|
||||
################
|
||||
@router.post("/", response={200: SimpleUser, 403: Error}, auth=None)
|
||||
def login(request, loginusername: Form[str], loginpassword: Form[str]):
|
||||
from django.contrib.auth import authenticate, login
|
||||
|
||||
email = User.objects.get(username=loginusername).email
|
||||
user = authenticate(username=email, password=loginpassword)
|
||||
if user:
|
||||
login(request, user)
|
||||
return user
|
||||
else:
|
||||
return 403, {"message": "Invalid username and/or password"}
|
||||
|
||||
|
||||
########
|
||||
# User #
|
||||
@ -794,6 +847,7 @@ class CreateCompound(Schema):
|
||||
compoundName: str | None = None
|
||||
compoundDescription: str | None = None
|
||||
inchi: str | None = None
|
||||
pesLink: str | None = None
|
||||
|
||||
|
||||
@router.post("/package/{uuid:package_uuid}/compound")
|
||||
@ -805,9 +859,32 @@ def create_package_compound(
|
||||
try:
|
||||
p = get_package_for_write(request.user, package_uuid)
|
||||
# inchi is not used atm
|
||||
c = Compound.create(
|
||||
p, c.compoundSmiles, c.compoundName, c.compoundDescription, inchi=c.inchi
|
||||
)
|
||||
|
||||
if c.pesLink is not None:
|
||||
from bayer.views import fetch_pes
|
||||
from bayer.models import PESCompound
|
||||
|
||||
try:
|
||||
pes_data = fetch_pes(request, c.pesLink)
|
||||
except ValueError as e:
|
||||
return 400, {"message": f"Could not fetch PES data for {c.pesLink}"}
|
||||
|
||||
classification = pes_data.get("classificationLevel", "")
|
||||
if "secret" == classification.lower():
|
||||
|
||||
if p.classification_level != Package.Classification.SECRET:
|
||||
return 400, {"Cannot create PESs for non-secret packages."}
|
||||
|
||||
data_pools = pes_data.get("dataPools")
|
||||
if data_pools:
|
||||
if s.DATA_POOL_MAPPING[p.data_pool.name] not in data_pools:
|
||||
return 400, { "messsage": f"PES data pool {s.DATA_POOL_MAPPING[p.data_pool.name]} not found in PES data"}
|
||||
|
||||
c = PESCompound.create(p, pes_data, c.compoundName, c.compoundDescription)
|
||||
else:
|
||||
c = Compound.create(
|
||||
p, c.compoundSmiles, c.compoundName, c.compoundDescription, inchi=c.inchi
|
||||
)
|
||||
return redirect(c.url)
|
||||
except ValueError as e:
|
||||
return 400, {"message": str(e)}
|
||||
@ -1902,25 +1979,67 @@ class CreateNode(Schema):
|
||||
nodeName: str | None = None
|
||||
nodeReason: str | None = None
|
||||
nodeDepth: str | None = None
|
||||
pesLink: str | None = None
|
||||
|
||||
|
||||
@router.post(
|
||||
"/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}/node",
|
||||
response={200: str | Any, 403: Error},
|
||||
response={200: str | Any, 400: Error, 403: Error},
|
||||
)
|
||||
def add_pathway_node(request, package_uuid, pathway_uuid, n: Form[CreateNode]):
|
||||
try:
|
||||
p = get_package_for_write(request.user, package_uuid)
|
||||
pw = Pathway.objects.get(package=p, uuid=pathway_uuid)
|
||||
|
||||
if n.nodeDepth is not None and n.nodeDepth.strip() != "":
|
||||
node_depth = int(float(n.nodeDepth))
|
||||
# TODO Code Dup from bayer.views
|
||||
|
||||
if n.pesLink:
|
||||
from bayer.views import fetch_pes
|
||||
from bayer.models import PESCompound
|
||||
|
||||
try:
|
||||
pes_data = fetch_pes(request, n.pesLink)
|
||||
except ValueError as e:
|
||||
return 400, {"message": f"Could not fetch PES data for {n.pesLink}"}
|
||||
|
||||
classification = pes_data.get("classificationLevel", "")
|
||||
if "secret" == classification.lower():
|
||||
|
||||
if p.classification_level != Package.Classification.SECRET:
|
||||
return 400, "Cannot create PESs for non-secret packages."
|
||||
|
||||
data_pools = pes_data.get("dataPools")
|
||||
if data_pools:
|
||||
if s.DATA_POOL_MAPPING[p.data_pool.name] not in data_pools:
|
||||
return 400, {
|
||||
"messsage": f"PES data pool {s.DATA_POOL_MAPPING[p.data_pool.name]} not found in PES data"
|
||||
}
|
||||
|
||||
c = PESCompound.create(p, pes_data, n.nodeName, n.nodeReason)
|
||||
|
||||
node_qs = Node.objects.filter(pathway=pw, default_node_label=c.default_structure)
|
||||
if node_qs.exists():
|
||||
return redirect(pw.url)
|
||||
|
||||
node = Node()
|
||||
node.stereo_removed = False
|
||||
node.pathway = pw
|
||||
node.depth = 0
|
||||
|
||||
node.default_node_label = c.default_structure
|
||||
node.save()
|
||||
|
||||
node.node_labels.add(c.default_structure)
|
||||
node.save()
|
||||
else:
|
||||
node_depth = -1
|
||||
if n.nodeDepth is not None and n.nodeDepth.strip() != "":
|
||||
node_depth = int(n.nodeDepth)
|
||||
else:
|
||||
node_depth = -1
|
||||
|
||||
n = Node.create(pw, n.nodeAsSmiles, node_depth, n.nodeName, n.nodeReason)
|
||||
node = Node.create(pw, n.nodeAsSmiles, node_depth, n.nodeName, n.nodeReason)
|
||||
|
||||
return redirect(n.url)
|
||||
return redirect(node.url)
|
||||
except ValueError:
|
||||
return 403, {"message": "Adding node failed!"}
|
||||
|
||||
@ -2030,13 +2149,16 @@ def add_pathway_edge(request, package_uuid, pathway_uuid, e: Form[CreateEdge]):
|
||||
educts = []
|
||||
products = []
|
||||
|
||||
subclasses = CompoundStructure.__subclasses__()
|
||||
|
||||
if e.edgeAsSmirks:
|
||||
for ed in e.edgeAsSmirks.split(">>")[0].split("\\."):
|
||||
stand_ed = FormatConverter.standardize(ed, remove_stereo=True)
|
||||
educts.append(
|
||||
Node.objects.get(
|
||||
pathway=pw,
|
||||
default_node_label=CompoundStructure.objects.get(
|
||||
default_node_label=CompoundStructure.objects.not_instance_of(*subclasses).
|
||||
get(
|
||||
compound__package=p, smiles=stand_ed
|
||||
).compound.default_structure,
|
||||
)
|
||||
@ -2047,7 +2169,8 @@ def add_pathway_edge(request, package_uuid, pathway_uuid, e: Form[CreateEdge]):
|
||||
products.append(
|
||||
Node.objects.get(
|
||||
pathway=pw,
|
||||
default_node_label=CompoundStructure.objects.get(
|
||||
default_node_label=CompoundStructure.objects.not_instance_of(*subclasses).
|
||||
get(
|
||||
compound__package=p, smiles=stand_pr
|
||||
).compound.default_structure,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user