import json import logging from typing import List, Dict, Any from django.conf import settings as s from django.contrib.auth import get_user_model from django.http import JsonResponse, HttpResponse, HttpResponseNotAllowed, HttpResponseBadRequest from django.shortcuts import render, redirect from django.urls import reverse from django.views.decorators.csrf import csrf_exempt from envipy_additional_information import NAME_MAPPING from oauth2_provider.decorators import protected_resource import nh3 from utilities.chem import FormatConverter, IndigoUtils from utilities.decorators import package_permission_required from utilities.misc import HTMLGenerator from .logic import ( GroupManager, PackageManager, UserManager, SettingManager, SearchManager, EPDBURLParser, ) from .models import ( Package, GroupPackagePermission, Group, CompoundStructure, Compound, Reaction, Rule, Pathway, Node, EPModel, EnviFormer, MLRelativeReasoning, RuleBasedRelativeReasoning, Scenario, SimpleAmbitRule, APIToken, UserPackagePermission, Permission, License, User, Edge, ExternalDatabase, ExternalIdentifier, ) logger = logging.getLogger(__name__) def log_post_params(request): if s.DEBUG: for k, v in request.POST.items(): logger.debug(f"{k}\t{v}") def clean_dict(bad_dict): """Check each value in a dictionary for XSS attempts""" clean = {} # TODO: I'm not sure if this is the best way to do this. for key, value in bad_dict.items(): if key != 'csrfmiddlewaretoken' and isinstance(value, str): value = nh3.clean(value, tags=s.ALLOWED_HTML_TAGS) clean[key] = value return clean def error(request, message: str, detail: str, code: int = 400): context = get_base_context(request) error_context = { "error_message": message, "error_detail": detail, } if request.headers.get("Accept") == "application/json": return JsonResponse(error_context, status=500) context.update(**error_context) return render(request, "errors/error.html", context, status=code) def login(request): context = get_base_context(request) if request.method == "GET": context["title"] = "enviPath" context["next"] = request.GET.get("next", "") return render(request, "static/login.html", context) elif request.method == "POST": from django.contrib.auth import authenticate from django.contrib.auth import login # Check if the cleaned username is equal to the unclean username, if not, invalid username username = nh3.clean(request.POST.get("username")).strip() if username != request.POST.get("username").strip(): context["message"] = "Login failed!" return render(request, "static/login.html", context) password = request.POST.get("password") # Get email for username and check if the account is active try: temp_user = get_user_model().objects.get(username=username) if not temp_user.is_active: context["message"] = "User account is not activated yet!" return render(request, "static/login.html", context) email = temp_user.email except get_user_model().DoesNotExist: context["message"] = "Login failed!" return render(request, "static/login.html", context) try: user = authenticate(username=email, password=password) except Exception: context["message"] = "Login failed!" return render(request, "static/login.html", context) if user is not None: login(request, user) if next := request.POST.get("next"): return redirect(next) return redirect(reverse("index")) else: context["message"] = "Login failed!" return render(request, "static/login.html", context) else: return HttpResponseNotAllowed(["GET", "POST"]) def logout(request): if request.method == "POST": is_logout = bool(request.POST.get("logout", False)) if is_logout: from django.contrib.auth import logout logout(request) return redirect(s.SERVER_URL) return HttpResponseBadRequest() def register(request): context = get_base_context(request) if request.method == "GET": context["title"] = "enviPath" context["next"] = request.GET.get("next", "") return render(request, "static/register.html", context) elif request.method == "POST": context["title"] = "enviPath" if next := request.POST.get("next"): context["next"] = next username = nh3.clean(request.POST.get("username", "")).strip() email = nh3.clean(request.POST.get("email", "")).strip() password = request.POST.get("password", "").strip() rpassword = request.POST.get("rpassword", "").strip() # Check if cleaned username and email are equal to the unclean, if not, invalid username or email if (not (username and email and password) or username != request.POST.get("username", "").strip() or email != request.POST.get("email", "").strip()): context["message"] = "Invalid username/email/password" return render(request, "static/register.html", context) if password != rpassword or password == "": context["message"] = "Registration failed, provided passwords differ!" return render(request, "static/register.html", context) try: u = UserManager.create_user(username, email, password) logger.info(f"Created user {u.username} ({u.pk})") except Exception: context["message"] = "Registration failed! Couldn't create User Account." return render(request, "static/register.html", context) if s.ADMIN_APPROVAL_REQUIRED: context["success_message"] = ( "Your account has been created! An admin will activate it soon!" ) else: context["success_message"] = ( "Account has been created! You'll receive a mail to activate your account shortly." ) return render(request, "static/login.html", context) else: return HttpResponseNotAllowed(["GET", "POST"]) def editable(request, user): if user.is_superuser: return True url = request.build_absolute_uri(request.path) if PackageManager.is_package_url(url): _package = PackageManager.get_package_lp(request.build_absolute_uri()) return PackageManager.writable(user, _package) elif GroupManager.is_group_url(url): _group = GroupManager.get_group_lp(request.build_absolute_uri()) return GroupManager.writable(user, _group) elif UserManager.is_user_url(url): _user = UserManager.get_user_lp(request.build_absolute_uri()) return UserManager.writable(user, _user) elif url in [ s.SERVER_URL, f"{s.SERVER_URL}/", f"{s.SERVER_URL}/package", f"{s.SERVER_URL}/user", f"{s.SERVER_URL}/group", f"{s.SERVER_URL}/search", ]: return True else: logger.debug(f"Unknown url: {url}") return False def get_base_context(request, for_user=None) -> Dict[str, Any]: current_user = _anonymous_or_real(request) can_edit = editable(request, current_user) parser = EPDBURLParser(request.build_absolute_uri(request.path)) url_contains_package = False if parser.contains_package_url() or parser.is_package_url(): url_contains_package = True if for_user: current_user = for_user ctx = { "title": "enviPath", "meta": { "version": "0.0.1", "server_url": s.SERVER_URL, "user": current_user, "can_edit": can_edit, "url_contains_package": url_contains_package, "readable_packages": PackageManager.get_all_readable_packages( current_user, include_reviewed=True ), "writeable_packages": PackageManager.get_all_writeable_packages(current_user), "available_groups": GroupManager.get_groups(current_user), "available_settings": SettingManager.get_all_settings(current_user), "enabled_features": s.FLAGS, "debug": s.DEBUG, "external_databases": ExternalDatabase.get_databases(), }, } return ctx def _anonymous_or_real(request): if request.user.is_authenticated and not request.user.is_anonymous: return request.user return get_user_model().objects.get(username="anonymous") def breadcrumbs( first_level_object=None, second_level_namespace=None, second_level_object=None, third_level_namespace=None, third_level_object=None, ) -> List[Dict[str, str]]: bread = [ {"Home": s.SERVER_URL}, {"Package": s.SERVER_URL + "/package"}, ] if first_level_object is not None: bread.append({first_level_object.name: first_level_object.url}) if second_level_namespace is not None: bread.append( { f"{second_level_namespace}".capitalize(): first_level_object.url + f"/{second_level_namespace}" } ) if second_level_object is not None: bread.append({second_level_object.name: second_level_object.url}) if third_level_namespace is not None: bread.append( { f"{third_level_namespace}".capitalize(): second_level_object.url + f"/{third_level_namespace}" } ) if third_level_object is not None: bread.append({third_level_object.name: third_level_object.url}) return bread def set_scenarios(current_user, attach_object, scenario_urls: List[str]): scens = [] for scenario_url in scenario_urls: # As empty lists will be removed in POST request well send [''] if scenario_url == "": continue package = PackageManager.get_package_by_url(current_user, scenario_url) scen = Scenario.objects.get(package=package, uuid=scenario_url.split("/")[-1]) scens.append(scen) attach_object.set_scenarios(scens) def set_aliases(current_user, attach_object, aliases: List[str]): if aliases == [""]: aliases = [] attach_object.aliases = aliases attach_object.save() def copy_object(current_user, target_package: "Package", source_object_url: str): # Ensures that source is readable source_package = PackageManager.get_package_by_url(current_user, source_object_url) if source_package == target_package: raise ValueError(f"Can't copy object {source_object_url} to the same package!") parser = EPDBURLParser(source_object_url) # if the url won't contain a package or is a plain package if not parser.contains_package_url(): raise ValueError(f"Object {source_object_url} can't be copied!") # Gets the most specific object source_object = parser.get_object() if hasattr(source_object, "copy"): mapping = dict() copy = source_object.copy(target_package, mapping) if s.DEBUG: for k, v in mapping.items(): logger.debug(f"Mapping {k.url} to {v.url}") return copy raise ValueError(f"Object {source_object} can't be copied!") def index(request): context = get_base_context(request) context["title"] = "enviPath - Home" context["meta"]["current_package"] = context["meta"]["user"].default_package if request.GET.get("getMLServerPath", False): return JsonResponse({"mlServerPath": s.SERVER_URL}) return render(request, "index/index.html", context) def packages(request): current_user = _anonymous_or_real(request) if request.method == "GET": context = get_base_context(request) context["title"] = "enviPath - Packages" context["object_type"] = "package" context["meta"]["current_package"] = context["meta"]["user"].default_package context["meta"]["can_edit"] = True reviewed_package_qs = Package.objects.filter(reviewed=True).order_by("created") unreviewed_package_qs = PackageManager.get_all_readable_packages(current_user).order_by( "name" ) context["reviewed_objects"] = reviewed_package_qs context["unreviewed_objects"] = unreviewed_package_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": if hidden := request.POST.get("hidden", None): if hidden in ["import-legacy-package-json", "import-package-json"]: f = request.FILES["file"] try: file_data = f.read().decode("utf-8") data = json.loads(file_data) if hidden == "import-legacy-package-json": pack = PackageManager.import_legacy_package(data, current_user) else: pack = PackageManager.import_package(data, current_user) return redirect(pack.url) except UnicodeDecodeError: return error(request, "Invalid encoding.", "Invalid encoding, must be UTF-8") else: return HttpResponseBadRequest() else: # Clean for potential XSS package_name = nh3.clean(request.POST.get("package-name"), tags=s.ALLOWED_HTML_TAGS).strip() package_description = nh3.clean(request.POST.get( "package-description", s.DEFAULT_VALUES["description"] ), tags=s.ALLOWED_HTML_TAGS).strip() created_package = PackageManager.create_package( current_user, package_name, package_description ) return redirect(created_package.url) elif request.method == "OPTIONS": response = HttpResponse() response["allow"] = ",".join(["GET", "POST"]) return response else: return HttpResponseNotAllowed(["GET", "POST"]) def compounds(request): if request.method == "GET": context = get_base_context(request) context["title"] = "enviPath - Compounds" context["object_type"] = "compound" context["meta"]["current_package"] = context["meta"]["user"].default_package reviewed_compound_qs = Compound.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_compound_qs |= Compound.objects.filter(package=p) reviewed_compound_qs = reviewed_compound_qs.order_by("name") if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": True} for pw in reviewed_compound_qs ] } ) context["reviewed_objects"] = reviewed_compound_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": # delegate to default package current_user = _anonymous_or_real(request) default_package = current_user.default_package return package_compounds(request, default_package.uuid) else: return HttpResponseNotAllowed(["GET", "POST"]) def rules(request): if request.method == "GET": context = get_base_context(request) context["title"] = "enviPath - Rules" context["object_type"] = "rule" context["meta"]["current_package"] = context["meta"]["user"].default_package context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Rule": s.SERVER_URL + "/rule"}, ] reviewed_rule_qs = Rule.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_rule_qs |= Rule.objects.filter(package=p) reviewed_rule_qs = reviewed_rule_qs.order_by("name") if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": True} for pw in reviewed_rule_qs ] } ) context["reviewed_objects"] = reviewed_rule_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": # delegate to default package current_user = _anonymous_or_real(request) default_package = current_user.default_package return package_rules(request, default_package.uuid) else: return HttpResponseNotAllowed(["GET", "POST"]) def reactions(request): if request.method == "GET": context = get_base_context(request) context["title"] = "enviPath - Reactions" context["object_type"] = "reaction" context["meta"]["current_package"] = context["meta"]["user"].default_package context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Reaction": s.SERVER_URL + "/reaction"}, ] reviewed_reaction_qs = Reaction.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_reaction_qs |= Reaction.objects.filter(package=p).order_by("name") reviewed_reaction_qs = reviewed_reaction_qs.order_by("name") if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": True} for pw in reviewed_reaction_qs ] } ) context["reviewed_objects"] = reviewed_reaction_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": # delegate to default package current_user = _anonymous_or_real(request) default_package = current_user.default_package return package_reactions(request, default_package.uuid) else: return HttpResponseNotAllowed(["GET", "POST"]) def pathways(request): if request.method == "GET": context = get_base_context(request) context["title"] = "enviPath - Pathways" context["object_type"] = "pathway" context["meta"]["current_package"] = context["meta"]["user"].default_package context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Pathway": s.SERVER_URL + "/pathway"}, ] reviewed_pathway_qs = Pathway.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_pathway_qs |= Pathway.objects.filter(package=p).order_by("name") reviewed_pathway_qs = reviewed_pathway_qs.order_by("name") if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": True} for pw in reviewed_pathway_qs ] } ) context["reviewed_objects"] = reviewed_pathway_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": # delegate to default package current_user = _anonymous_or_real(request) default_package = current_user.default_package return package_pathways(request, default_package.uuid) else: return HttpResponseNotAllowed(["GET", "POST"]) def scenarios(request): if request.method == "GET": context = get_base_context(request) context["title"] = "enviPath - Scenarios" context["object_type"] = "scenario" context["meta"]["current_package"] = context["meta"]["user"].default_package context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Scenario": s.SERVER_URL + "/scenario"}, ] reviewed_scenario_qs = Scenario.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_scenario_qs |= Scenario.objects.filter(package=p).order_by("name") reviewed_scenario_qs = reviewed_scenario_qs.order_by("name") if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": s.name, "url": s.url, "reviewed": True} for s in reviewed_scenario_qs ] } ) context["reviewed_objects"] = reviewed_scenario_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": # delegate to default package default_package = request.user.default_package return package_scenarios(request, default_package.uuid) else: return HttpResponseNotAllowed(["GET", "POST"]) def models(request): if request.method == "GET": context = get_base_context(request) context["title"] = "enviPath - Models" context["object_type"] = "model" context["meta"]["current_package"] = context["meta"]["user"].default_package context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Model": s.SERVER_URL + "/model"}, ] context["model_types"] = { "ML Relative Reasoning": "ml-relative-reasoning", "Rule Based Relative Reasoning": "rule-based-relative-reasoning", "EnviFormer": "enviformer", } for k, v in s.CLASSIFIER_PLUGINS.items(): context["model_types"][v.display()] = k reviewed_model_qs = EPModel.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_model_qs |= EPModel.objects.filter(package=p).order_by("name") reviewed_model_qs = reviewed_model_qs.order_by("name") if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": True} for pw in reviewed_model_qs ] } ) context["reviewed_objects"] = reviewed_model_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": current_user = _anonymous_or_real(request) default_package = current_user.default_package return package_models(request, default_package.uuid) else: return HttpResponseNotAllowed(["GET", "POST"]) def search(request): current_user = _anonymous_or_real(request) if request.method == "GET": package_urls = request.GET.getlist("packages") # Clean for potential XSS searchterm = nh3.clean(request.GET.get("search"), tags=s.ALLOWED_HTML_TAGS).strip() mode = request.GET.get("mode") # add HTTP_ACCEPT check to differentiate between index and ajax call if "application/json" in request.META.get("HTTP_ACCEPT") and all([searchterm, mode]): if package_urls: packages = [ PackageManager.get_package_by_url(current_user, p) for p in package_urls ] else: packages = PackageManager.get_reviewed_packages() search_result = SearchManager.search(packages, searchterm, mode) return JsonResponse(search_result, safe=False) context = get_base_context(request) context["title"] = "enviPath - Search" context["object_type"] = "model" context["meta"]["current_package"] = context["meta"]["user"].default_package context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Search": s.SERVER_URL + "/search"}, ] reviewed_package_qs = PackageManager.get_reviewed_packages() unreviewed_package_qs = PackageManager.get_all_readable_packages(current_user) context["reviewed_objects"] = reviewed_package_qs context["unreviewed_objects"] = unreviewed_package_qs if all([searchterm, mode]): if package_urls: packages = [ PackageManager.get_package_by_url(current_user, p) for p in package_urls ] else: packages = PackageManager.get_reviewed_packages() context["search_result"] = SearchManager.search(packages, searchterm, mode) context["search_result"]["searchterm"] = searchterm return render(request, "search.html", context) else: return HttpResponseNotAllowed(["GET"]) @package_permission_required() def package_models(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - Models" context["meta"]["current_package"] = current_package context["object_type"] = "model" context["breadcrumbs"] = breadcrumbs(current_package, "model") reviewed_model_qs = EPModel.objects.none() unreviewed_model_qs = EPModel.objects.none() if current_package.reviewed: reviewed_model_qs = EPModel.objects.filter(package=current_package).order_by("name") else: unreviewed_model_qs = EPModel.objects.filter(package=current_package).order_by("name") if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in ( reviewed_model_qs if current_package.reviewed else unreviewed_model_qs ) ] } ) context["reviewed_objects"] = reviewed_model_qs context["unreviewed_objects"] = unreviewed_model_qs context["model_types"] = { "ML Relative Reasoning": "ml-relative-reasoning", "Rule Based Relative Reasoning": "rule-based-relative-reasoning", } if s.FLAGS.get("ENVIFORMER", False): context["model_types"]["EnviFormer"] = "enviformer" if s.FLAGS.get("PLUGINS", False): for k, v in s.CLASSIFIER_PLUGINS.items(): context["model_types"][v.display()] = k return render(request, "collections/objects_list.html", context) elif request.method == "POST": log_post_params(request) # Clean for potential XSS name = nh3.clean(request.POST.get("model-name"), tags=s.ALLOWED_HTML_TAGS).strip() description = nh3.clean(request.POST.get("model-description"), tags=s.ALLOWED_HTML_TAGS).strip() model_type = request.POST.get("model-type") if model_type == "enviformer": threshold = float(request.POST.get(f"{model_type}-threshold", 0.5)) mod = EnviFormer.create(current_package, name, description, threshold) elif model_type == "ml-relative-reasoning" or model_type == "rule-based-relative-reasoning": # Generic fields for ML and Rule Based rule_packages = request.POST.getlist("package-based-relative-reasoning-rule-packages") data_packages = request.POST.getlist("package-based-relative-reasoning-data-packages") eval_packages = request.POST.getlist( "package-based-relative-reasoning-evaluation-packages", [] ) # Generic params params = { "package": current_package, "name": name, "description": description, "rule_packages": [ PackageManager.get_package_by_url(current_user, p) for p in rule_packages ], "data_packages": [ PackageManager.get_package_by_url(current_user, p) for p in data_packages ], "eval_packages": [ PackageManager.get_package_by_url(current_user, p) for p in eval_packages ], } if model_type == "ml-relative-reasoning": # ML Specific threshold = float(request.POST.get(f"{model_type}-threshold", 0.5)) # TODO handle additional fingerprinter # fingerprinter = request.POST.get(f"{model_type}-fingerprinter") # App Domain related parameters build_ad = request.POST.get("build-app-domain", False) == "on" num_neighbors = request.POST.get("num-neighbors", 5) reliability_threshold = request.POST.get("reliability-threshold", 0.5) local_compatibility_threshold = request.POST.get( "local-compatibility-threshold", 0.5 ) params["threshold"] = threshold # params['fingerprinter'] = fingerprinter params["build_app_domain"] = build_ad params["app_domain_num_neighbours"] = num_neighbors params["app_domain_reliability_threshold"] = reliability_threshold params["app_domain_local_compatibility_threshold"] = local_compatibility_threshold mod = MLRelativeReasoning.create(**params) else: mod = RuleBasedRelativeReasoning.create(**params) from .tasks import build_model build_model.delay(mod.pk) else: return error( request, "Invalid model type.", f'Model type "{model_type}" is not supported."' ) return redirect(mod.url) else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_model(request, package_uuid, model_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_model = EPModel.objects.get(package=current_package, uuid=model_uuid) if request.method == "GET": classify = request.GET.get("classify", False) ad_assessment = request.GET.get("app-domain-assessment", False) if classify or ad_assessment: smiles = request.GET.get("smiles", "").strip() # Check if smiles is non empty and valid if smiles == "": return JsonResponse({"error": "Received empty SMILES"}, status=400) try: stand_smiles = FormatConverter.standardize(smiles) except ValueError: return JsonResponse({"error": f'"{smiles}" is not a valid SMILES'}, status=400) if classify: pred_res = current_model.predict(stand_smiles) res = [] for pr in pred_res: if len(pr) > 0: products = [] for prod_set in pr.product_sets: logger.debug(f"Checking {prod_set}") products.append(tuple([x for x in prod_set])) res.append( { "products": list(set(products)), "probability": pr.probability, "btrule": {k: getattr(pr.rule, k) for k in ["url", "name"]} if pr.rule is not None else None, } ) return JsonResponse(res, safe=False) else: app_domain_assessment = current_model.app_domain.assess(stand_smiles)[0] return JsonResponse(app_domain_assessment, safe=False) context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - {current_model.name}" context["meta"]["current_package"] = current_package context["object_type"] = "model" context["breadcrumbs"] = breadcrumbs(current_package, "model", current_model) context["model"] = current_model context["current_object"] = current_model return render(request, "objects/model.html", context) elif request.method == "POST": if hidden := request.POST.get("hidden", None): if hidden == "delete": current_model.delete() return redirect(current_package.url + "/model") elif hidden == "evaluate": from .tasks import evaluate_model evaluate_model.delay(current_model.pk) return redirect(current_model.url) else: return HttpResponseBadRequest() else: # Clean for potential XSS name = nh3.clean(request.POST.get("model-name", "").strip(), tags=s.ALLOWED_HTML_TAGS).strip() description = nh3.clean(request.POST.get("model-description", "").strip(), tags=s.ALLOWED_HTML_TAGS).strip() if any([name, description]): if name: current_model.name = name if description: current_model.description = description current_model.save() return redirect(current_model.url) return HttpResponseBadRequest() else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == "GET": if request.GET.get("export", False) == "true": filename = f"{current_package.name.replace(' ', '_')}_{current_package.uuid}.json" pack_json = PackageManager.export_package( current_package, include_models=False, include_external_identifiers=False ) response = JsonResponse(pack_json, content_type="application/json") response["Content-Disposition"] = f'attachment; filename="{filename}"' return response context = get_base_context(request) context["title"] = f"enviPath - {current_package.name}" context["meta"]["current_package"] = current_package context["object_type"] = "package" context["breadcrumbs"] = breadcrumbs(current_package) context["package"] = current_package user_perms = UserPackagePermission.objects.filter(package=current_package) users = get_user_model().objects.exclude( id__in=UserPackagePermission.objects.filter(package=current_package).values_list( "user_id", flat=True ) ) group_perms = GroupPackagePermission.objects.filter(package=current_package) groups = Group.objects.exclude( id__in=GroupPackagePermission.objects.filter(package=current_package).values_list( "group_id", flat=True ) ) context["users"] = users context["groups"] = groups context["user_permissions"] = user_perms context["group_permissions"] = group_perms return render(request, "objects/package.html", context) elif request.method == "POST": log_post_params(request) if hidden := request.POST.get("hidden", None): if hidden == "delete": if current_user.default_package == current_package: return error( request, f'Package "{current_package.name}" is the default and cannot be deleted!', "You cannot delete the default package. If you want to delete this package you have to set another default package first.", ) logger.debug(current_package.delete()) return redirect(s.SERVER_URL + "/package") elif hidden == "publish-package": for g in Group.objects.filter(public=True): PackageManager.update_permissions( current_user, current_package, g, Permission.READ[0] ) return redirect(current_package.url) elif hidden == "copy": object_to_copy = request.POST.get("object_to_copy") if not object_to_copy: return error(request, "No object to copy", "There was no object to copy.") try: copied_object = copy_object(current_user, current_package, object_to_copy) except ValueError: return JsonResponse( {"error": f"Can't copy object {object_to_copy} to the same package!"}, status=400, ) return JsonResponse({"success": copied_object.url}) else: return HttpResponseBadRequest() # Clean for potential XSS new_package_name = nh3.clean(request.POST.get("package-name"), tags=s.ALLOWED_HTML_TAGS).strip() new_package_description = nh3.clean(request.POST.get("package-description"), tags=s.ALLOWED_HTML_TAGS).strip() grantee_url = request.POST.get("grantee") read = request.POST.get("read") == "on" write = request.POST.get("write") == "on" owner = request.POST.get("owner") == "on" license = request.POST.get("license") license_link = request.POST.get("license-link") license_image_link = request.POST.get("license-image-link") if new_package_name: current_package.name = new_package_name if new_package_description: current_package.description = new_package_description if any([new_package_name, new_package_description]): current_package.save() return redirect(current_package.url) elif any([grantee_url, read, write, owner]): if "user" in grantee_url: grantee = UserManager.get_user_lp(grantee_url) else: grantee = GroupManager.get_group_lp(grantee_url) max_perm = None if read: max_perm = Permission.READ[0] if write: max_perm = Permission.WRITE[0] if owner: max_perm = Permission.ALL[0] PackageManager.update_permissions(current_user, current_package, grantee, max_perm) return redirect(current_package.url) elif license is not None: if license == "no-license": if current_package.license is not None: current_package.license.delete() current_package.license = None current_package.save() return redirect(current_package.url) else: if current_package.license is not None: current_package.license.delete() license = License() license.link = license_link license.image_link = license_image_link license.save() current_package.license = license current_package.save() return redirect(current_package.url) else: return HttpResponseBadRequest() else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_compounds(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - Compounds" context["meta"]["current_package"] = current_package context["object_type"] = "compound" context["breadcrumbs"] = breadcrumbs(current_package, "compound") reviewed_compound_qs = Compound.objects.none() unreviewed_compound_qs = Compound.objects.none() if current_package.reviewed: reviewed_compound_qs = Compound.objects.filter(package=current_package).order_by("name") else: unreviewed_compound_qs = Compound.objects.filter(package=current_package).order_by( "name" ) if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in ( reviewed_compound_qs if current_package.reviewed else unreviewed_compound_qs ) ] } ) context["reviewed_objects"] = reviewed_compound_qs context["unreviewed_objects"] = unreviewed_compound_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": # Clean for potential XSS compound_name = nh3.clean(request.POST.get("compound-name"), tags=s.ALLOWED_HTML_TAGS).strip() compound_smiles = request.POST.get("compound-smiles").strip() compound_description = nh3.clean(request.POST.get("compound-description"), tags=s.ALLOWED_HTML_TAGS).strip() c = Compound.create(current_package, compound_smiles, compound_name, compound_description) return redirect(c.url) else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_compound(request, package_uuid, compound_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - {current_compound.name}" context["meta"]["current_package"] = current_package context["object_type"] = "compound" context["breadcrumbs"] = breadcrumbs(current_package, "compound", current_compound) context["compound"] = current_compound context["current_object"] = current_compound return render(request, "objects/compound.html", context) elif request.method == "POST": if hidden := request.POST.get("hidden", None): if hidden == "delete": current_compound.delete() return redirect(current_package.url + "/compound") else: return HttpResponseBadRequest() if "selected-scenarios" in request.POST: selected_scenarios = request.POST.getlist("selected-scenarios") set_scenarios(current_user, current_compound, selected_scenarios) return redirect(current_compound.url) if "aliases" in request.POST: aliases = request.POST.getlist("aliases") try: set_aliases(current_user, current_compound, aliases) except Exception as e: return JsonResponse({"error": str(e)}, status=400) return JsonResponse({"success": current_compound.url}) # Clean for potential XSS new_compound_name = nh3.clean(request.POST.get("compound-name", ""), tags=s.ALLOWED_HTML_TAGS).strip() new_compound_description = nh3.clean(request.POST.get("compound-description", ""), tags=s.ALLOWED_HTML_TAGS).strip() if new_compound_name: current_compound.name = new_compound_name if new_compound_description: current_compound.description = new_compound_description if any([new_compound_name, new_compound_description]): current_compound.save() return redirect(current_compound.url) selected_database = request.POST.get("selected-database", "").strip() external_identifier = request.POST.get("identifier", "").strip() if selected_database and external_identifier: db = ExternalDatabase.objects.get(id=int(selected_database)) ExternalIdentifier.objects.create( content_object=current_compound, database=db, identifier_value=external_identifier, url=db.url_pattern.format(id=external_identifier), is_primary=False, ) return redirect(current_compound.url) else: return HttpResponseBadRequest() else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_compound_structures(request, package_uuid, compound_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = ( f"enviPath - {current_package.name} - {current_compound.name} - Structures" ) context["meta"]["current_package"] = current_package context["object_type"] = "structure" context["breadcrumbs"] = breadcrumbs( current_package, "compound", current_compound, "structure" ) reviewed_compound_structure_qs = CompoundStructure.objects.none() unreviewed_compound_structure_qs = CompoundStructure.objects.none() if current_package.reviewed: reviewed_compound_structure_qs = current_compound.structures.order_by("name") else: unreviewed_compound_structure_qs = current_compound.structures.order_by("name") context["reviewed_objects"] = reviewed_compound_structure_qs context["unreviewed_objects"] = unreviewed_compound_structure_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": # Clean for potential XSS structure_name = nh3.clean(request.POST.get("structure-name"), tags=s.ALLOWED_HTML_TAGS).strip() structure_smiles = request.POST.get("structure-smiles").strip() structure_description = nh3.clean(request.POST.get("structure-description"), tags=s.ALLOWED_HTML_TAGS).strip() cs = current_compound.add_structure(structure_smiles, structure_name, structure_description) return redirect(cs.url) else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_compound_structure(request, package_uuid, compound_uuid, structure_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid) current_structure = CompoundStructure.objects.get( compound=current_compound, uuid=structure_uuid ) if request.method == "GET": context = get_base_context(request) context["title"] = ( f"enviPath - {current_package.name} - {current_compound.name} - {current_structure.name}" ) context["meta"]["current_package"] = current_package context["object_type"] = "structure" context["compound_structure"] = current_structure context["current_object"] = current_structure context["breadcrumbs"] = breadcrumbs( current_package, "compound", current_compound, "structure", current_structure ) return render(request, "objects/compound_structure.html", context) elif request.method == "POST": if hidden := request.POST.get("hidden", None): if hidden == "delete": # Check if we have to delete the compound as no structure is left if len(current_structure.compound.structures.all()) == 1: # This will delete the structure as well current_compound.delete() return redirect(current_package.url + "/compound") else: if current_structure.normalized_structure: current_compound.delete() return redirect(current_package.url + "/compound") else: if current_compound.default_structure == current_structure: current_structure.delete() current_compound.default_structure = ( current_compound.structures.all().first() ) return redirect(current_compound.url + "/structure") else: current_structure.delete() return redirect(current_compound.url + "/structure") else: return HttpResponseBadRequest() # Clean for potential XSS new_structure_name = nh3.clean(request.POST.get("compound-structure-name", ""), tags=s.ALLOWED_HTML_TAGS).strip() new_structure_description = nh3.clean(request.POST.get("compound-structure-description", ""), tags=s.ALLOWED_HTML_TAGS).strip() if new_structure_name: current_structure.name = new_structure_name if new_structure_description: current_structure.description = new_structure_description if any([new_structure_name, new_structure_description]): current_structure.save() return redirect(current_structure.url) if "selected-scenarios" in request.POST: selected_scenarios = request.POST.getlist("selected-scenarios") set_scenarios(current_user, current_structure, selected_scenarios) return redirect(current_structure.url) if "aliases" in request.POST: aliases = request.POST.getlist("aliases") try: set_aliases(current_user, current_structure, aliases) except Exception as e: return JsonResponse({"error": str(e)}, status=400) return JsonResponse({"success": current_structure.url}) selected_database = request.POST.get("selected-database", "").strip() external_identifier = request.POST.get("identifier", "").strip() if selected_database and external_identifier: db = ExternalDatabase.objects.get(id=int(selected_database)) ExternalIdentifier.objects.create( content_object=current_structure, database=db, identifier_value=external_identifier, url=db.url_pattern.format(id=external_identifier), is_primary=False, ) return redirect(current_structure.url) return HttpResponseBadRequest() else: return HttpResponseNotAllowed( [ "GET", ] ) @package_permission_required() def package_rules(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - Rules" context["meta"]["current_package"] = current_package context["object_type"] = "rule" context["breadcrumbs"] = breadcrumbs(current_package, "rule") reviewed_rule_qs = Rule.objects.none() unreviewed_rule_qs = Rule.objects.none() if current_package.reviewed: reviewed_rule_qs = Rule.objects.filter(package=current_package).order_by("name") else: unreviewed_rule_qs = Rule.objects.filter(package=current_package).order_by("name") if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in ( reviewed_rule_qs if current_package.reviewed else unreviewed_rule_qs ) ] } ) context["reviewed_objects"] = reviewed_rule_qs context["unreviewed_objects"] = unreviewed_rule_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": log_post_params(request) # Generic params # Clean for potential XSS rule_name = nh3.clean(request.POST.get("rule-name"), tags=s.ALLOWED_HTML_TAGS).strip() rule_description = nh3.clean(request.POST.get("rule-description"), tags=s.ALLOWED_HTML_TAGS).strip() rule_type = request.POST.get("rule-type") params = {} # Obtain parameters as required by rule type if rule_type == "SimpleAmbitRule": # Clean for potential XSS params["smirks"] = request.POST.get("rule-smirks").strip() params["reactant_filter_smarts"] = nh3.clean(request.POST.get("rule-reactant-smarts"), tags=s.ALLOWED_HTML_TAGS).strip() params["product_filter_smarts"] = nh3.clean(request.POST.get("rule-product-smarts"), tags=s.ALLOWED_HTML_TAGS).strip() elif rule_type == "SimpleRDKitRule": # Clean for potential XSS params["reaction_smarts"] = request.POST.get("rule-reaction-smarts").strip() elif rule_type == "ParallelRule": pass elif rule_type == "SequentialRule": pass else: return HttpResponseBadRequest() r = Rule.create( rule_type=rule_type, package=current_package, name=rule_name, description=rule_description, **params, ) return redirect(r.url) else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_rule(request, package_uuid, rule_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_rule = Rule.objects.get(package=current_package, uuid=rule_uuid) if request.method == "GET": context = get_base_context(request) if smiles := request.GET.get("smiles", False): stand_smiles = FormatConverter.standardize(smiles) res = current_rule.apply(stand_smiles) if len(res) > 1: logger.info( f"Rule {current_rule.uuid} returned multiple product sets on {smiles}, picking the first one." ) smirks = f"{stand_smiles}>>{'.'.join(sorted(res[0]))}" # Usually the functional groups are a mapping of fg -> count # As we are doing it on the fly here fake a high count to ensure that its properly highlighted educt_functional_groups = {x: 1000 for x in current_rule.reactants_smarts} product_functional_groups = {x: 1000 for x in current_rule.products_smarts} return HttpResponse( IndigoUtils.smirks_to_svg( smirks, False, 0, 0, educt_functional_groups=educt_functional_groups, product_functional_groups=product_functional_groups, ), content_type="image/svg+xml", ) context["title"] = f"enviPath - {current_package.name} - {current_rule.name}" context["meta"]["current_package"] = current_package context["object_type"] = "rule" context["breadcrumbs"] = breadcrumbs(current_package, "rule", current_rule) context["rule"] = current_rule context["current_object"] = current_rule if isinstance(current_rule, SimpleAmbitRule): return render(request, "objects/simple_rule.html", context) else: # isinstance(current_rule, ParallelRule) or isinstance(current_rule, SequentialRule): return render(request, "objects/composite_rule.html", context) elif request.method == "POST": if hidden := request.POST.get("hidden", None): if hidden == "delete": current_rule.delete() return redirect(current_package.url + "/rule") else: return HttpResponseBadRequest() if "selected-scenarios" in request.POST: selected_scenarios = request.POST.getlist("selected-scenarios") set_scenarios(current_user, current_rule, selected_scenarios) return redirect(current_rule.url) if "aliases" in request.POST: aliases = request.POST.getlist("aliases") try: set_aliases(current_user, current_rule, aliases) except Exception as e: return JsonResponse({"error": str(e)}, status=400) return JsonResponse({"success": current_rule.url}) # Clean for potential XSS rule_name = nh3.clean(request.POST.get("rule-name", ""), tags=s.ALLOWED_HTML_TAGS).strip() rule_description = nh3.clean(request.POST.get("rule-description", "").strip(), tags=s.ALLOWED_HTML_TAGS).strip() if rule_name: current_rule.name = rule_name if rule_description: current_rule.description = rule_description if any([rule_name, rule_description]): current_rule.save() return redirect(current_rule.url) else: return HttpResponseBadRequest() else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_reactions(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - {current_package.name} - Reactions" context["meta"]["current_package"] = current_package context["object_type"] = "reaction" context["breadcrumbs"] = breadcrumbs(current_package, "reaction") reviewed_reaction_qs = Reaction.objects.none() unreviewed_reaction_qs = Reaction.objects.none() if current_package.reviewed: reviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by("name") else: unreviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by( "name" ) if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in ( reviewed_reaction_qs if current_package.reviewed else unreviewed_reaction_qs ) ] } ) context["reviewed_objects"] = reviewed_reaction_qs context["unreviewed_objects"] = unreviewed_reaction_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": # Clean for potential XSS reaction_name = nh3.clean(request.POST.get("reaction-name"), tags=s.ALLOWED_HTML_TAGS).strip() reaction_description = nh3.clean(request.POST.get("reaction-description"), tags=s.ALLOWED_HTML_TAGS).strip() reactions_smirks = request.POST.get("reaction-smirks").strip() educts = reactions_smirks.split(">>")[0].split(".") products = reactions_smirks.split(">>")[1].split(".") r = Reaction.create( current_package, name=reaction_name, description=reaction_description, educts=educts, products=products, ) return redirect(r.url) else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_reaction(request, package_uuid, reaction_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_reaction = Reaction.objects.get(package=current_package, uuid=reaction_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - {current_reaction.name}" context["meta"]["current_package"] = current_package context["object_type"] = "reaction" context["breadcrumbs"] = breadcrumbs(current_package, "reaction", current_reaction) context["reaction"] = current_reaction context["current_object"] = current_reaction return render(request, "objects/reaction.html", context) elif request.method == "POST": if hidden := request.POST.get("hidden", None): if hidden == "delete": current_reaction.delete() return redirect(current_package.url + "/reaction") else: return HttpResponseBadRequest() if "selected-scenarios" in request.POST: selected_scenarios = request.POST.getlist("selected-scenarios") set_scenarios(current_user, current_reaction, selected_scenarios) return redirect(current_reaction.url) if "aliases" in request.POST: aliases = request.POST.getlist("aliases") try: set_aliases(current_user, current_reaction, aliases) except Exception as e: return JsonResponse({"error": str(e)}, status=400) return JsonResponse({"success": current_reaction.url}) # Clean for potential XSS new_reaction_name = nh3.clean(request.POST.get("reaction-name", ""), tags=s.ALLOWED_HTML_TAGS).strip() new_reaction_description = nh3.clean(request.POST.get("reaction-description", ""), tags=s.ALLOWED_HTML_TAGS).strip() if new_reaction_name: current_reaction.name = new_reaction_name if new_reaction_description: current_reaction.description = new_reaction_description if any([new_reaction_name, new_reaction_description]): current_reaction.save() return redirect(current_reaction.url) selected_database = request.POST.get("selected-database", "").strip() external_identifier = request.POST.get("identifier", "").strip() if selected_database and external_identifier: db = ExternalDatabase.objects.get(id=int(selected_database)) ExternalIdentifier.objects.create( content_object=current_reaction, database=db, identifier_value=external_identifier, url=db.url_pattern.format(id=external_identifier), is_primary=False, ) return redirect(current_reaction.url) return HttpResponseBadRequest() else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_pathways(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - Pathways" context["meta"]["current_package"] = current_package context["object_type"] = "pathway" context["breadcrumbs"] = breadcrumbs(current_package, "pathway") reviewed_pathway_qs = Pathway.objects.none() unreviewed_pathway_qs = Pathway.objects.none() if current_package.reviewed: reviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by("name") else: unreviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by("name") if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in ( reviewed_pathway_qs if current_package.reviewed else unreviewed_pathway_qs ) ] } ) context["reviewed_objects"] = reviewed_pathway_qs context["unreviewed_objects"] = unreviewed_pathway_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": log_post_params(request) # Clean for potential XSS name = nh3.clean(request.POST.get("name"), tags=s.ALLOWED_HTML_TAGS).strip() description = nh3.clean(request.POST.get("description"), tags=s.ALLOWED_HTML_TAGS).strip() smiles = request.POST.get("smiles", "").strip() pw_mode = request.POST.get("predict", "predict").strip() if "smiles" in request.POST and smiles == "": return error( request, "Pathway prediction failed!", "Pathway prediction failed due to missing or empty SMILES", ) try: stand_smiles = FormatConverter.standardize(smiles) except ValueError: return error( request, "Pathway prediction failed!", f'Pathway prediction failed as standardization of SMILES "{smiles}" failed!', ) modes = ["predict", "build", "incremental"] if pw_mode not in modes: return error( request, "Pathway prediction failed!", f'Pathway prediction failed as received mode "{pw_mode}" is none of {modes}', ) prediction_setting = request.POST.get("prediction-setting", None) if prediction_setting: prediction_setting = SettingManager.get_setting_by_url(current_user, prediction_setting) else: prediction_setting = current_user.prediction_settings() pw = Pathway.create(current_package, stand_smiles, name=name, description=description) # set mode pw.kv.update({"mode": pw_mode}) pw.save() if pw_mode == "predict" or pw_mode == "incremental": # unlimited pred (will be handled by setting) limit = -1 # For incremental predict first level and return if pw_mode == "incremental": limit = 1 pw.setting = prediction_setting pw.save() from .tasks import predict predict.delay(pw.pk, prediction_setting.pk, limit=limit) return redirect(pw.url) else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_pathway(request, package_uuid, pathway_uuid): current_user: User = _anonymous_or_real(request) current_package: Package = PackageManager.get_package_by_id(current_user, package_uuid) current_pathway: Pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid) if request.method == "GET": if request.GET.get("last_modified", False): return JsonResponse( {"modified": current_pathway.modified.strftime("%Y-%m-%d %H:%M:%S")} ) if request.GET.get("status", False): return JsonResponse( { "status": current_pathway.status(), "modified": current_pathway.modified.strftime("%Y-%m-%d %H:%M:%S"), } ) if request.GET.get("download", False) == "true": filename = f"{current_pathway.name.replace(' ', '_')}_{current_pathway.uuid}.csv" csv_pw = current_pathway.to_csv() response = HttpResponse(csv_pw, content_type="text/csv") response["Content-Disposition"] = f'attachment; filename="{filename}"' return response # Pathway d3_json() relies on a lot of related objects (Nodes, Structures, Edges, Reaction, Rules, ...) # we will again fetch the current pathway identified by this url, but this time together with nearly all # related objects current_pathway = Pathway.objects.prefetch_related( "node_set", "node_set__out_edges", "node_set__default_node_label", "node_set__scenarios", "edge_set", "edge_set__start_nodes", "edge_set__end_nodes", "edge_set__edge_label", "edge_set__scenarios", ).get(uuid=pathway_uuid) context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - {current_pathway.name}" context["meta"]["current_package"] = current_package context["object_type"] = "pathway" context["breadcrumbs"] = breadcrumbs(current_package, "pathway", current_pathway) context["pathway"] = current_pathway context["current_object"] = current_pathway context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Package": s.SERVER_URL + "/package"}, {current_package.name: current_package.url}, {"Pathway": current_package.url + "/pathway"}, {current_pathway.name: current_pathway.url}, ] return render(request, "objects/pathway.html", context) # return render(request, 'pathway_playground2.html', context) elif request.method == "POST": if hidden := request.POST.get("hidden", None): if hidden == "delete": current_pathway.delete() return redirect(current_package.url + "/pathway") else: return HttpResponseBadRequest() if "selected-scenarios" in request.POST: selected_scenarios = request.POST.getlist("selected-scenarios") set_scenarios(current_user, current_pathway, selected_scenarios) return redirect(current_pathway.url) if "aliases" in request.POST: aliases = request.POST.getlist("aliases") try: set_aliases(current_user, current_pathway, aliases) except Exception as e: return JsonResponse({"error": str(e)}, status=400) return JsonResponse({"success": current_pathway.url}) # Clean for potential XSS pathway_name = nh3.clean(request.POST.get("pathway-name"), tags=s.ALLOWED_HTML_TAGS).strip() pathway_description = nh3.clean(request.POST.get("pathway-description"), tags=s.ALLOWED_HTML_TAGS).strip() if any([pathway_name, pathway_description]): if pathway_name is not None and pathway_name.strip() != "": pathway_name = pathway_name.strip() current_pathway.name = pathway_name if pathway_description is not None and pathway_description.strip() != "": pathway_description = pathway_description.strip() current_pathway.description = pathway_description current_pathway.save() return redirect(current_pathway.url) node_url = request.POST.get("node") if node_url: n = current_pathway.get_node(node_url) from .tasks import predict # Dont delay? predict(current_pathway.pk, current_pathway.setting.pk, node_pk=n.pk) return JsonResponse({"success": current_pathway.url}) return HttpResponseBadRequest() else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_pathway_nodes(request, package_uuid, pathway_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - {current_pathway.name} - Nodes" context["meta"]["current_package"] = current_package context["object_type"] = "node" context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Package": s.SERVER_URL + "/package"}, {current_package.name: current_package.url}, {"Pathway": current_package.url + "/pathway"}, {current_pathway.name: current_pathway.url}, {"Node": current_pathway.url + "/node"}, ] reviewed_node_qs = Node.objects.none() unreviewed_node_qs = Node.objects.none() if current_package.reviewed: reviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by("name") else: unreviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by("name") if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in ( reviewed_node_qs if current_package.reviewed else unreviewed_node_qs ) ] } ) context["reviewed_objects"] = reviewed_node_qs context["unreviewed_objects"] = unreviewed_node_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": # Clean for potential XSS node_name = nh3.clean(request.POST.get("node-name"), tags=s.ALLOWED_HTML_TAGS).strip() node_description = nh3.clean(request.POST.get("node-description"), tags=s.ALLOWED_HTML_TAGS).strip() node_smiles = request.POST.get("node-smiles").strip() current_pathway.add_node(node_smiles, name=node_name, description=node_description) return redirect(current_pathway.url) else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_pathway_node(request, package_uuid, pathway_uuid, node_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid) current_node = Node.objects.get(pathway=current_pathway, uuid=node_uuid) if request.method == "GET": is_image_request = request.GET.get("image") if is_image_request: if is_image_request == "svg": svg_data = current_node.as_svg return HttpResponse(svg_data, content_type="image/svg+xml") context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - {current_pathway.name}" context["meta"]["current_package"] = current_package context["object_type"] = "pathway" context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Package": s.SERVER_URL + "/package"}, {current_package.name: current_package.url}, {"Pathway": current_package.url + "/pathway"}, {current_pathway.name: current_pathway.url}, {"Node": current_pathway.url + "/node"}, {current_node.name: current_node.url}, ] context["node"] = current_node context["current_object"] = current_node context["app_domain_assessment_data"] = json.dumps( current_node.get_app_domain_assessment_data() ) return render(request, "objects/node.html", context) elif request.method == "POST": log_post_params(request) if hidden := request.POST.get("hidden", None): if hidden == "delete": # pre_delete signal will take care of edge deletion current_node.delete() return redirect(current_pathway.url) else: return HttpResponseBadRequest() if "selected-scenarios" in request.POST: selected_scenarios = request.POST.getlist("selected-scenarios") set_scenarios(current_user, current_node, selected_scenarios) return redirect(current_node.url) if "aliases" in request.POST: aliases = request.POST.getlist("aliases") try: set_aliases(current_user, current_node, aliases) except Exception as e: return JsonResponse({"error": str(e)}, status=400) return JsonResponse({"success": current_node.url}) return HttpResponseBadRequest() else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_pathway_edges(request, package_uuid, pathway_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - {current_pathway.name} - Edges" context["meta"]["current_package"] = current_package context["object_type"] = "edge" context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Package": s.SERVER_URL + "/package"}, {current_package.name: current_package.url}, {"Pathway": current_package.url + "/pathway"}, {current_pathway.name: current_pathway.url}, {"Edge": current_pathway.url + "/edge"}, ] reviewed_edge_qs = Edge.objects.none() unreviewed_edge_qs = Edge.objects.none() if current_package.reviewed: reviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by("name") else: unreviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by("name") if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in ( reviewed_edge_qs if current_package.reviewed else unreviewed_edge_qs ) ] } ) context["reviewed_objects"] = reviewed_edge_qs context["unreviewed_objects"] = unreviewed_edge_qs return render(request, "collections/objects_list.html", context) elif request.method == "POST": log_post_params(request) # Clean for potential XSS edge_name = nh3.clean(request.POST.get("edge-name"), tags=s.ALLOWED_HTML_TAGS).strip() edge_description = nh3.clean(request.POST.get("edge-description"), tags=s.ALLOWED_HTML_TAGS).strip() edge_substrates = request.POST.getlist("edge-substrates") edge_products = request.POST.getlist("edge-products") substrate_nodes = [current_pathway.get_node(url) for url in edge_substrates] product_nodes = [current_pathway.get_node(url) for url in edge_products] # TODO in the future consider Rules here? current_pathway.add_edge( substrate_nodes, product_nodes, name=edge_name, description=edge_description ) return redirect(current_pathway.url) else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid) current_edge = Edge.objects.get(pathway=current_pathway, uuid=edge_uuid) if request.method == "GET": is_image_request = request.GET.get("image") if is_image_request: if is_image_request == "svg": svg_data = current_edge.as_svg return HttpResponse(svg_data, content_type="image/svg+xml") context = get_base_context(request) context["title"] = ( f"enviPath - {current_package.name} - {current_pathway.name} - {current_edge.edge_label.name}" ) context["meta"]["current_package"] = current_package context["object_type"] = "edge" context["breadcrumbs"] = breadcrumbs( current_package, "pathway", current_pathway, "edge", current_edge ) context["edge"] = current_edge context["current_object"] = current_edge return render(request, "objects/edge.html", context) elif request.method == "POST": log_post_params(request) if hidden := request.POST.get("hidden", None): if hidden == "delete": current_edge.delete() return redirect(current_pathway.url) if "selected-scenarios" in request.POST: selected_scenarios = request.POST.getlist("selected-scenarios") set_scenarios(current_user, current_edge, selected_scenarios) return redirect(current_edge.url) if "aliases" in request.POST: aliases = request.POST.getlist("aliases") try: set_aliases(current_user, current_edge, aliases) except Exception as e: return JsonResponse({"error": str(e)}, status=400) return JsonResponse({"success": current_edge.url}) return HttpResponseBadRequest() else: return HttpResponseNotAllowed(["GET", "POST"]) @package_permission_required() def package_scenarios(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == "GET": if "application/json" in request.META.get("HTTP_ACCEPT") and not request.GET.get( "all", False ): scens = Scenario.objects.filter(package=current_package).order_by("name") res = [{"name": s_.name, "url": s_.url, "uuid": s_.uuid} for s_ in scens] return JsonResponse(res, safe=False) context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - Scenarios" context["meta"]["current_package"] = current_package context["object_type"] = "scenario" context["breadcrumbs"] = breadcrumbs(current_package, "scenario") reviewed_scenario_qs = Scenario.objects.none() unreviewed_scenario_qs = Scenario.objects.none() if current_package.reviewed: reviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by("name") else: unreviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by( "name" ) if request.GET.get("all"): return JsonResponse( { "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in ( reviewed_scenario_qs if current_package.reviewed else unreviewed_scenario_qs ) ] } ) context["reviewed_objects"] = reviewed_scenario_qs context["unreviewed_objects"] = unreviewed_scenario_qs from envipy_additional_information import ( SLUDGE_ADDITIONAL_INFORMATION, SOIL_ADDITIONAL_INFORMATION, SEDIMENT_ADDITIONAL_INFORMATION, ) context["scenario_types"] = { "Soil Data": { "name": "soil", "widgets": [ HTMLGenerator.generate_html(ai, prefix=f"soil_{0}") for ai in [x for sv in SOIL_ADDITIONAL_INFORMATION.values() for x in sv] ], }, "Sludge Data": { "name": "sludge", "widgets": [ HTMLGenerator.generate_html(ai, prefix=f"sludge_{0}") for ai in [x for sv in SLUDGE_ADDITIONAL_INFORMATION.values() for x in sv] ], }, "Water-Sediment System Data": { "name": "sediment", "widgets": [ HTMLGenerator.generate_html(ai, prefix=f"sediment_{0}") for ai in [x for sv in SEDIMENT_ADDITIONAL_INFORMATION.values() for x in sv] ], }, } context["sludge_additional_information"] = SLUDGE_ADDITIONAL_INFORMATION context["soil_additional_information"] = SOIL_ADDITIONAL_INFORMATION context["sediment_additional_information"] = SEDIMENT_ADDITIONAL_INFORMATION return render(request, "collections/objects_list.html", context) elif request.method == "POST": log_post_params(request) # Clean for potential XSS scenario_name = nh3.clean(request.POST.get("scenario-name"), tags=s.ALLOWED_HTML_TAGS).strip() scenario_description = nh3.clean(request.POST.get("scenario-description"), tags=s.ALLOWED_HTML_TAGS).strip() scenario_date_year = request.POST.get("scenario-date-year") scenario_date_month = request.POST.get("scenario-date-month") scenario_date_day = request.POST.get("scenario-date-day") scenario_date = scenario_date_year if scenario_date_month is not None and scenario_date_month.strip() != "": scenario_date += f"-{int(scenario_date_month):02d}" if scenario_date_day is not None and scenario_date_day.strip() != "": scenario_date += f"-{int(scenario_date_day):02d}" scenario_type = request.POST.get("scenario-type") additional_information = HTMLGenerator.build_models(clean_dict(request.POST.dict())) additional_information = [x for sv in additional_information.values() for x in sv] new_scen = Scenario.create( current_package, name=scenario_name, description=scenario_description, scenario_date=scenario_date, scenario_type=scenario_type, additional_information=additional_information, ) return redirect(new_scen.url) else: return HttpResponseNotAllowed( [ "GET", ] ) @package_permission_required() def package_scenario(request, package_uuid, scenario_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_scenario = Scenario.objects.get(package=current_package, uuid=scenario_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = f"enviPath - {current_package.name} - {current_scenario.name}" context["meta"]["current_package"] = current_package context["object_type"] = "scenario" context["breadcrumbs"] = breadcrumbs(current_package, "scenario", current_scenario) context["scenario"] = current_scenario available_add_infs = [] for add_inf in NAME_MAPPING.values(): available_add_infs.append( { "display_name": add_inf.property_name(None), "name": add_inf.__name__, "widget": HTMLGenerator.generate_html(add_inf, prefix=f"{0}"), } ) context["available_additional_information"] = available_add_infs context["update_widgets"] = [ HTMLGenerator.generate_html(ai, prefix=f"{i}") for i, ai in enumerate(current_scenario.get_additional_information()) ] return render(request, "objects/scenario.html", context) elif request.method == "POST": log_post_params(request) if hidden := request.POST.get("hidden", None): if hidden == "delete": current_scenario.delete() return redirect(current_package.url + "/scenario") elif hidden == "delete-additional-information": uuid = request.POST.get("uuid") current_scenario.remove_additional_information(uuid) return redirect(current_scenario.url) elif hidden == "delete-all-additional-information": current_scenario.additional_information = dict() current_scenario.save() return redirect(current_scenario.url) elif hidden == "set-additional-information": post_dict = clean_dict(request.POST.dict()) # Clean post dict inputs for potential XSS ais = HTMLGenerator.build_models(post_dict) if s.DEBUG: logger.info(ais) current_scenario.set_additional_information(ais) return redirect(current_scenario.url) elif hidden == "add-additional-information": post_dict = clean_dict(request.POST.dict()) # Clean post dict inputs for potential XSS ais = HTMLGenerator.build_models(post_dict) if len(ais.keys()) != 1: raise ValueError( "Only one additional information field can be added at a time." ) ai = list(ais.values())[0][0] if s.DEBUG: logger.info(ais) current_scenario.add_additional_information(ai) return redirect(current_scenario.url) else: return HttpResponseBadRequest() else: return HttpResponseBadRequest() else: return HttpResponseNotAllowed(["GET", "POST"]) ############## # User/Group # ############## def users(request): if request.method == "GET": context = get_base_context(request) context["title"] = "enviPath - Users" context["object_type"] = "user" context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"User": s.SERVER_URL + "/user"}, ] context["objects"] = get_user_model().objects.all() return render(request, "collections/objects_list.html", context) else: return HttpResponseNotAllowed(["GET"]) def user(request, user_uuid): current_user = _anonymous_or_real(request) if request.method == "GET": # Check if current user is the one matching to the url if str(current_user.uuid) != user_uuid and not current_user.is_superuser: return HttpResponseBadRequest() requested_user = UserManager.get_user_by_id(current_user, user_uuid) context = get_base_context(request, for_user=requested_user) context["title"] = "enviPath - User" context["object_type"] = "user" context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"User": s.SERVER_URL + "/user"}, {current_user.username: requested_user.url}, ] context["user"] = requested_user model_qs = EPModel.objects.none() for p in PackageManager.get_all_readable_packages(requested_user, include_reviewed=True): model_qs |= p.models context["models"] = model_qs context["tokens"] = APIToken.objects.filter(user=requested_user) return render(request, "objects/user.html", context) elif request.method == "POST": is_hidden_method = bool(request.POST.get("hidden", False)) if is_hidden_method and request.POST["hidden"] == "request-api-token": name = request.POST.get("name", "No Name") valid_for = min(max(int(request.POST.get("valid-for", 90)), 1), 90) token, raw_token = APIToken.create_token(request.user, name=name, valid_for=valid_for) return JsonResponse( {"raw_token": raw_token, "token": {"id": token.id, "name": token.name}} ) if is_hidden_method and request.POST["hidden"] == "delete": token_id = request.POST.get("token-id") if token_id is None: return HttpResponseBadRequest("Token ID missing!") try: APIToken.objects.get(user=current_user, id=token_id).delete() except APIToken.DoesNotExist: return HttpResponseBadRequest("User and Token ID combination invalid!") return HttpResponse("success") default_package = request.POST.get("default-package") default_group = request.POST.get("default-group") default_prediction_setting = request.POST.get("default-prediction-setting") if any([default_package, default_group, default_prediction_setting]): current_user.default_package = PackageManager.get_package_by_url( current_user, default_package ) current_user.default_group = GroupManager.get_group_by_url(current_user, default_group) current_user.default_setting = SettingManager.get_setting_by_url( current_user, default_prediction_setting ) current_user.save() return redirect(current_user.url) return HttpResponseBadRequest() else: return HttpResponseNotAllowed(["GET", "POST"]) def groups(request): current_user = _anonymous_or_real(request) if request.method == "GET": context = get_base_context(request) context["title"] = "enviPath - Groups" context["object_type"] = "group" context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Group": s.SERVER_URL + "/group"}, ] context["objects"] = Group.objects.all() return render(request, "collections/objects_list.html", context) elif request.method == "POST": # Clean for potential XSS group_name = nh3.clean(request.POST.get("group-name"), tags=s.ALLOWED_HTML_TAGS).strip() group_description = nh3.clean(request.POST.get("group-description", s.DEFAULT_VALUES["description"]), tags=s.ALLOWED_HTML_TAGS).strip() g = GroupManager.create_group(current_user, group_name, group_description) return redirect(g.url) else: return HttpResponseNotAllowed(["GET", "POST"]) def group(request, group_uuid): current_user = _anonymous_or_real(request) current_group = GroupManager.get_group_by_id(current_user, group_uuid) if request.method == "GET": context = get_base_context(request) context["title"] = f"enviPath - {current_group.name}" context["object_type"] = "group" context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Group": s.SERVER_URL + "/group"}, {current_group.name: current_group.url}, ] context["group"] = current_group context["users"] = UserManager.get_users_lp().exclude( id__in=current_group.user_member.all() ) context["groups"] = ( GroupManager.get_groups_lp() .exclude(id__in=current_group.group_member.all()) .exclude(id=current_group.pk) ) context["packages"] = Package.objects.filter( id__in=GroupPackagePermission.objects.filter(group=current_group) .values("package") .distinct() ) return render(request, "objects/group.html", context) elif request.method == "POST": log_post_params(request) if hidden := request.POST.get("hidden", None): if hidden == "delete": current_group.delete() return redirect(s.SERVER_URL + "/group") else: return HttpResponseBadRequest() member_url = request.POST.get("member") action = request.POST.get("action") if all([member_url, action]) and action in ["add", "remove"]: if "user" in member_url: member = UserManager.get_user_lp(member_url) else: member = GroupManager.get_group_lp(member_url) GroupManager.update_members(current_user, current_group, member, action) return redirect(current_group.url) else: return HttpResponseNotAllowed(["GET", "POST"]) def settings(request): current_user = _anonymous_or_real(request) context = get_base_context(request) if request.method == "GET": context["object_type"] = "setting" # Even if settings are aready in "meta", for consistency add it on root level context["settings"] = SettingManager.get_all_settings(current_user) context["breadcrumbs"] = [ {"Home": s.SERVER_URL}, {"Group": s.SERVER_URL + "/setting"}, ] return elif request.method == "POST": if s.DEBUG: for k, v in request.POST.items(): logger.info("Parameters received:") logger.info(f"{k}\t{v}") # Clean for potential XSS name = nh3.clean(request.POST.get("prediction-setting-name"), tags=s.ALLOWED_HTML_TAGS).strip() description = nh3.clean(request.POST.get("prediction-setting-description"), tags=s.ALLOWED_HTML_TAGS).strip() new_default = request.POST.get("prediction-setting-new-default", "off") == "on" max_nodes = min( max( int(request.POST.get("prediction-setting-max-nodes", 1)), s.DEFAULT_MAX_NUMBER_OF_NODES, ), s.DEFAULT_MAX_NUMBER_OF_NODES, ) max_depth = min( max(int(request.POST.get("prediction-setting-max-depth", 1)), s.DEFAULT_MAX_DEPTH), s.DEFAULT_MAX_DEPTH, ) tp_gen_method = request.POST.get("tp-generation-method") params = {} if tp_gen_method == "model-based-prediction-setting": model_url = request.POST.get("model-based-prediction-setting-model") model_uuid = model_url.split("/")[-1] params["model"] = EPModel.objects.get(uuid=model_uuid) params["model_threshold"] = request.POST.get( "model-based-prediction-setting-threshold", s.DEFAULT_MODEL_THRESHOLD ) if not PackageManager.readable(current_user, params["model"].package): raise ValueError("") elif tp_gen_method == "rule-based-prediction-setting": rule_packages = request.POST.getlist("rule-based-prediction-setting-packages") params["rule_packages"] = [ PackageManager.get_package_by_url(current_user, p) for p in rule_packages ] else: raise ValueError("") created_setting = SettingManager.create_setting( current_user, name=name, description=description, max_nodes=max_nodes, max_depth=max_depth, **params, ) if new_default: current_user.default_setting = created_setting current_user.save() return HttpResponse("Success!") else: return HttpResponseNotAllowed(["GET", "POST"]) def setting(request, setting_uuid): pass ########### # KETCHER # ########### def indigo(request): from indigo import Indigo return JsonResponse({"Indigo": {"version": Indigo().version()}}) @csrf_exempt def aromatize(request): if request.method == "POST": data = json.loads(request.body) mol_data = data.get("struct") aromatized = IndigoUtils.aromatize(mol_data, False) return JsonResponse({"struct": aromatized}) else: return HttpResponseBadRequest() @csrf_exempt def dearomatize(request): if request.method == "POST": data = json.loads(request.body) mol_data = data.get("struct") dearomatized = IndigoUtils.dearomatize(mol_data, False) return JsonResponse({"struct": dearomatized}) else: return HttpResponseBadRequest() @csrf_exempt def layout(request): if request.method == "POST": data = json.loads(request.body) mol_data = data.get("struct") lay = IndigoUtils.layout(mol_data) return JsonResponse({"struct": lay}) else: return HttpResponseBadRequest() ########################## # Generic/Non-Persistent # ########################## def depict(request): if smiles := request.GET.get("smiles"): return HttpResponse(IndigoUtils.mol_to_svg(smiles), content_type="image/svg+xml") elif smirks := request.GET.get("smirks"): query_smirks = request.GET.get("is_query_smirks", False) == "true" return HttpResponse( IndigoUtils.smirks_to_svg(smirks, query_smirks), content_type="image/svg+xml" ) else: return HttpResponseBadRequest() @protected_resource() def userinfo(request): user = request.resource_owner res = { "sub": str(user.uuid), "email": user.email, "username": user.username, "name": user.get_full_name() or user.username, "email_verified": user.is_active, } return JsonResponse(res)