Files
enviPy-bayer/epdb/views.py
Tim Lorsbach b9ac713cb2 minor
2025-11-11 11:41:48 +01:00

2961 lines
106 KiB
Python

import json
import logging
from typing import List, Dict, Any
from django.conf import settings as s
from django.contrib.auth import get_user_model
from django.http import JsonResponse, HttpResponse, HttpResponseNotAllowed, HttpResponseBadRequest
from django.shortcuts import render, redirect
from django.urls import reverse
from django.views.decorators.csrf import csrf_exempt
from envipy_additional_information import NAME_MAPPING
from oauth2_provider.decorators import protected_resource
import nh3
from utilities.chem import FormatConverter, IndigoUtils
from utilities.decorators import package_permission_required
from utilities.misc import HTMLGenerator
from .logic import (
GroupManager,
PackageManager,
UserManager,
SettingManager,
SearchManager,
EPDBURLParser,
)
from .models import (
Package,
GroupPackagePermission,
Group,
CompoundStructure,
Compound,
Reaction,
Rule,
Pathway,
Node,
EPModel,
EnviFormer,
MLRelativeReasoning,
RuleBasedRelativeReasoning,
Scenario,
SimpleAmbitRule,
APIToken,
UserPackagePermission,
Permission,
License,
User,
Edge,
ExternalDatabase,
ExternalIdentifier,
EnzymeLink,
JobLog,
)
logger = logging.getLogger(__name__)
def log_post_params(request):
if s.DEBUG:
for k, v in request.POST.items():
logger.debug(f"{k}\t{v}")
def error(request, message: str, detail: str, code: int = 400):
context = get_base_context(request)
error_context = {
"error_message": message,
"error_detail": detail,
}
if request.headers.get("Accept") == "application/json":
return JsonResponse(error_context, status=500)
context.update(**error_context)
return render(request, "errors/error.html", context, status=code)
def login(request):
context = get_base_context(request)
if request.method == "GET":
context["title"] = "enviPath"
context["next"] = request.GET.get("next", "")
return render(request, "static/login.html", context)
elif request.method == "POST":
from django.contrib.auth import authenticate
from django.contrib.auth import login
username = request.POST.get("username").strip()
if username != request.POST.get("username"):
context["message"] = "Login failed!"
return render(request, "static/login.html", context)
password = request.POST.get("password")
# Get email for username and check if the account is active
try:
temp_user = get_user_model().objects.get(username=username)
if not temp_user.is_active:
context["message"] = "User account is not activated yet!"
return render(request, "static/login.html", context)
email = temp_user.email
except get_user_model().DoesNotExist:
context["message"] = "Login failed!"
return render(request, "static/login.html", context)
try:
user = authenticate(username=email, password=password)
except Exception:
context["message"] = "Login failed!"
return render(request, "static/login.html", context)
if user is not None:
login(request, user)
if next := request.POST.get("next"):
return redirect(next)
return redirect(reverse("index"))
else:
context["message"] = "Login failed!"
return render(request, "static/login.html", context)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def logout(request):
if request.method == "POST":
is_logout = bool(request.POST.get("logout", False))
if is_logout:
from django.contrib.auth import logout
logout(request)
return redirect(s.SERVER_URL)
return HttpResponseBadRequest()
def register(request):
context = get_base_context(request)
if request.method == "GET":
# Redirect to unified login page with signup tab
next_url = request.GET.get("next", "")
redirect_url = reverse("login") + "#signup"
if next_url:
redirect_url += f"?next={next_url}"
return redirect(redirect_url)
elif request.method == "POST":
context["title"] = "enviPath"
if next := request.POST.get("next"):
context["next"] = next
username = request.POST.get("username", "").strip()
email = request.POST.get("email", "").strip()
password = request.POST.get("password", "").strip()
rpassword = request.POST.get("rpassword", "").strip()
if not (username and email and password):
context["message"] = "Invalid username/email/password"
return render(request, "static/login.html", context)
if password != rpassword or password == "":
context["message"] = "Registration failed, provided passwords differ!"
return render(request, "static/login.html", context)
try:
u = UserManager.create_user(username, email, password)
logger.info(f"Created user {u.username} ({u.pk})")
except Exception:
context["message"] = "Registration failed! Couldn't create User Account."
return render(request, "static/login.html", context)
if s.ADMIN_APPROVAL_REQUIRED:
context["success_message"] = (
"Your account has been created! An admin will activate it soon!"
)
else:
context["success_message"] = (
"Account has been created! You'll receive a mail to activate your account shortly."
)
return render(request, "static/login.html", context)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def editable(request, user):
if user.is_superuser:
return True
url = request.build_absolute_uri(request.path)
if PackageManager.is_package_url(url):
_package = PackageManager.get_package_lp(request.build_absolute_uri())
return PackageManager.writable(user, _package)
elif GroupManager.is_group_url(url):
_group = GroupManager.get_group_lp(request.build_absolute_uri())
return GroupManager.writable(user, _group)
elif UserManager.is_user_url(url):
_user = UserManager.get_user_lp(request.build_absolute_uri())
return UserManager.writable(user, _user)
elif url in [
s.SERVER_URL,
f"{s.SERVER_URL}/",
f"{s.SERVER_URL}/package",
f"{s.SERVER_URL}/user",
f"{s.SERVER_URL}/group",
f"{s.SERVER_URL}/search",
]:
return True
else:
logger.debug(f"Unknown url: {url}")
return False
def get_base_context(request, for_user=None) -> Dict[str, Any]:
current_user = _anonymous_or_real(request)
can_edit = editable(request, current_user)
parser = EPDBURLParser(request.build_absolute_uri(request.path))
url_contains_package = False
if parser.contains_package_url() or parser.is_package_url():
url_contains_package = True
if for_user:
current_user = for_user
ctx = {
"title": "enviPath",
"meta": {
"version": "0.0.1",
"server_url": s.SERVER_URL,
"user": current_user,
"can_edit": can_edit,
"url_contains_package": url_contains_package,
"readable_packages": PackageManager.get_all_readable_packages(
current_user, include_reviewed=True
),
"writeable_packages": PackageManager.get_all_writeable_packages(current_user),
"available_groups": GroupManager.get_groups(current_user),
"available_settings": SettingManager.get_all_settings(current_user),
"enabled_features": s.FLAGS,
"debug": s.DEBUG,
"external_databases": ExternalDatabase.get_databases(),
"site_id": s.MATOMO_SITE_ID,
},
}
return ctx
def _anonymous_or_real(request):
if request.user.is_authenticated and not request.user.is_anonymous:
return request.user
return get_user_model().objects.get(username="anonymous")
def breadcrumbs(
first_level_object=None,
second_level_namespace=None,
second_level_object=None,
third_level_namespace=None,
third_level_object=None,
) -> List[Dict[str, str]]:
bread = [
{"Home": s.SERVER_URL},
{"Package": s.SERVER_URL + "/package"},
]
if first_level_object is not None:
bread.append({first_level_object.name: first_level_object.url})
if second_level_namespace is not None:
bread.append(
{
f"{second_level_namespace}".capitalize(): first_level_object.url
+ f"/{second_level_namespace}"
}
)
if second_level_object is not None:
bread.append({second_level_object.name: second_level_object.url})
if third_level_namespace is not None:
bread.append(
{
f"{third_level_namespace}".capitalize(): second_level_object.url
+ f"/{third_level_namespace}"
}
)
if third_level_object is not None:
bread.append({third_level_object.name: third_level_object.url})
return bread
def set_scenarios(current_user, attach_object, scenario_urls: List[str]):
scens = []
for scenario_url in scenario_urls:
# As empty lists will be removed in POST request well send ['']
if scenario_url == "":
continue
package = PackageManager.get_package_by_url(current_user, scenario_url)
scen = Scenario.objects.get(package=package, uuid=scenario_url.split("/")[-1])
scens.append(scen)
attach_object.set_scenarios(scens)
def set_aliases(current_user, attach_object, aliases: List[str]):
if aliases == [""]:
aliases = []
attach_object.aliases = aliases
attach_object.save()
def copy_object(current_user, target_package: "Package", source_object_url: str):
# Ensures that source is readable
source_package = PackageManager.get_package_by_url(current_user, source_object_url)
if source_package == target_package:
raise ValueError(f"Can't copy object {source_object_url} to the same package!")
parser = EPDBURLParser(source_object_url)
# if the url won't contain a package or is a plain package
if not parser.contains_package_url():
raise ValueError(f"Object {source_object_url} can't be copied!")
# Gets the most specific object
source_object = parser.get_object()
if hasattr(source_object, "copy"):
mapping = dict()
copy = source_object.copy(target_package, mapping)
if s.DEBUG:
for k, v in mapping.items():
logger.debug(f"Mapping {k.url} to {v.url}")
return copy
raise ValueError(f"Object {source_object} can't be copied!")
def index(request):
context = get_base_context(request)
context["title"] = "enviPath - Home"
context["meta"]["current_package"] = context["meta"]["user"].default_package
if request.GET.get("getMLServerPath", False):
return JsonResponse({"mlServerPath": s.SERVER_URL})
return render(request, "index/index.html", context)
def packages(request):
current_user = _anonymous_or_real(request)
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Packages"
context["object_type"] = "package"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["meta"]["can_edit"] = True
reviewed_package_qs = Package.objects.filter(reviewed=True).order_by("created")
unreviewed_package_qs = PackageManager.get_all_readable_packages(current_user).order_by(
"name"
)
context["reviewed_objects"] = reviewed_package_qs
context["unreviewed_objects"] = unreviewed_package_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden in ["import-legacy-package-json", "import-package-json"]:
f = request.FILES["file"]
try:
file_data = f.read().decode("utf-8")
data = json.loads(file_data)
if hidden == "import-legacy-package-json":
pack = PackageManager.import_legacy_package(data, current_user)
else:
pack = PackageManager.import_package(data, current_user)
return redirect(pack.url)
except UnicodeDecodeError:
return error(request, "Invalid encoding.", "Invalid encoding, must be UTF-8")
else:
return HttpResponseBadRequest()
else:
package_name = request.POST.get("package-name")
package_description = request.POST.get(
"package-description", s.DEFAULT_VALUES["description"]
)
created_package = PackageManager.create_package(
current_user, package_name, package_description
)
return redirect(created_package.url)
elif request.method == "OPTIONS":
response = HttpResponse()
response["allow"] = ",".join(["GET", "POST"])
return response
else:
return HttpResponseNotAllowed(["GET", "POST"])
def compounds(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Compounds"
context["object_type"] = "compound"
context["meta"]["current_package"] = context["meta"]["user"].default_package
reviewed_compound_qs = Compound.objects.none()
for p in PackageManager.get_reviewed_packages():
reviewed_compound_qs |= Compound.objects.filter(package=p)
reviewed_compound_qs = reviewed_compound_qs.order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": True}
for pw in reviewed_compound_qs
]
}
)
context["reviewed_objects"] = reviewed_compound_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
# delegate to default package
current_user = _anonymous_or_real(request)
default_package = current_user.default_package
return package_compounds(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def rules(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Rules"
context["object_type"] = "rule"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Rule": s.SERVER_URL + "/rule"},
]
reviewed_rule_qs = Rule.objects.none()
for p in PackageManager.get_reviewed_packages():
reviewed_rule_qs |= Rule.objects.filter(package=p)
reviewed_rule_qs = reviewed_rule_qs.order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": True}
for pw in reviewed_rule_qs
]
}
)
context["reviewed_objects"] = reviewed_rule_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
# delegate to default package
current_user = _anonymous_or_real(request)
default_package = current_user.default_package
return package_rules(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def reactions(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Reactions"
context["object_type"] = "reaction"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Reaction": s.SERVER_URL + "/reaction"},
]
reviewed_reaction_qs = Reaction.objects.none()
for p in PackageManager.get_reviewed_packages():
reviewed_reaction_qs |= Reaction.objects.filter(package=p).order_by("name")
reviewed_reaction_qs = reviewed_reaction_qs.order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": True}
for pw in reviewed_reaction_qs
]
}
)
context["reviewed_objects"] = reviewed_reaction_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
# delegate to default package
current_user = _anonymous_or_real(request)
default_package = current_user.default_package
return package_reactions(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def pathways(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Pathways"
context["object_type"] = "pathway"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Pathway": s.SERVER_URL + "/pathway"},
]
reviewed_pathway_qs = Pathway.objects.none()
for p in PackageManager.get_reviewed_packages():
reviewed_pathway_qs |= Pathway.objects.filter(package=p).order_by("name")
reviewed_pathway_qs = reviewed_pathway_qs.order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": True}
for pw in reviewed_pathway_qs
]
}
)
context["reviewed_objects"] = reviewed_pathway_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
# delegate to default package
current_user = _anonymous_or_real(request)
default_package = current_user.default_package
return package_pathways(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def scenarios(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Scenarios"
context["object_type"] = "scenario"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Scenario": s.SERVER_URL + "/scenario"},
]
reviewed_scenario_qs = Scenario.objects.none()
for p in PackageManager.get_reviewed_packages():
reviewed_scenario_qs |= Scenario.objects.filter(package=p).order_by("name")
reviewed_scenario_qs = reviewed_scenario_qs.order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": s.name, "url": s.url, "reviewed": True}
for s in reviewed_scenario_qs
]
}
)
context["reviewed_objects"] = reviewed_scenario_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
# delegate to default package
default_package = request.user.default_package
return package_scenarios(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def models(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Models"
context["object_type"] = "model"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Model": s.SERVER_URL + "/model"},
]
context["model_types"] = {
"ML Relative Reasoning": "ml-relative-reasoning",
"Rule Based Relative Reasoning": "rule-based-relative-reasoning",
"EnviFormer": "enviformer",
}
for k, v in s.CLASSIFIER_PLUGINS.items():
context["model_types"][v.display()] = k
reviewed_model_qs = EPModel.objects.none()
for p in PackageManager.get_reviewed_packages():
reviewed_model_qs |= EPModel.objects.filter(package=p).order_by("name")
reviewed_model_qs = reviewed_model_qs.order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": True}
for pw in reviewed_model_qs
]
}
)
context["reviewed_objects"] = reviewed_model_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
current_user = _anonymous_or_real(request)
default_package = current_user.default_package
return package_models(request, default_package.uuid)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def search(request):
current_user = _anonymous_or_real(request)
if request.method == "GET":
package_urls = request.GET.getlist("packages")
searchterm = request.GET.get("search").strip()
mode = request.GET.get("mode")
# add HTTP_ACCEPT check to differentiate between index and ajax call
if "application/json" in request.META.get("HTTP_ACCEPT") and all([searchterm, mode]):
if package_urls:
packages = [
PackageManager.get_package_by_url(current_user, p) for p in package_urls
]
else:
packages = PackageManager.get_reviewed_packages()
search_result = SearchManager.search(packages, searchterm, mode)
return JsonResponse(search_result, safe=False)
context = get_base_context(request)
context["title"] = "enviPath - Search"
context["object_type"] = "model"
context["meta"]["current_package"] = context["meta"]["user"].default_package
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Search": s.SERVER_URL + "/search"},
]
reviewed_package_qs = PackageManager.get_reviewed_packages()
unreviewed_package_qs = PackageManager.get_all_readable_packages(current_user)
context["reviewed_objects"] = reviewed_package_qs
context["unreviewed_objects"] = unreviewed_package_qs
if all([searchterm, mode]):
if package_urls:
packages = [
PackageManager.get_package_by_url(current_user, p) for p in package_urls
]
else:
packages = PackageManager.get_reviewed_packages()
context["search_result"] = SearchManager.search(packages, searchterm, mode)
context["search_result"]["searchterm"] = searchterm
return render(request, "search.html", context)
else:
return HttpResponseNotAllowed(["GET"])
@package_permission_required()
def package_models(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - Models"
context["meta"]["current_package"] = current_package
context["object_type"] = "model"
context["breadcrumbs"] = breadcrumbs(current_package, "model")
reviewed_model_qs = EPModel.objects.none()
unreviewed_model_qs = EPModel.objects.none()
if current_package.reviewed:
reviewed_model_qs = EPModel.objects.filter(package=current_package).order_by("name")
else:
unreviewed_model_qs = EPModel.objects.filter(package=current_package).order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_model_qs if current_package.reviewed else unreviewed_model_qs
)
]
}
)
context["reviewed_objects"] = reviewed_model_qs
context["unreviewed_objects"] = unreviewed_model_qs
context["model_types"] = {
"ML Relative Reasoning": "mlrr",
"Rule Based Relative Reasoning": "rbrr",
}
if s.FLAGS.get("ENVIFORMER", False):
context["model_types"]["EnviFormer"] = "enviformer"
if s.FLAGS.get("PLUGINS", False):
for k, v in s.CLASSIFIER_PLUGINS.items():
context["model_types"][v.display()] = k
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
log_post_params(request)
name = request.POST.get("model-name")
description = request.POST.get("model-description")
model_type = request.POST.get("model-type")
# Generic fields for ML and Rule Based
rule_packages = request.POST.getlist("model-rule-packages")
data_packages = request.POST.getlist("model-data-packages")
# Generic params
params = {
"package": current_package,
"name": name,
"description": description,
"data_packages": [
PackageManager.get_package_by_url(current_user, p) for p in data_packages
],
}
if model_type == "enviformer":
threshold = float(request.POST.get("model-threshold", 0.5))
params["threshold"] = threshold
mod = EnviFormer.create(**params)
elif model_type == "mlrr":
# ML Specific
threshold = float(request.POST.get("model-threshold", 0.5))
# TODO handle additional fingerprinter
# fingerprinter = request.POST.get("model-fingerprinter")
params["rule_packages"] = [
PackageManager.get_package_by_url(current_user, p) for p in rule_packages
]
# App Domain related parameters
build_ad = request.POST.get("build-app-domain", False) == "on"
num_neighbors = request.POST.get("num-neighbors", 5)
reliability_threshold = request.POST.get("reliability-threshold", 0.5)
local_compatibility_threshold = request.POST.get("local-compatibility-threshold", 0.5)
params["threshold"] = threshold
# params['fingerprinter'] = fingerprinter
params["build_app_domain"] = build_ad
params["app_domain_num_neighbours"] = num_neighbors
params["app_domain_reliability_threshold"] = reliability_threshold
params["app_domain_local_compatibility_threshold"] = local_compatibility_threshold
mod = MLRelativeReasoning.create(**params)
elif model_type == "rbrr":
params["rule_packages"] = [
PackageManager.get_package_by_url(current_user, p) for p in rule_packages
]
mod = RuleBasedRelativeReasoning.create(**params)
elif s.FLAGS.get("PLUGINS", False) and model_type in s.CLASSIFIER_PLUGINS.values():
pass
else:
return error(
request, "Invalid model type.", f'Model type "{model_type}" is not supported."'
)
from .tasks import dispatch, build_model
dispatch(current_user, build_model, mod.pk)
return redirect(mod.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_model(request, package_uuid, model_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_model = EPModel.objects.get(package=current_package, uuid=model_uuid)
if request.method == "GET":
classify = request.GET.get("classify", False)
ad_assessment = request.GET.get("app-domain-assessment", False)
if classify or ad_assessment:
smiles = request.GET.get("smiles", "").strip()
# Check if smiles is non empty and valid
if smiles == "":
return JsonResponse({"error": "Received empty SMILES"}, status=400)
try:
stand_smiles = FormatConverter.standardize(smiles)
except ValueError:
return JsonResponse({"error": f'"{smiles}" is not a valid SMILES'}, status=400)
if classify:
from epdb.tasks import dispatch_eager, predict_simple
res = dispatch_eager(current_user, predict_simple, current_model.pk, stand_smiles)
pred_res = current_model.predict(stand_smiles)
res = []
for pr in pred_res:
if len(pr) > 0:
products = []
for prod_set in pr.product_sets:
logger.debug(f"Checking {prod_set}")
products.append(tuple([x for x in prod_set]))
res.append(
{
"products": list(set(products)),
"probability": pr.probability,
"btrule": {k: getattr(pr.rule, k) for k in ["url", "name"]}
if pr.rule is not None
else None,
}
)
return JsonResponse(res, safe=False)
else:
app_domain_assessment = current_model.app_domain.assess(stand_smiles)
return JsonResponse(app_domain_assessment, safe=False)
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - {current_model.name}"
context["meta"]["current_package"] = current_package
context["object_type"] = "model"
context["breadcrumbs"] = breadcrumbs(current_package, "model", current_model)
context["model"] = current_model
context["current_object"] = current_model
return render(request, "objects/model.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_model.delete()
return redirect(current_package.url + "/model")
elif hidden == "evaluate":
from .tasks import dispatch, evaluate_model
eval_type = request.POST.get("model-evaluation-type")
if eval_type not in ["sg", "mg"]:
return error(
request,
"Invalid evaluation type",
f'Evaluation type "{eval_type}" is not supported. Only "sg" and "mg" are supported.',
)
multigen = eval_type == "mg"
eval_packages = request.POST.getlist("model-evaluation-packages")
eval_package_ids = [
PackageManager.get_package_by_url(current_user, p).id for p in eval_packages
]
dispatch(current_user, evaluate_model, current_model.pk, multigen, eval_package_ids)
return redirect(current_model.url)
else:
return HttpResponseBadRequest()
else:
# TODO: Move cleaning to property updater
name = request.POST.get("model-name")
if name is not None:
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
description = request.POST.get("model-description")
if description is not None:
description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
if any([name, description]):
if name:
current_model.name = name
if description:
current_model.description = description
current_model.save()
return redirect(current_model.url)
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
if request.GET.get("export", False) == "true":
filename = f"{current_package.name.replace(' ', '_')}_{current_package.uuid}.json"
pack_json = PackageManager.export_package(
current_package, include_models=False, include_external_identifiers=False
)
response = JsonResponse(pack_json, content_type="application/json")
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name}"
context["meta"]["current_package"] = current_package
context["object_type"] = "package"
context["breadcrumbs"] = breadcrumbs(current_package)
context["package"] = current_package
user_perms = UserPackagePermission.objects.filter(package=current_package)
users = get_user_model().objects.exclude(
id__in=UserPackagePermission.objects.filter(package=current_package).values_list(
"user_id", flat=True
)
)
group_perms = GroupPackagePermission.objects.filter(package=current_package)
groups = Group.objects.exclude(
id__in=GroupPackagePermission.objects.filter(package=current_package).values_list(
"group_id", flat=True
)
)
context["users"] = users
context["groups"] = groups
context["user_permissions"] = user_perms
context["group_permissions"] = group_perms
return render(request, "objects/package.html", context)
elif request.method == "POST":
log_post_params(request)
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
if current_user.default_package == current_package:
return error(
request,
f'Package "{current_package.name}" is the default and cannot be deleted!',
"You cannot delete the default package. If you want to delete this package you have to set another default package first.",
)
logger.debug(current_package.delete())
return redirect(s.SERVER_URL + "/package")
elif hidden == "publish-package":
for g in Group.objects.filter(public=True):
PackageManager.update_permissions(
current_user, current_package, g, Permission.READ[0]
)
return redirect(current_package.url)
elif hidden == "copy":
object_to_copy = request.POST.get("object_to_copy")
if not object_to_copy:
return error(request, "No object to copy", "There was no object to copy.")
try:
copied_object = copy_object(current_user, current_package, object_to_copy)
except ValueError:
return JsonResponse(
{"error": f"Can't copy object {object_to_copy} to the same package!"},
status=400,
)
return JsonResponse({"success": copied_object.url})
else:
return HttpResponseBadRequest()
# TODO: Move cleaning to property updater
new_package_name = request.POST.get("package-name")
if new_package_name is not None:
new_package_name = nh3.clean(new_package_name, tags=s.ALLOWED_HTML_TAGS).strip()
new_package_description = request.POST.get("package-description")
if new_package_description is not None:
new_package_description = nh3.clean(
new_package_description, tags=s.ALLOWED_HTML_TAGS
).strip()
grantee_url = request.POST.get("grantee")
read = request.POST.get("read") == "on"
write = request.POST.get("write") == "on"
owner = request.POST.get("owner") == "on"
license = request.POST.get("license")
license_link = request.POST.get("license-link")
license_image_link = request.POST.get("license-image-link")
if new_package_name:
current_package.name = new_package_name
if new_package_description:
current_package.description = new_package_description
if any([new_package_name, new_package_description]):
current_package.save()
return redirect(current_package.url)
elif any([grantee_url, read, write, owner]):
if "user" in grantee_url:
grantee = UserManager.get_user_lp(grantee_url)
else:
grantee = GroupManager.get_group_lp(grantee_url)
max_perm = None
if read:
max_perm = Permission.READ[0]
if write:
max_perm = Permission.WRITE[0]
if owner:
max_perm = Permission.ALL[0]
PackageManager.update_permissions(current_user, current_package, grantee, max_perm)
return redirect(current_package.url)
elif license is not None:
if license == "no-license":
if current_package.license is not None:
current_package.license.delete()
current_package.license = None
current_package.save()
return redirect(current_package.url)
else:
if current_package.license is not None:
current_package.license.delete()
license = License()
license.link = license_link
license.image_link = license_image_link
license.save()
current_package.license = license
current_package.save()
return redirect(current_package.url)
else:
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_compounds(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - Compounds"
context["meta"]["current_package"] = current_package
context["object_type"] = "compound"
context["breadcrumbs"] = breadcrumbs(current_package, "compound")
reviewed_compound_qs = Compound.objects.none()
unreviewed_compound_qs = Compound.objects.none()
if current_package.reviewed:
reviewed_compound_qs = Compound.objects.filter(package=current_package).order_by("name")
else:
unreviewed_compound_qs = Compound.objects.filter(package=current_package).order_by(
"name"
)
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_compound_qs
if current_package.reviewed
else unreviewed_compound_qs
)
]
}
)
context["reviewed_objects"] = reviewed_compound_qs
context["unreviewed_objects"] = unreviewed_compound_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
compound_name = request.POST.get("compound-name")
compound_smiles = request.POST.get("compound-smiles").strip()
compound_description = request.POST.get("compound-description")
c = Compound.create(current_package, compound_smiles, compound_name, compound_description)
return redirect(c.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_compound(request, package_uuid, compound_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - {current_compound.name}"
context["meta"]["current_package"] = current_package
context["object_type"] = "compound"
context["breadcrumbs"] = breadcrumbs(current_package, "compound", current_compound)
context["compound"] = current_compound
context["current_object"] = current_compound
return render(request, "objects/compound.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_compound.delete()
return redirect(current_package.url + "/compound")
else:
return HttpResponseBadRequest()
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_compound, selected_scenarios)
return redirect(current_compound.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_compound, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_compound.url})
# TODO: Move cleaning to property updater
new_compound_name = request.POST.get("compound-name")
if new_compound_name is not None:
new_compound_name = nh3.clean(new_compound_name, tags=s.ALLOWED_HTML_TAGS).strip()
new_compound_description = request.POST.get("compound-description")
if new_compound_description is not None:
new_compound_description = nh3.clean(
new_compound_description, tags=s.ALLOWED_HTML_TAGS
).strip()
if new_compound_name:
current_compound.name = new_compound_name
if new_compound_description:
current_compound.description = new_compound_description
if any([new_compound_name, new_compound_description]):
current_compound.save()
return redirect(current_compound.url)
selected_database = request.POST.get("selected-database", "").strip()
external_identifier = request.POST.get("identifier", "").strip()
if selected_database and external_identifier:
db = ExternalDatabase.objects.get(id=int(selected_database))
ExternalIdentifier.objects.create(
content_object=current_compound,
database=db,
identifier_value=external_identifier,
url=db.url_pattern.format(id=external_identifier),
is_primary=False,
)
return redirect(current_compound.url)
else:
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_compound_structures(request, package_uuid, compound_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = (
f"enviPath - {current_package.name} - {current_compound.name} - Structures"
)
context["meta"]["current_package"] = current_package
context["object_type"] = "structure"
context["breadcrumbs"] = breadcrumbs(
current_package, "compound", current_compound, "structure"
)
reviewed_compound_structure_qs = CompoundStructure.objects.none()
unreviewed_compound_structure_qs = CompoundStructure.objects.none()
if current_package.reviewed:
reviewed_compound_structure_qs = current_compound.structures.order_by("name")
else:
unreviewed_compound_structure_qs = current_compound.structures.order_by("name")
context["reviewed_objects"] = reviewed_compound_structure_qs
context["unreviewed_objects"] = unreviewed_compound_structure_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
structure_name = request.POST.get("structure-name")
structure_smiles = request.POST.get("structure-smiles").strip()
structure_description = request.POST.get("structure-description")
try:
cs = current_compound.add_structure(
structure_smiles, structure_name, structure_description
)
except ValueError:
return error(
request,
"Adding structure failed!",
"The structure could not be added as normalized structures don't match!",
)
return redirect(cs.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid)
current_structure = CompoundStructure.objects.get(
compound=current_compound, uuid=structure_uuid
)
if request.method == "GET":
context = get_base_context(request)
context["title"] = (
f"enviPath - {current_package.name} - {current_compound.name} - {current_structure.name}"
)
context["meta"]["current_package"] = current_package
context["object_type"] = "structure"
context["compound_structure"] = current_structure
context["current_object"] = current_structure
context["breadcrumbs"] = breadcrumbs(
current_package, "compound", current_compound, "structure", current_structure
)
return render(request, "objects/compound_structure.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
# Check if we have to delete the compound as no structure is left
if len(current_structure.compound.structures.all()) == 1:
# This will delete the structure as well
current_compound.delete()
return redirect(current_package.url + "/compound")
else:
if current_structure.normalized_structure:
current_compound.delete()
return redirect(current_package.url + "/compound")
else:
if current_compound.default_structure == current_structure:
current_structure.delete()
current_compound.default_structure = (
current_compound.structures.all().first()
)
return redirect(current_compound.url + "/structure")
else:
current_structure.delete()
return redirect(current_compound.url + "/structure")
else:
return HttpResponseBadRequest()
# TODO: Move cleaning to property updater
new_structure_name = request.POST.get("compound-structure-name")
if new_structure_name is not None:
new_structure_name = nh3.clean(new_structure_name, tags=s.ALLOWED_HTML_TAGS).strip()
new_structure_description = request.POST.get("compound-structure-description")
if new_structure_description is not None:
new_structure_description = nh3.clean(
new_structure_description, tags=s.ALLOWED_HTML_TAGS
).strip()
if new_structure_name:
current_structure.name = new_structure_name
if new_structure_description:
current_structure.description = new_structure_description
if any([new_structure_name, new_structure_description]):
current_structure.save()
return redirect(current_structure.url)
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_structure, selected_scenarios)
return redirect(current_structure.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_structure, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_structure.url})
selected_database = request.POST.get("selected-database", "").strip()
external_identifier = request.POST.get("identifier", "").strip()
if selected_database and external_identifier:
db = ExternalDatabase.objects.get(id=int(selected_database))
ExternalIdentifier.objects.create(
content_object=current_structure,
database=db,
identifier_value=external_identifier,
url=db.url_pattern.format(id=external_identifier),
is_primary=False,
)
return redirect(current_structure.url)
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(
[
"GET",
]
)
@package_permission_required()
def package_rules(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - Rules"
context["meta"]["current_package"] = current_package
context["object_type"] = "rule"
context["breadcrumbs"] = breadcrumbs(current_package, "rule")
reviewed_rule_qs = Rule.objects.none()
unreviewed_rule_qs = Rule.objects.none()
if current_package.reviewed:
reviewed_rule_qs = Rule.objects.filter(package=current_package).order_by("name")
else:
unreviewed_rule_qs = Rule.objects.filter(package=current_package).order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_rule_qs if current_package.reviewed else unreviewed_rule_qs
)
]
}
)
context["reviewed_objects"] = reviewed_rule_qs
context["unreviewed_objects"] = unreviewed_rule_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
log_post_params(request)
# Generic params
rule_name = request.POST.get("rule-name")
rule_description = request.POST.get("rule-description")
rule_type = request.POST.get("rule-type")
params = {}
# Obtain parameters as required by rule type
if rule_type == "SimpleAmbitRule":
params["smirks"] = request.POST.get("rule-smirks").strip()
params["reactant_filter_smarts"] = request.POST.get("rule-reactant-smarts")
params["product_filter_smarts"] = request.POST.get("rule-product-smarts")
elif rule_type == "SimpleRDKitRule":
params["reaction_smarts"] = request.POST.get("rule-reaction-smarts").strip()
elif rule_type == "ParallelRule":
pass
elif rule_type == "SequentialRule":
pass
else:
return HttpResponseBadRequest()
r = Rule.create(
rule_type=rule_type,
package=current_package,
name=rule_name,
description=rule_description,
**params,
)
return redirect(r.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_rule(request, package_uuid, rule_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_rule = Rule.objects.get(package=current_package, uuid=rule_uuid)
if request.method == "GET":
context = get_base_context(request)
if smiles := request.GET.get("smiles", False):
stand_smiles = FormatConverter.standardize(smiles)
res = current_rule.apply(stand_smiles)
if len(res) > 1:
logger.info(
f"Rule {current_rule.uuid} returned multiple product sets on {smiles}, picking the first one."
)
# Some Rules are touching unrelated areas which might result in ~ indicating
# any bond (-, =, #). For drawing we need a concrete bond. -> use single bond
product_smiles = [x.replace("~", "-") for x in res[0]]
smirks = f"{stand_smiles}>>{'.'.join(sorted(product_smiles))}"
# Usually the functional groups are a mapping of fg -> count
# As we are doing it on the fly here fake a high count to ensure that its properly highlighted
if isinstance(current_rule, SimpleAmbitRule):
educt_functional_groups = {current_rule.reactants_smarts: 1000}
product_functional_groups = {current_rule.products_smarts: 1000}
else:
educt_functional_groups = {x: 1000 for x in current_rule.reactants_smarts}
product_functional_groups = {x: 1000 for x in current_rule.products_smarts}
return HttpResponse(
IndigoUtils.smirks_to_svg(
smirks,
False,
0,
0,
educt_functional_groups=educt_functional_groups,
product_functional_groups=product_functional_groups,
),
content_type="image/svg+xml",
)
context["title"] = f"enviPath - {current_package.name} - {current_rule.name}"
context["meta"]["current_package"] = current_package
context["object_type"] = "rule"
context["breadcrumbs"] = breadcrumbs(current_package, "rule", current_rule)
context["rule"] = current_rule
context["current_object"] = current_rule
if isinstance(current_rule, SimpleAmbitRule):
return render(request, "objects/simple_rule.html", context)
else: # isinstance(current_rule, ParallelRule) or isinstance(current_rule, SequentialRule):
return render(request, "objects/composite_rule.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_rule.delete()
return redirect(current_package.url + "/rule")
else:
return HttpResponseBadRequest()
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_rule, selected_scenarios)
return redirect(current_rule.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_rule, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_rule.url})
# TODO: Move cleaning to property updater
rule_name = request.POST.get("rule-name")
if rule_name is not None:
rule_name = nh3.clean(rule_name, tags=s.ALLOWED_HTML_TAGS).strip()
rule_description = request.POST.get("rule-description")
if rule_description is not None:
rule_description = nh3.clean(rule_description, tags=s.ALLOWED_HTML_TAGS).strip()
if rule_name:
current_rule.name = rule_name
if rule_description:
current_rule.description = rule_description
if any([rule_name, rule_description]):
current_rule.save()
return redirect(current_rule.url)
else:
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_rule_enzymelink(request, package_uuid, rule_uuid, enzymelink_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_rule = Rule.objects.get(package=current_package, uuid=rule_uuid)
current_enzymelink = EnzymeLink.objects.get(rule=current_rule, uuid=enzymelink_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - {current_rule.name}"
context["meta"]["current_package"] = current_package
context["object_type"] = "enzyme"
context["breadcrumbs"] = breadcrumbs(
current_package, "rule", current_rule, "enzymelink", current_enzymelink
)
context["enzymelink"] = current_enzymelink
context["current_object"] = current_enzymelink
return render(request, "objects/enzymelink.html", context)
return HttpResponseNotAllowed(["GET"])
@package_permission_required()
def package_reactions(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - {current_package.name} - Reactions"
context["meta"]["current_package"] = current_package
context["object_type"] = "reaction"
context["breadcrumbs"] = breadcrumbs(current_package, "reaction")
reviewed_reaction_qs = Reaction.objects.none()
unreviewed_reaction_qs = Reaction.objects.none()
if current_package.reviewed:
reviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by("name")
else:
unreviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by(
"name"
)
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_reaction_qs
if current_package.reviewed
else unreviewed_reaction_qs
)
]
}
)
context["reviewed_objects"] = reviewed_reaction_qs
context["unreviewed_objects"] = unreviewed_reaction_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
reaction_name = request.POST.get("reaction-name")
reaction_description = request.POST.get("reaction-description")
reactions_smirks = request.POST.get("reaction-smirks").strip()
educts = reactions_smirks.split(">>")[0].split(".")
products = reactions_smirks.split(">>")[1].split(".")
r = Reaction.create(
current_package,
name=reaction_name,
description=reaction_description,
educts=educts,
products=products,
)
return redirect(r.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_reaction(request, package_uuid, reaction_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_reaction = Reaction.objects.get(package=current_package, uuid=reaction_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - {current_reaction.name}"
context["meta"]["current_package"] = current_package
context["object_type"] = "reaction"
context["breadcrumbs"] = breadcrumbs(current_package, "reaction", current_reaction)
context["reaction"] = current_reaction
context["current_object"] = current_reaction
return render(request, "objects/reaction.html", context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_reaction.delete()
return redirect(current_package.url + "/reaction")
else:
return HttpResponseBadRequest()
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_reaction, selected_scenarios)
return redirect(current_reaction.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_reaction, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_reaction.url})
# TODO: Move cleaning to property updater
new_reaction_name = request.POST.get("reaction-name")
if new_reaction_name is not None:
new_reaction_name = nh3.clean(new_reaction_name, tags=s.ALLOWED_HTML_TAGS).strip()
new_reaction_description = request.POST.get("reaction-description")
if new_reaction_description is not None:
new_reaction_description = nh3.clean(
new_reaction_description, tags=s.ALLOWED_HTML_TAGS
).strip()
if new_reaction_name:
current_reaction.name = new_reaction_name
if new_reaction_description:
current_reaction.description = new_reaction_description
if any([new_reaction_name, new_reaction_description]):
current_reaction.save()
return redirect(current_reaction.url)
selected_database = request.POST.get("selected-database", "").strip()
external_identifier = request.POST.get("identifier", "").strip()
if selected_database and external_identifier:
db = ExternalDatabase.objects.get(id=int(selected_database))
ExternalIdentifier.objects.create(
content_object=current_reaction,
database=db,
identifier_value=external_identifier,
url=db.url_pattern.format(id=external_identifier),
is_primary=False,
)
return redirect(current_reaction.url)
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathways(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - Pathways"
context["meta"]["current_package"] = current_package
context["object_type"] = "pathway"
context["breadcrumbs"] = breadcrumbs(current_package, "pathway")
reviewed_pathway_qs = Pathway.objects.none()
unreviewed_pathway_qs = Pathway.objects.none()
if current_package.reviewed:
reviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by("name")
else:
unreviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_pathway_qs
if current_package.reviewed
else unreviewed_pathway_qs
)
]
}
)
context["reviewed_objects"] = reviewed_pathway_qs
context["unreviewed_objects"] = unreviewed_pathway_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
log_post_params(request)
name = request.POST.get("name")
description = request.POST.get("description")
smiles = request.POST.get("smiles", "").strip()
pw_mode = request.POST.get("predict", "predict").strip()
if "smiles" in request.POST and smiles == "":
return error(
request,
"Pathway prediction failed!",
"Pathway prediction failed due to missing or empty SMILES",
)
try:
stand_smiles = FormatConverter.standardize(smiles)
except ValueError:
return error(
request,
"Pathway prediction failed!",
f'Pathway prediction failed as standardization of SMILES "{smiles}" failed!',
)
modes = ["predict", "build", "incremental"]
if pw_mode not in modes:
return error(
request,
"Pathway prediction failed!",
f'Pathway prediction failed as received mode "{pw_mode}" is none of {modes}',
)
prediction_setting = request.POST.get("prediction-setting", None)
if prediction_setting:
prediction_setting = SettingManager.get_setting_by_url(current_user, prediction_setting)
else:
prediction_setting = current_user.prediction_settings()
pw = Pathway.create(current_package, stand_smiles, name=name, description=description)
# set mode
pw.kv.update({"mode": pw_mode})
pw.save()
if pw_mode == "predict" or pw_mode == "incremental":
# unlimited pred (will be handled by setting)
limit = -1
# For incremental predict first level and return
if pw_mode == "incremental":
limit = 1
pw.setting = prediction_setting
pw.save()
from .tasks import dispatch, predict
dispatch(current_user, predict, pw.pk, prediction_setting.pk, limit=limit)
return redirect(pw.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathway(request, package_uuid, pathway_uuid):
current_user: User = _anonymous_or_real(request)
current_package: Package = PackageManager.get_package_by_id(current_user, package_uuid)
current_pathway: Pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
if request.method == "GET":
if request.GET.get("last_modified", False):
return JsonResponse(
{"modified": current_pathway.modified.strftime("%Y-%m-%d %H:%M:%S")}
)
if request.GET.get("status", False):
return JsonResponse(
{
"status": current_pathway.status(),
"modified": current_pathway.modified.strftime("%Y-%m-%d %H:%M:%S"),
}
)
if request.GET.get("download", False) == "true":
filename = f"{current_pathway.name.replace(' ', '_')}_{current_pathway.uuid}.csv"
csv_pw = current_pathway.to_csv()
response = HttpResponse(csv_pw, content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
if (
request.GET.get("identify-missing-rules", False) == "true"
and request.GET.get("rule-package") is not None
):
from .tasks import dispatch_eager, identify_missing_rules
rule_package = PackageManager.get_package_by_url(
current_user, request.GET.get("rule-package")
)
res = dispatch_eager(
current_user, identify_missing_rules, [current_pathway.pk], rule_package.pk
)
filename = f"{current_pathway.name.replace(' ', '_')}_{current_pathway.uuid}.csv"
response = HttpResponse(res, content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
# Pathway d3_json() relies on a lot of related objects (Nodes, Structures, Edges, Reaction, Rules, ...)
# we will again fetch the current pathway identified by this url, but this time together with nearly all
# related objects
current_pathway = Pathway.objects.prefetch_related(
"node_set",
"node_set__out_edges",
"node_set__default_node_label",
"node_set__scenarios",
"edge_set",
"edge_set__start_nodes",
"edge_set__end_nodes",
"edge_set__edge_label",
"edge_set__scenarios",
).get(uuid=pathway_uuid)
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - {current_pathway.name}"
context["meta"]["current_package"] = current_package
context["object_type"] = "pathway"
context["breadcrumbs"] = breadcrumbs(current_package, "pathway", current_pathway)
context["pathway"] = current_pathway
context["current_object"] = current_pathway
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Package": s.SERVER_URL + "/package"},
{current_package.name: current_package.url},
{"Pathway": current_package.url + "/pathway"},
{current_pathway.name: current_pathway.url},
]
return render(request, "objects/pathway.html", context)
# return render(request, 'pathway_playground2.html', context)
elif request.method == "POST":
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_pathway.delete()
return redirect(current_package.url + "/pathway")
else:
return HttpResponseBadRequest()
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_pathway, selected_scenarios)
return redirect(current_pathway.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_pathway, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_pathway.url})
# TODO: Move cleaning to property updater
pathway_name = request.POST.get("pathway-name")
if pathway_name is not None:
pathway_name = nh3.clean(pathway_name, tags=s.ALLOWED_HTML_TAGS).strip()
pathway_description = request.POST.get("pathway-description")
if pathway_description is not None:
pathway_description = nh3.clean(pathway_description, tags=s.ALLOWED_HTML_TAGS).strip()
if any([pathway_name, pathway_description]):
if pathway_name is not None and pathway_name.strip() != "":
pathway_name = pathway_name.strip()
current_pathway.name = pathway_name
if pathway_description is not None and pathway_description.strip() != "":
pathway_description = pathway_description.strip()
current_pathway.description = pathway_description
current_pathway.save()
return redirect(current_pathway.url)
node_url = request.POST.get("node")
if node_url:
n = current_pathway.get_node(node_url)
from .tasks import dispatch, predict
dispatch(
current_user,
predict,
current_pathway.pk,
current_pathway.setting.pk,
node_pk=n.pk,
)
return JsonResponse({"success": current_pathway.url})
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathway_nodes(request, package_uuid, pathway_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - {current_pathway.name} - Nodes"
context["meta"]["current_package"] = current_package
context["object_type"] = "node"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Package": s.SERVER_URL + "/package"},
{current_package.name: current_package.url},
{"Pathway": current_package.url + "/pathway"},
{current_pathway.name: current_pathway.url},
{"Node": current_pathway.url + "/node"},
]
reviewed_node_qs = Node.objects.none()
unreviewed_node_qs = Node.objects.none()
if current_package.reviewed:
reviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by("name")
else:
unreviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_node_qs if current_package.reviewed else unreviewed_node_qs
)
]
}
)
context["reviewed_objects"] = reviewed_node_qs
context["unreviewed_objects"] = unreviewed_node_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
node_name = request.POST.get("node-name")
node_description = request.POST.get("node-description")
node_smiles = request.POST.get("node-smiles").strip()
current_pathway.add_node(node_smiles, name=node_name, description=node_description)
return redirect(current_pathway.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathway_node(request, package_uuid, pathway_uuid, node_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
current_node = Node.objects.get(pathway=current_pathway, uuid=node_uuid)
if request.method == "GET":
is_image_request = request.GET.get("image")
is_highlight_request = request.GET.get("highlight", False)
is_highlight_reactivity = request.GET.get("highlightReactivity", False)
if is_image_request:
if is_image_request == "svg":
# TODO optimize this chain
if is_highlight_request:
# User functional groups covered by the model training data
fgs = {}
if current_pathway.setting:
if current_pathway.setting.model:
if current_pathway.setting.model.app_domain:
fgs = current_pathway.setting.model.app_domain.functional_groups
svg_data = IndigoUtils.mol_to_svg(
current_node.default_node_label.smiles, functional_groups=fgs
)
elif is_highlight_reactivity:
# Use reactant smarts to show all reaction sites
# set a high count to obtain a strong color
ad_data = current_node.get_app_domain_assessment_data()
fgs = {}
for t in ad_data.get("assessment", {}).get("transformations", []):
r = Rule.objects.get(url=t["rule"]["url"])
if isinstance(r, SimpleAmbitRule):
fgs[r.reactants_smarts] = 1000
else:
for sr in r.srs:
fgs[sr.reactants_smarts] = 1000
svg_data = IndigoUtils.mol_to_svg(
current_node.default_node_label.smiles, functional_groups=fgs
)
else:
svg_data = current_node.as_svg
return HttpResponse(svg_data, content_type="image/svg+xml")
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - {current_pathway.name}"
context["meta"]["current_package"] = current_package
context["object_type"] = "pathway"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Package": s.SERVER_URL + "/package"},
{current_package.name: current_package.url},
{"Pathway": current_package.url + "/pathway"},
{current_pathway.name: current_pathway.url},
{"Node": current_pathway.url + "/node"},
{current_node.name: current_node.url},
]
context["node"] = current_node
context["current_object"] = current_node
context["app_domain_assessment_data"] = json.dumps(
current_node.get_app_domain_assessment_data()
)
return render(request, "objects/node.html", context)
elif request.method == "POST":
log_post_params(request)
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
# pre_delete signal will take care of edge deletion
current_node.delete()
return redirect(current_pathway.url)
else:
return HttpResponseBadRequest()
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_node, selected_scenarios)
return redirect(current_node.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_node, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_node.url})
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathway_edges(request, package_uuid, pathway_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - {current_pathway.name} - Edges"
context["meta"]["current_package"] = current_package
context["object_type"] = "edge"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Package": s.SERVER_URL + "/package"},
{current_package.name: current_package.url},
{"Pathway": current_package.url + "/pathway"},
{current_pathway.name: current_pathway.url},
{"Edge": current_pathway.url + "/edge"},
]
reviewed_edge_qs = Edge.objects.none()
unreviewed_edge_qs = Edge.objects.none()
if current_package.reviewed:
reviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by("name")
else:
unreviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by("name")
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_edge_qs if current_package.reviewed else unreviewed_edge_qs
)
]
}
)
context["reviewed_objects"] = reviewed_edge_qs
context["unreviewed_objects"] = unreviewed_edge_qs
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
log_post_params(request)
edge_name = request.POST.get("edge-name")
edge_description = request.POST.get("edge-description")
edge_substrates = request.POST.getlist("edge-substrates")
edge_products = request.POST.getlist("edge-products")
substrate_nodes = [current_pathway.get_node(url) for url in edge_substrates]
product_nodes = [current_pathway.get_node(url) for url in edge_products]
# TODO in the future consider Rules here?
current_pathway.add_edge(
substrate_nodes, product_nodes, name=edge_name, description=edge_description
)
return redirect(current_pathway.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
current_edge = Edge.objects.get(pathway=current_pathway, uuid=edge_uuid)
if request.method == "GET":
is_image_request = request.GET.get("image")
if is_image_request:
if is_image_request == "svg":
svg_data = current_edge.as_svg
return HttpResponse(svg_data, content_type="image/svg+xml")
context = get_base_context(request)
context["title"] = (
f"enviPath - {current_package.name} - {current_pathway.name} - {current_edge.edge_label.name}"
)
context["meta"]["current_package"] = current_package
context["object_type"] = "edge"
context["breadcrumbs"] = breadcrumbs(
current_package, "pathway", current_pathway, "edge", current_edge
)
context["edge"] = current_edge
context["current_object"] = current_edge
return render(request, "objects/edge.html", context)
elif request.method == "POST":
log_post_params(request)
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_edge.delete()
return redirect(current_pathway.url)
if "selected-scenarios" in request.POST:
selected_scenarios = request.POST.getlist("selected-scenarios")
set_scenarios(current_user, current_edge, selected_scenarios)
return redirect(current_edge.url)
if "aliases" in request.POST:
aliases = request.POST.getlist("aliases")
try:
set_aliases(current_user, current_edge, aliases)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
return JsonResponse({"success": current_edge.url})
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
@package_permission_required()
def package_scenarios(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "GET":
if "application/json" in request.META.get("HTTP_ACCEPT") and not request.GET.get(
"all", False
):
scens = Scenario.objects.filter(package=current_package).order_by("name")
res = [{"name": s_.name, "url": s_.url, "uuid": s_.uuid} for s_ in scens]
return JsonResponse(res, safe=False)
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - Scenarios"
context["meta"]["current_package"] = current_package
context["object_type"] = "scenario"
context["breadcrumbs"] = breadcrumbs(current_package, "scenario")
reviewed_scenario_qs = Scenario.objects.none()
unreviewed_scenario_qs = Scenario.objects.none()
if current_package.reviewed:
reviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by("name")
else:
unreviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by(
"name"
)
if request.GET.get("all"):
return JsonResponse(
{
"objects": [
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
for pw in (
reviewed_scenario_qs
if current_package.reviewed
else unreviewed_scenario_qs
)
]
}
)
context["reviewed_objects"] = reviewed_scenario_qs
context["unreviewed_objects"] = unreviewed_scenario_qs
from envipy_additional_information import (
SLUDGE_ADDITIONAL_INFORMATION,
SOIL_ADDITIONAL_INFORMATION,
SEDIMENT_ADDITIONAL_INFORMATION,
)
context["scenario_types"] = {
"Soil Data": {
"name": "soil",
"widgets": [
HTMLGenerator.generate_html(ai, prefix=f"soil_{0}")
for ai in [x for sv in SOIL_ADDITIONAL_INFORMATION.values() for x in sv]
],
},
"Sludge Data": {
"name": "sludge",
"widgets": [
HTMLGenerator.generate_html(ai, prefix=f"sludge_{0}")
for ai in [x for sv in SLUDGE_ADDITIONAL_INFORMATION.values() for x in sv]
],
},
"Water-Sediment System Data": {
"name": "sediment",
"widgets": [
HTMLGenerator.generate_html(ai, prefix=f"sediment_{0}")
for ai in [x for sv in SEDIMENT_ADDITIONAL_INFORMATION.values() for x in sv]
],
},
}
context["sludge_additional_information"] = SLUDGE_ADDITIONAL_INFORMATION
context["soil_additional_information"] = SOIL_ADDITIONAL_INFORMATION
context["sediment_additional_information"] = SEDIMENT_ADDITIONAL_INFORMATION
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
log_post_params(request)
scenario_name = request.POST.get("scenario-name")
scenario_description = request.POST.get("scenario-description")
scenario_date_year = request.POST.get("scenario-date-year")
scenario_date_month = request.POST.get("scenario-date-month")
scenario_date_day = request.POST.get("scenario-date-day")
scenario_date = scenario_date_year
if scenario_date_month is not None and scenario_date_month.strip() != "":
scenario_date += f"-{int(scenario_date_month):02d}"
if scenario_date_day is not None and scenario_date_day.strip() != "":
scenario_date += f"-{int(scenario_date_day):02d}"
scenario_type = request.POST.get("scenario-type")
additional_information = HTMLGenerator.build_models(request.POST.dict())
additional_information = [x for sv in additional_information.values() for x in sv]
new_scen = Scenario.create(
current_package,
name=scenario_name,
description=scenario_description,
scenario_date=scenario_date,
scenario_type=scenario_type,
additional_information=additional_information,
)
return redirect(new_scen.url)
else:
return HttpResponseNotAllowed(
[
"GET",
]
)
@package_permission_required()
def package_scenario(request, package_uuid, scenario_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
current_scenario = Scenario.objects.get(package=current_package, uuid=scenario_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_package.name} - {current_scenario.name}"
context["meta"]["current_package"] = current_package
context["object_type"] = "scenario"
context["breadcrumbs"] = breadcrumbs(current_package, "scenario", current_scenario)
context["scenario"] = current_scenario
available_add_infs = []
for add_inf in NAME_MAPPING.values():
available_add_infs.append(
{
"display_name": add_inf.property_name(None),
"name": add_inf.__name__,
"widget": HTMLGenerator.generate_html(add_inf, prefix=f"{0}"),
}
)
context["available_additional_information"] = available_add_infs
context["update_widgets"] = [
HTMLGenerator.generate_html(ai, prefix=f"{i}")
for i, ai in enumerate(current_scenario.get_additional_information())
]
return render(request, "objects/scenario.html", context)
elif request.method == "POST":
log_post_params(request)
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_scenario.delete()
return redirect(current_package.url + "/scenario")
elif hidden == "delete-additional-information":
uuid = request.POST.get("uuid")
current_scenario.remove_additional_information(uuid)
return redirect(current_scenario.url)
elif hidden == "delete-all-additional-information":
current_scenario.additional_information = dict()
current_scenario.save()
return redirect(current_scenario.url)
elif hidden == "set-additional-information":
ais = HTMLGenerator.build_models(request.POST.dict())
if s.DEBUG:
logger.info(ais)
current_scenario.set_additional_information(ais)
return redirect(current_scenario.url)
elif hidden == "add-additional-information":
ais = HTMLGenerator.build_models(request.POST.dict())
if len(ais.keys()) != 1:
raise ValueError(
"Only one additional information field can be added at a time."
)
ai = list(ais.values())[0][0]
if s.DEBUG:
logger.info(ais)
current_scenario.add_additional_information(ai)
return redirect(current_scenario.url)
else:
return HttpResponseBadRequest()
else:
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
##############
# User/Group #
##############
def users(request):
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Users"
context["object_type"] = "user"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"User": s.SERVER_URL + "/user"},
]
context["objects"] = get_user_model().objects.all()
return render(request, "collections/objects_list.html", context)
else:
return HttpResponseNotAllowed(["GET"])
def user(request, user_uuid):
current_user = _anonymous_or_real(request)
if request.method == "GET":
# Check if current user is the one matching to the url
if str(current_user.uuid) != user_uuid and not current_user.is_superuser:
return HttpResponseBadRequest()
requested_user = UserManager.get_user_by_id(current_user, user_uuid)
context = get_base_context(request, for_user=requested_user)
context["title"] = "enviPath - User"
context["object_type"] = "user"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"User": s.SERVER_URL + "/user"},
{current_user.username: requested_user.url},
]
context["user"] = requested_user
model_qs = EPModel.objects.none()
for p in PackageManager.get_all_readable_packages(requested_user, include_reviewed=True):
model_qs |= p.models
context["models"] = model_qs
context["tokens"] = APIToken.objects.filter(user=requested_user)
return render(request, "objects/user.html", context)
elif request.method == "POST":
is_hidden_method = bool(request.POST.get("hidden", False))
if is_hidden_method and request.POST["hidden"] == "request-api-token":
name = request.POST.get("name", "No Name")
valid_for = min(max(int(request.POST.get("valid-for", 90)), 1), 90)
token, raw_token = APIToken.create_token(request.user, name=name, valid_for=valid_for)
return JsonResponse(
{"raw_token": raw_token, "token": {"id": token.id, "name": token.name}}
)
if is_hidden_method and request.POST["hidden"] == "delete":
token_id = request.POST.get("token-id")
if token_id is None:
return HttpResponseBadRequest("Token ID missing!")
try:
APIToken.objects.get(user=current_user, id=token_id).delete()
except APIToken.DoesNotExist:
return HttpResponseBadRequest("User and Token ID combination invalid!")
return HttpResponse("success")
default_package = request.POST.get("default-package")
default_group = request.POST.get("default-group")
default_prediction_setting = request.POST.get("default-prediction-setting")
if any([default_package, default_group, default_prediction_setting]):
current_user.default_package = PackageManager.get_package_by_url(
current_user, default_package
)
current_user.default_group = GroupManager.get_group_by_url(current_user, default_group)
current_user.default_setting = SettingManager.get_setting_by_url(
current_user, default_prediction_setting
)
current_user.save()
return redirect(current_user.url)
return HttpResponseBadRequest()
else:
return HttpResponseNotAllowed(["GET", "POST"])
def groups(request):
current_user = _anonymous_or_real(request)
if request.method == "GET":
context = get_base_context(request)
context["title"] = "enviPath - Groups"
context["object_type"] = "group"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Group": s.SERVER_URL + "/group"},
]
context["objects"] = Group.objects.all()
return render(request, "collections/objects_list.html", context)
elif request.method == "POST":
group_name = request.POST.get("group-name")
group_description = request.POST.get("group-description", s.DEFAULT_VALUES["description"])
g = GroupManager.create_group(current_user, group_name, group_description)
return redirect(g.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def group(request, group_uuid):
current_user = _anonymous_or_real(request)
current_group = GroupManager.get_group_by_id(current_user, group_uuid)
if request.method == "GET":
context = get_base_context(request)
context["title"] = f"enviPath - {current_group.name}"
context["object_type"] = "group"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Group": s.SERVER_URL + "/group"},
{current_group.name: current_group.url},
]
context["group"] = current_group
context["users"] = UserManager.get_users_lp().exclude(
id__in=current_group.user_member.all()
)
context["groups"] = (
GroupManager.get_groups_lp()
.exclude(id__in=current_group.group_member.all())
.exclude(id=current_group.pk)
)
context["packages"] = Package.objects.filter(
id__in=GroupPackagePermission.objects.filter(group=current_group)
.values("package")
.distinct()
)
return render(request, "objects/group.html", context)
elif request.method == "POST":
log_post_params(request)
if hidden := request.POST.get("hidden", None):
if hidden == "delete":
current_group.delete()
return redirect(s.SERVER_URL + "/group")
else:
return HttpResponseBadRequest()
member_url = request.POST.get("member")
action = request.POST.get("action")
if all([member_url, action]) and action in ["add", "remove"]:
if "user" in member_url:
member = UserManager.get_user_lp(member_url)
else:
member = GroupManager.get_group_lp(member_url)
GroupManager.update_members(current_user, current_group, member, action)
return redirect(current_group.url)
else:
return HttpResponseNotAllowed(["GET", "POST"])
def settings(request):
current_user = _anonymous_or_real(request)
context = get_base_context(request)
if request.method == "GET":
context["object_type"] = "setting"
# Even if settings are aready in "meta", for consistency add it on root level
context["settings"] = SettingManager.get_all_settings(current_user)
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Group": s.SERVER_URL + "/setting"},
]
return
elif request.method == "POST":
if s.DEBUG:
for k, v in request.POST.items():
logger.info("Parameters received:")
logger.info(f"{k}\t{v}")
name = request.POST.get("prediction-setting-name")
description = request.POST.get("prediction-setting-description")
new_default = request.POST.get("prediction-setting-new-default", "off") == "on"
max_nodes = min(
max(
int(request.POST.get("prediction-setting-max-nodes", 1)),
s.DEFAULT_MAX_NUMBER_OF_NODES,
),
s.DEFAULT_MAX_NUMBER_OF_NODES,
)
max_depth = min(
max(int(request.POST.get("prediction-setting-max-depth", 1)), s.DEFAULT_MAX_DEPTH),
s.DEFAULT_MAX_DEPTH,
)
tp_gen_method = request.POST.get("tp-generation-method")
params = {}
if tp_gen_method == "model-based-prediction-setting":
model_url = request.POST.get("model-based-prediction-setting-model")
model_uuid = model_url.split("/")[-1]
params["model"] = EPModel.objects.get(uuid=model_uuid)
params["model_threshold"] = request.POST.get(
"model-based-prediction-setting-threshold", s.DEFAULT_MODEL_THRESHOLD
)
if not PackageManager.readable(current_user, params["model"].package):
raise ValueError("")
elif tp_gen_method == "rule-based-prediction-setting":
rule_packages = request.POST.getlist("rule-based-prediction-setting-packages")
params["rule_packages"] = [
PackageManager.get_package_by_url(current_user, p) for p in rule_packages
]
else:
raise ValueError("")
created_setting = SettingManager.create_setting(
current_user,
name=name,
description=description,
max_nodes=max_nodes,
max_depth=max_depth,
**params,
)
if new_default:
current_user.default_setting = created_setting
current_user.save()
return HttpResponse("Success!")
else:
return HttpResponseNotAllowed(["GET", "POST"])
def setting(request, setting_uuid):
pass
def jobs(request):
current_user = _anonymous_or_real(request)
context = get_base_context(request)
if request.method == "GET":
context["object_type"] = "joblog"
context["breadcrumbs"] = [
{"Home": s.SERVER_URL},
{"Jobs": s.SERVER_URL + "/jobs"},
]
if current_user.is_superuser:
context["jobs"] = JobLog.objects.all().order_by("-created")
else:
context["jobs"] = JobLog.objects.filter(user=current_user).order_by("-created")
return render(request, "collections/joblog.html", context)
###########
# KETCHER #
###########
def indigo(request):
from indigo import Indigo
return JsonResponse({"Indigo": {"version": Indigo().version()}})
@csrf_exempt
def aromatize(request):
if request.method == "POST":
data = json.loads(request.body)
mol_data = data.get("struct")
aromatized = IndigoUtils.aromatize(mol_data, False)
return JsonResponse({"struct": aromatized})
else:
return HttpResponseBadRequest()
@csrf_exempt
def dearomatize(request):
if request.method == "POST":
data = json.loads(request.body)
mol_data = data.get("struct")
dearomatized = IndigoUtils.dearomatize(mol_data, False)
return JsonResponse({"struct": dearomatized})
else:
return HttpResponseBadRequest()
@csrf_exempt
def layout(request):
if request.method == "POST":
data = json.loads(request.body)
mol_data = data.get("struct")
lay = IndigoUtils.layout(mol_data)
return JsonResponse({"struct": lay})
else:
return HttpResponseBadRequest()
##########################
# Generic/Non-Persistent #
##########################
def depict(request):
if smiles := request.GET.get("smiles"):
return HttpResponse(IndigoUtils.mol_to_svg(smiles), content_type="image/svg+xml")
elif smirks := request.GET.get("smirks"):
query_smirks = request.GET.get("is_query_smirks", False) == "true"
return HttpResponse(
IndigoUtils.smirks_to_svg(smirks, query_smirks), content_type="image/svg+xml"
)
else:
return HttpResponseBadRequest()
@protected_resource()
def userinfo(request):
user = request.resource_owner
res = {
"sub": str(user.uuid),
"email": user.email,
"username": user.username,
"name": user.get_full_name() or user.username,
"email_verified": user.is_active,
}
return JsonResponse(res)
# Static Pages
def static_terms_of_use(request):
context = get_base_context(request)
context["title"] = "enviPath - Terms of Use"
context["public_mode"] = True
return render(request, "static/terms_of_use.html", context)
def static_privacy_policy(request):
context = get_base_context(request)
context["title"] = "enviPath - Privacy Policy"
context["public_mode"] = True
return render(request, "static/privacy_policy.html", context)
def static_cookie_policy(request):
context = get_base_context(request)
context["title"] = "enviPath - Cookie Policy"
context["public_mode"] = True
return render(request, "static/cookie_policy.html", context)
def static_about_us(request):
context = get_base_context(request)
context["title"] = "enviPath - About Us"
context["public_mode"] = True
return render(request, "static/about_us.html", context)
def static_contact_support(request):
context = get_base_context(request)
context["title"] = "enviPath - Contact & Support"
context["public_mode"] = True
return render(request, "static/contact.html", context)
def static_careers(request):
context = get_base_context(request)
context["title"] = "enviPath - Careers"
context["public_mode"] = True
return render(request, "static/careers.html", context)
def static_cite(request):
context = get_base_context(request)
context["title"] = "enviPath - How to Cite"
context["public_mode"] = True
return render(request, "static/cite.html", context)
def static_legal(request):
context = get_base_context(request)
context["title"] = "enviPath - Legal Information"
context["public_mode"] = True
return render(request, "static/legal.html", context)