import json import logging from typing import List, Dict, Any from django.conf import settings as s from django.contrib.auth import get_user_model from django.http import JsonResponse, HttpResponse, HttpResponseNotAllowed, HttpResponseBadRequest from django.shortcuts import render, redirect from django.views.decorators.csrf import csrf_exempt from envipy_additional_information import NAME_MAPPING from utilities.chem import FormatConverter, IndigoUtils from utilities.decorators import package_permission_required from utilities.misc import HTMLGenerator from .logic import GroupManager, PackageManager, UserManager, SettingManager, SearchManager, EPDBURLParser from .models import Package, GroupPackagePermission, Group, CompoundStructure, Compound, Reaction, Rule, Pathway, Node, \ EPModel, EnviFormer, MLRelativeReasoning, RuleBaseRelativeReasoning, Scenario, SimpleAmbitRule, APIToken, \ UserPackagePermission, Permission, License, User, Edge logger = logging.getLogger(__name__) def log_post_params(request): if s.DEBUG: for k, v in request.POST.items(): logger.debug(f"{k}\t{v}") def error(request, message: str, detail: str, code: int = 400): context = get_base_context(request) error_context = { 'error_message': message, 'error_detail': detail, } if request.headers.get('Accept') == 'application/json': return JsonResponse(error_context, status=500) context.update(**error_context) return render(request, "errors/error.html", context, status=code) def login(request): current_user = _anonymous_or_real(request) context = get_base_context(request) if request.method == 'GET': context['title'] = 'enviPath' context['next'] = request.GET.get('next', '') return render(request, 'login.html', context) elif request.method == 'POST': is_login = bool(request.POST.get('login', False)) is_register = bool(request.POST.get('register', False)) if is_login: from django.contrib.auth import authenticate from django.contrib.auth import login username = request.POST.get('username') password = request.POST.get('password') # Get email for username and check if the account is active try: temp_user = get_user_model().objects.get(username=username) if not temp_user.is_active: context['message'] = "User account is not activated yet!" return render(request, 'login.html', context) email = temp_user.email except get_user_model().DoesNotExist: context['message'] = "Login failed!" return render(request, 'login.html', context) try: user = authenticate(username=email, password=password) except Exception as e: context['message'] = "Login failed!" return render(request, 'login.html', context) if user is not None: login(request, user) if next := request.POST.get('next'): return redirect(next) return redirect(s.SERVER_URL) else: context['message'] = "Login failed!" return render(request, 'login.html', context) elif is_register: username = request.POST.get('username') email = request.POST.get('email') password = request.POST.get('password', '').strip() rpassword = request.POST.get('rpassword', '').strip() if password != rpassword or password == '': context['message'] = "Registration failed, provided passwords differ!" return render(request, 'login.html', context) try: u = UserManager.create_user(username, email, password) except Exception: context['message'] = "Registration failed! Couldn't create User Account." return render(request, 'login.html', context) if s.ADMIN_APPROVAL_REQUIRED: context['message'] = "Your account has been created! An admin will activate it soon!" else: context['message'] = "Account has been created! You'll receive a mail to activate your account shortly." return render(request, 'login.html', context) else: return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', 'POST']) def logout(request): if request.method == 'POST': is_logout = bool(request.POST.get('logout', False)) if is_logout: from django.contrib.auth import logout logout(request) return redirect(s.SERVER_URL) return HttpResponseBadRequest() def editable(request, user): if user.is_superuser: return True url = request.build_absolute_uri(request.path) if PackageManager.is_package_url(url): _package = PackageManager.get_package_lp(request.build_absolute_uri()) return PackageManager.writable(user, _package) elif GroupManager.is_group_url(url): _group = GroupManager.get_group_lp(request.build_absolute_uri()) return GroupManager.writable(user, _group) elif UserManager.is_user_url(url): _user = UserManager.get_user_lp(request.build_absolute_uri()) return UserManager.writable(user, _user) elif url in [s.SERVER_URL, f"{s.SERVER_URL}/", f"{s.SERVER_URL}/package", f"{s.SERVER_URL}/user", f"{s.SERVER_URL}/group", f"{s.SERVER_URL}/search"]: return True else: logger.debug(f"Unknown url: {url}") return False def get_base_context(request, for_user=None) -> Dict[str, Any]: current_user = _anonymous_or_real(request) can_edit = editable(request, current_user) parser = EPDBURLParser(request.build_absolute_uri(request.path)) url_contains_package = False if parser.contains_package_url() or parser.is_package_url(): url_contains_package = True if for_user: current_user = for_user ctx = { 'title': 'enviPath', 'meta': { 'version': '0.0.1', 'server_url': s.SERVER_URL, 'user': current_user, 'can_edit': can_edit, 'url_contains_package': url_contains_package, 'readable_packages': PackageManager.get_all_readable_packages(current_user, include_reviewed=True), 'writeable_packages': PackageManager.get_all_writeable_packages(current_user), 'available_groups': GroupManager.get_groups(current_user), 'available_settings': SettingManager.get_all_settings(current_user), 'enabled_features': s.FLAGS, 'debug': s.DEBUG, }, } return ctx def _anonymous_or_real(request): if request.user.is_authenticated and not request.user.is_anonymous: return request.user return get_user_model().objects.get(username='anonymous') def breadcrumbs(first_level_object=None, second_level_namespace=None, second_level_object=None, third_level_namespace=None, third_level_object=None) -> List[Dict[str, str]]: bread = [ {'Home': s.SERVER_URL}, {'Package': s.SERVER_URL + '/package'}, ] if first_level_object is not None: bread.append({first_level_object.name: first_level_object.url}) if second_level_namespace is not None: bread.append({f'{second_level_namespace}'.capitalize(): first_level_object.url + f'/{second_level_namespace}'}) if second_level_object is not None: bread.append({second_level_object.name: second_level_object.url}) if third_level_namespace is not None: bread.append({f'{third_level_namespace}'.capitalize(): second_level_object.url + f'/{third_level_namespace}'}) if third_level_object is not None: bread.append({third_level_object.name: third_level_object.url}) return bread def set_scenarios(current_user, attach_object, scenario_urls: List[str]): scens = [] for scenario_url in scenario_urls: package = PackageManager.get_package_by_url(current_user, scenario_url) scen = Scenario.objects.get(package=package, uuid=scenario_url.split('/')[-1]) scens.append(scen) attach_object.set_scenarios(scens) def copy_object(current_user, target_package: 'Package', source_object_url: str): # Ensures that source is readable source_package = PackageManager.get_package_by_url(current_user, source_object_url) parser = EPDBURLParser(source_object_url) # if the url won't contain a package or is a plain package if not parser.contains_package_url(): raise ValueError(f"Object {source_object_url} can't be copied!") # Gets the most specific object source_object = parser.get_object() if hasattr(source_object, 'copy'): mapping = dict() copy = source_object.copy(target_package, mapping) if s.DEBUG: for k, v in mapping.items(): logger.debug(f"Mapping {k.url} to {v.url}") return copy raise ValueError(f"Object {source_object} can't be copied!") def index(request): context = get_base_context(request) context['title'] = 'enviPath - Home' context['meta']['current_package'] = context['meta']['user'].default_package if request.GET.get('getMLServerPath', False): return JsonResponse({"mlServerPath": s.SERVER_URL}) return render(request, 'index/index.html', context) def packages(request): current_user = _anonymous_or_real(request) if request.method == 'GET': context = get_base_context(request) context['title'] = 'enviPath - Packages' context['object_type'] = 'package' context['meta']['current_package'] = context['meta']['user'].default_package context['meta']['can_edit'] = True reviewed_package_qs = Package.objects.filter(reviewed=True).order_by('created') unreviewed_package_qs = PackageManager.get_all_readable_packages(current_user).order_by('name') context['reviewed_objects'] = reviewed_package_qs context['unreviewed_objects'] = unreviewed_package_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': hidden = request.POST.get('hidden', None) if hidden is not None: pass else: package_name = request.POST.get('package-name') package_description = request.POST.get('package-description', s.DEFAULT_VALUES['description']) created_package = PackageManager.create_package(current_user, package_name, package_description) return redirect(created_package.url) elif request.method == 'OPTIONS': response = HttpResponse() response['allow'] = ','.join(['GET', 'POST']) return response else: return HttpResponseNotAllowed(['GET', 'POST']) def compounds(request): if request.method == 'GET': context = get_base_context(request) context['title'] = 'enviPath - Compounds' context['object_type'] = 'compound' context['meta']['current_package'] = context['meta']['user'].default_package reviewed_compound_qs = Compound.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_compound_qs |= Compound.objects.filter(package=p) reviewed_compound_qs = reviewed_compound_qs.order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": True} for pw in reviewed_compound_qs ] }) context['reviewed_objects'] = reviewed_compound_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': # delegate to default package current_user = _anonymous_or_real(request) default_package = current_user.default_package return package_compounds(request, default_package.uuid) else: return HttpResponseNotAllowed(['GET', 'POST']) def rules(request): if request.method == 'GET': context = get_base_context(request) context['title'] = 'enviPath - Rules' context['object_type'] = 'rule' context['meta']['current_package'] = context['meta']['user'].default_package context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Rule': s.SERVER_URL + '/rule'}, ] reviewed_rule_qs = Rule.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_rule_qs |= Rule.objects.filter(package=p) reviewed_rule_qs = reviewed_rule_qs.order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": True} for pw in reviewed_rule_qs ] }) context['reviewed_objects'] = reviewed_rule_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': # delegate to default package current_user = _anonymous_or_real(request) default_package = current_user.default_package return package_rules(request, default_package.uuid) else: return HttpResponseNotAllowed(['GET', 'POST']) def reactions(request): if request.method == 'GET': context = get_base_context(request) context['title'] = 'enviPath - Reactions' context['object_type'] = 'reaction' context['meta']['current_package'] = context['meta']['user'].default_package context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Reaction': s.SERVER_URL + '/reaction'}, ] reviewed_reaction_qs = Reaction.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_reaction_qs |= Reaction.objects.filter(package=p).order_by('name') reviewed_reaction_qs = reviewed_reaction_qs.order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": True} for pw in reviewed_reaction_qs ] }) context['reviewed_objects'] = reviewed_reaction_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': # delegate to default package current_user = _anonymous_or_real(request) default_package = current_user.default_package return package_reactions(request, default_package.uuid) else: return HttpResponseNotAllowed(['GET', 'POST']) def pathways(request): if request.method == 'GET': context = get_base_context(request) context['title'] = 'enviPath - Pathways' context['object_type'] = 'pathway' context['meta']['current_package'] = context['meta']['user'].default_package context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Pathway': s.SERVER_URL + '/pathway'}, ] reviewed_pathway_qs = Pathway.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_pathway_qs |= Pathway.objects.filter(package=p).order_by('name') reviewed_pathway_qs = reviewed_pathway_qs.order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": True} for pw in reviewed_pathway_qs ] }) context['reviewed_objects'] = reviewed_pathway_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': # delegate to default package current_user = _anonymous_or_real(request) default_package = current_user.default_package return package_pathways(request, default_package.uuid) else: return HttpResponseNotAllowed(['GET', 'POST']) def scenarios(request): if request.method == 'GET': context = get_base_context(request) context['title'] = 'enviPath - Scenarios' context['object_type'] = 'scenario' context['meta']['current_package'] = context['meta']['user'].default_package context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Scenario': s.SERVER_URL + '/scenario'}, ] reviewed_scenario_qs = Scenario.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_scenario_qs |= Scenario.objects.filter(package=p).order_by('name') reviewed_scenario_qs = reviewed_scenario_qs.order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": s.name, "url": s.url, "reviewed": True} for s in reviewed_scenario_qs ] }) context['reviewed_objects'] = reviewed_scenario_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': # delegate to default package default_package = request.user.default_package return package_scenarios(request, default_package.uuid) else: return HttpResponseNotAllowed(['GET', 'POST']) def models(request): if request.method == 'GET': context = get_base_context(request) context['title'] = 'enviPath - Models' context['object_type'] = 'model' context['meta']['current_package'] = context['meta']['user'].default_package context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Model': s.SERVER_URL + '/model'}, ] context['model_types'] = { 'ML Relative Reasoning': 'ml-relative-reasoning', 'Rule Based Relative Reasoning': 'rule-based-relative-reasoning', 'EnviFormer': 'enviformer', } for k, v in s.CLASSIFIER_PLUGINS.items(): context['model_types'][v.display()] = k reviewed_model_qs = EPModel.objects.none() for p in PackageManager.get_reviewed_packages(): reviewed_model_qs |= EPModel.objects.filter(package=p).order_by('name') reviewed_model_qs = reviewed_model_qs.order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": True} for pw in reviewed_model_qs ] }) context['reviewed_objects'] = reviewed_model_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': current_user = _anonymous_or_real(request) default_package = current_user.default_package return package_models(request, default_package.uuid) else: return HttpResponseNotAllowed(['GET', 'POST']) def search(request): current_user = _anonymous_or_real(request) if request.method == 'GET': package_urls = request.GET.getlist('packages') searchterm = request.GET.get('search') mode = request.GET.get('mode') # add HTTP_ACCEPT check to differentiate between index and ajax call if 'application/json' in request.META.get('HTTP_ACCEPT') and all([searchterm, mode]): if package_urls: packages = [PackageManager.get_package_by_url(current_user, p) for p in package_urls] else: packages = PackageManager.get_reviewed_packages() search_result = SearchManager.search(packages, searchterm, mode) return JsonResponse(search_result, safe=False) context = get_base_context(request) context['title'] = 'enviPath - Search' context['object_type'] = 'model' context['meta']['current_package'] = context['meta']['user'].default_package context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Search': s.SERVER_URL + '/search'}, ] reviewed_package_qs = PackageManager.get_reviewed_packages() unreviewed_package_qs = PackageManager.get_all_readable_packages(current_user) context['reviewed_objects'] = reviewed_package_qs context['unreviewed_objects'] = unreviewed_package_qs if all([searchterm, mode]): if package_urls: packages = [PackageManager.get_package_by_url(current_user, p) for p in package_urls] else: packages = PackageManager.get_reviewed_packages() context['search_result'] = SearchManager.search(packages, searchterm, mode) context['search_result']['searchterm'] = searchterm return render(request, 'search.html', context) else: return HttpResponseNotAllowed(['GET']) @package_permission_required() def package_models(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - Models' context['meta']['current_package'] = current_package context['object_type'] = 'model' context['breadcrumbs'] = breadcrumbs(current_package, 'model') reviewed_model_qs = EPModel.objects.none() unreviewed_model_qs = EPModel.objects.none() if current_package.reviewed: reviewed_model_qs = EPModel.objects.filter(package=current_package).order_by('name') else: unreviewed_model_qs = EPModel.objects.filter(package=current_package).order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in (reviewed_model_qs if current_package.reviewed else unreviewed_model_qs) ] }) context['reviewed_objects'] = reviewed_model_qs context['unreviewed_objects'] = unreviewed_model_qs context['model_types'] = { 'ML Relative Reasoning': 'ml-relative-reasoning', 'Rule Based Relative Reasoning': 'rule-based-relative-reasoning', } if s.FLAGS.get('ENVIFORMER', False): context['model_types']['EnviFormer'] = 'enviformer' if s.FLAGS.get('PLUGINS', False): for k, v in s.CLASSIFIER_PLUGINS.items(): context['model_types'][v.display()] = k return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': log_post_params(request) name = request.POST.get('model-name') description = request.POST.get('model-description') model_type = request.POST.get('model-type') if model_type == 'enviformer': threshold = float(request.POST.get(f'{model_type}-threshold', 0.5)) mod = EnviFormer.create(current_package, name, description, threshold) elif model_type == 'ml-relative-reasoning': threshold = float(request.POST.get(f'{model_type}-threshold', 0.5)) fingerprinter = request.POST.get(f'{model_type}-fingerprinter') rule_packages = request.POST.getlist(f'{model_type}-rule-packages') data_packages = request.POST.getlist(f'{model_type}-data-packages') eval_packages = request.POST.getlist(f'{model_type}-evaluation-packages', []) # get Package objects from urls rule_package_objs = [PackageManager.get_package_by_url(current_user, p) for p in rule_packages] data_package_objs = [PackageManager.get_package_by_url(current_user, p) for p in data_packages] eval_packages_objs = [PackageManager.get_package_by_url(current_user, p) for p in eval_packages] # App Domain related parameters build_ad = request.POST.get('build-app-domain', False) == 'on' num_neighbors = request.POST.get('num-neighbors', 5) reliability_threshold = request.POST.get('reliability-threshold', 0.5) local_compatibility_threshold = request.POST.get('local-compatibility-threshold', 0.5) mod = MLRelativeReasoning.create( package=current_package, name=name, description=description, rule_packages=rule_package_objs, data_packages=data_package_objs, eval_packages=eval_packages_objs, threshold=threshold, # fingerprinter=fingerprinter, build_app_domain=build_ad, app_domain_num_neighbours=num_neighbors, app_domain_reliability_threshold=reliability_threshold, app_domain_local_compatibility_threshold=local_compatibility_threshold, ) from .tasks import build_model build_model.delay(mod.pk) elif model_type == 'rule-base-relative-reasoning': mod = RuleBaseRelativeReasoning() mod.save() else: return error(request, 'Invalid model type.', f'Model type "{model_type}" is not supported."') return redirect(mod.url) else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_model(request, package_uuid, model_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_model = EPModel.objects.get(package=current_package, uuid=model_uuid) if request.method == 'GET': if request.GET.get('classify', False): smiles = request.GET['smiles'] stand_smiles = FormatConverter.standardize(smiles) pred_res = current_model.predict(stand_smiles) res = [] for pr in pred_res: if len(pr) > 0: products = [] for prod_set in pr.product_sets: logger.debug(f"Checking {prod_set}") products.append(tuple([x for x in prod_set])) res.append({ 'products': list(set(products)), 'probability': pr.probability, 'btrule': {k: getattr(pr.rule, k) for k in ['url', 'name']} if pr.rule is not None else None }) return JsonResponse(res, safe=False) elif request.GET.get('app-domain-assessment', False): smiles = request.GET['smiles'] stand_smiles = FormatConverter.standardize(smiles) app_domain_assessment = current_model.app_domain.assess(stand_smiles)[0] return JsonResponse(app_domain_assessment, safe=False) context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - {current_model.name}' context['meta']['current_package'] = current_package context['object_type'] = 'model' context['breadcrumbs'] = breadcrumbs(current_package, 'model', current_model) context['model'] = current_model context['current_object'] = current_model return render(request, 'objects/model.html', context) elif request.method == 'POST': if hidden := request.POST.get('hidden', None): if hidden == 'delete': current_model.delete() return redirect(current_package.url + '/model') else: return HttpResponseBadRequest() else: return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name}' context['meta']['current_package'] = current_package context['object_type'] = 'package' context['breadcrumbs'] = breadcrumbs(current_package) context['package'] = current_package user_perms = UserPackagePermission.objects.filter(package=current_package) users = get_user_model().objects.exclude( id__in=UserPackagePermission.objects.filter(package=current_package).values_list('user_id', flat=True)) group_perms = GroupPackagePermission.objects.filter(package=current_package) groups = Group.objects.exclude( id__in=GroupPackagePermission.objects.filter(package=current_package).values_list('group_id', flat=True)) context['users'] = users context['groups'] = groups context['user_permissions'] = user_perms context['group_permissions'] = group_perms return render(request, 'objects/package.html', context) elif request.method == 'POST': log_post_params(request) if hidden := request.POST.get('hidden', None): if hidden == 'delete': logger.debug(current_package.delete()) return redirect(s.SERVER_URL + '/package') elif hidden == 'publish-package': for g in Group.objects.filter(public=True): PackageManager.update_permissions(current_user, current_package, g, Permission.READ[0]) return redirect(current_package.url) elif hidden == 'copy': object_to_copy = request.POST.get('object_to_copy') if not object_to_copy: return error(request, 'Invalid target package.', 'Please select a target package.') copied_object = copy_object(current_user, current_package, object_to_copy) return JsonResponse({'success': copied_object.url}) else: return HttpResponseBadRequest() new_package_name = request.POST.get('package-name') new_package_description = request.POST.get('package-description') grantee_url = request.POST.get('grantee') read = request.POST.get('read') == 'on' write = request.POST.get('write') == 'on' owner = request.POST.get('owner') == 'on' license = request.POST.get('license') license_link = request.POST.get('license-link') license_image_link = request.POST.get('license-image-link') if new_package_name: current_package.name = new_package_name if new_package_description: current_package.description = new_package_description if any([new_package_name, new_package_description]): current_package.save() return redirect(current_package.url) elif any([grantee_url, read, write, owner]): if 'user' in grantee_url: grantee = UserManager.get_user_lp(grantee_url) else: grantee = GroupManager.get_group_lp(grantee_url) max_perm = None if read: max_perm = Permission.READ[0] if write: max_perm = Permission.WRITE[0] if owner: max_perm = Permission.ALL[0] PackageManager.update_permissions(current_user, current_package, grantee, max_perm) return redirect(current_package.url) elif license is not None: if license == 'no-license': if current_package.license is not None: current_package.license.delete() current_package.license = None current_package.save() return redirect(current_package.url) else: if current_package.license is not None: current_package.license.delete() l = License() l.link = license_link l.image_link = license_image_link l.save() current_package.license = l current_package.save() return redirect(current_package.url) else: return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_compounds(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - Compounds' context['meta']['current_package'] = current_package context['object_type'] = 'compound' context['breadcrumbs'] = breadcrumbs(current_package, 'compound') reviewed_compound_qs = Compound.objects.none() unreviewed_compound_qs = Compound.objects.none() if current_package.reviewed: reviewed_compound_qs = Compound.objects.filter(package=current_package).order_by('name') else: unreviewed_compound_qs = Compound.objects.filter(package=current_package).order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in (reviewed_compound_qs if current_package.reviewed else unreviewed_compound_qs) ] }) context['reviewed_objects'] = reviewed_compound_qs context['unreviewed_objects'] = unreviewed_compound_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': compound_name = request.POST.get('compound-name') compound_smiles = request.POST.get('compound-smiles') compound_description = request.POST.get('compound-description') c = Compound.create(current_package, compound_smiles, compound_name, compound_description) return redirect(c.url) else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_compound(request, package_uuid, compound_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - {current_compound.name}' context['meta']['current_package'] = current_package context['object_type'] = 'compound' context['breadcrumbs'] = breadcrumbs(current_package, 'compound', current_compound) context['compound'] = current_compound context['current_object'] = current_compound return render(request, 'objects/compound.html', context) elif request.method == 'POST': if hidden := request.POST.get('hidden', None): if hidden == 'delete': current_compound.delete() return redirect(current_package.url + '/compound') else: return HttpResponseBadRequest() selected_scenarios = request.POST.getlist('selected-scenarios') if selected_scenarios: set_scenarios(current_user, current_compound, selected_scenarios) return redirect(current_compound.url) new_compound_name = request.POST.get('compound-name') new_compound_description = request.POST.get('compound-description') if new_compound_name: current_compound.name = new_compound_name if new_compound_description: current_compound.description = new_compound_description if any([new_compound_name, new_compound_description]): current_compound.save() return redirect(current_compound.url) else: return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_compound_structures(request, package_uuid, compound_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - {current_compound.name} - Structures' context['meta']['current_package'] = current_package context['object_type'] = 'structure' context['breadcrumbs'] = breadcrumbs(current_package, 'compound', current_compound, 'structure') reviewed_compound_structure_qs = CompoundStructure.objects.none() unreviewed_compound_structure_qs = CompoundStructure.objects.none() if current_package.reviewed: reviewed_compound_structure_qs = current_compound.structures.order_by('name') else: unreviewed_compound_structure_qs = current_compound.structures.order_by('name') context['reviewed_objects'] = reviewed_compound_structure_qs context['unreviewed_objects'] = unreviewed_compound_structure_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': structure_name = request.POST.get('structure-name') structure_smiles = request.POST.get('structure-smiles') structure_description = request.POST.get('structure-description') cs = current_compound.add_structure(structure_smiles, structure_name, structure_description) return redirect(cs.url) else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_compound_structure(request, package_uuid, compound_uuid, structure_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid) current_structure = CompoundStructure.objects.get(compound=current_compound, uuid=structure_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - {current_compound.name} - {current_structure.name}' context['meta']['current_package'] = current_package context['object_type'] = 'structure' context['compound_structure'] = current_structure context['current_object'] = current_structure context['breadcrumbs'] = breadcrumbs(current_package, 'compound', current_compound, 'structure', current_structure) return render(request, 'objects/compound_structure.html', context) elif request.method == 'POST': if hidden := request.POST.get('hidden', None): if hidden == 'delete': # Check if we have to delete the compound as no structure is left if len(current_structure.compound.structures.all()) == 1: # This will delete the structure as well current_compound.delete() return redirect(current_package.url + '/compound') else: if current_structure.normalized_structure: current_compound.delete() return redirect(current_package.url + '/compound') else: if current_compound.default_structure == current_structure: current_structure.delete() current_compound.default_structure = current_compound.structures.all().first() return redirect(current_compound.url + '/structure') else: current_structure.delete() return redirect(current_compound.url + '/structure') else: return HttpResponseBadRequest() selected_scenarios = request.POST.getlist('selected-scenarios') if selected_scenarios: set_scenarios(current_user, current_structure, selected_scenarios) return redirect(current_structure.url) return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', ]) @package_permission_required() def package_rules(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - Rules' context['meta']['current_package'] = current_package context['object_type'] = 'rule' context['breadcrumbs'] = breadcrumbs(current_package, 'rule') reviewed_rule_qs = Rule.objects.none() unreviewed_rule_qs = Rule.objects.none() if current_package.reviewed: reviewed_rule_qs = Rule.objects.filter(package=current_package).order_by('name') else: unreviewed_rule_qs = Rule.objects.filter(package=current_package).order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in (reviewed_rule_qs if current_package.reviewed else unreviewed_rule_qs) ] }) context['reviewed_objects'] = reviewed_rule_qs context['unreviewed_objects'] = unreviewed_rule_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': log_post_params(request) # Generic params rule_name = request.POST.get('rule-name') rule_description = request.POST.get('rule-description') rule_type = request.POST.get('rule-type') params = {} # Obtain parameters as required by rule type if rule_type == 'SimpleAmbitRule': params['smirks'] = request.POST.get('rule-smirks') params['reactant_filter_smarts'] = request.POST.get('rule-reactant-smarts') params['product_filter_smarts'] = request.POST.get('rule-product-smarts') elif rule_type == 'SimpleRDKitRule': params['reaction_smarts'] = request.POST.get('rule-reaction-smarts') elif rule_type == 'ParallelRule': pass elif rule_type == 'SequentialRule': pass else: return HttpResponseBadRequest() r = Rule.create(rule_type=rule_type, package=current_package, name=rule_name, description=rule_description, **params) return redirect(r.url) else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_rule(request, package_uuid, rule_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_rule = Rule.objects.get(package=current_package, uuid=rule_uuid) if request.method == 'GET': context = get_base_context(request) if smiles := request.GET.get('smiles', False): stand_smiles = FormatConverter.standardize(smiles) res = current_rule.apply(stand_smiles) if len(res) > 1: logger.info(f"Rule {current_rule.uuid} returned multiple product sets on {smiles}, picking the first one.") smirks = f"{stand_smiles}>>{'.'.join(sorted(res[0]))}" # Usually the functional groups are a mapping of fg -> count # As we are doing it on the fly here fake a high count to ensure that its properly highlighted educt_functional_groups = {x: 1000 for x in current_rule.reactants_smarts} product_functional_groups = {x: 1000 for x in current_rule.products_smarts} return HttpResponse( IndigoUtils.smirks_to_svg(smirks, False, 0, 0, educt_functional_groups=educt_functional_groups, product_functional_groups=product_functional_groups), content_type='image/svg+xml') context['title'] = f'enviPath - {current_package.name} - {current_rule.name}' context['meta']['current_package'] = current_package context['object_type'] = 'rule' context['breadcrumbs'] = breadcrumbs(current_package, 'rule', current_rule) context['rule'] = current_rule context['current_object'] = current_rule if isinstance(current_rule, SimpleAmbitRule): return render(request, 'objects/simple_rule.html', context) else: # isinstance(current_rule, ParallelRule) or isinstance(current_rule, SequentialRule): return render(request, 'objects/composite_rule.html', context) elif request.method == 'POST': if hidden := request.POST.get('hidden', None): if hidden == 'delete': current_rule.delete() return redirect(current_package.url + '/rule') else: return HttpResponseBadRequest() selected_scenarios = request.POST.getlist('selected-scenarios') if selected_scenarios: set_scenarios(current_user, current_rule, selected_scenarios) return redirect(current_rule.url) rule_name = request.POST.get('rule-name', '').strip() rule_description = request.POST.get('rule-description', '').strip() if rule_name: current_rule.name = rule_name if rule_description: current_rule.description = rule_description if any([rule_name, rule_description]): current_rule.save() return redirect(current_rule.url) else: return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_reactions(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - {current_package.name} - Reactions' context['meta']['current_package'] = current_package context['object_type'] = 'reaction' context['breadcrumbs'] = breadcrumbs(current_package, 'reaction') reviewed_reaction_qs = Reaction.objects.none() unreviewed_reaction_qs = Reaction.objects.none() if current_package.reviewed: reviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by('name') else: unreviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in (reviewed_reaction_qs if current_package.reviewed else unreviewed_reaction_qs) ] }) context['reviewed_objects'] = reviewed_reaction_qs context['unreviewed_objects'] = unreviewed_reaction_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': reaction_name = request.POST.get('reaction-name') reaction_description = request.POST.get('reaction-description') reactions_smirks = request.POST.get('reaction-smirks') educts = reactions_smirks.split('>>')[0].split('.') products = reactions_smirks.split('>>')[1].split('.') r = Reaction.create(current_package, name=reaction_name, description=reaction_description, educts=educts, products=products) return redirect(r.url) else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_reaction(request, package_uuid, reaction_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_reaction = Reaction.objects.get(package=current_package, uuid=reaction_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - {current_reaction.name}' context['meta']['current_package'] = current_package context['object_type'] = 'reaction' context['breadcrumbs'] = breadcrumbs(current_package, 'reaction', current_reaction) context['reaction'] = current_reaction context['current_object'] = current_reaction return render(request, 'objects/reaction.html', context) elif request.method == 'POST': if hidden := request.POST.get('hidden', None): if hidden == 'delete': current_reaction.delete() return redirect(current_package.url + '/reaction') else: return HttpResponseBadRequest() selected_scenarios = request.POST.getlist('selected-scenarios') if selected_scenarios: set_scenarios(current_user, current_reaction, selected_scenarios) return redirect(current_reaction.url) new_reaction_name = request.POST.get('reaction-name') new_reaction_description = request.POST.get('reaction-description') if new_reaction_name: current_reaction.name = new_reaction_name if new_reaction_description: current_reaction.description = new_reaction_description if any([new_reaction_name, new_reaction_description]): current_reaction.save() return redirect(current_reaction.url) else: return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_pathways(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - Pathways' context['meta']['current_package'] = current_package context['object_type'] = 'pathway' context['breadcrumbs'] = breadcrumbs(current_package, 'pathway') reviewed_pathway_qs = Pathway.objects.none() unreviewed_pathway_qs = Pathway.objects.none() if current_package.reviewed: reviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by('name') else: unreviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in (reviewed_pathway_qs if current_package.reviewed else unreviewed_pathway_qs) ] }) context['reviewed_objects'] = reviewed_pathway_qs context['unreviewed_objects'] = unreviewed_pathway_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': log_post_params(request) name = request.POST.get('name') description = request.POST.get('description') pw_mode = request.POST.get('predict', 'predict') smiles = request.POST.get('smiles') if smiles is None or smiles.strip() == '': return error(request, "Pathway prediction failed!", "Pathway prediction failed due to missing or empty SMILES") smiles = smiles.strip() try: stand_smiles = FormatConverter.standardize(smiles) except ValueError: return error(request, "Pathway prediction failed!", f'Pathway prediction failed as standardization of SMILES "{smiles}" failed!') modes = ['predict', 'build', 'incremental'] if pw_mode not in modes: return error(request, "Pathway prediction failed!", f'Pathway prediction failed as received mode "{pw_mode}" is none of {modes}') prediction_setting = request.POST.get('prediction-setting', None) if prediction_setting: prediction_setting = SettingManager.get_setting_by_url(current_user, prediction_setting) else: prediction_setting = current_user.prediction_settings() pw = Pathway.create(current_package, stand_smiles, name=name, description=description) # set mode pw.kv.update({'mode': pw_mode}) pw.save() if pw_mode == 'predict' or pw_mode == 'incremental': # unlimited pred (will be handled by setting) limit = -1 # For incremental predict first level and return if pw_mode == 'incremental': limit = 1 pw.setting = prediction_setting pw.save() from .tasks import predict predict.delay(pw.pk, prediction_setting.pk, limit=limit) return redirect(pw.url) else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_pathway(request, package_uuid, pathway_uuid): current_user: User = _anonymous_or_real(request) current_package: Package = PackageManager.get_package_by_id(current_user, package_uuid) current_pathway: Pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid) if request.method == 'GET': if request.GET.get("last_modified", False): return JsonResponse({'modified': current_pathway.modified.strftime('%Y-%m-%d %H:%M:%S')}) if request.GET.get("download", False) == "true": filename = f"{current_pathway.name.replace(' ', '_')}_{current_pathway.uuid}.csv" csv_pw = current_pathway.to_csv() response = HttpResponse(csv_pw, content_type='text/csv') response['Content-Disposition'] = f'attachment; filename="{filename}"' return response context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - {current_pathway.name}' context['meta']['current_package'] = current_package context['object_type'] = 'pathway' context['breadcrumbs'] = breadcrumbs(current_package, 'pathway', current_pathway) context['pathway'] = current_pathway context['current_object'] = current_pathway context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Package': s.SERVER_URL + '/package'}, {current_package.name: current_package.url}, {'Pathway': current_package.url + '/pathway'}, {current_pathway.name: current_pathway.url}, ] return render(request, 'objects/pathway.html', context) # return render(request, 'pathway_playground2.html', context) elif request.method == 'POST': if hidden := request.POST.get('hidden', None): if hidden == 'delete': current_pathway.delete() return redirect(current_package.url + '/pathway') else: return HttpResponseBadRequest() selected_scenarios = request.POST.getlist('selected-scenarios') if selected_scenarios: set_scenarios(current_user, current_pathway, selected_scenarios) return redirect(current_pathway.url) pathway_name = request.POST.get('pathway-name') pathway_description = request.POST.get('pathway-description') if any([pathway_name, pathway_description]): if pathway_name is not None and pathway_name.strip() != '': pathway_name = pathway_name.strip() current_pathway.name = pathway_name if pathway_description is not None and pathway_description.strip() != '': pathway_description = pathway_description.strip() current_pathway.description = pathway_description current_pathway.save() return redirect(current_pathway.url) node_url = request.POST.get('node') if node_url: n = current_pathway.get_node(node_url) from .tasks import predict # Dont delay? predict(current_pathway.pk, current_pathway.setting.pk, node_pk=n.pk) return JsonResponse({'success': current_pathway.url}) return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_pathway_nodes(request, package_uuid, pathway_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - {current_pathway.name} - Nodes' context['meta']['current_package'] = current_package context['object_type'] = 'node' context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Package': s.SERVER_URL + '/package'}, {current_package.name: current_package.url}, {'Pathway': current_package.url + '/pathway'}, {current_pathway.name: current_pathway.url}, {'Node': current_pathway.url + '/node'}, ] reviewed_node_qs = Node.objects.none() unreviewed_node_qs = Node.objects.none() if current_package.reviewed: reviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by('name') else: unreviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in (reviewed_node_qs if current_package.reviewed else unreviewed_node_qs) ] }) context['reviewed_objects'] = reviewed_node_qs context['unreviewed_objects'] = unreviewed_node_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': node_name = request.POST.get('node-name') node_description = request.POST.get('node-description') node_smiles = request.POST.get('node-smiles') current_pathway.add_node(node_smiles, name=node_name, description=node_description) return redirect(current_pathway.url) else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_pathway_node(request, package_uuid, pathway_uuid, node_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid) current_node = Node.objects.get(pathway=current_pathway, uuid=node_uuid) if request.method == 'GET': is_image_request = request.GET.get('image') if is_image_request: if is_image_request == 'svg': svg_data = current_node.as_svg return HttpResponse(svg_data, content_type="image/svg+xml") context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - {current_pathway.name}' context['meta']['current_package'] = current_package context['object_type'] = 'pathway' context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Package': s.SERVER_URL + '/package'}, {current_package.name: current_package.url}, {'Pathway': current_package.url + '/pathway'}, {current_pathway.name: current_pathway.url}, {'Node': current_pathway.url + '/node'}, {current_node.name: current_node.url}, ] context['node'] = current_node context['current_object'] = current_node context['app_domain_assessment_data'] = json.dumps(current_node.get_app_domain_assessment_data()) return render(request, 'objects/node.html', context) elif request.method == 'POST': log_post_params(request) if hidden := request.POST.get('hidden', None): if hidden == 'delete': # pre_delete signal will take care of edge deletion current_node.delete() return redirect(current_pathway.url) else: return HttpResponseBadRequest() selected_scenarios = request.POST.getlist('selected-scenarios') if selected_scenarios: set_scenarios(current_user, current_node, selected_scenarios) return redirect(current_node.url) return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_pathway_edges(request, package_uuid, pathway_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - {current_pathway.name} - Edges' context['meta']['current_package'] = current_package context['object_type'] = 'edge' context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Package': s.SERVER_URL + '/package'}, {current_package.name: current_package.url}, {'Pathway': current_package.url + '/pathway'}, {current_pathway.name: current_pathway.url}, {'Edge': current_pathway.url + '/edge'}, ] reviewed_edge_qs = Edge.objects.none() unreviewed_edge_qs = Edge.objects.none() if current_package.reviewed: reviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by('name') else: unreviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in (reviewed_edge_qs if current_package.reviewed else unreviewed_edge_qs) ] }) context['reviewed_objects'] = reviewed_edge_qs context['unreviewed_objects'] = unreviewed_edge_qs return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': log_post_params(request) edge_name = request.POST.get('edge-name') edge_description = request.POST.get('edge-description') edge_substrates = request.POST.getlist('edge-substrates') edge_products = request.POST.getlist('edge-products') substrate_nodes = [current_pathway.get_node(url) for url in edge_substrates] product_nodes = [current_pathway.get_node(url) for url in edge_products] # TODO in the future consider Rules here? current_pathway.add_edge(substrate_nodes, product_nodes, name=edge_name, description=edge_description) return redirect(current_pathway.url) else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid) current_edge = Edge.objects.get(pathway=current_pathway, uuid=edge_uuid) if request.method == 'GET': is_image_request = request.GET.get('image') if is_image_request: if is_image_request == 'svg': svg_data = current_edge.as_svg return HttpResponse(svg_data, content_type="image/svg+xml") context = get_base_context(request) context[ 'title'] = f'enviPath - {current_package.name} - {current_pathway.name} - {current_edge.edge_label.name}' context['meta']['current_package'] = current_package context['object_type'] = 'edge' context['breadcrumbs'] = breadcrumbs(current_package, 'pathway', current_pathway, 'edge', current_edge) context['edge'] = current_edge context['current_object'] = current_edge return render(request, 'objects/edge.html', context) elif request.method == 'POST': log_post_params(request) if hidden := request.POST.get('hidden', None): if hidden == 'delete': current_edge.delete() return redirect(current_pathway.url) selected_scenarios = request.POST.getlist('selected-scenarios') if selected_scenarios: set_scenarios(current_user, current_edge, selected_scenarios) return redirect(current_edge.url) return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', 'POST']) @package_permission_required() def package_scenarios(request, package_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) if request.method == 'GET': if 'application/json' in request.META.get('HTTP_ACCEPT') and not request.GET.get('all', False): scens = Scenario.objects.filter(package=current_package).order_by('name') res = [{'name': s.name, 'url': s.url, 'uuid': s.uuid} for s in scens] return JsonResponse(res, safe=False) context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - Scenarios' context['meta']['current_package'] = current_package context['object_type'] = 'scenario' context['breadcrumbs'] = breadcrumbs(current_package, 'scenario') reviewed_scenario_qs = Scenario.objects.none() unreviewed_scenario_qs = Scenario.objects.none() if current_package.reviewed: reviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by('name') else: unreviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by('name') if request.GET.get('all'): return JsonResponse({ "objects": [ {"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed} for pw in (reviewed_scenario_qs if current_package.reviewed else unreviewed_scenario_qs) ] }) context['reviewed_objects'] = reviewed_scenario_qs context['unreviewed_objects'] = unreviewed_scenario_qs from envipy_additional_information import SLUDGE_ADDITIONAL_INFORMATION, SOIL_ADDITIONAL_INFORMATION, \ SEDIMENT_ADDITIONAL_INFORMATION context['scenario_types'] = { 'Soil Data': { 'name': 'soil', 'widgets': [HTMLGenerator.generate_html(ai, prefix=f'soil_{0}') for ai in [x for s in SOIL_ADDITIONAL_INFORMATION.values() for x in s]] }, 'Sludge Data': { 'name': 'sludge', 'widgets': [HTMLGenerator.generate_html(ai, prefix=f'sludge_{0}') for ai in [x for s in SLUDGE_ADDITIONAL_INFORMATION.values() for x in s]] }, 'Water-Sediment System Data': { 'name': 'sediment', 'widgets': [HTMLGenerator.generate_html(ai, prefix=f'sediment_{0}') for ai in [x for s in SEDIMENT_ADDITIONAL_INFORMATION.values() for x in s]] } } context['sludge_additional_information'] = SLUDGE_ADDITIONAL_INFORMATION context['soil_additional_information'] = SOIL_ADDITIONAL_INFORMATION context['sediment_additional_information'] = SEDIMENT_ADDITIONAL_INFORMATION return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': log_post_params(request) scenario_name = request.POST.get('scenario-name') scenario_description = request.POST.get('scenario-description') scenario_date_year = request.POST.get('scenario-date-year') scenario_date_month = request.POST.get('scenario-date-month') scenario_date_day = request.POST.get('scenario-date-day') scenario_date = scenario_date_year if scenario_date_month is not None and scenario_date_month.strip() != '': scenario_date += f'-{int(scenario_date_month):02d}' if scenario_date_day is not None and scenario_date_day.strip() != '': scenario_date += f'-{int(scenario_date_day):02d}' scenario_type = request.POST.get('scenario-type') additional_information = HTMLGenerator.build_models(request.POST.dict()) additional_information = [x for s in additional_information.values() for x in s] s = Scenario.create(current_package, name=scenario_name, description=scenario_description, scenario_date=scenario_date, scenario_type=scenario_type, additional_information=additional_information) return redirect(s.url) else: return HttpResponseNotAllowed(['GET', ]) @package_permission_required() def package_scenario(request, package_uuid, scenario_uuid): current_user = _anonymous_or_real(request) current_package = PackageManager.get_package_by_id(current_user, package_uuid) current_scenario = Scenario.objects.get(package=current_package, uuid=scenario_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_package.name} - {current_scenario.name}' context['meta']['current_package'] = current_package context['object_type'] = 'scenario' context['breadcrumbs'] = breadcrumbs(current_package, 'scenario', current_scenario) context['scenario'] = current_scenario available_add_infs = [] for add_inf in NAME_MAPPING.values(): available_add_infs.append({ 'display_name': add_inf.property_name(None), 'name': add_inf.__name__, 'widget': HTMLGenerator.generate_html(add_inf, prefix=f'{0}') }) context['available_additional_information'] = available_add_infs context['update_widgets'] = [HTMLGenerator.generate_html(ai, prefix=f'{i}') for i, ai in enumerate(current_scenario.get_additional_information())] return render(request, 'objects/scenario.html', context) elif request.method == 'POST': log_post_params(request) if hidden := request.POST.get('hidden', None): if hidden == 'delete': current_scenario.delete() return redirect(current_package.url + '/scenario') elif hidden == 'delete-additional-information': uuid = request.POST.get('uuid') current_scenario.remove_additional_information(uuid) return redirect(current_scenario.url) elif hidden == 'delete-all-additional-information': current_scenario.additional_information = dict() current_scenario.save() return redirect(current_scenario.url) elif hidden == 'set-additional-information': ais = HTMLGenerator.build_models(request.POST.dict()) if s.DEBUG: logger.info(ais) current_scenario.set_additional_information(ais) return redirect(current_scenario.url) elif hidden == 'add-additional-information': ais = HTMLGenerator.build_models(request.POST.dict()) if len(ais.keys()) != 1: raise ValueError('Only one additional information field can be added at a time.') ai = list(ais.values())[0][0] if s.DEBUG: logger.info(ais) current_scenario.add_additional_information(ai) return redirect(current_scenario.url) else: return HttpResponseBadRequest() else: return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', 'POST']) ############## # User/Group # ############## def users(request): if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - Users' context['object_type'] = 'user' context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'User': s.SERVER_URL + '/user'}, ] context['objects'] = get_user_model().objects.all() return render(request, 'collections/objects_list.html', context) else: return HttpResponseNotAllowed(['GET']) def user(request, user_uuid): current_user = _anonymous_or_real(request) if request.method == 'GET': # Check if current user is the one matching to the url if str(current_user.uuid) != user_uuid and not current_user.is_superuser: return HttpResponseBadRequest() requested_user = UserManager.get_user_by_id(current_user, user_uuid) context = get_base_context(request, for_user=requested_user) context['title'] = f'enviPath - User' context['object_type'] = 'user' context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'User': s.SERVER_URL + '/user'}, {current_user.username: requested_user.url} ] context['user'] = requested_user model_qs = EPModel.objects.none() for p in PackageManager.get_all_readable_packages(requested_user, include_reviewed=True): model_qs |= p.models context['models'] = model_qs context['tokens'] = APIToken.objects.filter(user=requested_user) return render(request, 'objects/user.html', context) elif request.method == 'POST': is_hidden_method = bool(request.POST.get('hidden', False)) if is_hidden_method and request.POST['hidden'] == 'request-api-token': name = request.POST.get('name', 'No Name') valid_for = min(max(int(request.POST.get('valid-for', 90)), 1), 90) token, raw_token = APIToken.create_token(request.user, name=name, valid_for=valid_for) return JsonResponse({"raw_token": raw_token, 'token': {'id': token.id, 'name': token.name}}) if is_hidden_method and request.POST['hidden'] == 'delete': token_id = request.POST.get('token-id') if token_id is None: return HttpResponseBadRequest("Token ID missing!") try: APIToken.objects.get(user=current_user, id=token_id).delete() except APIToken.DoesNotExist: return HttpResponseBadRequest("User and Token ID combination invalid!") return HttpResponse("success") default_package = request.POST.get('default-package') default_group = request.POST.get('default-group') default_prediction_setting = request.POST.get('default-prediction-setting') if any([default_package, default_group, default_prediction_setting]): current_user.default_package = PackageManager.get_package_by_url(current_user, default_package) current_user.default_group = GroupManager.get_group_by_url(current_user, default_group) current_user.default_setting = SettingManager.get_setting_by_url(current_user, default_prediction_setting) current_user.save() return redirect(current_user.url) prediction_model_pk = request.POST.get('model') prediction_threshold = request.POST.get('threshold') prediction_max_nodes = request.POST.get('max_nodes') prediction_max_depth = request.POST.get('max_depth') if all([prediction_model_pk, prediction_threshold, prediction_max_nodes, prediction_max_depth]): # validate input.. mod = EPModel.objects.get(id=prediction_model_pk) if not PackageManager.readable(current_user, mod.package): return HttpResponseBadRequest() threshold = float(prediction_threshold) if threshold < 0 or threshold > 1: return HttpResponseBadRequest() max_nodes = min(max(int(prediction_max_nodes), 1), 50) max_depth = min(max(int(prediction_max_depth), 1), 8) setting = { 'model': mod, 'model_parameters': { 'threshold': threshold }, 'truncator': { 'max_nodes': max_nodes, 'max_depth': max_depth, } } return HttpResponseBadRequest() else: return HttpResponseNotAllowed(['GET', 'POST']) def groups(request): current_user = _anonymous_or_real(request) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - Groups' context['object_type'] = 'group' context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Group': s.SERVER_URL + '/group'}, ] context['objects'] = Group.objects.all() return render(request, 'collections/objects_list.html', context) elif request.method == 'POST': group_name = request.POST.get('group-name') group_description = request.POST.get('group-description', s.DEFAULT_VALUES['description']) g = GroupManager.create_group(current_user, group_name, group_description) return redirect(g.url) else: return HttpResponseNotAllowed(['GET', 'POST']) def group(request, group_uuid): current_user = _anonymous_or_real(request) current_group = GroupManager.get_group_by_id(current_user, group_uuid) if request.method == 'GET': context = get_base_context(request) context['title'] = f'enviPath - {current_group.name}' context['object_type'] = 'group' context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Group': s.SERVER_URL + '/group'}, {current_group.name: current_group.url} ] context['group'] = current_group context['users'] = UserManager.get_users_lp().exclude(id__in=current_group.user_member.all()) context['groups'] = GroupManager.get_groups_lp().exclude(id__in=current_group.group_member.all()).exclude(id=current_group.pk) context['packages'] = Package.objects.filter( id__in=GroupPackagePermission.objects.filter(group=current_group).values('package').distinct()) return render(request, 'objects/group.html', context) elif request.method == 'POST': log_post_params(request) if hidden := request.POST.get('hidden', None): if hidden == 'delete': current_group.delete() return redirect(s.SERVER_URL + '/group') else: return HttpResponseBadRequest() member_url = request.POST.get('member') action = request.POST.get('action') if all([member_url, action]) and action in ['add', 'remove']: if 'user' in member_url: member = UserManager.get_user_lp(member_url) else: member = GroupManager.get_group_lp(member_url) GroupManager.update_members(current_user, current_group, member, action) return redirect(current_group.url) else: return HttpResponseNotAllowed(['GET', 'POST']) def settings(request): current_user = _anonymous_or_real(request) context = get_base_context(request) if request.method == 'GET': context['object_type'] = 'setting' # Even if settings are aready in "meta", for consistency add it on root level context['settings'] = SettingManager.get_all_settings(current_user) context['breadcrumbs'] = [ {'Home': s.SERVER_URL}, {'Group': s.SERVER_URL + '/setting'}, ] return elif request.method == 'POST': if s.DEBUG: for k, v in request.POST.items(): logger.info("Parameters received:") logger.info(f"{k}\t{v}") name = request.POST.get('prediction-setting-name') description = request.POST.get('prediction-setting-description') new_default = request.POST.get('prediction-setting-new-default', 'off') == 'on' max_nodes = min(max(int(request.POST.get('prediction-setting-max-nodes', 1)), s.DEFAULT_MAX_NUMBER_OF_NODES), s.DEFAULT_MAX_NUMBER_OF_NODES) max_depth = min(max(int(request.POST.get('prediction-setting-max-depth', 1)), s.DEFAULT_MAX_DEPTH), s.DEFAULT_MAX_DEPTH) tp_gen_method = request.POST.get('tp-generation-method') params = {} if tp_gen_method == 'model-based-prediction-setting': model_url = request.POST.get('model-based-prediction-setting-model') model_uuid = model_url.split('/')[-1] params['model'] = EPModel.objects.get(uuid=model_uuid) params['model_threshold'] = request.POST.get('model-based-prediction-setting-threshold', s.DEFAULT_MODEL_THRESHOLD) if not PackageManager.readable(current_user, params['model'].package): raise ValueError("") elif tp_gen_method == 'rule-based-prediction-setting': rule_packages = request.POST.getlist('rule-based-prediction-setting-packages') params['rule_packages'] = [PackageManager.get_package_by_url(current_user, p) for p in rule_packages] else: raise ValueError("") created_setting = SettingManager.create_setting(current_user, name=name, description=description, max_nodes=max_nodes, max_depth=max_depth, **params) if new_default: current_user.default_setting = created_setting current_user.save() return HttpResponse("Success!") else: return HttpResponseNotAllowed(['GET', 'POST']) def setting(request, setting_uuid): pass ########### # KETCHER # ########### def indigo(request): from indigo import Indigo return JsonResponse({'Indigo': {'version': Indigo().version()}}) @csrf_exempt def aromatize(request): if request.method == 'POST': data = json.loads(request.body) mol_data = data.get('struct') aromatized = IndigoUtils.aromatize(mol_data, False) return JsonResponse({"struct": aromatized}) else: return HttpResponseBadRequest() @csrf_exempt def dearomatize(request): if request.method == 'POST': data = json.loads(request.body) mol_data = data.get('struct') dearomatized = IndigoUtils.dearomatize(mol_data, False) return JsonResponse({"struct": dearomatized}) else: return HttpResponseBadRequest() @csrf_exempt def layout(request): if request.method == 'POST': data = json.loads(request.body) mol_data = data.get('struct') lay = IndigoUtils.layout(mol_data) return JsonResponse({"struct": lay}) else: return HttpResponseBadRequest() ########################## # Generic/Non-Persistent # ########################## def depict(request): if smiles := request.GET.get('smiles'): return HttpResponse(IndigoUtils.mol_to_svg(smiles), content_type='image/svg+xml') elif smirks := request.GET.get('smirks'): query_smirks = request.GET.get('is_query_smirks', False) == 'true' return HttpResponse(IndigoUtils.smirks_to_svg(smirks, query_smirks), content_type='image/svg+xml') else: return HttpResponseBadRequest() from django.contrib.auth.decorators import login_required from django.http import JsonResponse @login_required def userinfo(request): user = request.user return JsonResponse({ "sub": str(user.uuid), "email": user.email, "username": user.username, "name": user.get_full_name() or user.username, "email_verified": user.is_active, })