forked from enviPath/enviPy
2829 lines
101 KiB
Python
2829 lines
101 KiB
Python
import json
|
|
import logging
|
|
from typing import List, Dict, Any
|
|
|
|
from django.conf import settings as s
|
|
from django.contrib.auth import get_user_model
|
|
from django.http import JsonResponse, HttpResponse, HttpResponseNotAllowed, HttpResponseBadRequest
|
|
from django.shortcuts import render, redirect
|
|
from django.urls import reverse
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from envipy_additional_information import NAME_MAPPING
|
|
from oauth2_provider.decorators import protected_resource
|
|
|
|
from utilities.chem import FormatConverter, IndigoUtils
|
|
from utilities.decorators import package_permission_required
|
|
from utilities.misc import HTMLGenerator
|
|
from .logic import (
|
|
GroupManager,
|
|
PackageManager,
|
|
UserManager,
|
|
SettingManager,
|
|
SearchManager,
|
|
EPDBURLParser,
|
|
)
|
|
from .models import (
|
|
Package,
|
|
GroupPackagePermission,
|
|
Group,
|
|
CompoundStructure,
|
|
Compound,
|
|
Reaction,
|
|
Rule,
|
|
Pathway,
|
|
Node,
|
|
EPModel,
|
|
EnviFormer,
|
|
MLRelativeReasoning,
|
|
RuleBasedRelativeReasoning,
|
|
Scenario,
|
|
SimpleAmbitRule,
|
|
APIToken,
|
|
UserPackagePermission,
|
|
Permission,
|
|
License,
|
|
User,
|
|
Edge,
|
|
ExternalDatabase,
|
|
ExternalIdentifier,
|
|
EnzymeLink,
|
|
JobLog,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def log_post_params(request):
|
|
if s.DEBUG:
|
|
for k, v in request.POST.items():
|
|
logger.debug(f"{k}\t{v}")
|
|
|
|
|
|
def error(request, message: str, detail: str, code: int = 400):
|
|
context = get_base_context(request)
|
|
error_context = {
|
|
"error_message": message,
|
|
"error_detail": detail,
|
|
}
|
|
|
|
if request.headers.get("Accept") == "application/json":
|
|
return JsonResponse(error_context, status=500)
|
|
|
|
context.update(**error_context)
|
|
return render(request, "errors/error.html", context, status=code)
|
|
|
|
|
|
def login(request):
|
|
context = get_base_context(request)
|
|
|
|
if request.method == "GET":
|
|
context["title"] = "enviPath"
|
|
context["next"] = request.GET.get("next", "")
|
|
return render(request, "static/login.html", context)
|
|
|
|
elif request.method == "POST":
|
|
from django.contrib.auth import authenticate
|
|
from django.contrib.auth import login
|
|
|
|
username = request.POST.get("username")
|
|
password = request.POST.get("password")
|
|
|
|
# Get email for username and check if the account is active
|
|
try:
|
|
temp_user = get_user_model().objects.get(username=username)
|
|
|
|
if not temp_user.is_active:
|
|
context["message"] = "User account is not activated yet!"
|
|
return render(request, "static/login.html", context)
|
|
|
|
email = temp_user.email
|
|
except get_user_model().DoesNotExist:
|
|
context["message"] = "Login failed!"
|
|
return render(request, "static/login.html", context)
|
|
try:
|
|
user = authenticate(username=email, password=password)
|
|
except Exception:
|
|
context["message"] = "Login failed!"
|
|
return render(request, "static/login.html", context)
|
|
|
|
if user is not None:
|
|
login(request, user)
|
|
|
|
if next := request.POST.get("next"):
|
|
return redirect(next)
|
|
|
|
return redirect(reverse("index"))
|
|
else:
|
|
context["message"] = "Login failed!"
|
|
return render(request, "static/login.html", context)
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def logout(request):
|
|
if request.method == "POST":
|
|
is_logout = bool(request.POST.get("logout", False))
|
|
|
|
if is_logout:
|
|
from django.contrib.auth import logout
|
|
|
|
logout(request)
|
|
return redirect(s.SERVER_URL)
|
|
|
|
return HttpResponseBadRequest()
|
|
|
|
|
|
def register(request):
|
|
context = get_base_context(request)
|
|
|
|
if request.method == "GET":
|
|
context["title"] = "enviPath"
|
|
context["next"] = request.GET.get("next", "")
|
|
return render(request, "static/register.html", context)
|
|
elif request.method == "POST":
|
|
context["title"] = "enviPath"
|
|
if next := request.POST.get("next"):
|
|
context["next"] = next
|
|
|
|
username = request.POST.get("username", "").strip()
|
|
email = request.POST.get("email", "").strip()
|
|
password = request.POST.get("password", "").strip()
|
|
rpassword = request.POST.get("rpassword", "").strip()
|
|
|
|
if not (username and email and password):
|
|
context["message"] = "Invalid username/email/password"
|
|
return render(request, "static/register.html", context)
|
|
|
|
if password != rpassword or password == "":
|
|
context["message"] = "Registration failed, provided passwords differ!"
|
|
return render(request, "static/register.html", context)
|
|
|
|
try:
|
|
u = UserManager.create_user(username, email, password)
|
|
logger.info(f"Created user {u.username} ({u.pk})")
|
|
except Exception:
|
|
context["message"] = "Registration failed! Couldn't create User Account."
|
|
return render(request, "static/register.html", context)
|
|
|
|
if s.ADMIN_APPROVAL_REQUIRED:
|
|
context["success_message"] = (
|
|
"Your account has been created! An admin will activate it soon!"
|
|
)
|
|
else:
|
|
context["success_message"] = (
|
|
"Account has been created! You'll receive a mail to activate your account shortly."
|
|
)
|
|
|
|
return render(request, "static/login.html", context)
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def editable(request, user):
|
|
if user.is_superuser:
|
|
return True
|
|
|
|
url = request.build_absolute_uri(request.path)
|
|
if PackageManager.is_package_url(url):
|
|
_package = PackageManager.get_package_lp(request.build_absolute_uri())
|
|
return PackageManager.writable(user, _package)
|
|
elif GroupManager.is_group_url(url):
|
|
_group = GroupManager.get_group_lp(request.build_absolute_uri())
|
|
return GroupManager.writable(user, _group)
|
|
elif UserManager.is_user_url(url):
|
|
_user = UserManager.get_user_lp(request.build_absolute_uri())
|
|
return UserManager.writable(user, _user)
|
|
elif url in [
|
|
s.SERVER_URL,
|
|
f"{s.SERVER_URL}/",
|
|
f"{s.SERVER_URL}/package",
|
|
f"{s.SERVER_URL}/user",
|
|
f"{s.SERVER_URL}/group",
|
|
f"{s.SERVER_URL}/search",
|
|
]:
|
|
return True
|
|
else:
|
|
logger.debug(f"Unknown url: {url}")
|
|
return False
|
|
|
|
|
|
def get_base_context(request, for_user=None) -> Dict[str, Any]:
|
|
current_user = _anonymous_or_real(request)
|
|
can_edit = editable(request, current_user)
|
|
|
|
parser = EPDBURLParser(request.build_absolute_uri(request.path))
|
|
|
|
url_contains_package = False
|
|
if parser.contains_package_url() or parser.is_package_url():
|
|
url_contains_package = True
|
|
|
|
if for_user:
|
|
current_user = for_user
|
|
|
|
ctx = {
|
|
"title": "enviPath",
|
|
"meta": {
|
|
"version": "0.0.1",
|
|
"server_url": s.SERVER_URL,
|
|
"user": current_user,
|
|
"can_edit": can_edit,
|
|
"url_contains_package": url_contains_package,
|
|
"readable_packages": PackageManager.get_all_readable_packages(
|
|
current_user, include_reviewed=True
|
|
),
|
|
"writeable_packages": PackageManager.get_all_writeable_packages(current_user),
|
|
"available_groups": GroupManager.get_groups(current_user),
|
|
"available_settings": SettingManager.get_all_settings(current_user),
|
|
"enabled_features": s.FLAGS,
|
|
"debug": s.DEBUG,
|
|
"external_databases": ExternalDatabase.get_databases(),
|
|
"site_id": s.MATOMO_SITE_ID,
|
|
},
|
|
}
|
|
|
|
return ctx
|
|
|
|
|
|
def _anonymous_or_real(request):
|
|
if request.user.is_authenticated and not request.user.is_anonymous:
|
|
return request.user
|
|
return get_user_model().objects.get(username="anonymous")
|
|
|
|
|
|
def breadcrumbs(
|
|
first_level_object=None,
|
|
second_level_namespace=None,
|
|
second_level_object=None,
|
|
third_level_namespace=None,
|
|
third_level_object=None,
|
|
) -> List[Dict[str, str]]:
|
|
bread = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Package": s.SERVER_URL + "/package"},
|
|
]
|
|
if first_level_object is not None:
|
|
bread.append({first_level_object.name: first_level_object.url})
|
|
|
|
if second_level_namespace is not None:
|
|
bread.append(
|
|
{
|
|
f"{second_level_namespace}".capitalize(): first_level_object.url
|
|
+ f"/{second_level_namespace}"
|
|
}
|
|
)
|
|
|
|
if second_level_object is not None:
|
|
bread.append({second_level_object.name: second_level_object.url})
|
|
|
|
if third_level_namespace is not None:
|
|
bread.append(
|
|
{
|
|
f"{third_level_namespace}".capitalize(): second_level_object.url
|
|
+ f"/{third_level_namespace}"
|
|
}
|
|
)
|
|
|
|
if third_level_object is not None:
|
|
bread.append({third_level_object.name: third_level_object.url})
|
|
|
|
return bread
|
|
|
|
|
|
def set_scenarios(current_user, attach_object, scenario_urls: List[str]):
|
|
scens = []
|
|
for scenario_url in scenario_urls:
|
|
# As empty lists will be removed in POST request well send ['']
|
|
if scenario_url == "":
|
|
continue
|
|
|
|
package = PackageManager.get_package_by_url(current_user, scenario_url)
|
|
scen = Scenario.objects.get(package=package, uuid=scenario_url.split("/")[-1])
|
|
scens.append(scen)
|
|
|
|
attach_object.set_scenarios(scens)
|
|
|
|
|
|
def set_aliases(current_user, attach_object, aliases: List[str]):
|
|
if aliases == [""]:
|
|
aliases = []
|
|
|
|
attach_object.aliases = aliases
|
|
attach_object.save()
|
|
|
|
|
|
def copy_object(current_user, target_package: "Package", source_object_url: str):
|
|
# Ensures that source is readable
|
|
source_package = PackageManager.get_package_by_url(current_user, source_object_url)
|
|
|
|
if source_package == target_package:
|
|
raise ValueError(f"Can't copy object {source_object_url} to the same package!")
|
|
|
|
parser = EPDBURLParser(source_object_url)
|
|
|
|
# if the url won't contain a package or is a plain package
|
|
if not parser.contains_package_url():
|
|
raise ValueError(f"Object {source_object_url} can't be copied!")
|
|
|
|
# Gets the most specific object
|
|
source_object = parser.get_object()
|
|
|
|
if hasattr(source_object, "copy"):
|
|
mapping = dict()
|
|
copy = source_object.copy(target_package, mapping)
|
|
|
|
if s.DEBUG:
|
|
for k, v in mapping.items():
|
|
logger.debug(f"Mapping {k.url} to {v.url}")
|
|
|
|
return copy
|
|
|
|
raise ValueError(f"Object {source_object} can't be copied!")
|
|
|
|
|
|
def index(request):
|
|
context = get_base_context(request)
|
|
context["title"] = "enviPath - Home"
|
|
context["meta"]["current_package"] = context["meta"]["user"].default_package
|
|
|
|
if request.GET.get("getMLServerPath", False):
|
|
return JsonResponse({"mlServerPath": s.SERVER_URL})
|
|
|
|
return render(request, "index/index.html", context)
|
|
|
|
|
|
def packages(request):
|
|
current_user = _anonymous_or_real(request)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = "enviPath - Packages"
|
|
|
|
context["object_type"] = "package"
|
|
context["meta"]["current_package"] = context["meta"]["user"].default_package
|
|
context["meta"]["can_edit"] = True
|
|
|
|
reviewed_package_qs = Package.objects.filter(reviewed=True).order_by("created")
|
|
unreviewed_package_qs = PackageManager.get_all_readable_packages(current_user).order_by(
|
|
"name"
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_package_qs
|
|
context["unreviewed_objects"] = unreviewed_package_qs
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden in ["import-legacy-package-json", "import-package-json"]:
|
|
f = request.FILES["file"]
|
|
|
|
try:
|
|
file_data = f.read().decode("utf-8")
|
|
data = json.loads(file_data)
|
|
|
|
if hidden == "import-legacy-package-json":
|
|
pack = PackageManager.import_legacy_package(data, current_user)
|
|
else:
|
|
pack = PackageManager.import_package(data, current_user)
|
|
|
|
return redirect(pack.url)
|
|
except UnicodeDecodeError:
|
|
return error(request, "Invalid encoding.", "Invalid encoding, must be UTF-8")
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
else:
|
|
package_name = request.POST.get("package-name")
|
|
package_description = request.POST.get(
|
|
"package-description", s.DEFAULT_VALUES["description"]
|
|
)
|
|
|
|
created_package = PackageManager.create_package(
|
|
current_user, package_name, package_description
|
|
)
|
|
|
|
return redirect(created_package.url)
|
|
|
|
elif request.method == "OPTIONS":
|
|
response = HttpResponse()
|
|
response["allow"] = ",".join(["GET", "POST"])
|
|
return response
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def compounds(request):
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = "enviPath - Compounds"
|
|
|
|
context["object_type"] = "compound"
|
|
context["meta"]["current_package"] = context["meta"]["user"].default_package
|
|
|
|
reviewed_compound_qs = Compound.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_compound_qs |= Compound.objects.filter(package=p)
|
|
|
|
reviewed_compound_qs = reviewed_compound_qs.order_by("name")
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": True}
|
|
for pw in reviewed_compound_qs
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_compound_qs
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
# delegate to default package
|
|
current_user = _anonymous_or_real(request)
|
|
default_package = current_user.default_package
|
|
return package_compounds(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def rules(request):
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = "enviPath - Rules"
|
|
|
|
context["object_type"] = "rule"
|
|
context["meta"]["current_package"] = context["meta"]["user"].default_package
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Rule": s.SERVER_URL + "/rule"},
|
|
]
|
|
reviewed_rule_qs = Rule.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_rule_qs |= Rule.objects.filter(package=p)
|
|
|
|
reviewed_rule_qs = reviewed_rule_qs.order_by("name")
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": True}
|
|
for pw in reviewed_rule_qs
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_rule_qs
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
# delegate to default package
|
|
current_user = _anonymous_or_real(request)
|
|
default_package = current_user.default_package
|
|
return package_rules(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def reactions(request):
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = "enviPath - Reactions"
|
|
|
|
context["object_type"] = "reaction"
|
|
context["meta"]["current_package"] = context["meta"]["user"].default_package
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Reaction": s.SERVER_URL + "/reaction"},
|
|
]
|
|
reviewed_reaction_qs = Reaction.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_reaction_qs |= Reaction.objects.filter(package=p).order_by("name")
|
|
|
|
reviewed_reaction_qs = reviewed_reaction_qs.order_by("name")
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": True}
|
|
for pw in reviewed_reaction_qs
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_reaction_qs
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
# delegate to default package
|
|
current_user = _anonymous_or_real(request)
|
|
default_package = current_user.default_package
|
|
return package_reactions(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def pathways(request):
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = "enviPath - Pathways"
|
|
|
|
context["object_type"] = "pathway"
|
|
context["meta"]["current_package"] = context["meta"]["user"].default_package
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Pathway": s.SERVER_URL + "/pathway"},
|
|
]
|
|
|
|
reviewed_pathway_qs = Pathway.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_pathway_qs |= Pathway.objects.filter(package=p).order_by("name")
|
|
|
|
reviewed_pathway_qs = reviewed_pathway_qs.order_by("name")
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": True}
|
|
for pw in reviewed_pathway_qs
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_pathway_qs
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
# delegate to default package
|
|
current_user = _anonymous_or_real(request)
|
|
default_package = current_user.default_package
|
|
return package_pathways(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def scenarios(request):
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = "enviPath - Scenarios"
|
|
|
|
context["object_type"] = "scenario"
|
|
context["meta"]["current_package"] = context["meta"]["user"].default_package
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Scenario": s.SERVER_URL + "/scenario"},
|
|
]
|
|
|
|
reviewed_scenario_qs = Scenario.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_scenario_qs |= Scenario.objects.filter(package=p).order_by("name")
|
|
|
|
reviewed_scenario_qs = reviewed_scenario_qs.order_by("name")
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": s.name, "url": s.url, "reviewed": True}
|
|
for s in reviewed_scenario_qs
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_scenario_qs
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
# delegate to default package
|
|
default_package = request.user.default_package
|
|
return package_scenarios(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def models(request):
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = "enviPath - Models"
|
|
|
|
context["object_type"] = "model"
|
|
context["meta"]["current_package"] = context["meta"]["user"].default_package
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Model": s.SERVER_URL + "/model"},
|
|
]
|
|
|
|
context["model_types"] = {
|
|
"ML Relative Reasoning": "ml-relative-reasoning",
|
|
"Rule Based Relative Reasoning": "rule-based-relative-reasoning",
|
|
"EnviFormer": "enviformer",
|
|
}
|
|
|
|
for k, v in s.CLASSIFIER_PLUGINS.items():
|
|
context["model_types"][v.display()] = k
|
|
|
|
reviewed_model_qs = EPModel.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_model_qs |= EPModel.objects.filter(package=p).order_by("name")
|
|
|
|
reviewed_model_qs = reviewed_model_qs.order_by("name")
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": True}
|
|
for pw in reviewed_model_qs
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_model_qs
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
current_user = _anonymous_or_real(request)
|
|
default_package = current_user.default_package
|
|
return package_models(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def search(request):
|
|
current_user = _anonymous_or_real(request)
|
|
|
|
if request.method == "GET":
|
|
package_urls = request.GET.getlist("packages")
|
|
searchterm = request.GET.get("search")
|
|
mode = request.GET.get("mode")
|
|
|
|
# add HTTP_ACCEPT check to differentiate between index and ajax call
|
|
if "application/json" in request.META.get("HTTP_ACCEPT") and all([searchterm, mode]):
|
|
if package_urls:
|
|
packages = [
|
|
PackageManager.get_package_by_url(current_user, p) for p in package_urls
|
|
]
|
|
else:
|
|
packages = PackageManager.get_reviewed_packages()
|
|
|
|
search_result = SearchManager.search(packages, searchterm, mode)
|
|
|
|
return JsonResponse(search_result, safe=False)
|
|
|
|
context = get_base_context(request)
|
|
context["title"] = "enviPath - Search"
|
|
|
|
context["object_type"] = "model"
|
|
context["meta"]["current_package"] = context["meta"]["user"].default_package
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Search": s.SERVER_URL + "/search"},
|
|
]
|
|
|
|
reviewed_package_qs = PackageManager.get_reviewed_packages()
|
|
unreviewed_package_qs = PackageManager.get_all_readable_packages(current_user)
|
|
|
|
context["reviewed_objects"] = reviewed_package_qs
|
|
context["unreviewed_objects"] = unreviewed_package_qs
|
|
|
|
if all([searchterm, mode]):
|
|
if package_urls:
|
|
packages = [
|
|
PackageManager.get_package_by_url(current_user, p) for p in package_urls
|
|
]
|
|
else:
|
|
packages = PackageManager.get_reviewed_packages()
|
|
|
|
context["search_result"] = SearchManager.search(packages, searchterm, mode)
|
|
context["search_result"]["searchterm"] = searchterm
|
|
|
|
return render(request, "search.html", context)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_models(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - Models"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "model"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "model")
|
|
|
|
reviewed_model_qs = EPModel.objects.none()
|
|
unreviewed_model_qs = EPModel.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_model_qs = EPModel.objects.filter(package=current_package).order_by("name")
|
|
else:
|
|
unreviewed_model_qs = EPModel.objects.filter(package=current_package).order_by("name")
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (
|
|
reviewed_model_qs if current_package.reviewed else unreviewed_model_qs
|
|
)
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_model_qs
|
|
context["unreviewed_objects"] = unreviewed_model_qs
|
|
|
|
context["model_types"] = {
|
|
"ML Relative Reasoning": "mlrr",
|
|
"Rule Based Relative Reasoning": "rbrr",
|
|
}
|
|
|
|
if s.FLAGS.get("ENVIFORMER", False):
|
|
context["model_types"]["EnviFormer"] = "enviformer"
|
|
|
|
if s.FLAGS.get("PLUGINS", False):
|
|
for k, v in s.CLASSIFIER_PLUGINS.items():
|
|
context["model_types"][v.display()] = k
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
log_post_params(request)
|
|
|
|
name = request.POST.get("model-name")
|
|
description = request.POST.get("model-description")
|
|
|
|
model_type = request.POST.get("model-type")
|
|
|
|
# Generic fields for ML and Rule Based
|
|
rule_packages = request.POST.getlist("model-rule-packages")
|
|
data_packages = request.POST.getlist("model-data-packages")
|
|
|
|
# Generic params
|
|
params = {
|
|
"package": current_package,
|
|
"name": name,
|
|
"description": description,
|
|
"data_packages": [
|
|
PackageManager.get_package_by_url(current_user, p) for p in data_packages
|
|
],
|
|
}
|
|
|
|
if model_type == "enviformer":
|
|
threshold = float(request.POST.get("model-threshold", 0.5))
|
|
params["threshold"] = threshold
|
|
|
|
mod = EnviFormer.create(**params)
|
|
elif model_type == "mlrr":
|
|
# ML Specific
|
|
threshold = float(request.POST.get("model-threshold", 0.5))
|
|
# TODO handle additional fingerprinter
|
|
# fingerprinter = request.POST.get("model-fingerprinter")
|
|
|
|
params["rule_packages"] = [
|
|
PackageManager.get_package_by_url(current_user, p) for p in rule_packages
|
|
]
|
|
|
|
# App Domain related parameters
|
|
build_ad = request.POST.get("build-app-domain", False) == "on"
|
|
num_neighbors = request.POST.get("num-neighbors", 5)
|
|
reliability_threshold = request.POST.get("reliability-threshold", 0.5)
|
|
local_compatibility_threshold = request.POST.get("local-compatibility-threshold", 0.5)
|
|
|
|
params["threshold"] = threshold
|
|
# params['fingerprinter'] = fingerprinter
|
|
params["build_app_domain"] = build_ad
|
|
params["app_domain_num_neighbours"] = num_neighbors
|
|
params["app_domain_reliability_threshold"] = reliability_threshold
|
|
params["app_domain_local_compatibility_threshold"] = local_compatibility_threshold
|
|
|
|
mod = MLRelativeReasoning.create(**params)
|
|
elif model_type == "rbrr":
|
|
params["rule_packages"] = [
|
|
PackageManager.get_package_by_url(current_user, p) for p in rule_packages
|
|
]
|
|
|
|
mod = RuleBasedRelativeReasoning.create(**params)
|
|
elif s.FLAGS.get("PLUGINS", False) and model_type in s.CLASSIFIER_PLUGINS.values():
|
|
pass
|
|
else:
|
|
return error(
|
|
request, "Invalid model type.", f'Model type "{model_type}" is not supported."'
|
|
)
|
|
|
|
from .tasks import dispatch, build_model
|
|
|
|
dispatch(current_user, build_model, mod.pk)
|
|
|
|
return redirect(mod.url)
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_model(request, package_uuid, model_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_model = EPModel.objects.get(package=current_package, uuid=model_uuid)
|
|
|
|
if request.method == "GET":
|
|
classify = request.GET.get("classify", False)
|
|
ad_assessment = request.GET.get("app-domain-assessment", False)
|
|
|
|
if classify or ad_assessment:
|
|
smiles = request.GET.get("smiles", "").strip()
|
|
|
|
# Check if smiles is non empty and valid
|
|
if smiles == "":
|
|
return JsonResponse({"error": "Received empty SMILES"}, status=400)
|
|
|
|
try:
|
|
stand_smiles = FormatConverter.standardize(smiles)
|
|
except ValueError:
|
|
return JsonResponse({"error": f'"{smiles}" is not a valid SMILES'}, status=400)
|
|
|
|
if classify:
|
|
from epdb.tasks import dispatch_eager, predict_simple
|
|
|
|
res = dispatch_eager(current_user, predict_simple, current_model.pk, stand_smiles)
|
|
|
|
pred_res = current_model.predict(stand_smiles)
|
|
res = []
|
|
|
|
for pr in pred_res:
|
|
if len(pr) > 0:
|
|
products = []
|
|
for prod_set in pr.product_sets:
|
|
logger.debug(f"Checking {prod_set}")
|
|
products.append(tuple([x for x in prod_set]))
|
|
|
|
res.append(
|
|
{
|
|
"products": list(set(products)),
|
|
"probability": pr.probability,
|
|
"btrule": {k: getattr(pr.rule, k) for k in ["url", "name"]}
|
|
if pr.rule is not None
|
|
else None,
|
|
}
|
|
)
|
|
|
|
return JsonResponse(res, safe=False)
|
|
|
|
else:
|
|
app_domain_assessment = current_model.app_domain.assess(stand_smiles)
|
|
return JsonResponse(app_domain_assessment, safe=False)
|
|
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - {current_model.name}"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "model"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "model", current_model)
|
|
|
|
context["model"] = current_model
|
|
context["current_object"] = current_model
|
|
|
|
return render(request, "objects/model.html", context)
|
|
|
|
elif request.method == "POST":
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden == "delete":
|
|
current_model.delete()
|
|
return redirect(current_package.url + "/model")
|
|
elif hidden == "evaluate":
|
|
from .tasks import dispatch, evaluate_model
|
|
|
|
eval_type = request.POST.get("model-evaluation-type")
|
|
|
|
if eval_type not in ["sg", "mg"]:
|
|
return error(
|
|
request,
|
|
"Invalid evaluation type",
|
|
f'Evaluation type "{eval_type}" is not supported. Only "sg" and "mg" are supported.',
|
|
)
|
|
|
|
multigen = eval_type == "mg"
|
|
|
|
eval_packages = request.POST.getlist("model-evaluation-packages")
|
|
eval_package_ids = [
|
|
PackageManager.get_package_by_url(current_user, p).id for p in eval_packages
|
|
]
|
|
dispatch(current_user, evaluate_model, current_model.pk, multigen, eval_package_ids)
|
|
|
|
return redirect(current_model.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
else:
|
|
name = request.POST.get("model-name", "").strip()
|
|
description = request.POST.get("model-description", "").strip()
|
|
|
|
if any([name, description]):
|
|
if name:
|
|
current_model.name = name
|
|
|
|
if description:
|
|
current_model.description = description
|
|
|
|
current_model.save()
|
|
return redirect(current_model.url)
|
|
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == "GET":
|
|
if request.GET.get("export", False) == "true":
|
|
filename = f"{current_package.name.replace(' ', '_')}_{current_package.uuid}.json"
|
|
pack_json = PackageManager.export_package(
|
|
current_package, include_models=False, include_external_identifiers=False
|
|
)
|
|
response = JsonResponse(pack_json, content_type="application/json")
|
|
response["Content-Disposition"] = f'attachment; filename="{filename}"'
|
|
|
|
return response
|
|
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name}"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "package"
|
|
context["breadcrumbs"] = breadcrumbs(current_package)
|
|
|
|
context["package"] = current_package
|
|
|
|
user_perms = UserPackagePermission.objects.filter(package=current_package)
|
|
users = get_user_model().objects.exclude(
|
|
id__in=UserPackagePermission.objects.filter(package=current_package).values_list(
|
|
"user_id", flat=True
|
|
)
|
|
)
|
|
|
|
group_perms = GroupPackagePermission.objects.filter(package=current_package)
|
|
groups = Group.objects.exclude(
|
|
id__in=GroupPackagePermission.objects.filter(package=current_package).values_list(
|
|
"group_id", flat=True
|
|
)
|
|
)
|
|
|
|
context["users"] = users
|
|
context["groups"] = groups
|
|
context["user_permissions"] = user_perms
|
|
context["group_permissions"] = group_perms
|
|
|
|
return render(request, "objects/package.html", context)
|
|
|
|
elif request.method == "POST":
|
|
log_post_params(request)
|
|
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden == "delete":
|
|
if current_user.default_package == current_package:
|
|
return error(
|
|
request,
|
|
f'Package "{current_package.name}" is the default and cannot be deleted!',
|
|
"You cannot delete the default package. If you want to delete this package you have to set another default package first.",
|
|
)
|
|
|
|
logger.debug(current_package.delete())
|
|
return redirect(s.SERVER_URL + "/package")
|
|
elif hidden == "publish-package":
|
|
for g in Group.objects.filter(public=True):
|
|
PackageManager.update_permissions(
|
|
current_user, current_package, g, Permission.READ[0]
|
|
)
|
|
return redirect(current_package.url)
|
|
elif hidden == "copy":
|
|
object_to_copy = request.POST.get("object_to_copy")
|
|
|
|
if not object_to_copy:
|
|
return error(request, "No object to copy", "There was no object to copy.")
|
|
|
|
try:
|
|
copied_object = copy_object(current_user, current_package, object_to_copy)
|
|
except ValueError:
|
|
return JsonResponse(
|
|
{"error": f"Can't copy object {object_to_copy} to the same package!"},
|
|
status=400,
|
|
)
|
|
|
|
return JsonResponse({"success": copied_object.url})
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
new_package_name = request.POST.get("package-name")
|
|
new_package_description = request.POST.get("package-description")
|
|
|
|
grantee_url = request.POST.get("grantee")
|
|
read = request.POST.get("read") == "on"
|
|
write = request.POST.get("write") == "on"
|
|
owner = request.POST.get("owner") == "on"
|
|
|
|
cc_string = request.POST.get("license")
|
|
|
|
if new_package_name:
|
|
current_package.name = new_package_name
|
|
|
|
if new_package_description:
|
|
current_package.description = new_package_description
|
|
|
|
if any([new_package_name, new_package_description]):
|
|
current_package.save()
|
|
return redirect(current_package.url)
|
|
|
|
elif any([grantee_url, read, write, owner]):
|
|
if "user" in grantee_url:
|
|
grantee = UserManager.get_user_lp(grantee_url)
|
|
else:
|
|
grantee = GroupManager.get_group_lp(grantee_url)
|
|
|
|
max_perm = None
|
|
if read:
|
|
max_perm = Permission.READ[0]
|
|
if write:
|
|
max_perm = Permission.WRITE[0]
|
|
if owner:
|
|
max_perm = Permission.ALL[0]
|
|
|
|
PackageManager.update_permissions(current_user, current_package, grantee, max_perm)
|
|
return redirect(current_package.url)
|
|
elif cc_string is not None:
|
|
if cc_string == "no-license":
|
|
current_package.license = None
|
|
current_package.save()
|
|
return redirect(current_package.url)
|
|
else:
|
|
current_package.license = License.objects.get(cc_string=cc_string)
|
|
current_package.save()
|
|
|
|
return redirect(current_package.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_compounds(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - Compounds"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "compound"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "compound")
|
|
|
|
reviewed_compound_qs = Compound.objects.none()
|
|
unreviewed_compound_qs = Compound.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_compound_qs = Compound.objects.filter(package=current_package).order_by("name")
|
|
else:
|
|
unreviewed_compound_qs = Compound.objects.filter(package=current_package).order_by(
|
|
"name"
|
|
)
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (
|
|
reviewed_compound_qs
|
|
if current_package.reviewed
|
|
else unreviewed_compound_qs
|
|
)
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_compound_qs
|
|
context["unreviewed_objects"] = unreviewed_compound_qs
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
compound_name = request.POST.get("compound-name")
|
|
compound_smiles = request.POST.get("compound-smiles")
|
|
compound_description = request.POST.get("compound-description")
|
|
|
|
c = Compound.create(current_package, compound_smiles, compound_name, compound_description)
|
|
|
|
return redirect(c.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_compound(request, package_uuid, compound_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - {current_compound.name}"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "compound"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "compound", current_compound)
|
|
|
|
context["compound"] = current_compound
|
|
context["current_object"] = current_compound
|
|
|
|
return render(request, "objects/compound.html", context)
|
|
|
|
elif request.method == "POST":
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden == "delete":
|
|
current_compound.delete()
|
|
return redirect(current_package.url + "/compound")
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
if "selected-scenarios" in request.POST:
|
|
selected_scenarios = request.POST.getlist("selected-scenarios")
|
|
set_scenarios(current_user, current_compound, selected_scenarios)
|
|
return redirect(current_compound.url)
|
|
|
|
if "aliases" in request.POST:
|
|
aliases = request.POST.getlist("aliases")
|
|
|
|
try:
|
|
set_aliases(current_user, current_compound, aliases)
|
|
except Exception as e:
|
|
return JsonResponse({"error": str(e)}, status=400)
|
|
|
|
return JsonResponse({"success": current_compound.url})
|
|
|
|
new_compound_name = request.POST.get("compound-name", "").strip()
|
|
new_compound_description = request.POST.get("compound-description", "").strip()
|
|
|
|
if new_compound_name:
|
|
current_compound.name = new_compound_name
|
|
|
|
if new_compound_description:
|
|
current_compound.description = new_compound_description
|
|
|
|
if any([new_compound_name, new_compound_description]):
|
|
current_compound.save()
|
|
return redirect(current_compound.url)
|
|
|
|
selected_database = request.POST.get("selected-database", "").strip()
|
|
external_identifier = request.POST.get("identifier", "").strip()
|
|
|
|
if selected_database and external_identifier:
|
|
db = ExternalDatabase.objects.get(id=int(selected_database))
|
|
ExternalIdentifier.objects.create(
|
|
content_object=current_compound,
|
|
database=db,
|
|
identifier_value=external_identifier,
|
|
url=db.url_pattern.format(id=external_identifier),
|
|
is_primary=False,
|
|
)
|
|
return redirect(current_compound.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_compound_structures(request, package_uuid, compound_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = (
|
|
f"enviPath - {current_package.name} - {current_compound.name} - Structures"
|
|
)
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "structure"
|
|
context["breadcrumbs"] = breadcrumbs(
|
|
current_package, "compound", current_compound, "structure"
|
|
)
|
|
|
|
reviewed_compound_structure_qs = CompoundStructure.objects.none()
|
|
unreviewed_compound_structure_qs = CompoundStructure.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_compound_structure_qs = current_compound.structures.order_by("name")
|
|
else:
|
|
unreviewed_compound_structure_qs = current_compound.structures.order_by("name")
|
|
|
|
context["reviewed_objects"] = reviewed_compound_structure_qs
|
|
context["unreviewed_objects"] = unreviewed_compound_structure_qs
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
structure_name = request.POST.get("structure-name")
|
|
structure_smiles = request.POST.get("structure-smiles")
|
|
structure_description = request.POST.get("structure-description")
|
|
|
|
try:
|
|
cs = current_compound.add_structure(
|
|
structure_smiles, structure_name, structure_description
|
|
)
|
|
except ValueError:
|
|
return error(
|
|
request,
|
|
"Adding structure failed!",
|
|
"The structure could not be added as normalized structures don't match!",
|
|
)
|
|
|
|
return redirect(cs.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid)
|
|
current_structure = CompoundStructure.objects.get(
|
|
compound=current_compound, uuid=structure_uuid
|
|
)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = (
|
|
f"enviPath - {current_package.name} - {current_compound.name} - {current_structure.name}"
|
|
)
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "structure"
|
|
|
|
context["compound_structure"] = current_structure
|
|
context["current_object"] = current_structure
|
|
context["breadcrumbs"] = breadcrumbs(
|
|
current_package, "compound", current_compound, "structure", current_structure
|
|
)
|
|
|
|
return render(request, "objects/compound_structure.html", context)
|
|
|
|
elif request.method == "POST":
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden == "delete":
|
|
# Check if we have to delete the compound as no structure is left
|
|
if len(current_structure.compound.structures.all()) == 1:
|
|
# This will delete the structure as well
|
|
current_compound.delete()
|
|
return redirect(current_package.url + "/compound")
|
|
else:
|
|
if current_structure.normalized_structure:
|
|
current_compound.delete()
|
|
return redirect(current_package.url + "/compound")
|
|
else:
|
|
if current_compound.default_structure == current_structure:
|
|
current_structure.delete()
|
|
current_compound.default_structure = (
|
|
current_compound.structures.all().first()
|
|
)
|
|
return redirect(current_compound.url + "/structure")
|
|
else:
|
|
current_structure.delete()
|
|
return redirect(current_compound.url + "/structure")
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
new_structure_name = request.POST.get("compound-structure-name", "").strip()
|
|
new_structure_description = request.POST.get("compound-structure-description", "").strip()
|
|
|
|
if new_structure_name:
|
|
current_structure.name = new_structure_name
|
|
|
|
if new_structure_description:
|
|
current_structure.description = new_structure_description
|
|
|
|
if any([new_structure_name, new_structure_description]):
|
|
current_structure.save()
|
|
return redirect(current_structure.url)
|
|
|
|
if "selected-scenarios" in request.POST:
|
|
selected_scenarios = request.POST.getlist("selected-scenarios")
|
|
|
|
set_scenarios(current_user, current_structure, selected_scenarios)
|
|
return redirect(current_structure.url)
|
|
|
|
if "aliases" in request.POST:
|
|
aliases = request.POST.getlist("aliases")
|
|
|
|
try:
|
|
set_aliases(current_user, current_structure, aliases)
|
|
except Exception as e:
|
|
return JsonResponse({"error": str(e)}, status=400)
|
|
|
|
return JsonResponse({"success": current_structure.url})
|
|
|
|
selected_database = request.POST.get("selected-database", "").strip()
|
|
external_identifier = request.POST.get("identifier", "").strip()
|
|
|
|
if selected_database and external_identifier:
|
|
db = ExternalDatabase.objects.get(id=int(selected_database))
|
|
ExternalIdentifier.objects.create(
|
|
content_object=current_structure,
|
|
database=db,
|
|
identifier_value=external_identifier,
|
|
url=db.url_pattern.format(id=external_identifier),
|
|
is_primary=False,
|
|
)
|
|
return redirect(current_structure.url)
|
|
|
|
return HttpResponseBadRequest()
|
|
else:
|
|
return HttpResponseNotAllowed(
|
|
[
|
|
"GET",
|
|
]
|
|
)
|
|
|
|
|
|
@package_permission_required()
|
|
def package_rules(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - Rules"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "rule"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "rule")
|
|
|
|
reviewed_rule_qs = Rule.objects.none()
|
|
unreviewed_rule_qs = Rule.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_rule_qs = Rule.objects.filter(package=current_package).order_by("name")
|
|
else:
|
|
unreviewed_rule_qs = Rule.objects.filter(package=current_package).order_by("name")
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (
|
|
reviewed_rule_qs if current_package.reviewed else unreviewed_rule_qs
|
|
)
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_rule_qs
|
|
context["unreviewed_objects"] = unreviewed_rule_qs
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
log_post_params(request)
|
|
|
|
# Generic params
|
|
rule_name = request.POST.get("rule-name")
|
|
rule_description = request.POST.get("rule-description")
|
|
|
|
rule_type = request.POST.get("rule-type")
|
|
|
|
params = {}
|
|
|
|
# Obtain parameters as required by rule type
|
|
if rule_type == "SimpleAmbitRule":
|
|
params["smirks"] = request.POST.get("rule-smirks")
|
|
params["reactant_filter_smarts"] = request.POST.get("rule-reactant-smarts")
|
|
params["product_filter_smarts"] = request.POST.get("rule-product-smarts")
|
|
elif rule_type == "SimpleRDKitRule":
|
|
params["reaction_smarts"] = request.POST.get("rule-reaction-smarts")
|
|
elif rule_type == "ParallelRule":
|
|
pass
|
|
elif rule_type == "SequentialRule":
|
|
pass
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
r = Rule.create(
|
|
rule_type=rule_type,
|
|
package=current_package,
|
|
name=rule_name,
|
|
description=rule_description,
|
|
**params,
|
|
)
|
|
return redirect(r.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_rule(request, package_uuid, rule_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_rule = Rule.objects.get(package=current_package, uuid=rule_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
|
|
if smiles := request.GET.get("smiles", False):
|
|
stand_smiles = FormatConverter.standardize(smiles)
|
|
res = current_rule.apply(stand_smiles)
|
|
if len(res) > 1:
|
|
logger.info(
|
|
f"Rule {current_rule.uuid} returned multiple product sets on {smiles}, picking the first one."
|
|
)
|
|
# Some Rules are touching unrelated areas which might result in ~ indicating
|
|
# any bond (-, =, #). For drawing we need a concrete bond. -> use single bond
|
|
product_smiles = [x.replace("~", "-") for x in res[0]]
|
|
smirks = f"{stand_smiles}>>{'.'.join(sorted(product_smiles))}"
|
|
# Usually the functional groups are a mapping of fg -> count
|
|
# As we are doing it on the fly here fake a high count to ensure that its properly highlighted
|
|
|
|
if isinstance(current_rule, SimpleAmbitRule):
|
|
educt_functional_groups = {current_rule.reactants_smarts: 1000}
|
|
product_functional_groups = {current_rule.products_smarts: 1000}
|
|
else:
|
|
educt_functional_groups = {x: 1000 for x in current_rule.reactants_smarts}
|
|
product_functional_groups = {x: 1000 for x in current_rule.products_smarts}
|
|
|
|
return HttpResponse(
|
|
IndigoUtils.smirks_to_svg(
|
|
smirks,
|
|
False,
|
|
0,
|
|
0,
|
|
educt_functional_groups=educt_functional_groups,
|
|
product_functional_groups=product_functional_groups,
|
|
),
|
|
content_type="image/svg+xml",
|
|
)
|
|
|
|
context["title"] = f"enviPath - {current_package.name} - {current_rule.name}"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "rule"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "rule", current_rule)
|
|
|
|
context["rule"] = current_rule
|
|
context["current_object"] = current_rule
|
|
|
|
if isinstance(current_rule, SimpleAmbitRule):
|
|
return render(request, "objects/simple_rule.html", context)
|
|
else: # isinstance(current_rule, ParallelRule) or isinstance(current_rule, SequentialRule):
|
|
return render(request, "objects/composite_rule.html", context)
|
|
|
|
elif request.method == "POST":
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden == "delete":
|
|
current_rule.delete()
|
|
return redirect(current_package.url + "/rule")
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
if "selected-scenarios" in request.POST:
|
|
selected_scenarios = request.POST.getlist("selected-scenarios")
|
|
|
|
set_scenarios(current_user, current_rule, selected_scenarios)
|
|
return redirect(current_rule.url)
|
|
|
|
if "aliases" in request.POST:
|
|
aliases = request.POST.getlist("aliases")
|
|
|
|
try:
|
|
set_aliases(current_user, current_rule, aliases)
|
|
except Exception as e:
|
|
return JsonResponse({"error": str(e)}, status=400)
|
|
|
|
return JsonResponse({"success": current_rule.url})
|
|
|
|
rule_name = request.POST.get("rule-name", "").strip()
|
|
rule_description = request.POST.get("rule-description", "").strip()
|
|
|
|
if rule_name:
|
|
current_rule.name = rule_name
|
|
|
|
if rule_description:
|
|
current_rule.description = rule_description
|
|
|
|
if any([rule_name, rule_description]):
|
|
current_rule.save()
|
|
return redirect(current_rule.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_rule_enzymelink(request, package_uuid, rule_uuid, enzymelink_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_rule = Rule.objects.get(package=current_package, uuid=rule_uuid)
|
|
current_enzymelink = EnzymeLink.objects.get(rule=current_rule, uuid=enzymelink_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
|
|
context["title"] = f"enviPath - {current_package.name} - {current_rule.name}"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "enzyme"
|
|
context["breadcrumbs"] = breadcrumbs(
|
|
current_package, "rule", current_rule, "enzymelink", current_enzymelink
|
|
)
|
|
|
|
context["enzymelink"] = current_enzymelink
|
|
context["current_object"] = current_enzymelink
|
|
|
|
return render(request, "objects/enzymelink.html", context)
|
|
|
|
return HttpResponseNotAllowed(["GET"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_reactions(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - {current_package.name} - Reactions"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "reaction"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "reaction")
|
|
|
|
reviewed_reaction_qs = Reaction.objects.none()
|
|
unreviewed_reaction_qs = Reaction.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by("name")
|
|
else:
|
|
unreviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by(
|
|
"name"
|
|
)
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (
|
|
reviewed_reaction_qs
|
|
if current_package.reviewed
|
|
else unreviewed_reaction_qs
|
|
)
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_reaction_qs
|
|
context["unreviewed_objects"] = unreviewed_reaction_qs
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
reaction_name = request.POST.get("reaction-name")
|
|
reaction_description = request.POST.get("reaction-description")
|
|
reactions_smirks = request.POST.get("reaction-smirks")
|
|
|
|
educts = reactions_smirks.split(">>")[0].split(".")
|
|
products = reactions_smirks.split(">>")[1].split(".")
|
|
|
|
r = Reaction.create(
|
|
current_package,
|
|
name=reaction_name,
|
|
description=reaction_description,
|
|
educts=educts,
|
|
products=products,
|
|
)
|
|
|
|
return redirect(r.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_reaction(request, package_uuid, reaction_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_reaction = Reaction.objects.get(package=current_package, uuid=reaction_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - {current_reaction.name}"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "reaction"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "reaction", current_reaction)
|
|
|
|
context["reaction"] = current_reaction
|
|
context["current_object"] = current_reaction
|
|
|
|
return render(request, "objects/reaction.html", context)
|
|
|
|
elif request.method == "POST":
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden == "delete":
|
|
current_reaction.delete()
|
|
return redirect(current_package.url + "/reaction")
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
if "selected-scenarios" in request.POST:
|
|
selected_scenarios = request.POST.getlist("selected-scenarios")
|
|
|
|
set_scenarios(current_user, current_reaction, selected_scenarios)
|
|
return redirect(current_reaction.url)
|
|
|
|
if "aliases" in request.POST:
|
|
aliases = request.POST.getlist("aliases")
|
|
|
|
try:
|
|
set_aliases(current_user, current_reaction, aliases)
|
|
except Exception as e:
|
|
return JsonResponse({"error": str(e)}, status=400)
|
|
|
|
return JsonResponse({"success": current_reaction.url})
|
|
|
|
new_reaction_name = request.POST.get("reaction-name", "").strip()
|
|
new_reaction_description = request.POST.get("reaction-description", "").strip()
|
|
|
|
if new_reaction_name:
|
|
current_reaction.name = new_reaction_name
|
|
|
|
if new_reaction_description:
|
|
current_reaction.description = new_reaction_description
|
|
|
|
if any([new_reaction_name, new_reaction_description]):
|
|
current_reaction.save()
|
|
return redirect(current_reaction.url)
|
|
|
|
selected_database = request.POST.get("selected-database", "").strip()
|
|
external_identifier = request.POST.get("identifier", "").strip()
|
|
|
|
if selected_database and external_identifier:
|
|
db = ExternalDatabase.objects.get(id=int(selected_database))
|
|
ExternalIdentifier.objects.create(
|
|
content_object=current_reaction,
|
|
database=db,
|
|
identifier_value=external_identifier,
|
|
url=db.url_pattern.format(id=external_identifier),
|
|
is_primary=False,
|
|
)
|
|
return redirect(current_reaction.url)
|
|
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathways(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - Pathways"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "pathway"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "pathway")
|
|
|
|
reviewed_pathway_qs = Pathway.objects.none()
|
|
unreviewed_pathway_qs = Pathway.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by("name")
|
|
else:
|
|
unreviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by("name")
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (
|
|
reviewed_pathway_qs
|
|
if current_package.reviewed
|
|
else unreviewed_pathway_qs
|
|
)
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_pathway_qs
|
|
context["unreviewed_objects"] = unreviewed_pathway_qs
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
log_post_params(request)
|
|
|
|
name = request.POST.get("name")
|
|
description = request.POST.get("description")
|
|
pw_mode = request.POST.get("predict", "predict").strip()
|
|
smiles = request.POST.get("smiles", "").strip()
|
|
|
|
if "smiles" in request.POST and smiles == "":
|
|
return error(
|
|
request,
|
|
"Pathway prediction failed!",
|
|
"Pathway prediction failed due to missing or empty SMILES",
|
|
)
|
|
|
|
smiles = smiles.strip()
|
|
|
|
try:
|
|
stand_smiles = FormatConverter.standardize(smiles)
|
|
except ValueError:
|
|
return error(
|
|
request,
|
|
"Pathway prediction failed!",
|
|
f'Pathway prediction failed as standardization of SMILES "{smiles}" failed!',
|
|
)
|
|
|
|
modes = ["predict", "build", "incremental"]
|
|
if pw_mode not in modes:
|
|
return error(
|
|
request,
|
|
"Pathway prediction failed!",
|
|
f'Pathway prediction failed as received mode "{pw_mode}" is none of {modes}',
|
|
)
|
|
|
|
prediction_setting = request.POST.get("prediction-setting", None)
|
|
if prediction_setting:
|
|
prediction_setting = SettingManager.get_setting_by_url(current_user, prediction_setting)
|
|
else:
|
|
prediction_setting = current_user.prediction_settings()
|
|
|
|
pw = Pathway.create(current_package, stand_smiles, name=name, description=description)
|
|
|
|
# set mode
|
|
pw.kv.update({"mode": pw_mode})
|
|
pw.save()
|
|
|
|
if pw_mode == "predict" or pw_mode == "incremental":
|
|
# unlimited pred (will be handled by setting)
|
|
limit = -1
|
|
|
|
# For incremental predict first level and return
|
|
if pw_mode == "incremental":
|
|
limit = 1
|
|
|
|
pw.setting = prediction_setting
|
|
pw.save()
|
|
|
|
from .tasks import dispatch, predict
|
|
|
|
dispatch(current_user, predict, pw.pk, prediction_setting.pk, limit=limit)
|
|
|
|
return redirect(pw.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathway(request, package_uuid, pathway_uuid):
|
|
current_user: User = _anonymous_or_real(request)
|
|
current_package: Package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_pathway: Pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
|
|
|
|
if request.method == "GET":
|
|
if request.GET.get("last_modified", False):
|
|
return JsonResponse(
|
|
{"modified": current_pathway.modified.strftime("%Y-%m-%d %H:%M:%S")}
|
|
)
|
|
|
|
if request.GET.get("status", False):
|
|
return JsonResponse(
|
|
{
|
|
"status": current_pathway.status(),
|
|
"modified": current_pathway.modified.strftime("%Y-%m-%d %H:%M:%S"),
|
|
}
|
|
)
|
|
|
|
if request.GET.get("download", False) == "true":
|
|
filename = f"{current_pathway.name.replace(' ', '_')}_{current_pathway.uuid}.csv"
|
|
csv_pw = current_pathway.to_csv()
|
|
response = HttpResponse(csv_pw, content_type="text/csv")
|
|
response["Content-Disposition"] = f'attachment; filename="{filename}"'
|
|
|
|
return response
|
|
|
|
if (
|
|
request.GET.get("identify-missing-rules", False) == "true"
|
|
and request.GET.get("rule-package") is not None
|
|
):
|
|
from .tasks import dispatch_eager, identify_missing_rules
|
|
|
|
rule_package = PackageManager.get_package_by_url(
|
|
current_user, request.GET.get("rule-package")
|
|
)
|
|
res = dispatch_eager(
|
|
current_user, identify_missing_rules, [current_pathway.pk], rule_package.pk
|
|
)
|
|
|
|
filename = f"{current_pathway.name.replace(' ', '_')}_{current_pathway.uuid}.csv"
|
|
response = HttpResponse(res, content_type="text/csv")
|
|
response["Content-Disposition"] = f'attachment; filename="{filename}"'
|
|
|
|
return response
|
|
|
|
# Pathway d3_json() relies on a lot of related objects (Nodes, Structures, Edges, Reaction, Rules, ...)
|
|
# we will again fetch the current pathway identified by this url, but this time together with nearly all
|
|
# related objects
|
|
|
|
current_pathway = Pathway.objects.prefetch_related(
|
|
"node_set",
|
|
"node_set__out_edges",
|
|
"node_set__default_node_label",
|
|
"node_set__scenarios",
|
|
"edge_set",
|
|
"edge_set__start_nodes",
|
|
"edge_set__end_nodes",
|
|
"edge_set__edge_label",
|
|
"edge_set__scenarios",
|
|
).get(uuid=pathway_uuid)
|
|
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - {current_pathway.name}"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "pathway"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "pathway", current_pathway)
|
|
|
|
context["pathway"] = current_pathway
|
|
context["current_object"] = current_pathway
|
|
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Package": s.SERVER_URL + "/package"},
|
|
{current_package.name: current_package.url},
|
|
{"Pathway": current_package.url + "/pathway"},
|
|
{current_pathway.name: current_pathway.url},
|
|
]
|
|
|
|
return render(request, "objects/pathway.html", context)
|
|
# return render(request, 'pathway_playground2.html', context)
|
|
|
|
elif request.method == "POST":
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden == "delete":
|
|
current_pathway.delete()
|
|
return redirect(current_package.url + "/pathway")
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
if "selected-scenarios" in request.POST:
|
|
selected_scenarios = request.POST.getlist("selected-scenarios")
|
|
|
|
set_scenarios(current_user, current_pathway, selected_scenarios)
|
|
return redirect(current_pathway.url)
|
|
|
|
if "aliases" in request.POST:
|
|
aliases = request.POST.getlist("aliases")
|
|
|
|
try:
|
|
set_aliases(current_user, current_pathway, aliases)
|
|
except Exception as e:
|
|
return JsonResponse({"error": str(e)}, status=400)
|
|
|
|
return JsonResponse({"success": current_pathway.url})
|
|
|
|
pathway_name = request.POST.get("pathway-name")
|
|
pathway_description = request.POST.get("pathway-description")
|
|
|
|
if any([pathway_name, pathway_description]):
|
|
if pathway_name is not None and pathway_name.strip() != "":
|
|
pathway_name = pathway_name.strip()
|
|
|
|
current_pathway.name = pathway_name
|
|
|
|
if pathway_description is not None and pathway_description.strip() != "":
|
|
pathway_description = pathway_description.strip()
|
|
|
|
current_pathway.description = pathway_description
|
|
|
|
current_pathway.save()
|
|
return redirect(current_pathway.url)
|
|
|
|
node_url = request.POST.get("node")
|
|
|
|
if node_url:
|
|
n = current_pathway.get_node(node_url)
|
|
|
|
from .tasks import dispatch, predict
|
|
|
|
dispatch(
|
|
current_user,
|
|
predict,
|
|
current_pathway.pk,
|
|
current_pathway.setting.pk,
|
|
node_pk=n.pk,
|
|
)
|
|
|
|
return JsonResponse({"success": current_pathway.url})
|
|
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathway_nodes(request, package_uuid, pathway_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - {current_pathway.name} - Nodes"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "node"
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Package": s.SERVER_URL + "/package"},
|
|
{current_package.name: current_package.url},
|
|
{"Pathway": current_package.url + "/pathway"},
|
|
{current_pathway.name: current_pathway.url},
|
|
{"Node": current_pathway.url + "/node"},
|
|
]
|
|
|
|
reviewed_node_qs = Node.objects.none()
|
|
unreviewed_node_qs = Node.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by("name")
|
|
else:
|
|
unreviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by("name")
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (
|
|
reviewed_node_qs if current_package.reviewed else unreviewed_node_qs
|
|
)
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_node_qs
|
|
context["unreviewed_objects"] = unreviewed_node_qs
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
node_name = request.POST.get("node-name")
|
|
node_description = request.POST.get("node-description")
|
|
node_smiles = request.POST.get("node-smiles")
|
|
|
|
current_pathway.add_node(node_smiles, name=node_name, description=node_description)
|
|
|
|
return redirect(current_pathway.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathway_node(request, package_uuid, pathway_uuid, node_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
|
|
current_node = Node.objects.get(pathway=current_pathway, uuid=node_uuid)
|
|
|
|
if request.method == "GET":
|
|
is_image_request = request.GET.get("image")
|
|
is_highlight_request = request.GET.get("highlight", False)
|
|
is_highlight_reactivity = request.GET.get("highlightReactivity", False)
|
|
if is_image_request:
|
|
if is_image_request == "svg":
|
|
# TODO optimize this chain
|
|
if is_highlight_request:
|
|
# User functional groups covered by the model training data
|
|
fgs = {}
|
|
if current_pathway.setting:
|
|
if current_pathway.setting.model:
|
|
if current_pathway.setting.model.app_domain:
|
|
fgs = current_pathway.setting.model.app_domain.functional_groups
|
|
|
|
svg_data = IndigoUtils.mol_to_svg(
|
|
current_node.default_node_label.smiles, functional_groups=fgs
|
|
)
|
|
elif is_highlight_reactivity:
|
|
# Use reactant smarts to show all reaction sites
|
|
# set a high count to obtain a strong color
|
|
ad_data = current_node.get_app_domain_assessment_data()
|
|
fgs = {}
|
|
for t in ad_data.get("assessment", {}).get("transformations", []):
|
|
r = Rule.objects.get(url=t["rule"]["url"])
|
|
|
|
if isinstance(r, SimpleAmbitRule):
|
|
fgs[r.reactants_smarts] = 1000
|
|
else:
|
|
for sr in r.srs:
|
|
fgs[sr.reactants_smarts] = 1000
|
|
|
|
svg_data = IndigoUtils.mol_to_svg(
|
|
current_node.default_node_label.smiles, functional_groups=fgs
|
|
)
|
|
else:
|
|
svg_data = current_node.as_svg
|
|
|
|
return HttpResponse(svg_data, content_type="image/svg+xml")
|
|
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - {current_pathway.name}"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "pathway"
|
|
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Package": s.SERVER_URL + "/package"},
|
|
{current_package.name: current_package.url},
|
|
{"Pathway": current_package.url + "/pathway"},
|
|
{current_pathway.name: current_pathway.url},
|
|
{"Node": current_pathway.url + "/node"},
|
|
{current_node.name: current_node.url},
|
|
]
|
|
|
|
context["node"] = current_node
|
|
context["current_object"] = current_node
|
|
|
|
context["app_domain_assessment_data"] = json.dumps(
|
|
current_node.get_app_domain_assessment_data()
|
|
)
|
|
|
|
return render(request, "objects/node.html", context)
|
|
|
|
elif request.method == "POST":
|
|
log_post_params(request)
|
|
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden == "delete":
|
|
# pre_delete signal will take care of edge deletion
|
|
current_node.delete()
|
|
|
|
return redirect(current_pathway.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
if "selected-scenarios" in request.POST:
|
|
selected_scenarios = request.POST.getlist("selected-scenarios")
|
|
|
|
set_scenarios(current_user, current_node, selected_scenarios)
|
|
return redirect(current_node.url)
|
|
|
|
if "aliases" in request.POST:
|
|
aliases = request.POST.getlist("aliases")
|
|
|
|
try:
|
|
set_aliases(current_user, current_node, aliases)
|
|
except Exception as e:
|
|
return JsonResponse({"error": str(e)}, status=400)
|
|
|
|
return JsonResponse({"success": current_node.url})
|
|
|
|
return HttpResponseBadRequest()
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathway_edges(request, package_uuid, pathway_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - {current_pathway.name} - Edges"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "edge"
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Package": s.SERVER_URL + "/package"},
|
|
{current_package.name: current_package.url},
|
|
{"Pathway": current_package.url + "/pathway"},
|
|
{current_pathway.name: current_pathway.url},
|
|
{"Edge": current_pathway.url + "/edge"},
|
|
]
|
|
|
|
reviewed_edge_qs = Edge.objects.none()
|
|
unreviewed_edge_qs = Edge.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by("name")
|
|
else:
|
|
unreviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by("name")
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (
|
|
reviewed_edge_qs if current_package.reviewed else unreviewed_edge_qs
|
|
)
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_edge_qs
|
|
context["unreviewed_objects"] = unreviewed_edge_qs
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
elif request.method == "POST":
|
|
log_post_params(request)
|
|
|
|
edge_name = request.POST.get("edge-name")
|
|
edge_description = request.POST.get("edge-description")
|
|
edge_substrates = request.POST.getlist("edge-substrates")
|
|
edge_products = request.POST.getlist("edge-products")
|
|
|
|
substrate_nodes = [current_pathway.get_node(url) for url in edge_substrates]
|
|
product_nodes = [current_pathway.get_node(url) for url in edge_products]
|
|
|
|
# TODO in the future consider Rules here?
|
|
current_pathway.add_edge(
|
|
substrate_nodes, product_nodes, name=edge_name, description=edge_description
|
|
)
|
|
|
|
return redirect(current_pathway.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
|
|
current_edge = Edge.objects.get(pathway=current_pathway, uuid=edge_uuid)
|
|
|
|
if request.method == "GET":
|
|
is_image_request = request.GET.get("image")
|
|
if is_image_request:
|
|
if is_image_request == "svg":
|
|
svg_data = current_edge.as_svg
|
|
return HttpResponse(svg_data, content_type="image/svg+xml")
|
|
|
|
context = get_base_context(request)
|
|
context["title"] = (
|
|
f"enviPath - {current_package.name} - {current_pathway.name} - {current_edge.edge_label.name}"
|
|
)
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "edge"
|
|
context["breadcrumbs"] = breadcrumbs(
|
|
current_package, "pathway", current_pathway, "edge", current_edge
|
|
)
|
|
context["edge"] = current_edge
|
|
context["current_object"] = current_edge
|
|
|
|
return render(request, "objects/edge.html", context)
|
|
|
|
elif request.method == "POST":
|
|
log_post_params(request)
|
|
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden == "delete":
|
|
current_edge.delete()
|
|
return redirect(current_pathway.url)
|
|
|
|
if "selected-scenarios" in request.POST:
|
|
selected_scenarios = request.POST.getlist("selected-scenarios")
|
|
|
|
set_scenarios(current_user, current_edge, selected_scenarios)
|
|
return redirect(current_edge.url)
|
|
|
|
if "aliases" in request.POST:
|
|
aliases = request.POST.getlist("aliases")
|
|
|
|
try:
|
|
set_aliases(current_user, current_edge, aliases)
|
|
except Exception as e:
|
|
return JsonResponse({"error": str(e)}, status=400)
|
|
|
|
return JsonResponse({"success": current_edge.url})
|
|
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_scenarios(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == "GET":
|
|
if "application/json" in request.META.get("HTTP_ACCEPT") and not request.GET.get(
|
|
"all", False
|
|
):
|
|
scens = Scenario.objects.filter(package=current_package).order_by("name")
|
|
res = [{"name": s.name, "url": s.url, "uuid": s.uuid} for s in scens]
|
|
return JsonResponse(res, safe=False)
|
|
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - Scenarios"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "scenario"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "scenario")
|
|
|
|
reviewed_scenario_qs = Scenario.objects.none()
|
|
unreviewed_scenario_qs = Scenario.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by("name")
|
|
else:
|
|
unreviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by(
|
|
"name"
|
|
)
|
|
|
|
if request.GET.get("all"):
|
|
return JsonResponse(
|
|
{
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (
|
|
reviewed_scenario_qs
|
|
if current_package.reviewed
|
|
else unreviewed_scenario_qs
|
|
)
|
|
]
|
|
}
|
|
)
|
|
|
|
context["reviewed_objects"] = reviewed_scenario_qs
|
|
context["unreviewed_objects"] = unreviewed_scenario_qs
|
|
|
|
from envipy_additional_information import (
|
|
SLUDGE_ADDITIONAL_INFORMATION,
|
|
SOIL_ADDITIONAL_INFORMATION,
|
|
SEDIMENT_ADDITIONAL_INFORMATION,
|
|
)
|
|
|
|
context["scenario_types"] = {
|
|
"Soil Data": {
|
|
"name": "soil",
|
|
"widgets": [
|
|
HTMLGenerator.generate_html(ai, prefix=f"soil_{0}")
|
|
for ai in [x for s in SOIL_ADDITIONAL_INFORMATION.values() for x in s]
|
|
],
|
|
},
|
|
"Sludge Data": {
|
|
"name": "sludge",
|
|
"widgets": [
|
|
HTMLGenerator.generate_html(ai, prefix=f"sludge_{0}")
|
|
for ai in [x for s in SLUDGE_ADDITIONAL_INFORMATION.values() for x in s]
|
|
],
|
|
},
|
|
"Water-Sediment System Data": {
|
|
"name": "sediment",
|
|
"widgets": [
|
|
HTMLGenerator.generate_html(ai, prefix=f"sediment_{0}")
|
|
for ai in [x for s in SEDIMENT_ADDITIONAL_INFORMATION.values() for x in s]
|
|
],
|
|
},
|
|
}
|
|
|
|
context["sludge_additional_information"] = SLUDGE_ADDITIONAL_INFORMATION
|
|
context["soil_additional_information"] = SOIL_ADDITIONAL_INFORMATION
|
|
context["sediment_additional_information"] = SEDIMENT_ADDITIONAL_INFORMATION
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
elif request.method == "POST":
|
|
log_post_params(request)
|
|
|
|
scenario_name = request.POST.get("scenario-name")
|
|
scenario_description = request.POST.get("scenario-description")
|
|
scenario_date_year = request.POST.get("scenario-date-year")
|
|
scenario_date_month = request.POST.get("scenario-date-month")
|
|
scenario_date_day = request.POST.get("scenario-date-day")
|
|
|
|
scenario_date = scenario_date_year
|
|
if scenario_date_month is not None and scenario_date_month.strip() != "":
|
|
scenario_date += f"-{int(scenario_date_month):02d}"
|
|
if scenario_date_day is not None and scenario_date_day.strip() != "":
|
|
scenario_date += f"-{int(scenario_date_day):02d}"
|
|
|
|
scenario_type = request.POST.get("scenario-type")
|
|
|
|
additional_information = HTMLGenerator.build_models(request.POST.dict())
|
|
additional_information = [x for s in additional_information.values() for x in s]
|
|
|
|
s = Scenario.create(
|
|
current_package,
|
|
name=scenario_name,
|
|
description=scenario_description,
|
|
scenario_date=scenario_date,
|
|
scenario_type=scenario_type,
|
|
additional_information=additional_information,
|
|
)
|
|
|
|
return redirect(s.url)
|
|
else:
|
|
return HttpResponseNotAllowed(
|
|
[
|
|
"GET",
|
|
]
|
|
)
|
|
|
|
|
|
@package_permission_required()
|
|
def package_scenario(request, package_uuid, scenario_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_scenario = Scenario.objects.get(package=current_package, uuid=scenario_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_package.name} - {current_scenario.name}"
|
|
|
|
context["meta"]["current_package"] = current_package
|
|
context["object_type"] = "scenario"
|
|
context["breadcrumbs"] = breadcrumbs(current_package, "scenario", current_scenario)
|
|
|
|
context["scenario"] = current_scenario
|
|
|
|
available_add_infs = []
|
|
for add_inf in NAME_MAPPING.values():
|
|
available_add_infs.append(
|
|
{
|
|
"display_name": add_inf.property_name(None),
|
|
"name": add_inf.__name__,
|
|
"widget": HTMLGenerator.generate_html(add_inf, prefix=f"{0}"),
|
|
}
|
|
)
|
|
context["available_additional_information"] = available_add_infs
|
|
|
|
context["update_widgets"] = [
|
|
HTMLGenerator.generate_html(ai, prefix=f"{i}")
|
|
for i, ai in enumerate(current_scenario.get_additional_information())
|
|
]
|
|
|
|
return render(request, "objects/scenario.html", context)
|
|
|
|
elif request.method == "POST":
|
|
log_post_params(request)
|
|
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden == "delete":
|
|
current_scenario.delete()
|
|
return redirect(current_package.url + "/scenario")
|
|
elif hidden == "delete-additional-information":
|
|
uuid = request.POST.get("uuid")
|
|
current_scenario.remove_additional_information(uuid)
|
|
return redirect(current_scenario.url)
|
|
elif hidden == "delete-all-additional-information":
|
|
current_scenario.additional_information = dict()
|
|
current_scenario.save()
|
|
return redirect(current_scenario.url)
|
|
elif hidden == "set-additional-information":
|
|
ais = HTMLGenerator.build_models(request.POST.dict())
|
|
|
|
if s.DEBUG:
|
|
logger.info(ais)
|
|
|
|
current_scenario.set_additional_information(ais)
|
|
return redirect(current_scenario.url)
|
|
elif hidden == "add-additional-information":
|
|
ais = HTMLGenerator.build_models(request.POST.dict())
|
|
|
|
if len(ais.keys()) != 1:
|
|
raise ValueError(
|
|
"Only one additional information field can be added at a time."
|
|
)
|
|
|
|
ai = list(ais.values())[0][0]
|
|
|
|
if s.DEBUG:
|
|
logger.info(ais)
|
|
|
|
current_scenario.add_additional_information(ai)
|
|
return redirect(current_scenario.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
##############
|
|
# User/Group #
|
|
##############
|
|
def users(request):
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = "enviPath - Users"
|
|
|
|
context["object_type"] = "user"
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"User": s.SERVER_URL + "/user"},
|
|
]
|
|
|
|
context["objects"] = get_user_model().objects.all()
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET"])
|
|
|
|
|
|
def user(request, user_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
|
|
if request.method == "GET":
|
|
# Check if current user is the one matching to the url
|
|
if str(current_user.uuid) != user_uuid and not current_user.is_superuser:
|
|
return HttpResponseBadRequest()
|
|
|
|
requested_user = UserManager.get_user_by_id(current_user, user_uuid)
|
|
|
|
context = get_base_context(request, for_user=requested_user)
|
|
context["title"] = "enviPath - User"
|
|
|
|
context["object_type"] = "user"
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"User": s.SERVER_URL + "/user"},
|
|
{current_user.username: requested_user.url},
|
|
]
|
|
|
|
context["user"] = requested_user
|
|
|
|
model_qs = EPModel.objects.none()
|
|
for p in PackageManager.get_all_readable_packages(requested_user, include_reviewed=True):
|
|
model_qs |= p.models
|
|
|
|
context["models"] = model_qs
|
|
|
|
context["tokens"] = APIToken.objects.filter(user=requested_user)
|
|
|
|
return render(request, "objects/user.html", context)
|
|
|
|
elif request.method == "POST":
|
|
is_hidden_method = bool(request.POST.get("hidden", False))
|
|
|
|
if is_hidden_method and request.POST["hidden"] == "request-api-token":
|
|
name = request.POST.get("name", "No Name")
|
|
valid_for = min(max(int(request.POST.get("valid-for", 90)), 1), 90)
|
|
|
|
token, raw_token = APIToken.create_token(request.user, name=name, valid_for=valid_for)
|
|
|
|
return JsonResponse(
|
|
{"raw_token": raw_token, "token": {"id": token.id, "name": token.name}}
|
|
)
|
|
|
|
if is_hidden_method and request.POST["hidden"] == "delete":
|
|
token_id = request.POST.get("token-id")
|
|
if token_id is None:
|
|
return HttpResponseBadRequest("Token ID missing!")
|
|
|
|
try:
|
|
APIToken.objects.get(user=current_user, id=token_id).delete()
|
|
except APIToken.DoesNotExist:
|
|
return HttpResponseBadRequest("User and Token ID combination invalid!")
|
|
|
|
return HttpResponse("success")
|
|
|
|
default_package = request.POST.get("default-package")
|
|
default_group = request.POST.get("default-group")
|
|
default_prediction_setting = request.POST.get("default-prediction-setting")
|
|
|
|
if any([default_package, default_group, default_prediction_setting]):
|
|
current_user.default_package = PackageManager.get_package_by_url(
|
|
current_user, default_package
|
|
)
|
|
current_user.default_group = GroupManager.get_group_by_url(current_user, default_group)
|
|
current_user.default_setting = SettingManager.get_setting_by_url(
|
|
current_user, default_prediction_setting
|
|
)
|
|
current_user.save()
|
|
|
|
return redirect(current_user.url)
|
|
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def groups(request):
|
|
current_user = _anonymous_or_real(request)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = "enviPath - Groups"
|
|
|
|
context["object_type"] = "group"
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Group": s.SERVER_URL + "/group"},
|
|
]
|
|
|
|
context["objects"] = Group.objects.all()
|
|
|
|
return render(request, "collections/objects_list.html", context)
|
|
elif request.method == "POST":
|
|
group_name = request.POST.get("group-name")
|
|
group_description = request.POST.get("group-description", s.DEFAULT_VALUES["description"])
|
|
|
|
g = GroupManager.create_group(current_user, group_name, group_description)
|
|
|
|
return redirect(g.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def group(request, group_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_group = GroupManager.get_group_by_id(current_user, group_uuid)
|
|
|
|
if request.method == "GET":
|
|
context = get_base_context(request)
|
|
context["title"] = f"enviPath - {current_group.name}"
|
|
|
|
context["object_type"] = "group"
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Group": s.SERVER_URL + "/group"},
|
|
{current_group.name: current_group.url},
|
|
]
|
|
|
|
context["group"] = current_group
|
|
|
|
context["users"] = UserManager.get_users_lp().exclude(
|
|
id__in=current_group.user_member.all()
|
|
)
|
|
context["groups"] = (
|
|
GroupManager.get_groups_lp()
|
|
.exclude(id__in=current_group.group_member.all())
|
|
.exclude(id=current_group.pk)
|
|
)
|
|
|
|
context["packages"] = Package.objects.filter(
|
|
id__in=GroupPackagePermission.objects.filter(group=current_group)
|
|
.values("package")
|
|
.distinct()
|
|
)
|
|
|
|
return render(request, "objects/group.html", context)
|
|
|
|
elif request.method == "POST":
|
|
log_post_params(request)
|
|
|
|
if hidden := request.POST.get("hidden", None):
|
|
if hidden == "delete":
|
|
current_group.delete()
|
|
return redirect(s.SERVER_URL + "/group")
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
member_url = request.POST.get("member")
|
|
action = request.POST.get("action")
|
|
|
|
if all([member_url, action]) and action in ["add", "remove"]:
|
|
if "user" in member_url:
|
|
member = UserManager.get_user_lp(member_url)
|
|
else:
|
|
member = GroupManager.get_group_lp(member_url)
|
|
|
|
GroupManager.update_members(current_user, current_group, member, action)
|
|
|
|
return redirect(current_group.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def settings(request):
|
|
current_user = _anonymous_or_real(request)
|
|
context = get_base_context(request)
|
|
|
|
if request.method == "GET":
|
|
context["object_type"] = "setting"
|
|
# Even if settings are aready in "meta", for consistency add it on root level
|
|
context["settings"] = SettingManager.get_all_settings(current_user)
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Group": s.SERVER_URL + "/setting"},
|
|
]
|
|
return
|
|
elif request.method == "POST":
|
|
if s.DEBUG:
|
|
for k, v in request.POST.items():
|
|
logger.info("Parameters received:")
|
|
logger.info(f"{k}\t{v}")
|
|
|
|
name = request.POST.get("prediction-setting-name")
|
|
description = request.POST.get("prediction-setting-description")
|
|
new_default = request.POST.get("prediction-setting-new-default", "off") == "on"
|
|
|
|
max_nodes = min(
|
|
max(
|
|
int(request.POST.get("prediction-setting-max-nodes", 1)),
|
|
s.DEFAULT_MAX_NUMBER_OF_NODES,
|
|
),
|
|
s.DEFAULT_MAX_NUMBER_OF_NODES,
|
|
)
|
|
max_depth = min(
|
|
max(int(request.POST.get("prediction-setting-max-depth", 1)), s.DEFAULT_MAX_DEPTH),
|
|
s.DEFAULT_MAX_DEPTH,
|
|
)
|
|
|
|
tp_gen_method = request.POST.get("tp-generation-method")
|
|
|
|
params = {}
|
|
|
|
if tp_gen_method == "model-based-prediction-setting":
|
|
model_url = request.POST.get("model-based-prediction-setting-model")
|
|
|
|
model_uuid = model_url.split("/")[-1]
|
|
params["model"] = EPModel.objects.get(uuid=model_uuid)
|
|
params["model_threshold"] = request.POST.get(
|
|
"model-based-prediction-setting-threshold", s.DEFAULT_MODEL_THRESHOLD
|
|
)
|
|
|
|
if not PackageManager.readable(current_user, params["model"].package):
|
|
raise ValueError("")
|
|
|
|
elif tp_gen_method == "rule-based-prediction-setting":
|
|
rule_packages = request.POST.getlist("rule-based-prediction-setting-packages")
|
|
params["rule_packages"] = [
|
|
PackageManager.get_package_by_url(current_user, p) for p in rule_packages
|
|
]
|
|
else:
|
|
raise ValueError("")
|
|
|
|
created_setting = SettingManager.create_setting(
|
|
current_user,
|
|
name=name,
|
|
description=description,
|
|
max_nodes=max_nodes,
|
|
max_depth=max_depth,
|
|
**params,
|
|
)
|
|
|
|
if new_default:
|
|
current_user.default_setting = created_setting
|
|
current_user.save()
|
|
|
|
return HttpResponse("Success!")
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(["GET", "POST"])
|
|
|
|
|
|
def setting(request, setting_uuid):
|
|
pass
|
|
|
|
|
|
def jobs(request):
|
|
current_user = _anonymous_or_real(request)
|
|
context = get_base_context(request)
|
|
|
|
if request.method == "GET":
|
|
context["object_type"] = "joblog"
|
|
context["breadcrumbs"] = [
|
|
{"Home": s.SERVER_URL},
|
|
{"Jobs": s.SERVER_URL + "/jobs"},
|
|
]
|
|
if current_user.is_superuser:
|
|
context["jobs"] = JobLog.objects.all().order_by("-created")
|
|
else:
|
|
context["jobs"] = JobLog.objects.filter(user=current_user).order_by("-created")
|
|
|
|
return render(request, "collections/joblog.html", context)
|
|
|
|
|
|
###########
|
|
# KETCHER #
|
|
###########
|
|
|
|
|
|
def indigo(request):
|
|
from indigo import Indigo
|
|
|
|
return JsonResponse({"Indigo": {"version": Indigo().version()}})
|
|
|
|
|
|
@csrf_exempt
|
|
def aromatize(request):
|
|
if request.method == "POST":
|
|
data = json.loads(request.body)
|
|
mol_data = data.get("struct")
|
|
aromatized = IndigoUtils.aromatize(mol_data, False)
|
|
return JsonResponse({"struct": aromatized})
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
|
|
@csrf_exempt
|
|
def dearomatize(request):
|
|
if request.method == "POST":
|
|
data = json.loads(request.body)
|
|
mol_data = data.get("struct")
|
|
dearomatized = IndigoUtils.dearomatize(mol_data, False)
|
|
return JsonResponse({"struct": dearomatized})
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
|
|
@csrf_exempt
|
|
def layout(request):
|
|
if request.method == "POST":
|
|
data = json.loads(request.body)
|
|
mol_data = data.get("struct")
|
|
lay = IndigoUtils.layout(mol_data)
|
|
return JsonResponse({"struct": lay})
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
|
|
##########################
|
|
# Generic/Non-Persistent #
|
|
##########################
|
|
def depict(request):
|
|
if smiles := request.GET.get("smiles"):
|
|
return HttpResponse(IndigoUtils.mol_to_svg(smiles), content_type="image/svg+xml")
|
|
|
|
elif smirks := request.GET.get("smirks"):
|
|
query_smirks = request.GET.get("is_query_smirks", False) == "true"
|
|
return HttpResponse(
|
|
IndigoUtils.smirks_to_svg(smirks, query_smirks), content_type="image/svg+xml"
|
|
)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
|
|
@protected_resource()
|
|
def userinfo(request):
|
|
user = request.resource_owner
|
|
res = {
|
|
"sub": str(user.uuid),
|
|
"email": user.email,
|
|
"username": user.username,
|
|
"name": user.get_full_name() or user.username,
|
|
"email_verified": user.is_active,
|
|
}
|
|
return JsonResponse(res)
|