Files
enviPy-bayer/epdb/views.py
Tim Lorsbach 54056c654d
Some checks failed
API CI / api-tests (pull_request) Failing after 21s
CI / test (pull_request) Failing after 22s
adjusted migration
Initial bayer app

Show Pack Classification

Adjusted docker compose to bayer specifics

Adjusted Dockerfile for Bayer

Adding secret flags to group, add secret pools to packages

Adjusted View for Package creation

Prep configs, added Package Create Modal

wip

More on PES

wip

wip
2026-04-21 22:53:30 +02:00

3386 lines
124 KiB
Python

import json
import logging
from datetime import datetime
from typing import Any, Dict, List, Iterable
import requests
import nh3
from django.conf import settings as s
from django.contrib.auth import get_user_model
from django.contrib.auth.validators import UnicodeUsernameValidator
from django.core.exceptions import BadRequest, PermissionDenied, ValidationError
from django.core.validators import validate_email
from django.http import HttpResponse, HttpResponseBadRequest, HttpResponseNotAllowed, JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.views.decorators.csrf import csrf_exempt
from oauth2_provider.decorators import protected_resource
from sentry_sdk import capture_exception
from utilities.chem import FormatConverter, IndigoUtils
from utilities.decorators import package_permission_required
from .logic import (
EPDBURLParser,
GroupManager,
PackageManager,
SearchManager,
SettingManager,
UserManager,
)
from .models import (
AdditionalInformation,
APIToken,
Compound,
CompoundStructure,
ClassifierPluginModel,
Edge,
EnviFormer,
EnzymeLink,
EPModel,
ExpansionSchemeChoice,
ExternalDatabase,
ExternalIdentifier,
Group,
GroupPackagePermission,
JobLog,
License,
MLRelativeReasoning,
Node,
Pathway,
Permission,
PropertyPluginModel,
Reaction,
Rule,
RuleBasedRelativeReasoning,
Scenario,
SimpleAmbitRule,
User,
UserPackagePermission,
)
logger = logging.getLogger(__name__)
Package = s.GET_PACKAGE_MODEL()
def log_post_params(request):
if s.DEBUG:
for k, v in request.POST.items():
logger.debug(f"{k}\t{v}")
def get_error_handler_context(request, for_user=None) -> Dict[str, Any]:
current_user = _anonymous_or_real(request)
if for_user:
current_user = for_user
ctx = {
"title": "enviPath",
"meta": {
"site_id": s.MATOMO_SITE_ID,
"version": "0.0.1",
"server_url": s.SERVER_URL,
"user": current_user,
"enabled_features": s.FLAGS,
"debug": s.DEBUG,
},
}
return ctx
def error(request, message: str, detail: str, code: int = 400):
context = get_base_context(request)
error_context = {
"error_message": message,
"error_detail": detail,
}
if request.headers.get("Accept") == "application/json":
return JsonResponse(error_context, status=500)
context.update(**error_context)
return render(request, "errors/error.html", context, status=code)
def handler400(request, exception):
"""Custom 400 Bad Request error handler"""
context = get_error_handler_context(request)
context["public_mode"] = True
return render(request, "errors/400_bad_request.html", context, status=400)
def handler403(request, exception):
"""Custom 403 Forbidden error handler"""
context = get_error_handler_context(request)
context["public_mode"] = True
return render(request, "errors/403_access_denied.html", context, status=403)
def handler404(request, exception):
"""Custom 404 Not Found error handler"""
context = get_error_handler_context(request)
context["public_mode"] = True
return render(request, "errors/404_not_found.html", context, status=404)
def handler500(request):
"""Custom 500 Internal Server Error handler"""
context = get_error_handler_context(request)
error_context = {}
error_context["error_message"] = "Internal Server Error"
error_context["error_detail"] = "An unexpected error occurred. Please try again later."
if request.headers.get("Accept") == "application/json":
return JsonResponse(error_context, status=500)
context["public_mode"] = True
context["error_code"] = 500
context["error_description"] = (
"We encountered an unexpected error while processing your request. Our team has been notified and is working to resolve the issue."
)
context.update(**error_context)
return render(request, "errors/error.html", context, status=500)
def login(request):
context = get_base_context(request)
if s.CAP_ENABLED:
context["CAP_ENABLED"] = s.CAP_ENABLED
context["CAP_API_BASE"] = s.CAP_API_BASE
context["CAP_SITE_KEY"] = s.CAP_SITE_KEY
if request.method == "GET":
context["title"] = "enviPath"
context["next"] = request.GET.get("next", "")
return render(request, "static/login.html", context)
elif request.method == "POST":
from django.contrib.auth import authenticate, login
username = request.POST.get("username").strip()
if username != request.POST.get("username"):
context["message"] = "Login failed!"
return render(request, "static/login.html", context)
password = request.POST.get("password")
# Get email for username and check if the account is active
try:
# Try username and if it fails check if username is a valid email adress and we'll find a user
try:
temp_user = get_user_model().objects.get(username=username)
except get_user_model().DoesNotExist as e:
# validate_email returns None if input is valid -> check for None
# Otherwise a ValidationError is raised
if validate_email(username) is None:
temp_user = get_user_model().objects.get(email=username)
else:
raise e
if not temp_user.is_active:
context["message"] = "User account is not activated yet!"
return render(request, "static/login.html", context)
email = temp_user.email
except (get_user_model().DoesNotExist, ValidationError):
context["message"] = "Login failed!"
return render(request, "static/login.html", context)
except Exception as e:
logger.info(f"Uncaught exception while trying to login: {e}")
context["message"] = "Login failed!"
return render(request, "static/login.html", context)
try:
user = authenticate(username=email, password=password)
except Exception:
context["message"] = "Login failed!"
return render(request, "static/login.html", context)
if user is not None:
login(request, user)
if next := request.POST.get("next"):
return redirect(next)
return redirect(reverse("index"))
else:
context["message"] = "Login failed!"
return render(request, "static/login.html", context)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def logout(request):
if request.method == "POST":
is_logout = bool(request.POST.get("logout", False))
if is_logout:
from django.contrib.auth import logout
logout(request)
return redirect(s.SERVER_URL)
return HttpResponseBadRequest()
def register(request):
context = get_base_context(request)
if s.CAP_ENABLED:
context["CAP_ENABLED"] = s.CAP_ENABLED
context["CAP_API_BASE"] = s.CAP_API_BASE
context["CAP_SITE_KEY"] = s.CAP_SITE_KEY
if request.method == "GET":
# Redirect to unified login page with signup tab
next_url = request.GET.get("next", "")
redirect_url = reverse("login") + "#signup"
if next_url:
redirect_url += f"?next={next_url}"
return redirect(redirect_url)
elif request.method == "POST":
context["title"] = "enviPath"
if next := request.POST.get("next"):
context["next"] = next
# Catpcha
if s.CAP_ENABLED:
cap_token = request.POST.get("cap-token")
if not cap_token:
context["message"] = "Missing CAP Token."
return render(request, "static/login.html", context)
verify_url = f"{s.CAP_API_BASE}/{s.CAP_SITE_KEY}/siteverify"
payload = {
"secret": s.CAP_SECRET_KEY,
"response": cap_token,
}
try:
resp = requests.post(verify_url, json=payload, timeout=10)
resp.raise_for_status()
verify_data = resp.json()
except requests.RequestException:
context["message"] = "Captcha verification failed."
return render(request, "static/login.html", context)
if not verify_data.get("success"):
context["message"] = "Captcha check failed. Please try again."
return render(request, "static/login.html", context)
# End Captcha
username = request.POST.get("username", "").strip()
email = request.POST.get("email", "").strip()
password = request.POST.get("password", "").strip()
rpassword = request.POST.get("rpassword", "").strip()
if not (username and email and password):
context["message"] = "Invalid username/email/password"
return render(request, "static/login.html", context)
try:
UnicodeUsernameValidator()(username)
except ValidationError:
context["message"] = (
"Enter a valid username. This value may contain only letters, "
"numbers, and @/./+/-/_ characters."
)
return render(request, "static/login.html", context)
if password != rpassword or password == "":
context["message"] = "Registration failed, provided passwords differ!"
return render(request, "static/login.html", context)
try:
u = UserManager.create_user(username, email, password)
logger.info(f"Created user {u.username} ({u.pk})")
try:
from .tasks import send_registration_mail
send_registration_mail.delay(u.pk)
except Exception as e:
logger.error(f"Failed to send registration mail to {u.email}: {e}")
capture_exception(e)
except Exception:
context["message"] = "Registration failed! Couldn't create User Account."
return render(request, "static/login.html", context)
if s.ADMIN_APPROVAL_REQUIRED:
context["success_message"] = (
"Your account has been created! An admin will activate it soon!"
)
else:
context["success_message"] = (
"Account has been created! You'll receive a mail to activate your account shortly."
)
return render(request, "static/login.html", context)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def editable(request, user):
# if user.is_superuser:
# return True
url = request.build_absolute_uri(request.path)
if PackageManager.is_package_url(url):
_package = PackageManager.get_package_lp(request.build_absolute_uri())
return PackageManager.writable(user, _package)
elif GroupManager.is_group_url(url):
_group = GroupManager.get_group_lp(request.build_absolute_uri())
return GroupManager.writable(user, _group)
elif UserManager.is_user_url(url):
_user = UserManager.get_user_lp(request.build_absolute_uri())
return UserManager.writable(user, _user)
elif url in [
s.SERVER_URL,
f"{s.SERVER_URL}/",
f"{s.SERVER_URL}/package",
f"{s.SERVER_URL}/user",
f"{s.SERVER_URL}/group",
f"{s.SERVER_URL}/search",
]:
return True
else:
logger.debug(f"Unknown url: {url}")
return False
def get_base_context(request, for_user=None) -> Dict[str, Any]:
current_user = _anonymous_or_real(request)
can_edit = editable(request, current_user)
parser = EPDBURLParser(request.build_absolute_uri(request.path))
url_contains_package = False
if parser.contains_package_url() or parser.is_package_url():
url_contains_package = True
if for_user:
current_user = for_user
ctx = {
"title": "enviPath",
"meta": {
"version": "0.0.1",
"server_url": s.SERVER_URL,
"user": current_user,
"can_edit": can_edit,
"url_contains_package": url_contains_package,
"readable_packages": PackageManager.get_all_readable_packages(
current_user, include_reviewed=True
),
"writeable_packages": PackageManager.get_all_writeable_packages(current_user),
"available_groups": GroupManager.get_groups(current_user),
"available_settings": SettingManager.get_all_settings(current_user),
"enabled_features": s.FLAGS,
"debug": s.DEBUG,
"external_databases": ExternalDatabase.get_databases(),
"site_id": s.MATOMO_SITE_ID,
# EDIT START
"secret_groups": Group.objects.filter(secret=True),
# EDIT END
},
}
return ctx
def _anonymous_or_real(request):
if request.user and (request.user.is_authenticated and not request.user.is_anonymous):
return request.user
return get_user_model().objects.get(username="anonymous")
def breadcrumbs(
first_level_object=None,
second_level_namespace=None,
second_level_object=None,
third_level_namespace=None,
third_level_object=None,
) -> List[Dict[str, str]]:
bread = [
{"Home": s.SERVER_URL},
{"Package": s.SERVER_URL + "/package"},
]
if first_level_object is not None:
bread.append({first_level_object.get_name(): first_level_object.url})
if second_level_namespace is not None:
bread.append(
{
f"{second_level_namespace}".capitalize(): first_level_object.url
+ f"/{second_level_namespace}"
}
)
if second_level_object is not None:
bread.append({second_level_object.get_name(): second_level_object.url})
if third_level_namespace is not None:
bread.append(
{
f"{third_level_namespace}".capitalize(): second_level_object.url
+ f"/{third_level_namespace}"
}
)
if third_level_object is not None:
bread.append({third_level_object.get_name(): third_level_object.url})
return bread
def set_scenarios(current_user, attach_object, scenario_urls: List[str]):
scens = []
for scenario_url in scenario_urls:
# As empty lists will be removed in POST request we'll send ['']
if scenario_url == "":
continue
package = PackageManager.get_package_by_url(current_user, scenario_url)
scen = Scenario.objects.get(package=package, uuid=scenario_url.split("/")[-1])
scens.append(scen)
attach_object.set_scenarios(scens)
def set_aliases(current_user, attach_object, aliases: List[str]):
# As empty lists will be removed in POST request we'll send ['']
if aliases == [""]:
aliases = []
attach_object.aliases = aliases
attach_object.save()
def copy_object(current_user, target_package: "Package", source_object_url: str):
# Ensures that source object is readable
source_package = PackageManager.get_package_by_url(current_user, source_object_url)
if source_package == target_package:
raise ValueError(f"Can't copy object {source_object_url} to the same package!")
parser = EPDBURLParser(source_object_url)
# if the url don't contain a package or is a plain package
if not parser.contains_package_url():
raise ValueError(f"Object {source_object_url} can't be copied!")
# Gets the most specific object
source_object = parser.get_object()
if hasattr(source_object, "copy"):
mapping = dict()
copy = source_object.copy(target_package, mapping)
if s.DEBUG:
for k, v in mapping.items():
logger.debug(f"Mapping {k.url} to {v.url}")
return copy
raise ValueError(f"Object {source_object} can't be copied!")
def index(request):
context = get_base_context(request)
context["title"] = "enviPath - Home"
context["meta"]["current_package"] = context["meta"]["user"].default_package
if request.GET.get("getMLServerPath", False):
return JsonResponse({"mlServerPath": s.SERVER_URL})
return render(request, "index/index.html", context)
def predict_pathway(request):
"""Top-level predict pathway view using user's default package."""
if request.method != "GET":
return HttpResponseNotAllowed(["GET"])
context = get_base_context(request)
context["title"] = "enviPath - Predict Pathway"
context["meta"]["current_package"] = context["meta"]["user"].default_package
return render(request, "predict_pathway.html", context)
def batch_predict_pathway(request):
"""Top-level predict pathway view using user's default package."""
if request.method != "GET":
return HttpResponseNotAllowed(["GET"])
context = get_base_context(request)
context["title"] = "enviPath - Batch Predict Pathway"
context["meta"]["current_package"] = context["meta"]["user"].default_package
return render(request, "batch_predict_pathway.html", context)
@package_permission_required()
def package_predict_pathway(request, package_uuid):
"""Package-specific predict pathway view."""
if request.method != "GET":
return HttpResponseNotAllowed(["GET"])
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()} - Predict Pathway"
context["meta"]["current_package"] = current_package
return render(request, "predict_pathway.html", context)
def packages(request):
current_user = _anonymous_or_real(request)
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Packages"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Package": s.SERVER_URL + "/package"},
]
# Context for paginated template
context["entity_type"] = "package"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/packages/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "packages"
return render(request, "collections/packages_paginated.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden in ["import-legacy-package-json", "import-package-json"]:
f = request.FILES["file"]
try:
file_data = f.read().decode("utf-8")
data = json.loads(file_data)
if hidden == "import-legacy-package-json":
pack = PackageManager.import_legacy_package(data, current_user)
else:
pack = PackageManager.import_package(data, current_user)
return redirect(pack.url)
except UnicodeDecodeError:
return error(request, "Invalid encoding.", "Invalid encoding, must be UTF-8")
else:
return HttpResponseBadRequest()
else:
package_name = request.POST.get("package-name")
package_description = request.POST.get(
"package-description", s.DEFAULT_VALUES["description"]
)
# EDIT START
data_pool = None
package_classification = request.POST.get("package-classification")
classification = Package.Classification(int(package_classification))
# For SECRET we'll need a data pool which will be an additional perm check later
if classification == Package.Classification.SECRET:
package_data_pool = request.POST.get("package-data-pool")
if package_data_pool is None:
return error(request, "Invalid data pool.", "Data Pool is required!")
data_pool = GroupManager.get_group_by_url(current_user, package_data_pool)
if data_pool is None:
return error(request, "Invalid data pool.", "Data Pool does not exist or no access!")
if not data_pool.secret:
return error(request, "Invalid data pool.", "Data Pool is not a secret group!")
created_package = PackageManager.create_package(
current_user, package_name, package_description
)
created_package.classification_level = classification
# Set previously determined data pool
if classification == Package.Classification.SECRET:
created_package.data_pool = data_pool
created_package.save()
# EDIT END
return redirect(created_package.url)
elif request.method == "OPTIONS":
response = HttpResponse()
response["allow"] = ",".join(["GET", "POST"])
return response
else:
return HttpResponseNotAllowed(["GET", "POST"])
def compounds(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Compounds"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Compound": s.SERVER_URL + "/compound"},
]
# Context for paginated template
context["entity_type"] = "compound"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/compounds/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_mode"] = "tabbed"
context["list_title"] = "compounds"
return render(request, "collections/compounds_paginated.html", context)
elif request.method == "POST":
# delegate to default package
current_user = _anonymous_or_real(request)
default_package = current_user.default_package
return package_compounds(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def rules(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Rules"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Rule": s.SERVER_URL + "/rule"},
]
# Context for paginated template
context["entity_type"] = "rule"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/rules/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "rules"
return render(request, "collections/rules_paginated.html", context)
elif request.method == "POST":
# delegate to default package
current_user = _anonymous_or_real(request)
default_package = current_user.default_package
return package_rules(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def reactions(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Reactions"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Reaction": s.SERVER_URL + "/reaction"},
]
# Context for paginated template
context["entity_type"] = "reaction"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/reactions/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "reactions"
return render(request, "collections/reactions_paginated.html", context)
elif request.method == "POST":
# delegate to default package
current_user = _anonymous_or_real(request)
default_package = current_user.default_package
return package_reactions(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def pathways(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Pathways"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Pathway": s.SERVER_URL + "/pathway"},
]
# Context for paginated template
context["entity_type"] = "pathway"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/pathways/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "pathways"
return render(request, "collections/pathways_paginated.html", context)
elif request.method == "POST":
# delegate to default package
current_user = _anonymous_or_real(request)
default_package = current_user.default_package
return package_pathways(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def scenarios(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Scenarios"
context["object_type"] = "scenario"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Scenario": s.SERVER_URL + "/scenario"},
]
# Context for paginated template
context["entity_type"] = "scenario"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/scenarios/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "scenarios"
return render(request, "collections/scenarios_paginated.html", context)
elif request.method == "POST":
# delegate to default package
default_package = request.user.default_package
return package_scenarios(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def models(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Models"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Model": s.SERVER_URL + "/model"},
]
# Keep model_types for potential modal/action use
context["model_types"] = {
"ML Relative Reasoning": {
"type": "ml-relative-reasoning",
"requires_rule_packages": True,
"requires_data_packages": True,
},
"Rule Based Relative Reasoning": {
"type": "rule-based-relative-reasoning",
"requires_rule_packages": True,
"requires_data_packages": True,
},
"EnviFormer": {
"type": "enviformer",
"requires_rule_packages": False,
"requires_data_packages": True,
},
}
if s.FLAGS.get("PLUGINS", False):
for k, v in s.CLASSIFIER_PLUGINS.items():
context["model_types"][v.display()] = {
"type": k,
"requires_rule_packages": v.requires_rule_packages(),
"requires_data_packages": v.requires_data_packages(),
}
for k, v in s.PROPERTY_PLUGINS.items():
context["model_types"][v.display()] = {
"type": k,
"requires_rule_packages": v.requires_rule_packages(),
"requires_data_packages": v.requires_data_packages(),
}
# Context for paginated template
context["entity_type"] = "model"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/models/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "models"
return render(request, "collections/models_paginated.html", context)
elif request.method == "POST":
current_user = _anonymous_or_real(request)
default_package = current_user.default_package
return package_models(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def search(request):
current_user = _anonymous_or_real(request)
if request.method == "GET":
package_urls = request.GET.getlist("packages")
searchterm = request.GET.get("search", "").strip()
mode = request.GET.get("mode")
# add HTTP_ACCEPT check to differentiate between index and ajax call
if "application/json" in request.META.get("HTTP_ACCEPT") and all([searchterm, mode]):
if package_urls:
packages = [
PackageManager.get_package_by_url(current_user, p) for p in package_urls
]
else:
packages = PackageManager.get_reviewed_packages()
search_result = SearchManager.search(packages, searchterm, mode)
return JsonResponse(search_result, safe=False)
context = get_base_context(request)
context["title"] = "enviPath - Search"
context["object_type"] = "model"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Search": s.SERVER_URL + "/search"},
]
reviewed_package_qs = PackageManager.get_reviewed_packages()
unreviewed_package_qs = PackageManager.get_all_readable_packages(current_user)
context["reviewed_objects"] = reviewed_package_qs
context["unreviewed_objects"] = unreviewed_package_qs
if all([searchterm, mode]):
if package_urls:
packages = [
PackageManager.get_package_by_url(current_user, p) for p in package_urls
]
else:
packages = PackageManager.get_reviewed_packages()
context["search_result"] = SearchManager.search(packages, searchterm, mode)
context["search_result"]["searchterm"] = searchterm
return render(request, "search.html", context)
else:
return HttpResponseNotAllowed(["GET"])
@package_permission_required()
def package_models(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()} - Models"
context["meta"]["current_package"] = current_package
context["object_type"] = "model"
context["breadcrumbs"] = breadcrumbs(current_package, "model")
context["entity_type"] = "model"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/package/{current_package.uuid}/model/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "models"
reviewed_model_qs = EPModel.objects.none()
unreviewed_model_qs = EPModel.objects.none()
if current_package.reviewed:
reviewed_model_qs = EPModel.objects.filter(package=current_package).order_by("name")
else:
unreviewed_model_qs = EPModel.objects.filter(package=current_package).order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.get_name(), "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_model_qs if current_package.reviewed else unreviewed_model_qs
)
]
}
)
context["model_types"] = {
"ML Relative Reasoning": {
"type": "ml-relative-reasoning",
"requires_rule_packages": True,
"requires_data_packages": True,
},
"Rule Based Relative Reasoning": {
"type": "rule-based-relative-reasoning",
"requires_rule_packages": True,
"requires_data_packages": True,
},
"EnviFormer": {
"type": "enviformer",
"requires_rule_packages": False,
"requires_data_packages": True,
},
}
if s.FLAGS.get("PLUGINS", False):
for k, v in s.CLASSIFIER_PLUGINS.items():
context["model_types"][v.display()] = {
"type": k,
"requires_rule_packages": v.requires_rule_packages(),
"requires_data_packages": v.requires_data_packages(),
"additional_parameters": v.Config.__name__.lower()
if v.Config.__name__ != ""
else None,
}
for k, v in s.PROPERTY_PLUGINS.items():
context["model_types"][v.display()] = {
"type": k,
"requires_rule_packages": v.requires_rule_packages(),
"requires_data_packages": v.requires_data_packages(),
}
return render(request, "collections/models_paginated.html", context)
elif request.method == "POST":
log_post_params(request)
name = request.POST.get("model-name")
description = request.POST.get("model-description")
model_type = request.POST.get("model-type")
# Generic fields for ML and Rule Based
rule_packages = request.POST.getlist("model-rule-packages")
data_packages = request.POST.getlist("model-data-packages")
# Generic params
params = {
"package": current_package,
"name": name,
"description": description,
"data_packages": [
PackageManager.get_package_by_url(current_user, p) for p in data_packages
],
}
if model_type == "enviformer":
threshold = float(request.POST.get("model-threshold", 0.5))
params["threshold"] = threshold
mod = EnviFormer.create(**params)
elif model_type == "ml-relative-reasoning":
# ML Specific
threshold = float(request.POST.get("model-threshold", 0.5))
# TODO handle additional fingerprinter
# fingerprinter = request.POST.get("model-fingerprinter")
params["rule_packages"] = [
PackageManager.get_package_by_url(current_user, p) for p in rule_packages
]
# App Domain related parameters
build_ad = request.POST.get("build-app-domain", False) == "on"
num_neighbors = request.POST.get("num-neighbors", 5)
reliability_threshold = request.POST.get("reliability-threshold", 0.5)
local_compatibility_threshold = request.POST.get("local-compatibility-threshold", 0.5)
params["threshold"] = threshold
# params['fingerprinter'] = fingerprinter
params["build_app_domain"] = build_ad
params["app_domain_num_neighbours"] = num_neighbors
params["app_domain_reliability_threshold"] = reliability_threshold
params["app_domain_local_compatibility_threshold"] = local_compatibility_threshold
mod = MLRelativeReasoning.create(**params)
elif model_type == "rule-based-relative-reasoning":
params["rule_packages"] = [
PackageManager.get_package_by_url(current_user, p) for p in rule_packages
]
mod = RuleBasedRelativeReasoning.create(**params)
elif s.FLAGS.get("PLUGINS", False) and model_type in s.CLASSIFIER_PLUGINS:
params["plugin_identifier"] = model_type
impl = s.CLASSIFIER_PLUGINS[model_type]
if impl.requires_rule_packages():
params["rule_packages"] = [
PackageManager.get_package_by_url(current_user, p) for p in rule_packages
]
else:
params["rule_packages"] = []
if not impl.requires_data_packages():
params["data_packages"] = []
params["config"] = impl.parse_config(request.POST.dict())
mod = ClassifierPluginModel.create(**params)
elif s.FLAGS.get("PLUGINS", False) and model_type in s.PROPERTY_PLUGINS:
params["plugin_identifier"] = model_type
impl = s.PROPERTY_PLUGINS[model_type]
if impl.requires_rule_packages():
params["rule_packages"] = [
PackageManager.get_package_by_url(current_user, p) for p in rule_packages
]
else:
params["rule_packages"] = []
if not impl.requires_data_packages():
del params["data_packages"]
mod = PropertyPluginModel.create(**params)
else:
return error(
request, "Invalid model type.", f'Model type "{model_type}" is not supported."'
)
from .tasks import build_model, dispatch
dispatch(current_user, build_model, mod.pk)
return redirect(mod.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_model(request, package_uuid, model_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_model = get_object_or_404(EPModel, package=current_package, uuid=model_uuid)
if request.method == "GET":
classify = request.GET.get("classify", False)
ad_assessment = request.GET.get("app-domain-assessment", False)
# TODO this needs to be generic
half_life = request.GET.get("half_life", False)
if any([classify, ad_assessment, half_life]):
smiles = request.GET.get("smiles", "").strip()
# Check if smiles is non empty and valid
if smiles == "":
return JsonResponse({"error": "Received empty SMILES"}, status=400)
stereo = FormatConverter.has_stereo(smiles)
try:
stand_smiles = FormatConverter.standardize(smiles, remove_stereo=True)
except ValueError:
return JsonResponse({"error": f'"{smiles}" is not a valid SMILES'}, status=400)
if classify:
from epdb.tasks import dispatch_eager, predict_simple
_, pred_res = dispatch_eager(
current_user, predict_simple, current_model.pk, stand_smiles
)
res = {"pred": [], "stereo": stereo}
for pr in pred_res:
if len(pr) > 0:
products = []
for prod_set in pr.product_sets:
logger.debug(f"Checking {prod_set}")
products.append(tuple([x for x in prod_set]))
res["pred"].append(
{
"products": list(set(products)),
"probability": pr.probability,
"btrule": {k: getattr(pr.rule, k) for k in ["url", "name"]}
if pr.rule is not None
else None,
}
)
return JsonResponse(res, safe=False)
elif half_life:
from epdb.tasks import dispatch_eager, predict_simple
_, run_res = dispatch_eager(
current_user, predict_simple, current_model.pk, stand_smiles, include_svg=True
)
# Here we expect a single result
if isinstance(run_res.result, Iterable):
return JsonResponse(run_res.result[0].model_dump(mode="json"), safe=False)
return JsonResponse(run_res.result.model_dump(mode="json"), safe=False)
else:
app_domain_assessment = current_model.app_domain.assess(stand_smiles)
return JsonResponse(app_domain_assessment, safe=False)
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()} - {current_model.get_name()}"
context["meta"]["current_package"] = current_package
context["object_type"] = "model"
context["breadcrumbs"] = breadcrumbs(current_package, "model", current_model)
context["model"] = current_model
context["current_object"] = current_model
if isinstance(current_model, PropertyPluginModel):
context["plugin_identifier"] = current_model.plugin_identifier
return render(request, "objects/model/property_model.html", context)
else:
return render(request, "objects/model/classification_model.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_model.delete()
return redirect(current_package.url + "/model")
elif hidden == "evaluate":
from .tasks import dispatch, evaluate_model
eval_type = request.POST.get("model-evaluation-type")
if eval_type not in ["sg", "mg"]:
return error(
request,
"Invalid evaluation type",
f'Evaluation type "{eval_type}" is not supported. Only "sg" and "mg" are supported.',
)
multigen = eval_type == "mg"
eval_packages = request.POST.getlist("model-evaluation-packages")
eval_package_ids = [
PackageManager.get_package_by_url(current_user, p).id for p in eval_packages
]
dispatch(current_user, evaluate_model, current_model.pk, multigen, eval_package_ids)
return redirect(current_model.url)
elif hidden == "retrain":
from .tasks import dispatch, retrain
dispatch(current_user, retrain, current_model.pk)
return redirect(current_model.url)
else:
return HttpResponseBadRequest()
else:
# TODO: Move cleaning to property updater
name = request.POST.get("model-name")
if name is not None:
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
description = request.POST.get("model-description")
if description is not None:
description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
if any([name, description]):
if name:
current_model.name = name
if description:
current_model.description = description
current_model.save()
return redirect(current_model.url)
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
if request.GET.get("export", False) == "true":
filename = f"{current_package.get_name().replace(' ', '_')}_{current_package.uuid}.json"
pack_json = PackageManager.export_package(
current_package, include_models=False, include_external_identifiers=False
)
response = JsonResponse(pack_json, content_type="application/json")
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()}"
context["meta"]["current_package"] = current_package
context["object_type"] = "package"
context["breadcrumbs"] = breadcrumbs(current_package)
context["package"] = current_package
user_perms = UserPackagePermission.objects.filter(package=current_package)
users = get_user_model().objects.exclude(
id__in=UserPackagePermission.objects.filter(package=current_package).values_list(
"user_id", flat=True
)
)
users = users.filter(is_active=True)
group_perms = GroupPackagePermission.objects.filter(package=current_package)
groups = Group.objects.exclude(
id__in=GroupPackagePermission.objects.filter(package=current_package).values_list(
"group_id", flat=True
)
)
context["users"] = users
context["groups"] = groups
context["user_permissions"] = user_perms
context["group_permissions"] = group_perms
return render(request, "objects/package.html", context)
elif request.method == "POST":
log_post_params(request)
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
if current_user.default_package == current_package:
return error(
request,
f'Package "{current_package.get_name()}" is the default and cannot be deleted!',
"You cannot delete the default package. If you want to delete this package you have to set another default package first.",
)
logger.debug(current_package.delete())
return redirect(s.SERVER_URL + "/package")
elif hidden == "publish-package":
for g in Group.objects.filter(public=True):
PackageManager.grant_read(current_user, current_package, g)
return redirect(current_package.url)
elif hidden == "copy":
object_to_copy = request.POST.get("object_to_copy")
if not object_to_copy:
return error(request, "No object to copy", "There was no object to copy.")
try:
copied_object = copy_object(current_user, current_package, object_to_copy)
except ValueError:
return JsonResponse(
{"error": f"Can't copy object {object_to_copy} to the same package!"},
status=400,
)
return JsonResponse({"success": copied_object.url})
else:
return HttpResponseBadRequest()
# TODO: Move cleaning to property updater
new_package_name = request.POST.get("package-name")
if new_package_name is not None:
new_package_name = nh3.clean(new_package_name, tags=s.ALLOWED_HTML_TAGS).strip()
new_package_description = request.POST.get("package-description")
if new_package_description is not None:
new_package_description = nh3.clean(
new_package_description, tags=s.ALLOWED_HTML_TAGS
).strip()
grantee_url = request.POST.get("grantee")
read = request.POST.get("read") == "on"
write = request.POST.get("write") == "on"
owner = request.POST.get("owner") == "on"
cc_string = request.POST.get("license")
if new_package_name:
current_package.name = new_package_name
if new_package_description:
current_package.description = new_package_description
if any([new_package_name, new_package_description]):
current_package.save()
return redirect(current_package.url)
elif any([grantee_url, read, write, owner]):
if "user" in grantee_url:
grantee = UserManager.get_user_lp(grantee_url)
else:
grantee = GroupManager.get_group_lp(grantee_url)
max_perm = None
if read:
max_perm = Permission.READ[0]
if write:
max_perm = Permission.WRITE[0]
if owner:
max_perm = Permission.ALL[0]
PackageManager.update_permissions(current_user, current_package, grantee, max_perm)
return redirect(current_package.url)
elif cc_string is not None:
cc_string = cc_string.strip()
if cc_string == "no-license": # Reset the package's license
current_package.license = None
current_package.save()
return redirect(current_package.url)
else: # Get the license and assign it to the package
current_package.license = License.objects.get(cc_string=cc_string)
current_package.save()
return redirect(current_package.url)
else:
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_compounds(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()} - Compounds"
context["meta"]["current_package"] = current_package
context["object_type"] = "compound"
context["breadcrumbs"] = breadcrumbs(current_package, "compound")
context["entity_type"] = "compound"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/package/{current_package.uuid}/compound/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_mode"] = "tabbed"
context["list_title"] = "compounds"
reviewed_compound_qs = Compound.objects.none()
unreviewed_compound_qs = Compound.objects.none()
if current_package.reviewed:
reviewed_compound_qs = Compound.objects.filter(package=current_package).order_by("name")
else:
unreviewed_compound_qs = Compound.objects.filter(package=current_package).order_by(
"name"
)
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.get_name(), "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_compound_qs
if current_package.reviewed
else unreviewed_compound_qs
)
]
}
)
return render(request, "collections/compounds_paginated.html", context)
elif request.method == "POST":
compound_name = request.POST.get("compound-name")
compound_smiles = request.POST.get("compound-smiles")
compound_description = request.POST.get("compound-description")
try:
c = Compound.create(
current_package, compound_smiles, compound_name, compound_description
)
except ValueError as e:
raise BadRequest(str(e))
return redirect(c.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_compound(request, package_uuid, compound_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_compound = get_object_or_404(Compound, package=current_package, uuid=compound_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = (
f"enviPath - {current_package.get_name()} - {current_compound.get_name()}"
)
context["meta"]["current_package"] = current_package
context["object_type"] = "compound"
context["breadcrumbs"] = breadcrumbs(current_package, "compound", current_compound)
context["compound"] = current_compound
context["current_object"] = current_compound
return render(request, "objects/compound.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_compound.delete()
return redirect(current_package.url + "/compound")
else:
return HttpResponseBadRequest()
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_compound, selected_scenarios)
return redirect(current_compound.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_compound, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_compound.url})
# TODO: Move cleaning to property updater
new_compound_name = request.POST.get("compound-name")
if new_compound_name is not None:
new_compound_name = nh3.clean(new_compound_name, tags=s.ALLOWED_HTML_TAGS).strip()
new_compound_description = request.POST.get("compound-description")
if new_compound_description is not None:
new_compound_description = nh3.clean(
new_compound_description, tags=s.ALLOWED_HTML_TAGS
).strip()
if new_compound_name:
current_compound.name = new_compound_name
if new_compound_description:
current_compound.description = new_compound_description
if any([new_compound_name, new_compound_description]):
current_compound.save()
return redirect(current_compound.url)
selected_database = request.POST.get("selected-database", "").strip()
external_identifier = request.POST.get("identifier", "").strip()
if selected_database and external_identifier:
db = ExternalDatabase.objects.get(id=int(selected_database))
ExternalIdentifier.objects.create(
content_object=current_compound,
database=db,
identifier_value=external_identifier,
url=db.url_pattern.format(id=external_identifier),
is_primary=False,
)
return redirect(current_compound.url)
else:
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_compound_structures(request, package_uuid, compound_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = (
f"enviPath - {current_package.get_name()} - {current_compound.get_name()} - Structures"
)
context["meta"]["current_package"] = current_package
context["object_type"] = "structure"
context["breadcrumbs"] = breadcrumbs(
current_package, "compound", current_compound, "structure"
)
context["entity_type"] = "structure"
context["page_title"] = f"{current_compound.get_name()} - Structures"
context["api_endpoint"] = (
f"{s.SERVER_PATH}/api/v1/package/{current_package.uuid}/compound/{current_compound.uuid}/structure/"
)
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["compound"] = current_compound
context["list_mode"] = "combined"
context["list_title"] = "structures"
return render(request, "collections/structures_paginated.html", context)
elif request.method == "POST":
structure_name = request.POST.get("structure-name")
structure_smiles = request.POST.get("structure-smiles")
structure_description = request.POST.get("structure-description")
try:
cs = current_compound.add_structure(
structure_smiles, structure_name, structure_description
)
except ValueError:
return error(
request,
"Adding structure failed!",
"The structure could not be added as normalized structures don't match!",
)
return redirect(cs.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_compound = get_object_or_404(Compound, package=current_package, uuid=compound_uuid)
current_structure = get_object_or_404(
CompoundStructure, compound=current_compound, uuid=structure_uuid
)
if request.method == "GET":
is_image_request = request.GET.get("image")
if is_image_request:
if is_image_request == "svg":
return HttpResponse(current_structure.as_svg, content_type="image/svg+xml")
else:
return HttpResponseBadRequest("Currently only SVG as image formate supported!")
context = get_base_context(request)
context["title"] = (
f"enviPath - {current_package.get_name()} - {current_compound.get_name()} - {current_structure.get_name()}"
)
context["meta"]["current_package"] = current_package
context["object_type"] = "structure"
context["compound_structure"] = current_structure
context["current_object"] = current_structure
context["breadcrumbs"] = breadcrumbs(
current_package, "compound", current_compound, "structure", current_structure
)
return render(request, "objects/compound_structure.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
# Check if we have to delete the compound as no structure is left
if len(current_structure.compound.structures.all()) == 1:
# This will delete the structure as well
current_compound.delete()
return redirect(current_package.url + "/compound")
else:
if current_structure.normalized_structure:
current_compound.delete()
return redirect(current_package.url + "/compound")
else:
if current_compound.default_structure == current_structure:
current_structure.delete()
current_compound.default_structure = (
current_compound.structures.all().first()
)
return redirect(current_compound.url + "/structure")
else:
current_structure.delete()
return redirect(current_compound.url + "/structure")
else:
return HttpResponseBadRequest()
# TODO: Move cleaning to property updater
new_structure_name = request.POST.get("compound-structure-name")
if new_structure_name is not None:
new_structure_name = nh3.clean(new_structure_name, tags=s.ALLOWED_HTML_TAGS).strip()
new_structure_description = request.POST.get("compound-structure-description")
if new_structure_description is not None:
new_structure_description = nh3.clean(
new_structure_description, tags=s.ALLOWED_HTML_TAGS
).strip()
if new_structure_name:
current_structure.name = new_structure_name
if new_structure_description:
current_structure.description = new_structure_description
if any([new_structure_name, new_structure_description]):
current_structure.save()
return redirect(current_structure.url)
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_structure, selected_scenarios)
return redirect(current_structure.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_structure, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_structure.url})
selected_database = request.POST.get("selected-database", "").strip()
external_identifier = request.POST.get("identifier", "").strip()
if selected_database and external_identifier:
db = ExternalDatabase.objects.get(id=int(selected_database))
ExternalIdentifier.objects.create(
content_object=current_structure,
database=db,
identifier_value=external_identifier,
url=db.url_pattern.format(id=external_identifier),
is_primary=False,
)
return redirect(current_structure.url)
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(
[
"GET",
]
)
@package_permission_required()
def package_rules(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()} - Rules"
context["meta"]["current_package"] = current_package
context["object_type"] = "rule"
context["breadcrumbs"] = breadcrumbs(current_package, "rule")
context["entity_type"] = "rule"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/package/{current_package.uuid}/rule/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "rules"
reviewed_rule_qs = Rule.objects.none()
unreviewed_rule_qs = Rule.objects.none()
if current_package.reviewed:
reviewed_rule_qs = Rule.objects.filter(package=current_package).order_by("name")
else:
unreviewed_rule_qs = Rule.objects.filter(package=current_package).order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.get_name(), "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_rule_qs if current_package.reviewed else unreviewed_rule_qs
)
]
}
)
return render(request, "collections/rules_paginated.html", context)
elif request.method == "POST":
log_post_params(request)
# Generic params
rule_name = request.POST.get("rule-name")
rule_description = request.POST.get("rule-description")
rule_type = request.POST.get("rule-type")
params = {}
# Obtain parameters as required by rule type
if rule_type == "SimpleAmbitRule":
params["smirks"] = request.POST.get("rule-smirks")
params["reactant_filter_smarts"] = request.POST.get("rule-reactant-smarts")
params["product_filter_smarts"] = request.POST.get("rule-product-smarts")
elif rule_type == "SimpleRDKitRule":
params["reaction_smarts"] = request.POST.get("rule-reaction-smarts")
elif rule_type == "ParallelRule":
pass
elif rule_type == "SequentialRule":
pass
else:
return HttpResponseBadRequest()
r = Rule.create(
rule_type=rule_type,
package=current_package,
name=rule_name,
description=rule_description,
**params,
)
return redirect(r.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_rule(request, package_uuid, rule_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_rule = get_object_or_404(Rule, package=current_package, uuid=rule_uuid)
if request.method == "GET":
context = get_base_context(request)
if smiles := request.GET.get("smiles", False):
stand_smiles = FormatConverter.standardize(smiles, remove_stereo=True)
res = current_rule.apply(stand_smiles)
if len(res) > 1:
logger.info(
f"Rule {current_rule.uuid} returned multiple product sets on {smiles}, picking the first one."
)
# Some Rules are touching unrelated areas which might result in ~ indicating
# any bond (-, =, #). For drawing we need a concrete bond. -> use single bond
product_smiles = [x.replace("~", "-") for x in res[0]]
smirks = f"{stand_smiles}>>{'.'.join(sorted(product_smiles))}"
# Usually the functional groups are a mapping of fg -> count
# As we are doing it on the fly here fake a high count to ensure that its properly highlighted
if isinstance(current_rule, SimpleAmbitRule):
educt_functional_groups = {current_rule.reactants_smarts: 1000}
product_functional_groups = {current_rule.products_smarts: 1000}
else:
educt_functional_groups = {x: 1000 for x in current_rule.reactants_smarts}
product_functional_groups = {x: 1000 for x in current_rule.products_smarts}
return HttpResponse(
IndigoUtils.smirks_to_svg(
smirks,
False,
0,
0,
educt_functional_groups=educt_functional_groups,
product_functional_groups=product_functional_groups,
),
content_type="image/svg+xml",
)
context["title"] = f"enviPath - {current_package.get_name()} - {current_rule.get_name()}"
context["meta"]["current_package"] = current_package
context["object_type"] = "rule"
context["breadcrumbs"] = breadcrumbs(current_package, "rule", current_rule)
context["rule"] = current_rule
context["current_object"] = current_rule
if isinstance(current_rule, SimpleAmbitRule):
return render(request, "objects/simple_rule.html", context)
else: # isinstance(current_rule, ParallelRule) or isinstance(current_rule, SequentialRule):
return render(request, "objects/composite_rule.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_rule.delete()
return redirect(current_package.url + "/rule")
else:
return HttpResponseBadRequest()
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_rule, selected_scenarios)
return redirect(current_rule.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_rule, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_rule.url})
# TODO: Move cleaning to property updater
rule_name = request.POST.get("rule-name")
if rule_name is not None:
rule_name = nh3.clean(rule_name, tags=s.ALLOWED_HTML_TAGS).strip()
rule_description = request.POST.get("rule-description")
if rule_description is not None:
rule_description = nh3.clean(rule_description, tags=s.ALLOWED_HTML_TAGS).strip()
if rule_name:
current_rule.name = rule_name
if rule_description:
current_rule.description = rule_description
if any([rule_name, rule_description]):
current_rule.save()
return redirect(current_rule.url)
else:
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_rule_enzymelink(request, package_uuid, rule_uuid, enzymelink_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_rule = Rule.objects.get(package=current_package, uuid=rule_uuid)
current_enzymelink = EnzymeLink.objects.get(rule=current_rule, uuid=enzymelink_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()} - {current_rule.get_name()}"
context["meta"]["current_package"] = current_package
context["object_type"] = "enzyme"
context["breadcrumbs"] = breadcrumbs(
current_package, "rule", current_rule, "enzymelink", current_enzymelink
)
context["enzymelink"] = current_enzymelink
context["current_object"] = current_enzymelink
return render(request, "objects/enzymelink.html", context)
return HttpResponseNotAllowed(["GET"])
@package_permission_required()
def package_reactions(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()} - Reactions"
context["meta"]["current_package"] = current_package
context["object_type"] = "reaction"
context["breadcrumbs"] = breadcrumbs(current_package, "reaction")
context["entity_type"] = "reaction"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/package/{current_package.uuid}/reaction/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "reactions"
reviewed_reaction_qs = Reaction.objects.none()
unreviewed_reaction_qs = Reaction.objects.none()
if current_package.reviewed:
reviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by("name")
else:
unreviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by(
"name"
)
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.get_name(), "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_reaction_qs
if current_package.reviewed
else unreviewed_reaction_qs
)
]
}
)
return render(request, "collections/reactions_paginated.html", context)
elif request.method == "POST":
reaction_name = request.POST.get("reaction-name")
reaction_description = request.POST.get("reaction-description")
reactions_smirks = request.POST.get("reaction-smirks")
educts = reactions_smirks.split(">>")[0].split(".")
products = reactions_smirks.split(">>")[1].split(".")
r = Reaction.create(
current_package,
name=reaction_name,
description=reaction_description,
educts=educts,
products=products,
)
return redirect(r.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_reaction(request, package_uuid, reaction_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_reaction = get_object_or_404(Reaction, package=current_package, uuid=reaction_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = (
f"enviPath - {current_package.get_name()} - {current_reaction.get_name()}"
)
context["meta"]["current_package"] = current_package
context["object_type"] = "reaction"
context["breadcrumbs"] = breadcrumbs(current_package, "reaction", current_reaction)
context["reaction"] = current_reaction
context["current_object"] = current_reaction
return render(request, "objects/reaction.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_reaction.delete()
return redirect(current_package.url + "/reaction")
else:
return HttpResponseBadRequest()
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_reaction, selected_scenarios)
return redirect(current_reaction.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_reaction, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_reaction.url})
# TODO: Move cleaning to property updater
new_reaction_name = request.POST.get("reaction-name")
if new_reaction_name is not None:
new_reaction_name = nh3.clean(new_reaction_name, tags=s.ALLOWED_HTML_TAGS).strip()
new_reaction_description = request.POST.get("reaction-description")
if new_reaction_description is not None:
new_reaction_description = nh3.clean(
new_reaction_description, tags=s.ALLOWED_HTML_TAGS
).strip()
if new_reaction_name:
current_reaction.name = new_reaction_name
if new_reaction_description:
current_reaction.description = new_reaction_description
if any([new_reaction_name, new_reaction_description]):
current_reaction.save()
return redirect(current_reaction.url)
selected_database = request.POST.get("selected-database", "").strip()
external_identifier = request.POST.get("identifier", "").strip()
if selected_database and external_identifier:
db = ExternalDatabase.objects.get(id=int(selected_database))
ExternalIdentifier.objects.create(
content_object=current_reaction,
database=db,
identifier_value=external_identifier,
url=db.url_pattern.format(id=external_identifier),
is_primary=False,
)
return redirect(current_reaction.url)
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathways(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()} - Pathways"
context["meta"]["current_package"] = current_package
context["object_type"] = "pathway"
context["breadcrumbs"] = breadcrumbs(current_package, "pathway")
context["entity_type"] = "pathway"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/package/{current_package.uuid}/pathway/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "pathways"
reviewed_pathway_qs = Pathway.objects.none()
unreviewed_pathway_qs = Pathway.objects.none()
if current_package.reviewed:
reviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by("name")
else:
unreviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.get_name(), "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_pathway_qs
if current_package.reviewed
else unreviewed_pathway_qs
)
]
}
)
return render(request, "collections/pathways_paginated.html", context)
elif request.method == "POST":
log_post_params(request)
name = request.POST.get("name")
description = request.POST.get("description")
smiles = request.POST.get("smiles", "").strip()
pw_mode = request.POST.get("predict", "predict").strip()
if "smiles" in request.POST and smiles == "":
return error(
request,
"Pathway prediction failed!",
"Pathway prediction failed due to missing or empty SMILES",
)
try:
stand_smiles = FormatConverter.standardize(smiles, remove_stereo=True)
except ValueError:
return error(
request,
"Pathway prediction failed!",
f'Pathway prediction failed as standardization of SMILES "{smiles}" failed!',
)
modes = ["predict", "build", "incremental"]
if pw_mode not in modes:
return error(
request,
"Pathway prediction failed!",
f'Pathway prediction failed as received mode "{pw_mode}" is none of {modes}',
)
prediction_setting = request.POST.get("prediction-setting", None)
if prediction_setting:
prediction_setting = SettingManager.get_setting_by_url(current_user, prediction_setting)
else:
prediction_setting = current_user.prediction_settings()
pw = Pathway.create(
current_package,
stand_smiles,
name=name,
description=description,
predicted=pw_mode in {"predict", "incremental"},
)
# set mode
pw.kv.update({"mode": pw_mode})
pw.save()
if pw_mode == "predict" or pw_mode == "incremental":
# unlimited pred (will be handled by setting)
limit = None
# For incremental predict first level and return
if pw_mode == "incremental":
limit = 1
pw.setting = prediction_setting
pw.kv.update({"status": "running"})
pw.save()
from .tasks import dispatch, predict
dispatch(current_user, predict, pw.pk, prediction_setting.pk, limit=limit)
return redirect(pw.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathway(request, package_uuid, pathway_uuid):
current_user: User = _anonymous_or_real(request)
current_package: Package = PackageManager.get_package_by_id(current_user, package_uuid)
current_pathway: Pathway = get_object_or_404(
Pathway, package=current_package, uuid=pathway_uuid
)
if request.method == "GET":
if request.GET.get("last_modified", False):
return JsonResponse(
{"modified": current_pathway.modified.strftime("%Y-%m-%d %H:%M:%S")}
)
if request.GET.get("status", False):
return JsonResponse(
{
"status": current_pathway.status(),
"modified": current_pathway.modified.strftime("%Y-%m-%d %H:%M:%S"),
"emptyDueToThreshold": current_pathway.empty_due_to_threshold(),
}
)
if request.GET.get("download", False) == "true":
filename = f"{current_pathway.get_name().replace(' ', '_')}_{current_pathway.uuid}.csv"
csv_pw = current_pathway.to_csv()
response = HttpResponse(csv_pw, content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
if (
request.GET.get("identify-missing-rules", False) == "true"
and request.GET.get("rule-package") is not None
):
from .tasks import dispatch_eager, identify_missing_rules
rule_package = PackageManager.get_package_by_url(
current_user, request.GET.get("rule-package")
)
_, res = dispatch_eager(
current_user, identify_missing_rules, [current_pathway.pk], rule_package.pk
)
filename = f"{current_pathway.get_name().replace(' ', '_')}_{current_pathway.uuid}.csv"
response = HttpResponse(res, content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
# Pathway d3_json() relies on a lot of related objects (Nodes, Structures, Edges, Reaction, Rules, ...)
# we will again fetch the current pathway identified by this url, but this time together with nearly all
# related objects
current_pathway = Pathway.objects.prefetch_related(
"node_set",
"node_set__out_edges",
"node_set__default_node_label",
"node_set__scenarios",
"edge_set",
"edge_set__start_nodes",
"edge_set__end_nodes",
"edge_set__edge_label",
"edge_set__scenarios",
).get(uuid=pathway_uuid)
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()} - {current_pathway.get_name()}"
context["meta"]["current_package"] = current_package
context["object_type"] = "pathway"
context["breadcrumbs"] = breadcrumbs(current_package, "pathway", current_pathway)
context["pathway"] = current_pathway
context["current_object"] = current_pathway
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Package": s.SERVER_URL + "/package"},
{current_package.get_name(): current_package.url},
{"Pathway": current_package.url + "/pathway"},
{current_pathway.get_name(): current_pathway.url},
]
return render(request, "objects/pathway.html", context)
# return render(request, 'pathway_playground2.html', context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_pathway.delete()
return redirect(current_package.url + "/pathway")
else:
return HttpResponseBadRequest()
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_pathway, selected_scenarios)
return redirect(current_pathway.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_pathway, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_pathway.url})
# TODO: Move cleaning to property updater
pathway_name = request.POST.get("pathway-name")
if pathway_name is not None:
pathway_name = nh3.clean(pathway_name, tags=s.ALLOWED_HTML_TAGS).strip()
pathway_description = request.POST.get("pathway-description")
if pathway_description is not None:
pathway_description = nh3.clean(pathway_description, tags=s.ALLOWED_HTML_TAGS).strip()
if any([pathway_name, pathway_description]):
if pathway_name is not None and pathway_name.strip() != "":
pathway_name = pathway_name.strip()
current_pathway.name = pathway_name
if pathway_description is not None and pathway_description.strip() != "":
pathway_description = pathway_description.strip()
current_pathway.description = pathway_description
current_pathway.save()
return redirect(current_pathway.url)
node_url = request.POST.get("node")
if node_url:
n = current_pathway.get_node(node_url)
current_pathway.kv.update({"status": "running"})
current_pathway.save()
from .tasks import dispatch, predict
dispatch(
current_user,
predict,
current_pathway.pk,
current_pathway.setting.pk,
node_pk=n.pk,
)
return JsonResponse({"success": current_pathway.url})
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathway_nodes(request, package_uuid, pathway_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_pathway = get_object_or_404(Pathway, package=current_package, uuid=pathway_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = (
f"enviPath - {current_package.get_name()} - {current_pathway.get_name()} - Nodes"
)
context["meta"]["current_package"] = current_package
context["object_type"] = "node"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Package": s.SERVER_URL + "/package"},
{current_package.get_name(): current_package.url},
{"Pathway": current_package.url + "/pathway"},
{current_pathway.get_name(): current_pathway.url},
{"Node": current_pathway.url + "/node"},
]
reviewed_node_qs = Node.objects.none()
unreviewed_node_qs = Node.objects.none()
if current_package.reviewed:
reviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by("name")
else:
unreviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.get_name(), "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_node_qs if current_package.reviewed else unreviewed_node_qs
)
]
}
)
context["reviewed_objects"] = reviewed_node_qs
context["unreviewed_objects"] = unreviewed_node_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
node_name = request.POST.get("node-name")
node_description = request.POST.get("node-description")
node_smiles = request.POST.get("node-smiles").strip()
current_pathway.add_node(node_smiles, name=node_name, description=node_description)
return redirect(current_pathway.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathway_node(request, package_uuid, pathway_uuid, node_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_pathway = get_object_or_404(Pathway, package=current_package, uuid=pathway_uuid)
current_node = get_object_or_404(Node, pathway=current_pathway, uuid=node_uuid)
if request.method == "GET":
is_image_request = request.GET.get("image")
is_highlight_request = request.GET.get("highlight", False)
is_highlight_reactivity = request.GET.get("highlightReactivity", False)
if is_image_request:
if is_image_request == "svg":
# TODO optimize this chain
if is_highlight_request:
# User functional groups covered by the model training data
fgs = {}
if current_pathway.setting:
if current_pathway.setting.model:
if current_pathway.setting.model.app_domain:
fgs = current_pathway.setting.model.app_domain.functional_groups
svg_data = IndigoUtils.mol_to_svg(
current_node.default_node_label.smiles, functional_groups=fgs
)
elif is_highlight_reactivity:
# Use reactant smarts to show all reaction sites
# set a high count to obtain a strong color
ad_data = current_node.get_app_domain_assessment_data()
fgs = {}
for t in ad_data.get("assessment", {}).get("transformations", []):
r = Rule.objects.get(url=t["rule"]["url"])
if isinstance(r, SimpleAmbitRule):
fgs[r.reactants_smarts] = 1000
else:
for sr in r.srs:
fgs[sr.reactants_smarts] = 1000
svg_data = IndigoUtils.mol_to_svg(
current_node.default_node_label.smiles, functional_groups=fgs
)
else:
svg_data = current_node.as_svg
return HttpResponse(svg_data, content_type="image/svg+xml")
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()} - {current_pathway.get_name()}"
context["meta"]["current_package"] = current_package
context["object_type"] = "pathway"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Package": s.SERVER_URL + "/package"},
{current_package.get_name(): current_package.url},
{"Pathway": current_package.url + "/pathway"},
{current_pathway.get_name(): current_pathway.url},
{"Node": current_pathway.url + "/node"},
{current_node.get_name(): current_node.url},
]
context["node"] = current_node
context["current_object"] = current_node
context["app_domain_assessment_data"] = json.dumps(
current_node.get_app_domain_assessment_data()
)
return render(request, "objects/node.html", context)
elif request.method == "POST":
log_post_params(request)
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
# pre_delete signal will take care of edge deletion
current_node.delete()
return redirect(current_pathway.url)
else:
return HttpResponseBadRequest()
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_node, selected_scenarios)
return redirect(current_node.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_node, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_node.url})
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathway_edges(request, package_uuid, pathway_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_pathway = get_object_or_404(Pathway, package=current_package, uuid=pathway_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = (
f"enviPath - {current_package.get_name()} - {current_pathway.get_name()} - Edges"
)
context["meta"]["current_package"] = current_package
context["object_type"] = "edge"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Package": s.SERVER_URL + "/package"},
{current_package.get_name(): current_package.url},
{"Pathway": current_package.url + "/pathway"},
{current_pathway.get_name(): current_pathway.url},
{"Edge": current_pathway.url + "/edge"},
]
reviewed_edge_qs = Edge.objects.none()
unreviewed_edge_qs = Edge.objects.none()
if current_package.reviewed:
reviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by("name")
else:
unreviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.get_name(), "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_edge_qs if current_package.reviewed else unreviewed_edge_qs
)
]
}
)
context["reviewed_objects"] = reviewed_edge_qs
context["unreviewed_objects"] = unreviewed_edge_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
log_post_params(request)
edge_name = request.POST.get("edge-name")
edge_description = request.POST.get("edge-description")
edge_substrates = request.POST.getlist("edge-substrates")
edge_products = request.POST.getlist("edge-products")
substrate_nodes = [current_pathway.get_node(url) for url in edge_substrates]
product_nodes = [current_pathway.get_node(url) for url in edge_products]
# TODO in the future consider Rules here?
current_pathway.add_edge(
substrate_nodes, product_nodes, name=edge_name, description=edge_description
)
return redirect(current_pathway.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_pathway = get_object_or_404(Pathway, package=current_package, uuid=pathway_uuid)
current_edge = get_object_or_404(Edge, pathway=current_pathway, uuid=edge_uuid)
if request.method == "GET":
is_image_request = request.GET.get("image")
if is_image_request:
if is_image_request == "svg":
svg_data = current_edge.as_svg
return HttpResponse(svg_data, content_type="image/svg+xml")
context = get_base_context(request)
context["title"] = (
f"enviPath - {current_package.get_name()} - {current_pathway.get_name()} - {current_edge.edge_label.get_name()}"
)
context["meta"]["current_package"] = current_package
context["object_type"] = "edge"
context["breadcrumbs"] = breadcrumbs(
current_package, "pathway", current_pathway, "edge", current_edge
)
context["edge"] = current_edge
context["current_object"] = current_edge
return render(request, "objects/edge.html", context)
elif request.method == "POST":
log_post_params(request)
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_edge.delete()
return redirect(current_pathway.url)
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_edge, selected_scenarios)
return redirect(current_edge.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_edge, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_edge.url})
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_scenarios(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
if "application/json" in request.META.get("HTTP_ACCEPT") and not request.GET.get(
"all", False
):
scens = Scenario.objects.filter(package=current_package).order_by("name")
res = [{"name": s_.get_name(), "url": s_.url, "uuid": s_.uuid} for s_ in scens]
return JsonResponse(res, safe=False)
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.get_name()} - Scenarios"
context["meta"]["current_package"] = current_package
context["object_type"] = "scenario"
context["breadcrumbs"] = breadcrumbs(current_package, "scenario")
context["entity_type"] = "scenario"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/package/{current_package.uuid}/scenario/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "scenarios"
reviewed_scenario_qs = Scenario.objects.none()
unreviewed_scenario_qs = Scenario.objects.none()
if current_package.reviewed:
reviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by("name")
else:
unreviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by(
"name"
)
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.get_name(), "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_scenario_qs
if current_package.reviewed
else unreviewed_scenario_qs
)
]
}
)
return render(request, "collections/scenarios_paginated.html", context)
else:
return HttpResponseNotAllowed(
[
"GET",
]
)
@package_permission_required()
def package_scenario(request, package_uuid, scenario_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_scenario = get_object_or_404(Scenario, package=current_package, uuid=scenario_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = (
f"enviPath - {current_package.get_name()} - {current_scenario.get_name()}"
)
context["meta"]["current_package"] = current_package
context["object_type"] = "scenario"
context["breadcrumbs"] = breadcrumbs(current_package, "scenario", current_scenario)
context["scenario"] = current_scenario
context["associated_additional_information"] = AdditionalInformation.objects.filter(
scenario=current_scenario
)
# Note: Modals now fetch schemas and data from API endpoints
# Keeping these for backwards compatibility if needed elsewhere
# They are no longer used by the main scenario template
return render(request, "objects/scenario.html", context)
elif request.method == "POST":
log_post_params(request)
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_scenario.delete()
return redirect(current_package.url + "/scenario")
elif hidden == "delete-additional-information":
uuid = request.POST.get("uuid")
current_scenario.remove_additional_information(uuid)
return redirect(current_scenario.url)
elif hidden == "delete-all-additional-information":
current_scenario.additional_information = dict()
current_scenario.save()
return redirect(current_scenario.url)
elif hidden == "set-additional-information":
# Legacy POST handler - no longer used, modals use API endpoints
return HttpResponseBadRequest(
"This endpoint is deprecated. Please use the API endpoints."
)
elif hidden == "add-additional-information":
# Legacy POST handler - no longer used, modals use API endpoints
return HttpResponseBadRequest(
"This endpoint is deprecated. Please use the API endpoints."
)
else:
return HttpResponseBadRequest()
new_scenario_name = request.POST.get("scenario-name")
if new_scenario_name is not None:
new_scenario_name = nh3.clean(new_scenario_name, tags=s.ALLOWED_HTML_TAGS).strip()
if new_scenario_name:
current_scenario.name = new_scenario_name
new_scenario_description = request.POST.get("scenario-description")
if new_scenario_description is not None:
new_scenario_description = nh3.clean(
new_scenario_description, tags=s.ALLOWED_HTML_TAGS
).strip()
if new_scenario_description:
current_scenario.description = new_scenario_description
if any([new_scenario_name, new_scenario_description]):
current_scenario.save()
return redirect(current_scenario.url)
else:
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
##############
# User/Group #
##############
def users(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Users"
context["object_type"] = "user"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"User": s.SERVER_URL + "/user"},
]
context["objects"] = get_user_model().objects.all()
return render(request, "collections/objects_list.html", context)
else:
return HttpResponseNotAllowed(["GET"])
def user(request, user_uuid):
current_user = _anonymous_or_real(request)
if request.method == "GET":
# Check if current user is the one matching to the url
if str(current_user.uuid) != user_uuid and not current_user.is_superuser:
return HttpResponseBadRequest()
requested_user = UserManager.get_user_by_id(current_user, user_uuid)
context = get_base_context(request, for_user=requested_user)
context["title"] = "enviPath - User"
context["object_type"] = "user"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"User": s.SERVER_URL + "/user"},
{current_user.username: requested_user.url},
]
context["user"] = requested_user
accessible_packages = PackageManager.get_all_readable_packages(
requested_user, include_reviewed=True
)
property_models = PropertyPluginModel.objects.filter(
package__in=accessible_packages
).order_by("name")
tp_prediction_models = (
EPModel.objects.filter(package__in=accessible_packages)
.exclude(id__in=[pm.id for pm in property_models])
.order_by("name")
)
context["models"] = tp_prediction_models
context["property_models"] = property_models
context["tokens"] = APIToken.objects.filter(user=requested_user)
return render(request, "objects/user.html", context)
elif request.method == "POST":
is_hidden_method = bool(request.POST.get("hidden", False))
if is_hidden_method and request.POST["hidden"] == "request-api-token":
name = request.POST.get("name", "No Name")
expires_days = min(max(int(request.POST.get("valid-for", 90)), 1), 90)
token, raw_token = APIToken.create_token(
request.user, name=name, expires_days=expires_days
)
return JsonResponse(
{"raw_token": raw_token, "token": {"id": token.id, "name": token.name}}
)
if is_hidden_method and request.POST["hidden"] == "delete":
token_id = request.POST.get("token-id")
if token_id is None:
return HttpResponseBadRequest("Token ID missing!")
try:
APIToken.objects.get(user=current_user, id=token_id).delete()
except APIToken.DoesNotExist:
return HttpResponseBadRequest("User and Token ID combination invalid!")
return HttpResponse("success")
default_package = request.POST.get("default-package")
default_group = request.POST.get("default-group")
default_prediction_setting = request.POST.get("default-prediction-setting")
if any([default_package, default_group, default_prediction_setting]):
current_user.default_package = PackageManager.get_package_by_url(
current_user, default_package
)
current_user.default_group = GroupManager.get_group_by_url(current_user, default_group)
current_user.default_setting = SettingManager.get_setting_by_url(
current_user, default_prediction_setting
)
current_user.save()
return redirect(current_user.url)
if "change_default" in request.POST:
new_default_uuid = request.POST["change_default"]
current_user.default_setting = SettingManager.get_setting_by_id(
current_user, new_default_uuid
)
current_user.save()
return redirect(current_user.url)
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
def groups(request):
current_user = _anonymous_or_real(request)
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Groups"
context["object_type"] = "group"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Group": s.SERVER_URL + "/group"},
]
# Context for paginated template
context["entity_type"] = "group"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/groups/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "groups"
context["list_mode"] = "combined"
return render(request, "collections/groups_paginated.html", context)
elif request.method == "POST":
group_name = request.POST.get("group-name")
group_description = request.POST.get("group-description", s.DEFAULT_VALUES["description"])
g = GroupManager.create_group(current_user, group_name, group_description)
return redirect(g.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def group(request, group_uuid):
current_user = _anonymous_or_real(request)
current_group = GroupManager.get_group_by_id(current_user, group_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_group.get_name()}"
context["object_type"] = "group"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Group": s.SERVER_URL + "/group"},
{current_group.get_name(): current_group.url},
]
context["group"] = current_group
context["users"] = (
UserManager.get_users_lp()
.exclude(id__in=current_group.user_member.all())
.filter(is_active=True)
)
context["groups"] = (
GroupManager.get_groups_lp()
.exclude(id__in=current_group.group_member.all())
.exclude(id=current_group.pk)
)
context["packages"] = Package.objects.filter(
id__in=GroupPackagePermission.objects.filter(group=current_group)
.values("package")
.distinct()
)
return render(request, "objects/group.html", context)
elif request.method == "POST":
log_post_params(request)
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_group.delete()
return redirect(s.SERVER_URL + "/group")
else:
return HttpResponseBadRequest()
member_url = request.POST.get("member")
action = request.POST.get("action")
if all([member_url, action]) and action in ["add", "remove"]:
if "user" in member_url:
member = UserManager.get_user_lp(member_url)
else:
member = GroupManager.get_group_lp(member_url)
GroupManager.update_members(current_user, current_group, member, action)
return redirect(current_group.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def settings(request):
current_user = _anonymous_or_real(request)
context = get_base_context(request)
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Settings"
context["object_type"] = "setting"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Setting": s.SERVER_URL + "/setting"},
]
# Context for paginated template
context["entity_type"] = "setting"
context["api_endpoint"] = f"{s.SERVER_PATH}/api/v1/settings/"
context["per_page"] = s.API_PAGINATION_DEFAULT_PAGE_SIZE
context["list_title"] = "settings"
context["list_mode"] = "combined"
return render(request, "collections/settings_paginated.html", context)
elif request.method == "POST":
if s.DEBUG:
for k, v in request.POST.items():
logger.info("Parameters received:")
logger.info(f"{k}\t{v}")
name = request.POST.get("prediction-setting-name")
description = request.POST.get("prediction-setting-description")
new_default = request.POST.get("prediction-setting-new-default", "off") == "on"
# min 2, max s.DEFAULT_MAX_NUMBER_OF_NODES
max_nodes = min(
max(
int(request.POST.get("prediction-setting-max-nodes", 1)),
2,
),
s.DEFAULT_MAX_NUMBER_OF_NODES,
)
# min 1, max s.DEFAULT_MAX_DEPTH
max_depth = min(
max(int(request.POST.get("prediction-setting-max-depth", 1)), 1),
s.DEFAULT_MAX_DEPTH,
)
tp_gen_method = request.POST.get("tp-generation-method")
params = {}
if tp_gen_method == "model-based-prediction-setting":
model_url = request.POST.get("model-based-prediction-setting-model")
model_uuid = model_url.split("/")[-1]
params["model"] = EPModel.objects.get(uuid=model_uuid)
params["model_threshold"] = request.POST.get(
"model-based-prediction-setting-threshold", s.DEFAULT_MODEL_THRESHOLD
)
if not PackageManager.readable(current_user, params["model"].package):
raise PermissionDenied("You're not allowed to access this model!")
expansion_scheme = request.POST.get(
"model-based-prediction-setting-expansion-scheme", "BFS"
)
if expansion_scheme not in ExpansionSchemeChoice.values:
raise BadRequest(f"Unknown expansion scheme: {expansion_scheme}")
params["expansion_scheme"] = ExpansionSchemeChoice(expansion_scheme)
elif tp_gen_method == "rule-based-prediction-setting":
rule_packages = request.POST.getlist("rule-based-prediction-setting-packages")
params["rule_packages"] = [
PackageManager.get_package_by_url(current_user, p) for p in rule_packages
]
else:
raise BadRequest("Neither Model-Based nor Rule-Based as Method selected!")
property_model_urls = request.POST.getlist("prediction-setting-property-models")
if property_model_urls:
mods = []
for pm_url in property_model_urls:
model = PropertyPluginModel.objects.get(url=pm_url)
if PackageManager.readable(current_user, model.package):
mods.append(model)
params["property_models"] = mods
created_setting = SettingManager.create_setting(
current_user,
name=name,
description=description,
max_nodes=max_nodes,
max_depth=max_depth,
**params,
)
if new_default:
current_user.default_setting = created_setting
current_user.save()
return HttpResponse("Success!")
else:
return HttpResponseNotAllowed(["GET", "POST"])
def setting(request, setting_uuid):
current_user = _anonymous_or_real(request)
current_setting = SettingManager.get_setting_by_id(current_user, setting_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_setting.get_name()}"
context["object_type"] = "setting"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Setting": s.SERVER_URL + "/setting"},
{f"{current_setting.get_name()}": current_setting.url},
]
context["setting"] = current_setting
context["current_object"] = current_setting
return render(request, "objects/setting.html", context)
else:
return HttpResponseNotAllowed(["GET"])
def jobs(request):
current_user = _anonymous_or_real(request)
context = get_base_context(request)
if request.method == "GET":
context["object_type"] = "joblog"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Jobs": s.SERVER_URL + "/jobs"},
]
if current_user.is_superuser:
context["jobs"] = JobLog.objects.all().order_by("-created")
else:
context["jobs"] = JobLog.objects.filter(user=current_user).order_by("-created")
return render(request, "collections/joblog.html", context)
elif request.method == "POST":
job_name = request.POST.get("job-name")
if job_name == "engineer-pathway":
pathway_to_engineer = request.POST.get("pathway-to-engineer")
engineer_setting = request.POST.get("engineer-setting")
if not all([pathway_to_engineer, engineer_setting]):
raise BadRequest(
f"Unable to run {job_name} as it requires 'pathway-to-engineer' and 'engineer-setting' parameters."
)
pathway_package = PackageManager.get_package_by_url(current_user, pathway_to_engineer)
pathway_to_engineer = Pathway.objects.get(
url=pathway_to_engineer, package=pathway_package
)
engineer_setting = SettingManager.get_setting_by_url(current_user, engineer_setting)
target_package = PackageManager.create_package(
current_user,
f"Autogenerated Package for Pathway Engineering of {pathway_to_engineer.get_name()}",
f"This Package was generated automatically for the engineering Task of {pathway_to_engineer.get_name()}.",
)
from .tasks import dispatch, engineer_pathways
res = dispatch(
current_user,
engineer_pathways,
[pathway_to_engineer.pk],
engineer_setting.pk,
target_package.pk,
)
return redirect(f"{s.SERVER_URL}/jobs/{res.task_id}")
elif job_name == "batch-predict":
substrates = request.POST.get("substrates")
prediction_setting_url = request.POST.get("prediction-setting")
num_tps = request.POST.get("num-tps")
if substrates is None or substrates.strip() == "":
raise BadRequest("No substrates provided.")
pred_data = []
for pair in substrates.split("\n"):
parts = pair.split(",")
try:
smiles = FormatConverter.standardize(parts[0], remove_stereo=True)
except ValueError:
raise BadRequest(f"Couldn't standardize SMILES {parts[0]}!")
# name is optional
name = ",".join(parts[1:]) if len(parts) > 1 else None
pred_data.append([smiles, name])
max_tps = 50
if num_tps is not None and num_tps.strip() != "":
try:
num_tps = int(num_tps)
max_tps = max(min(num_tps, 50), 1)
except ValueError:
raise BadRequest(f"Parameter for num-tps {num_tps} is not a valid integer.")
batch_predict_setting = SettingManager.get_setting_by_url(
current_user, prediction_setting_url
)
target_package = PackageManager.create_package(
current_user,
f"Autogenerated Package for Batch Prediction {datetime.now()}",
"This Package was generated automatically for the batch prediction task.",
)
from .tasks import batch_predict, dispatch
res = dispatch(
current_user,
batch_predict,
pred_data,
batch_predict_setting.pk,
target_package.pk,
num_tps=max_tps,
)
return redirect(f"{s.SERVER_URL}/jobs/{res.task_id}")
else:
raise BadRequest(f"Job {job_name} is not supported!")
else:
return HttpResponseNotAllowed(["GET", "POST"])
def job(request, job_uuid):
current_user = _anonymous_or_real(request)
context = get_base_context(request)
if request.method == "GET":
if current_user.is_superuser:
job = JobLog.objects.get(task_id=job_uuid)
else:
job = JobLog.objects.get(task_id=job_uuid, user=current_user)
# No op if status is already in a terminal state
job.check_for_update()
if request.GET.get("download", False) == "true":
if not job.is_result_downloadable():
raise BadRequest("Result is not downloadable!")
if job.job_name == "batch_predict":
filename = f"{job.job_name.replace(' ', '_')}_{job.task_id}.csv"
elif job.job_name == "identify_missing_rules":
filename = f"{job.job_name.replace(' ', '_')}_{job.task_id}.csv"
else:
raise BadRequest("Result is not downloadable!")
res_str = job.task_result
response = HttpResponse(res_str, content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
context["object_type"] = "joblog"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Jobs": s.SERVER_URL + "/jobs"},
{job.job_name: f"{s.SERVER_URL}/jobs/{job.task_id}"},
]
context["job"] = job
return render(request, "objects/joblog.html", context)
else:
return HttpResponseNotAllowed(["GET"])
###########
# KETCHER #
###########
def indigo(request):
from indigo import Indigo
return JsonResponse({"Indigo": {"version": Indigo().version()}})
@csrf_exempt
def aromatize(request):
if request.method == "POST":
data = json.loads(request.body)
mol_data = data.get("struct")
aromatized = IndigoUtils.aromatize(mol_data, False)
return JsonResponse({"struct": aromatized})
else:
return HttpResponseBadRequest()
@csrf_exempt
def dearomatize(request):
if request.method == "POST":
data = json.loads(request.body)
mol_data = data.get("struct")
dearomatized = IndigoUtils.dearomatize(mol_data, False)
return JsonResponse({"struct": dearomatized})
else:
return HttpResponseBadRequest()
@csrf_exempt
def layout(request):
if request.method == "POST":
data = json.loads(request.body)
mol_data = data.get("struct")
lay = IndigoUtils.layout(mol_data)
return JsonResponse({"struct": lay})
else:
return HttpResponseBadRequest()
##########################
# Generic/Non-Persistent #
##########################
def depict(request):
if smiles := request.GET.get("smiles"):
return HttpResponse(IndigoUtils.mol_to_svg(smiles), content_type="image/svg+xml")
elif smirks := request.GET.get("smirks"):
query_smirks = request.GET.get("is_query_smirks", False) == "true"
return HttpResponse(
IndigoUtils.smirks_to_svg(smirks, query_smirks), content_type="image/svg+xml"
)
else:
return HttpResponseBadRequest()
@protected_resource()
def userinfo(request):
user = request.resource_owner
res = {
"sub": str(user.uuid),
"email": user.email,
"username": user.username,
"name": user.get_full_name() or user.username,
"email_verified": user.is_active,
}
return JsonResponse(res)
# Static Pages
def static_terms_of_use(request):
context = get_base_context(request)
context["title"] = "enviPath - Terms of Use"
context["public_mode"] = True
return render(request, "static/terms_of_use.html", context)
def static_privacy_policy(request):
context = get_base_context(request)
context["title"] = "enviPath - Privacy Policy"
context["public_mode"] = True
return render(request, "static/privacy_policy.html", context)
def static_cookie_policy(request):
context = get_base_context(request)
context["title"] = "enviPath - Cookie Policy"
context["public_mode"] = True
return render(request, "static/cookie_policy.html", context)
def static_about_us(request):
context = get_base_context(request)
context["title"] = "enviPath - About Us"
context["public_mode"] = True
return render(request, "static/about_us.html", context)
def static_contact_support(request):
context = get_base_context(request)
context["title"] = "enviPath - Contact & Support"
context["public_mode"] = True
return render(request, "static/contact.html", context)
def static_careers(request):
context = get_base_context(request)
context["title"] = "enviPath - Careers"
context["public_mode"] = True
return render(request, "static/careers.html", context)
def static_cite(request):
context = get_base_context(request)
context["title"] = "enviPath - How to Cite"
context["public_mode"] = True
return render(request, "static/cite.html", context)
def static_legal(request):
context = get_base_context(request)
context["title"] = "enviPath - Legal Information"
context["public_mode"] = True
return render(request, "static/legal.html", context)