forked from enviPath/enviPy
2083 lines
78 KiB
Python
2083 lines
78 KiB
Python
import json
|
|
import logging
|
|
from typing import List, Dict, Any
|
|
|
|
from django.conf import settings as s
|
|
from django.contrib.auth import get_user_model
|
|
from django.db.models import F, Value
|
|
from django.db.models.fields import CharField
|
|
from django.db.models.functions import Concat
|
|
from django.http import JsonResponse, HttpResponse, HttpResponseNotAllowed, HttpResponseBadRequest
|
|
from django.shortcuts import render, redirect
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
|
|
from utilities.chem import FormatConverter, IndigoUtils
|
|
from utilities.decorators import package_permission_required
|
|
from .logic import GroupManager, PackageManager, UserManager, SettingManager, SearchManager
|
|
from .models import Package, GroupPackagePermission, Group, CompoundStructure, Compound, Reaction, Rule, Pathway, Node, \
|
|
EPModel, EnviFormer, MLRelativeReasoning, RuleBaseRelativeReasoning, Scenario, SimpleAmbitRule, APIToken, \
|
|
UserPackagePermission, Permission, License, User, Edge
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def log_post_params(request):
|
|
if s.DEBUG:
|
|
for k, v in request.POST.items():
|
|
logger.debug(f"{k}\t{v}")
|
|
|
|
|
|
def error(request, message: str, detail: str, code: int = 400):
|
|
context = get_base_context(request)
|
|
error_context = {
|
|
'error_message': message,
|
|
'error_detail': detail,
|
|
}
|
|
|
|
if request.headers.get('Accept') == 'application/json':
|
|
return JsonResponse(error_context, status=500)
|
|
|
|
context.update(**error_context)
|
|
return render(request, "errors/error.html", context, status=code)
|
|
|
|
|
|
def login(request):
|
|
current_user = _anonymous_or_real(request)
|
|
context = get_base_context(request)
|
|
|
|
if request.method == 'GET':
|
|
context['title'] = 'enviPath'
|
|
return render(request, 'login.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
is_login = bool(request.POST.get('login', False))
|
|
is_register = bool(request.POST.get('register', False))
|
|
|
|
if is_login:
|
|
from django.contrib.auth import authenticate
|
|
from django.contrib.auth import login
|
|
|
|
username = request.POST.get('username')
|
|
password = request.POST.get('password')
|
|
|
|
# Get email for username and check if account is active
|
|
try:
|
|
temp_user = get_user_model().objects.get(username=username)
|
|
|
|
if not temp_user.is_active:
|
|
context['message'] = "User account is not activated yet!"
|
|
return render(request, 'login.html', context)
|
|
|
|
email = temp_user.email
|
|
except get_user_model().DoesNotExist:
|
|
context['message'] = "Login failed!"
|
|
return render(request, 'login.html', context)
|
|
try:
|
|
user = authenticate(username=email, password=password)
|
|
except Exception as e:
|
|
context['message'] = "Login failed!"
|
|
return render(request, 'login.html', context)
|
|
|
|
if user is not None:
|
|
login(request, user)
|
|
return redirect(s.SERVER_URL)
|
|
else:
|
|
context['message'] = "Login failed!"
|
|
return render(request, 'login.html', context)
|
|
|
|
elif is_register:
|
|
username = request.POST.get('username')
|
|
email = request.POST.get('email')
|
|
password = request.POST.get('password', '').strip()
|
|
rpassword = request.POST.get('rpassword', '').strip()
|
|
|
|
if password != rpassword or password == '':
|
|
context['message'] = "Registration failed, provided passwords differ!"
|
|
return render(request, 'login.html', context)
|
|
|
|
try:
|
|
u = UserManager.create_user(username, email, password)
|
|
except Exception:
|
|
context['message'] = "Registration failed! Couldn't create User Account."
|
|
return render(request, 'login.html', context)
|
|
|
|
if s.ADMIN_APPROVAL_REQUIRED:
|
|
context['message'] = "Your account has been created! An admin will activate it soon!"
|
|
else:
|
|
context['message'] = "Account has been created! You'll receive a mail to activate your account shortly."
|
|
return render(request, 'login.html', context)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
def logout(request):
|
|
if request.method == 'POST':
|
|
is_logout = bool(request.POST.get('logout', False))
|
|
|
|
if is_logout:
|
|
from django.contrib.auth import logout
|
|
logout(request)
|
|
return redirect(s.SERVER_URL)
|
|
|
|
return HttpResponseBadRequest()
|
|
|
|
|
|
def editable(request, user):
|
|
|
|
if user.is_superuser:
|
|
return True
|
|
|
|
url = request.build_absolute_uri(request.path)
|
|
if PackageManager.is_package_url(url):
|
|
_package = PackageManager.get_package_lp(request.build_absolute_uri())
|
|
return PackageManager.writable(user, _package)
|
|
elif GroupManager.is_group_url(url):
|
|
_group = GroupManager.get_group_lp(request.build_absolute_uri())
|
|
return GroupManager.writable(user, _group)
|
|
elif UserManager.is_user_url(url):
|
|
_user = UserManager.get_user_lp(request.build_absolute_uri())
|
|
return UserManager.writable(user, _user)
|
|
elif url in [s.SERVER_URL, f"{s.SERVER_URL}/", f"{s.SERVER_URL}/package", f"{s.SERVER_URL}/user",
|
|
f"{s.SERVER_URL}/group", f"{s.SERVER_URL}/search"]:
|
|
return True
|
|
else:
|
|
logger.debug(f"Unknown url: {url}")
|
|
return False
|
|
|
|
|
|
def get_base_context(request, for_user=None) -> Dict[str, Any]:
|
|
current_user = _anonymous_or_real(request)
|
|
can_edit = editable(request, current_user)
|
|
|
|
if for_user:
|
|
current_user = for_user
|
|
|
|
ctx = {
|
|
'title': 'enviPath',
|
|
'meta': {
|
|
'version': '0.0.1',
|
|
'server_url': s.SERVER_URL,
|
|
'user': current_user,
|
|
'can_edit': can_edit,
|
|
'readable_packages': PackageManager.get_all_readable_packages(current_user, include_reviewed=True),
|
|
'writeable_packages': PackageManager.get_all_writeable_packages(current_user),
|
|
'available_groups': GroupManager.get_groups(current_user),
|
|
'available_settings': SettingManager.get_all_settings(current_user),
|
|
'enabled_features': s.FLAGS,
|
|
'debug': s.DEBUG,
|
|
},
|
|
}
|
|
|
|
return ctx
|
|
|
|
|
|
def _anonymous_or_real(request):
|
|
if request.user.is_authenticated and not request.user.is_anonymous:
|
|
return request.user
|
|
return get_user_model().objects.get(username='anonymous')
|
|
|
|
|
|
def breadcrumbs(first_level_object=None, second_level_namespace=None, second_level_object=None,
|
|
third_level_namespace=None, third_level_object=None) -> List[Dict[str, str]]:
|
|
bread = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Package': s.SERVER_URL + '/package'},
|
|
]
|
|
if first_level_object is not None:
|
|
bread.append({first_level_object.name: first_level_object.url})
|
|
|
|
if second_level_namespace is not None:
|
|
bread.append({f'{second_level_namespace}'.capitalize(): first_level_object.url + f'/{second_level_namespace}'})
|
|
|
|
if second_level_object is not None:
|
|
bread.append({second_level_object.name: second_level_object.url})
|
|
|
|
if third_level_namespace is not None:
|
|
bread.append({f'{third_level_namespace}'.capitalize(): second_level_object.url + f'/{third_level_namespace}'})
|
|
|
|
if third_level_object is not None:
|
|
bread.append({third_level_object.name: third_level_object.url})
|
|
|
|
return bread
|
|
|
|
|
|
def set_scenarios(current_user, attach_object, scenario_urls: List[str]):
|
|
scens = []
|
|
for scenario_url in scenario_urls:
|
|
package = PackageManager.get_package_by_url(current_user, scenario_url)
|
|
scen = Scenario.objects.get(package=package, uuid=scenario_url.split('/')[-1])
|
|
scens.append(scen)
|
|
|
|
attach_object.set_scenarios(scens)
|
|
|
|
def index(request):
|
|
context = get_base_context(request)
|
|
context['title'] = 'enviPath - Home'
|
|
context['meta']['current_package'] = context['meta']['user'].default_package
|
|
|
|
if request.GET.get('getMLServerPath', False):
|
|
return JsonResponse({"mlServerPath": s.SERVER_URL})
|
|
|
|
return render(request, 'index/index.html', context)
|
|
|
|
|
|
def packages(request):
|
|
current_user = _anonymous_or_real(request)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = 'enviPath - Packages'
|
|
|
|
context['object_type'] = 'package'
|
|
context['meta']['current_package'] = context['meta']['user'].default_package
|
|
context['meta']['can_edit'] = True
|
|
|
|
reviewed_package_qs = Package.objects.filter(reviewed=True).order_by('created')
|
|
unreviewed_package_qs = PackageManager.get_all_readable_packages(current_user).order_by('name')
|
|
|
|
context['reviewed_objects'] = reviewed_package_qs
|
|
context['unreviewed_objects'] = unreviewed_package_qs
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
hidden = request.POST.get('hidden', None)
|
|
|
|
if hidden is not None:
|
|
pass
|
|
else:
|
|
package_name = request.POST.get('package-name')
|
|
package_description = request.POST.get('package-description', s.DEFAULT_VALUES['description'])
|
|
|
|
created_package = PackageManager.create_package(current_user, package_name, package_description)
|
|
|
|
return redirect(created_package.url)
|
|
|
|
elif request.method == 'OPTIONS':
|
|
response = HttpResponse()
|
|
response['allow'] = ','.join(['GET', 'POST'])
|
|
return response
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
def compounds(request):
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = 'enviPath - Compounds'
|
|
|
|
context['object_type'] = 'compound'
|
|
context['meta']['current_package'] = context['meta']['user'].default_package
|
|
|
|
reviewed_compound_qs = Compound.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_compound_qs |= Compound.objects.filter(package=p)
|
|
|
|
reviewed_compound_qs = reviewed_compound_qs.order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": True}
|
|
for pw in reviewed_compound_qs
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_compound_qs
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
# delegate to default package
|
|
current_user = _anonymous_or_real(request)
|
|
default_package = current_user.default_package
|
|
return package_compounds(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
def rules(request):
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = 'enviPath - Rules'
|
|
|
|
context['object_type'] = 'rule'
|
|
context['meta']['current_package'] = context['meta']['user'].default_package
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Rule': s.SERVER_URL + '/rule'},
|
|
]
|
|
reviewed_rule_qs = Rule.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_rule_qs |= Rule.objects.filter(package=p)
|
|
|
|
reviewed_rule_qs = reviewed_rule_qs.order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": True}
|
|
for pw in reviewed_rule_qs
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_rule_qs
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
# delegate to default package
|
|
current_user = _anonymous_or_real(request)
|
|
default_package = current_user.default_package
|
|
return package_rules(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
def reactions(request):
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = 'enviPath - Reactions'
|
|
|
|
context['object_type'] = 'reaction'
|
|
context['meta']['current_package'] = context['meta']['user'].default_package
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Reaction': s.SERVER_URL + '/reaction'},
|
|
]
|
|
reviewed_reaction_qs = Reaction.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_reaction_qs |= Reaction.objects.filter(package=p).order_by('name')
|
|
|
|
reviewed_reaction_qs = reviewed_reaction_qs.order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": True}
|
|
for pw in reviewed_reaction_qs
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_reaction_qs
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
# delegate to default package
|
|
current_user = _anonymous_or_real(request)
|
|
default_package = current_user.default_package
|
|
return package_reactions(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
def pathways(request):
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = 'enviPath - Pathways'
|
|
|
|
context['object_type'] = 'pathway'
|
|
context['meta']['current_package'] = context['meta']['user'].default_package
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Pathway': s.SERVER_URL + '/pathway'},
|
|
]
|
|
|
|
reviewed_pathway_qs = Pathway.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_pathway_qs |= Pathway.objects.filter(package=p).order_by('name')
|
|
|
|
reviewed_pathway_qs = reviewed_pathway_qs.order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": True}
|
|
for pw in reviewed_pathway_qs
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_pathway_qs
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
# delegate to default package
|
|
current_user = _anonymous_or_real(request)
|
|
default_package = current_user.default_package
|
|
return package_pathways(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
def scenarios(request):
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = 'enviPath - Scenarios'
|
|
|
|
context['object_type'] = 'scenario'
|
|
context['meta']['current_package'] = context['meta']['user'].default_package
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Scenario': s.SERVER_URL + '/scenario'},
|
|
]
|
|
|
|
reviewed_scenario_qs = Scenario.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_scenario_qs |= Scenario.objects.filter(package=p).order_by('name')
|
|
|
|
reviewed_scenario_qs = reviewed_scenario_qs.order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": s.name, "url": s.full_url, "reviewed": True}
|
|
for s in reviewed_scenario_qs.annotate(
|
|
full_url=Concat(
|
|
Value(s.SERVER_URL + '/package/'),
|
|
F("package__uuid"),
|
|
Value("/scenario/"),
|
|
F("uuid"),
|
|
output_field=CharField(),
|
|
)
|
|
)
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_scenario_qs
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
# delegate to default package
|
|
default_package = request.user.default_package
|
|
return package_scenarios(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
def models(request):
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = 'enviPath - Models'
|
|
|
|
context['object_type'] = 'model'
|
|
context['meta']['current_package'] = context['meta']['user'].default_package
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Model': s.SERVER_URL + '/model'},
|
|
]
|
|
|
|
context['model_types'] = {
|
|
'ML Relative Reasoning': 'ml-relative-reasoning',
|
|
'Rule Based Relative Reasoning': 'rule-based-relative-reasoning',
|
|
'EnviFormer': 'enviformer',
|
|
}
|
|
|
|
for k, v in s.CLASSIFIER_PLUGINS.items():
|
|
context['model_types'][v.display()] = k
|
|
|
|
reviewed_model_qs = EPModel.objects.none()
|
|
|
|
for p in PackageManager.get_reviewed_packages():
|
|
reviewed_model_qs |= EPModel.objects.filter(package=p).order_by('name')
|
|
|
|
reviewed_model_qs = reviewed_model_qs.order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": True}
|
|
for pw in reviewed_model_qs
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_model_qs
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
current_user = _anonymous_or_real(request)
|
|
default_package = current_user.default_package
|
|
return package_models(request, default_package.uuid)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
def search(request):
|
|
current_user = _anonymous_or_real(request)
|
|
|
|
if request.method == 'GET':
|
|
package_urls = request.GET.getlist('packages')
|
|
searchterm = request.GET.get('search')
|
|
mode = request.GET.get('mode')
|
|
|
|
# add HTTP_ACCEPT check to differentiate between index and ajax call
|
|
if 'application/json' in request.META.get('HTTP_ACCEPT') and all([searchterm, mode]):
|
|
if package_urls:
|
|
packages = [PackageManager.get_package_by_url(current_user, p) for p in package_urls]
|
|
else:
|
|
packages = PackageManager.get_reviewed_packages()
|
|
|
|
search_result = SearchManager.search(packages, searchterm, mode)
|
|
|
|
return JsonResponse(search_result, safe=False)
|
|
|
|
context = get_base_context(request)
|
|
context['title'] = 'enviPath - Search'
|
|
|
|
context['object_type'] = 'model'
|
|
context['meta']['current_package'] = context['meta']['user'].default_package
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Search': s.SERVER_URL + '/search'},
|
|
]
|
|
|
|
reviewed_package_qs = PackageManager.get_reviewed_packages()
|
|
unreviewed_package_qs = PackageManager.get_all_readable_packages(current_user)
|
|
|
|
context['reviewed_objects'] = reviewed_package_qs
|
|
context['unreviewed_objects'] = unreviewed_package_qs
|
|
|
|
if all([searchterm, mode]):
|
|
if package_urls:
|
|
packages = [PackageManager.get_package_by_url(current_user, p) for p in package_urls]
|
|
else:
|
|
packages = PackageManager.get_reviewed_packages()
|
|
|
|
context['search_result'] = SearchManager.search(packages, searchterm, mode)
|
|
context['search_result']['searchterm'] = searchterm
|
|
|
|
return render(request, 'search.html', context)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_models(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - Models'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'model'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'model')
|
|
|
|
reviewed_model_qs = EPModel.objects.none()
|
|
unreviewed_model_qs = EPModel.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_model_qs = EPModel.objects.filter(package=current_package).order_by('name')
|
|
else:
|
|
unreviewed_model_qs = EPModel.objects.filter(package=current_package).order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (reviewed_model_qs if current_package.reviewed else unreviewed_model_qs)
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_model_qs
|
|
context['unreviewed_objects'] = unreviewed_model_qs
|
|
|
|
context['model_types'] = {
|
|
'ML Relative Reasoning': 'ml-relative-reasoning',
|
|
'Rule Based Relative Reasoning': 'rule-based-relative-reasoning',
|
|
}
|
|
|
|
if s.FLAGS.get('ENVIFORMER', False):
|
|
context['model_types']['EnviFormer'] = 'enviformer'
|
|
|
|
if s.FLAGS.get('PLUGINS', False):
|
|
for k, v in s.CLASSIFIER_PLUGINS.items():
|
|
context['model_types'][v.display()] = k
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
|
|
log_post_params(request)
|
|
|
|
name = request.POST.get('model-name')
|
|
description = request.POST.get('model-description')
|
|
|
|
model_type = request.POST.get('model-type')
|
|
|
|
if model_type == 'enviformer':
|
|
threshold = float(request.POST.get(f'{model_type}-threshold', 0.5))
|
|
|
|
mod = EnviFormer.create(current_package, name, description, threshold)
|
|
|
|
elif model_type == 'ml-relative-reasoning':
|
|
threshold = float(request.POST.get(f'{model_type}-threshold', 0.5))
|
|
fingerprinter = request.POST.get(f'{model_type}-fingerprinter')
|
|
rule_packages = request.POST.getlist(f'{model_type}-rule-packages')
|
|
data_packages = request.POST.getlist(f'{model_type}-data-packages')
|
|
eval_packages = request.POST.getlist(f'{model_type}-evaluation-packages', [])
|
|
|
|
# get Package objects from urls
|
|
rule_package_objs = [PackageManager.get_package_by_url(current_user, p) for p in rule_packages]
|
|
data_package_objs = [PackageManager.get_package_by_url(current_user, p) for p in data_packages]
|
|
eval_packages_objs = [PackageManager.get_package_by_url(current_user, p) for p in eval_packages]
|
|
|
|
# App Domain related parameters
|
|
build_ad = request.POST.get('build-app-domain', False) == 'on'
|
|
num_neighbors = request.POST.get('num-neighbors', 5)
|
|
reliability_threshold = request.POST.get('reliability-threshold', 0.5)
|
|
local_compatibility_threshold = request.POST.get('local-compatibility-threshold', 0.5)
|
|
|
|
mod = MLRelativeReasoning.create(
|
|
package=current_package,
|
|
name=name,
|
|
description=description,
|
|
rule_packages=rule_package_objs,
|
|
data_packages=data_package_objs,
|
|
eval_packages=eval_packages_objs,
|
|
threshold=threshold,
|
|
# fingerprinter=fingerprinter,
|
|
build_app_domain=build_ad,
|
|
app_domain_num_neighbours=num_neighbors,
|
|
app_domain_reliability_threshold=reliability_threshold,
|
|
app_domain_local_compatibility_threshold=local_compatibility_threshold,
|
|
)
|
|
|
|
from .tasks import build_model
|
|
build_model.delay(mod.pk)
|
|
|
|
elif model_type == 'rule-base-relative-reasoning':
|
|
mod = RuleBaseRelativeReasoning()
|
|
|
|
mod.save()
|
|
else:
|
|
return error(request, 'Invalid model type.', f'Model type "{model_type}" is not supported."')
|
|
return redirect(mod.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_model(request, package_uuid, model_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_model = EPModel.objects.get(package=current_package, uuid=model_uuid)
|
|
|
|
if request.method == 'GET':
|
|
|
|
if request.GET.get('classify', False):
|
|
smiles = request.GET['smiles']
|
|
stand_smiles = FormatConverter.standardize(smiles)
|
|
pred_res = current_model.predict(stand_smiles)
|
|
res = []
|
|
|
|
for pr in pred_res:
|
|
if len(pr) > 0:
|
|
products = []
|
|
for prod_set in pr.product_sets:
|
|
logger.debug(f"Checking {prod_set}")
|
|
products.append(tuple([x for x in prod_set]))
|
|
|
|
res.append({
|
|
'products': list(set(products)),
|
|
'probability': pr.probability,
|
|
'btrule': {k: getattr(pr.rule, k) for k in ['url', 'name']} if pr.rule is not None else None
|
|
})
|
|
|
|
return JsonResponse(res, safe=False)
|
|
|
|
elif request.GET.get('app-domain-assessment', False):
|
|
smiles = request.GET['smiles']
|
|
stand_smiles = FormatConverter.standardize(smiles)
|
|
app_domain_assessment = current_model.app_domain.assess(stand_smiles)[0]
|
|
return JsonResponse(app_domain_assessment, safe=False)
|
|
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - {current_model.name}'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'model'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'model', current_model)
|
|
|
|
context['model'] = current_model
|
|
context['current_object'] = current_model
|
|
|
|
return render(request, 'objects/model.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
if hidden := request.POST.get('hidden', None):
|
|
if hidden == 'delete':
|
|
current_model.delete()
|
|
return redirect(current_package.url + '/model')
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name}'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'package'
|
|
context['breadcrumbs'] = breadcrumbs(current_package)
|
|
|
|
context['package'] = current_package
|
|
|
|
user_perms = UserPackagePermission.objects.filter(package=current_package)
|
|
users = get_user_model().objects.exclude(
|
|
id__in=UserPackagePermission.objects.filter(package=current_package).values_list('user_id', flat=True))
|
|
|
|
group_perms = GroupPackagePermission.objects.filter(package=current_package)
|
|
groups = Group.objects.exclude(
|
|
id__in=GroupPackagePermission.objects.filter(package=current_package).values_list('group_id', flat=True))
|
|
|
|
context['users'] = users
|
|
context['groups'] = groups
|
|
context['user_permissions'] = user_perms
|
|
context['group_permissions'] = group_perms
|
|
|
|
return render(request, 'objects/package.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
|
|
log_post_params(request)
|
|
|
|
if hidden := request.POST.get('hidden', None):
|
|
if hidden == 'delete':
|
|
logger.debug(current_package.delete())
|
|
return redirect(s.SERVER_URL + '/package')
|
|
elif hidden == 'publish-package':
|
|
for g in Group.objects.filter(public=True):
|
|
PackageManager.update_permissions(current_user, current_package, g, Permission.READ[0])
|
|
return redirect(current_package.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
new_package_name = request.POST.get('package-name')
|
|
new_package_description = request.POST.get('package-description')
|
|
|
|
grantee_url = request.POST.get('grantee')
|
|
read = request.POST.get('read') == 'on'
|
|
write = request.POST.get('write') == 'on'
|
|
owner = request.POST.get('owner') == 'on'
|
|
|
|
license = request.POST.get('license')
|
|
license_link = request.POST.get('license-link')
|
|
license_image_link = request.POST.get('license-image-link')
|
|
|
|
if new_package_name:
|
|
current_package.name = new_package_name
|
|
|
|
if new_package_description:
|
|
current_package.description = new_package_description
|
|
|
|
if any([new_package_name, new_package_description]):
|
|
current_package.save()
|
|
return redirect(current_package.url)
|
|
|
|
elif any([grantee_url, read, write, owner]):
|
|
if 'user' in grantee_url:
|
|
grantee = UserManager.get_user_lp(grantee_url)
|
|
else:
|
|
grantee = GroupManager.get_group_lp(grantee_url)
|
|
|
|
max_perm = None
|
|
if read:
|
|
max_perm = Permission.READ[0]
|
|
if write:
|
|
max_perm = Permission.WRITE[0]
|
|
if owner:
|
|
max_perm = Permission.ALL[0]
|
|
|
|
PackageManager.update_permissions(current_user, current_package, grantee, max_perm)
|
|
return redirect(current_package.url)
|
|
elif license is not None:
|
|
if license == 'no-license':
|
|
if current_package.license is not None:
|
|
current_package.license.delete()
|
|
|
|
current_package.license = None
|
|
current_package.save()
|
|
return redirect(current_package.url)
|
|
else:
|
|
if current_package.license is not None:
|
|
current_package.license.delete()
|
|
|
|
l = License()
|
|
l.link = license_link
|
|
l.image_link = license_image_link
|
|
l.save()
|
|
|
|
current_package.license = l
|
|
current_package.save()
|
|
|
|
return redirect(current_package.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_compounds(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - Compounds'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'compound'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'compound')
|
|
|
|
reviewed_compound_qs = Compound.objects.none()
|
|
unreviewed_compound_qs = Compound.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_compound_qs = Compound.objects.filter(package=current_package).order_by('name')
|
|
else:
|
|
unreviewed_compound_qs = Compound.objects.filter(package=current_package).order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (reviewed_compound_qs if current_package.reviewed else unreviewed_compound_qs)
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_compound_qs
|
|
context['unreviewed_objects'] = unreviewed_compound_qs
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
compound_name = request.POST.get('compound-name')
|
|
compound_smiles = request.POST.get('compound-smiles')
|
|
compound_description = request.POST.get('compound-description')
|
|
|
|
c = Compound.create(current_package, compound_smiles, compound_name, compound_description)
|
|
|
|
return redirect(c.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_compound(request, package_uuid, compound_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - {current_compound.name}'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'compound'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'compound', current_compound)
|
|
|
|
context['compound'] = current_compound
|
|
context['current_object'] = current_compound
|
|
|
|
return render(request, 'objects/compound.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
if hidden := request.POST.get('hidden', None):
|
|
if hidden == 'delete':
|
|
current_compound.delete()
|
|
return redirect(current_package.url + '/compound')
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
selected_scenarios = request.POST.getlist('selected-scenarios')
|
|
|
|
if selected_scenarios:
|
|
set_scenarios(current_user, current_compound, selected_scenarios)
|
|
return redirect(current_compound.url)
|
|
|
|
new_compound_name = request.POST.get('compound-name')
|
|
new_compound_description = request.POST.get('compound-description')
|
|
|
|
if new_compound_name:
|
|
current_compound.name = new_compound_name
|
|
|
|
if new_compound_description:
|
|
current_compound.description = new_compound_description
|
|
|
|
if any([new_compound_name, new_compound_description]):
|
|
current_compound.save()
|
|
return redirect(current_compound.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_compound_structures(request, package_uuid, compound_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - {current_compound.name} - Structures'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'structure'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'compound', current_compound, 'structure')
|
|
|
|
reviewed_compound_structure_qs = CompoundStructure.objects.none()
|
|
unreviewed_compound_structure_qs = CompoundStructure.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_compound_structure_qs = current_compound.structures.order_by('name')
|
|
else:
|
|
unreviewed_compound_structure_qs = current_compound.structures.order_by('name')
|
|
|
|
context['reviewed_objects'] = reviewed_compound_structure_qs
|
|
context['unreviewed_objects'] = unreviewed_compound_structure_qs
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
structure_name = request.POST.get('structure-name')
|
|
structure_smiles = request.POST.get('structure-smiles')
|
|
structure_description = request.POST.get('structure-description')
|
|
|
|
cs = current_compound.add_structure(structure_smiles, structure_name, structure_description)
|
|
|
|
return redirect(cs.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_compound_structure(request, package_uuid, compound_uuid, structure_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_compound = Compound.objects.get(package=current_package, uuid=compound_uuid)
|
|
current_structure = CompoundStructure.objects.get(compound=current_compound, uuid=structure_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - {current_compound.name} - {current_structure.name}'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'structure'
|
|
|
|
context['compound_structure'] = current_structure
|
|
context['current_object'] = current_structure
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'compound', current_compound, 'structure', current_structure)
|
|
|
|
return render(request, 'objects/compound_structure.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
|
|
if hidden := request.POST.get('hidden', None):
|
|
if hidden == 'delete':
|
|
# Check if we have to delete the compound as no structure is left
|
|
if len(current_structure.compound.structures.all()) == 1:
|
|
# This will delete the structure as well
|
|
current_compound.delete()
|
|
return redirect(current_package.url + '/compound')
|
|
else:
|
|
if current_structure.normalized_structure:
|
|
current_compound.delete()
|
|
return redirect(current_package.url + '/compound')
|
|
else:
|
|
if current_compound.default_structure == current_structure:
|
|
current_structure.delete()
|
|
current_compound.default_structure = current_compound.structures.all().first()
|
|
return redirect(current_compound.url + '/structure')
|
|
else:
|
|
current_structure.delete()
|
|
return redirect(current_compound.url + '/structure')
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
selected_scenarios = request.POST.getlist('selected-scenarios')
|
|
|
|
if selected_scenarios:
|
|
set_scenarios(current_user, current_structure, selected_scenarios)
|
|
return redirect(current_structure.url)
|
|
|
|
return HttpResponseBadRequest()
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', ])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_rules(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - Rules'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'rule'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'rule')
|
|
|
|
reviewed_rule_qs = Rule.objects.none()
|
|
unreviewed_rule_qs = Rule.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_rule_qs = Rule.objects.filter(package=current_package).order_by('name')
|
|
else:
|
|
unreviewed_rule_qs = Rule.objects.filter(package=current_package).order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (reviewed_rule_qs if current_package.reviewed else unreviewed_rule_qs)
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_rule_qs
|
|
context['unreviewed_objects'] = unreviewed_rule_qs
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
|
|
log_post_params(request)
|
|
|
|
# Generic params
|
|
rule_name = request.POST.get('rule-name')
|
|
rule_description = request.POST.get('rule-description')
|
|
|
|
rule_type = request.POST.get('rule-type')
|
|
|
|
params = {}
|
|
|
|
# Obtain parameters as required by rule type
|
|
if rule_type == 'SimpleAmbitRule':
|
|
params['smirks'] = request.POST.get('rule-smirks')
|
|
params['reactant_filter_smarts'] = request.POST.get('rule-reactant-smarts')
|
|
params['product_filter_smarts'] = request.POST.get('rule-product-smarts')
|
|
elif rule_type == 'SimpleRDKitRule':
|
|
params['reaction_smarts'] = request.POST.get('rule-reaction-smarts')
|
|
elif rule_type == 'ParallelRule':
|
|
pass
|
|
elif rule_type == 'SequentialRule':
|
|
pass
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
r = Rule.create(rule_type=rule_type, package=current_package, name=rule_name, description=rule_description,
|
|
**params)
|
|
return redirect(r.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_rule(request, package_uuid, rule_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_rule = Rule.objects.get(package=current_package, uuid=rule_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
|
|
if smiles := request.GET.get('smiles', False):
|
|
stand_smiles = FormatConverter.standardize(smiles)
|
|
res = current_rule.apply(stand_smiles)
|
|
if len(res) > 1:
|
|
logger.info(f"Rule {current_rule.uuid} returned multiple product sets on {smiles}, picking the first one.")
|
|
|
|
smirks = f"{stand_smiles}>>{'.'.join(sorted(res[0]))}"
|
|
# Usually the functional groups are a mapping of fg -> count
|
|
# As we are doing it on the fly here fake a high count to ensure that its properly highlighted
|
|
educt_functional_groups = {x: 1000 for x in current_rule.reactants_smarts}
|
|
product_functional_groups = {x: 1000 for x in current_rule.products_smarts}
|
|
return HttpResponse(
|
|
IndigoUtils.smirks_to_svg(smirks, False, 0, 0,
|
|
educt_functional_groups=educt_functional_groups,
|
|
product_functional_groups=product_functional_groups),
|
|
content_type='image/svg+xml')
|
|
|
|
context['title'] = f'enviPath - {current_package.name} - {current_rule.name}'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'rule'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'rule', current_rule)
|
|
|
|
context['rule'] = current_rule
|
|
context['current_object'] = current_rule
|
|
|
|
if isinstance(current_rule, SimpleAmbitRule):
|
|
return render(request, 'objects/simple_rule.html', context)
|
|
else: # isinstance(current_rule, ParallelRule) or isinstance(current_rule, SequentialRule):
|
|
return render(request, 'objects/composite_rule.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
if hidden := request.POST.get('hidden', None):
|
|
if hidden == 'delete':
|
|
current_rule.delete()
|
|
return redirect(current_package.url + '/rule')
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
selected_scenarios = request.POST.getlist('selected-scenarios')
|
|
|
|
if selected_scenarios:
|
|
set_scenarios(current_user, current_rule, selected_scenarios)
|
|
return redirect(current_rule.url)
|
|
|
|
rule_name = request.POST.get('rule-name', '').strip()
|
|
rule_description = request.POST.get('rule-description', '').strip()
|
|
|
|
if rule_name:
|
|
current_rule.name = rule_name
|
|
|
|
if rule_description:
|
|
current_rule.description = rule_description
|
|
|
|
if any([rule_name, rule_description]):
|
|
current_rule.save()
|
|
return redirect(current_rule.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_reactions(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - {current_package.name} - Reactions'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'reaction'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'reaction')
|
|
|
|
reviewed_reaction_qs = Reaction.objects.none()
|
|
unreviewed_reaction_qs = Reaction.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by('name')
|
|
else:
|
|
unreviewed_reaction_qs = Reaction.objects.filter(package=current_package).order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (reviewed_reaction_qs if current_package.reviewed else unreviewed_reaction_qs)
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_reaction_qs
|
|
context['unreviewed_objects'] = unreviewed_reaction_qs
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
reaction_name = request.POST.get('reaction-name')
|
|
reaction_description = request.POST.get('reaction-description')
|
|
reactions_smirks = request.POST.get('reaction-smirks')
|
|
|
|
educts = reactions_smirks.split('>>')[0].split('.')
|
|
products = reactions_smirks.split('>>')[1].split('.')
|
|
|
|
r = Reaction.create(current_package, name=reaction_name, description=reaction_description, educts=educts,
|
|
products=products)
|
|
|
|
return redirect(r.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_reaction(request, package_uuid, reaction_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_reaction = Reaction.objects.get(package=current_package, uuid=reaction_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - {current_reaction.name}'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'reaction'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'reaction', current_reaction)
|
|
|
|
context['reaction'] = current_reaction
|
|
context['current_object'] = current_reaction
|
|
|
|
return render(request, 'objects/reaction.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
if hidden := request.POST.get('hidden', None):
|
|
if hidden == 'delete':
|
|
current_reaction.delete()
|
|
return redirect(current_package.url + '/reaction')
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
selected_scenarios = request.POST.getlist('selected-scenarios')
|
|
|
|
if selected_scenarios:
|
|
set_scenarios(current_user, current_reaction, selected_scenarios)
|
|
return redirect(current_reaction.url)
|
|
|
|
new_reaction_name = request.POST.get('reaction-name')
|
|
new_reaction_description = request.POST.get('reaction-description')
|
|
|
|
if new_reaction_name:
|
|
current_reaction.name = new_reaction_name
|
|
|
|
if new_reaction_description:
|
|
current_reaction.description = new_reaction_description
|
|
|
|
if any([new_reaction_name, new_reaction_description]):
|
|
current_reaction.save()
|
|
return redirect(current_reaction.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathways(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - Pathways'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'pathway'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'pathway')
|
|
|
|
reviewed_pathway_qs = Pathway.objects.none()
|
|
unreviewed_pathway_qs = Pathway.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by('name')
|
|
else:
|
|
unreviewed_pathway_qs = Pathway.objects.filter(package=current_package).order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (reviewed_pathway_qs if current_package.reviewed else unreviewed_pathway_qs)
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_pathway_qs
|
|
context['unreviewed_objects'] = unreviewed_pathway_qs
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
|
|
log_post_params(request)
|
|
|
|
name = request.POST.get('name')
|
|
description = request.POST.get('description')
|
|
pw_mode = request.POST.get('predict', 'predict')
|
|
smiles = request.POST.get('smiles')
|
|
|
|
if smiles is None or smiles.strip() == '':
|
|
return error(request, "Pathway prediction failed!",
|
|
"Pathway prediction failed due to missing or empty SMILES")
|
|
|
|
smiles = smiles.strip()
|
|
|
|
try:
|
|
stand_smiles = FormatConverter.standardize(smiles)
|
|
except ValueError:
|
|
return error(request, "Pathway prediction failed!",
|
|
f'Pathway prediction failed as standardization of SMILES "{smiles}" failed!')
|
|
|
|
modes = ['predict', 'build', 'incremental']
|
|
if pw_mode not in modes:
|
|
return error(request, "Pathway prediction failed!",
|
|
f'Pathway prediction failed as received mode "{pw_mode}" is none of {modes}')
|
|
|
|
prediction_setting = request.POST.get('prediction-setting', None)
|
|
if prediction_setting:
|
|
prediction_setting = SettingManager.get_setting_by_url(current_user, prediction_setting)
|
|
else:
|
|
prediction_setting = current_user.prediction_settings()
|
|
|
|
pw = Pathway.create(current_package, stand_smiles, name=name, description=description)
|
|
|
|
# set mode
|
|
pw.kv.update({'mode': pw_mode})
|
|
pw.save()
|
|
|
|
if pw_mode == 'predict' or pw_mode == 'incremental':
|
|
# unlimited pred (will be handled by setting)
|
|
limit = -1
|
|
|
|
# For incremental predict first level and return
|
|
if pw_mode == 'incremental':
|
|
limit = 1
|
|
|
|
pw.setting = prediction_setting
|
|
pw.save()
|
|
|
|
from .tasks import predict
|
|
predict.delay(pw.pk, prediction_setting.pk, limit=limit)
|
|
|
|
return redirect(pw.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathway(request, package_uuid, pathway_uuid):
|
|
current_user: User = _anonymous_or_real(request)
|
|
current_package: Package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_pathway: Pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
|
|
|
|
if request.method == 'GET':
|
|
|
|
if request.GET.get("last_modified", False):
|
|
return JsonResponse({'modified': current_pathway.modified.strftime('%Y-%m-%d %H:%M:%S')})
|
|
|
|
if request.GET.get("download", False) == "true":
|
|
filename = f"{current_pathway.name.replace(' ', '_')}_{current_pathway.uuid}.csv"
|
|
csv_pw = current_pathway.to_csv()
|
|
response = HttpResponse(csv_pw, content_type='text/csv')
|
|
response['Content-Disposition'] = f'attachment; filename="{filename}"'
|
|
|
|
return response
|
|
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - {current_pathway.name}'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'pathway'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'pathway', current_pathway)
|
|
|
|
context['pathway'] = current_pathway
|
|
context['current_object'] = current_pathway
|
|
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Package': s.SERVER_URL + '/package'},
|
|
{current_package.name: current_package.url},
|
|
{'Pathway': current_package.url + '/pathway'},
|
|
{current_pathway.name: current_pathway.url},
|
|
]
|
|
|
|
return render(request, 'objects/pathway.html', context)
|
|
# return render(request, 'pathway_playground2.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
if hidden := request.POST.get('hidden', None):
|
|
if hidden == 'delete':
|
|
current_pathway.delete()
|
|
return redirect(current_package.url + '/pathway')
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
selected_scenarios = request.POST.getlist('selected-scenarios')
|
|
|
|
if selected_scenarios:
|
|
set_scenarios(current_user, current_pathway, selected_scenarios)
|
|
return redirect(current_pathway.url)
|
|
|
|
pathway_name = request.POST.get('pathway-name')
|
|
pathway_description = request.POST.get('pathway-description')
|
|
|
|
if any([pathway_name, pathway_description]):
|
|
if pathway_name is not None and pathway_name.strip() != '':
|
|
pathway_name = pathway_name.strip()
|
|
|
|
current_pathway.name = pathway_name
|
|
|
|
if pathway_description is not None and pathway_description.strip() != '':
|
|
pathway_description = pathway_description.strip()
|
|
|
|
current_pathway.description = pathway_description
|
|
|
|
current_pathway.save()
|
|
return redirect(current_pathway.url)
|
|
|
|
node_url = request.POST.get('node')
|
|
|
|
if node_url:
|
|
n = current_pathway.get_node(node_url)
|
|
|
|
from .tasks import predict
|
|
# Dont delay?
|
|
predict(current_pathway.pk, current_pathway.setting.pk, node_pk=n.pk)
|
|
return JsonResponse({'success': current_pathway.url})
|
|
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathway_nodes(request, package_uuid, pathway_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - {current_pathway.name} - Nodes'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'node'
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Package': s.SERVER_URL + '/package'},
|
|
{current_package.name: current_package.url},
|
|
{'Pathway': current_package.url + '/pathway'},
|
|
{current_pathway.name: current_pathway.url},
|
|
{'Node': current_pathway.url + '/node'},
|
|
]
|
|
|
|
reviewed_node_qs = Node.objects.none()
|
|
unreviewed_node_qs = Node.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by('name')
|
|
else:
|
|
unreviewed_node_qs = Node.objects.filter(pathway=current_pathway).order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (reviewed_node_qs if current_package.reviewed else unreviewed_node_qs)
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_node_qs
|
|
context['unreviewed_objects'] = unreviewed_node_qs
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
|
|
node_name = request.POST.get('node-name')
|
|
node_description = request.POST.get('node-description')
|
|
node_smiles = request.POST.get('node-smiles')
|
|
|
|
current_pathway.add_node(node_smiles, name=node_name, description=node_description)
|
|
|
|
return redirect(current_pathway.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathway_node(request, package_uuid, pathway_uuid, node_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
|
|
current_node = Node.objects.get(pathway=current_pathway, uuid=node_uuid)
|
|
|
|
if request.method == 'GET':
|
|
is_image_request = request.GET.get('image')
|
|
if is_image_request:
|
|
if is_image_request == 'svg':
|
|
svg_data = current_node.as_svg
|
|
return HttpResponse(svg_data, content_type="image/svg+xml")
|
|
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - {current_pathway.name}'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'pathway'
|
|
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Package': s.SERVER_URL + '/package'},
|
|
{current_package.name: current_package.url},
|
|
{'Pathway': current_package.url + '/pathway'},
|
|
{current_pathway.name: current_pathway.url},
|
|
{'Node': current_pathway.url + '/node'},
|
|
{current_node.name: current_node.url},
|
|
]
|
|
|
|
context['node'] = current_node
|
|
context['current_object'] = current_node
|
|
|
|
context['app_domain_assessment_data'] = json.dumps(current_node.get_app_domain_assessment_data())
|
|
|
|
return render(request, 'objects/node.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
|
|
log_post_params(request)
|
|
|
|
if hidden := request.POST.get('hidden', None):
|
|
if hidden == 'delete':
|
|
|
|
# pre_delete signal will take care of edge deletion
|
|
current_node.delete()
|
|
|
|
return redirect(current_pathway.url)
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
selected_scenarios = request.POST.getlist('selected-scenarios')
|
|
|
|
if selected_scenarios:
|
|
set_scenarios(current_user, current_node, selected_scenarios)
|
|
return redirect(current_node.url)
|
|
|
|
return HttpResponseBadRequest()
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathway_edges(request, package_uuid, pathway_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - {current_pathway.name} - Edges'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'edge'
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Package': s.SERVER_URL + '/package'},
|
|
{current_package.name: current_package.url},
|
|
{'Pathway': current_package.url + '/pathway'},
|
|
{current_pathway.name: current_pathway.url},
|
|
{'Edge': current_pathway.url + '/edge'},
|
|
]
|
|
|
|
reviewed_edge_qs = Edge.objects.none()
|
|
unreviewed_edge_qs = Edge.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by('name')
|
|
else:
|
|
unreviewed_edge_qs = Edge.objects.filter(pathway=current_pathway).order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (reviewed_edge_qs if current_package.reviewed else unreviewed_edge_qs)
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_edge_qs
|
|
context['unreviewed_objects'] = unreviewed_edge_qs
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
|
|
log_post_params(request)
|
|
|
|
edge_name = request.POST.get('edge-name')
|
|
edge_description = request.POST.get('edge-description')
|
|
edge_substrates = request.POST.getlist('edge-substrates')
|
|
edge_products = request.POST.getlist('edge-products')
|
|
|
|
substrate_nodes = [current_pathway.get_node(url) for url in edge_substrates]
|
|
product_nodes = [current_pathway.get_node(url) for url in edge_products]
|
|
|
|
# TODO in the future consider Rules here?
|
|
current_pathway.add_edge(substrate_nodes, product_nodes, name=edge_name, description=edge_description)
|
|
|
|
return redirect(current_pathway.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_pathway_edge(request, package_uuid, pathway_uuid, edge_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_pathway = Pathway.objects.get(package=current_package, uuid=pathway_uuid)
|
|
current_edge = Edge.objects.get(pathway=current_pathway, uuid=edge_uuid)
|
|
|
|
if request.method == 'GET':
|
|
is_image_request = request.GET.get('image')
|
|
if is_image_request:
|
|
if is_image_request == 'svg':
|
|
svg_data = current_edge.as_svg
|
|
return HttpResponse(svg_data, content_type="image/svg+xml")
|
|
|
|
context = get_base_context(request)
|
|
context[
|
|
'title'] = f'enviPath - {current_package.name} - {current_pathway.name} - {current_edge.edge_label.name}'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'edge'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'pathway', current_pathway, 'edge', current_edge)
|
|
context['edge'] = current_edge
|
|
context['current_object'] = current_edge
|
|
|
|
return render(request, 'objects/edge.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
|
|
log_post_params(request)
|
|
|
|
if hidden := request.POST.get('hidden', None):
|
|
if hidden == 'delete':
|
|
current_edge.delete()
|
|
return redirect(current_pathway.url)
|
|
|
|
selected_scenarios = request.POST.getlist('selected-scenarios')
|
|
|
|
if selected_scenarios:
|
|
set_scenarios(current_user, current_edge, selected_scenarios)
|
|
return redirect(current_edge.url)
|
|
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_scenarios(request, package_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
|
|
if request.method == 'GET':
|
|
|
|
if 'application/json' in request.META.get('HTTP_ACCEPT'): #request.headers.get('Accept') == 'application/json':
|
|
scens = Scenario.objects.filter(package=current_package).order_by('name')
|
|
res = [{'name': s.name, 'url': s.url, 'uuid': s.uuid} for s in scens]
|
|
return JsonResponse(res, safe=False)
|
|
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - Scenarios'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'scenario'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'pathway')
|
|
|
|
reviewed_scenario_qs = Scenario.objects.none()
|
|
unreviewed_scenario_qs = Scenario.objects.none()
|
|
|
|
if current_package.reviewed:
|
|
reviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by('name')
|
|
else:
|
|
unreviewed_scenario_qs = Scenario.objects.filter(package=current_package).order_by('name')
|
|
|
|
if request.GET.get('all'):
|
|
return JsonResponse({
|
|
"objects": [
|
|
{"name": pw.name, "url": pw.url, "reviewed": current_package.reviewed}
|
|
for pw in (reviewed_scenario_qs if current_package.reviewed else unreviewed_scenario_qs)
|
|
]
|
|
})
|
|
|
|
context['reviewed_objects'] = reviewed_scenario_qs
|
|
context['unreviewed_objects'] = unreviewed_scenario_qs
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', ])
|
|
|
|
|
|
@package_permission_required()
|
|
def package_scenario(request, package_uuid, scenario_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
|
|
current_scenario = Scenario.objects.get(package=current_package, uuid=scenario_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_package.name} - {current_scenario.name}'
|
|
|
|
context['meta']['current_package'] = current_package
|
|
context['object_type'] = 'scenario'
|
|
context['breadcrumbs'] = breadcrumbs(current_package, 'scenario', current_scenario)
|
|
|
|
context['scenario'] = current_scenario
|
|
|
|
return render(request, 'objects/scenario.html', context)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', ])
|
|
|
|
|
|
##############
|
|
# User/Group #
|
|
##############
|
|
def users(request):
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - Users'
|
|
|
|
context['object_type'] = 'user'
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'User': s.SERVER_URL + '/user'},
|
|
]
|
|
|
|
context['objects'] = get_user_model().objects.all()
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET'])
|
|
|
|
|
|
def user(request, user_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
|
|
if request.method == 'GET':
|
|
|
|
# Check if current user is the one matching to the url
|
|
if str(current_user.uuid) != user_uuid and not current_user.is_superuser:
|
|
return HttpResponseBadRequest()
|
|
|
|
requested_user = UserManager.get_user_by_id(current_user, user_uuid)
|
|
|
|
context = get_base_context(request, for_user=requested_user)
|
|
context['title'] = f'enviPath - User'
|
|
|
|
context['object_type'] = 'user'
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'User': s.SERVER_URL + '/user'},
|
|
{current_user.username: requested_user.url}
|
|
]
|
|
|
|
context['user'] = requested_user
|
|
|
|
model_qs = EPModel.objects.none()
|
|
for p in PackageManager.get_all_readable_packages(requested_user, include_reviewed=True):
|
|
model_qs |= p.models
|
|
|
|
context['models'] = model_qs
|
|
|
|
context['tokens'] = APIToken.objects.filter(user=requested_user)
|
|
|
|
return render(request, 'objects/user.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
|
|
is_hidden_method = bool(request.POST.get('hidden', False))
|
|
|
|
if is_hidden_method and request.POST['hidden'] == 'request-api-token':
|
|
name = request.POST.get('name', 'No Name')
|
|
valid_for = min(max(int(request.POST.get('valid-for', 90)), 1), 90)
|
|
|
|
token, raw_token = APIToken.create_token(request.user, name=name, valid_for=valid_for)
|
|
|
|
return JsonResponse({"raw_token": raw_token, 'token': {'id': token.id, 'name': token.name}})
|
|
|
|
if is_hidden_method and request.POST['hidden'] == 'delete':
|
|
token_id = request.POST.get('token-id')
|
|
if token_id is None:
|
|
return HttpResponseBadRequest("Token ID missing!")
|
|
|
|
try:
|
|
APIToken.objects.get(user=current_user, id=token_id).delete()
|
|
except APIToken.DoesNotExist:
|
|
return HttpResponseBadRequest("User and Token ID combination invalid!")
|
|
|
|
return HttpResponse("success")
|
|
|
|
default_package = request.POST.get('default-package')
|
|
default_group = request.POST.get('default-group')
|
|
default_prediction_setting = request.POST.get('default-prediction-setting')
|
|
|
|
if any([default_package, default_group, default_prediction_setting]):
|
|
current_user.default_package = PackageManager.get_package_by_url(current_user, default_package)
|
|
current_user.default_group = GroupManager.get_group_by_url(current_user, default_group)
|
|
current_user.default_setting = SettingManager.get_setting_by_url(current_user, default_prediction_setting)
|
|
current_user.save()
|
|
|
|
return redirect(current_user.url)
|
|
|
|
prediction_model_pk = request.POST.get('model')
|
|
prediction_threshold = request.POST.get('threshold')
|
|
prediction_max_nodes = request.POST.get('max_nodes')
|
|
prediction_max_depth = request.POST.get('max_depth')
|
|
|
|
if all([prediction_model_pk, prediction_threshold, prediction_max_nodes, prediction_max_depth]):
|
|
# validate input..
|
|
mod = EPModel.objects.get(id=prediction_model_pk)
|
|
if not PackageManager.readable(current_user, mod.package):
|
|
return HttpResponseBadRequest()
|
|
|
|
threshold = float(prediction_threshold)
|
|
if threshold < 0 or threshold > 1:
|
|
return HttpResponseBadRequest()
|
|
|
|
max_nodes = min(max(int(prediction_max_nodes), 1), 50)
|
|
max_depth = min(max(int(prediction_max_depth), 1), 8)
|
|
|
|
setting = {
|
|
'model': mod,
|
|
'model_parameters': {
|
|
'threshold': threshold
|
|
},
|
|
'truncator': {
|
|
'max_nodes': max_nodes,
|
|
'max_depth': max_depth,
|
|
}
|
|
}
|
|
|
|
return HttpResponseBadRequest()
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
def groups(request):
|
|
current_user = _anonymous_or_real(request)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - Groups'
|
|
|
|
context['object_type'] = 'group'
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Group': s.SERVER_URL + '/group'},
|
|
]
|
|
|
|
context['objects'] = Group.objects.all()
|
|
|
|
return render(request, 'collections/objects_list.html', context)
|
|
elif request.method == 'POST':
|
|
group_name = request.POST.get('group-name')
|
|
group_description = request.POST.get('group-description', s.DEFAULT_VALUES['description'])
|
|
|
|
g = GroupManager.create_group(current_user, group_name, group_description)
|
|
|
|
return redirect(g.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
def group(request, group_uuid):
|
|
current_user = _anonymous_or_real(request)
|
|
current_group = GroupManager.get_group_by_id(current_user, group_uuid)
|
|
|
|
if request.method == 'GET':
|
|
context = get_base_context(request)
|
|
context['title'] = f'enviPath - {current_group.name}'
|
|
|
|
context['object_type'] = 'group'
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Group': s.SERVER_URL + '/group'},
|
|
{current_group.name: current_group.url}
|
|
]
|
|
|
|
context['group'] = current_group
|
|
|
|
context['users'] = UserManager.get_users_lp().exclude(id__in=current_group.user_member.all())
|
|
context['groups'] = GroupManager.get_groups_lp().exclude(id__in=current_group.group_member.all()).exclude(id=current_group.pk)
|
|
|
|
context['packages'] = Package.objects.filter(
|
|
id__in=GroupPackagePermission.objects.filter(group=current_group).values('package').distinct())
|
|
|
|
return render(request, 'objects/group.html', context)
|
|
|
|
elif request.method == 'POST':
|
|
|
|
log_post_params(request)
|
|
|
|
if hidden := request.POST.get('hidden', None):
|
|
if hidden == 'delete':
|
|
current_group.delete()
|
|
return redirect(s.SERVER_URL + '/group')
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
member_url = request.POST.get('member')
|
|
action = request.POST.get('action')
|
|
|
|
if all([member_url, action]) and action in ['add', 'remove']:
|
|
if 'user' in member_url:
|
|
member = UserManager.get_user_lp(member_url)
|
|
else:
|
|
member = GroupManager.get_group_lp(member_url)
|
|
|
|
GroupManager.update_members(current_user, current_group, member, action)
|
|
|
|
return redirect(current_group.url)
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
def settings(request):
|
|
current_user = _anonymous_or_real(request)
|
|
context = get_base_context(request)
|
|
|
|
if request.method == 'GET':
|
|
context['object_type'] = 'setting'
|
|
# Even if settings are aready in "meta", for consistency add it on root level
|
|
context['settings'] = SettingManager.get_all_settings(current_user)
|
|
context['breadcrumbs'] = [
|
|
{'Home': s.SERVER_URL},
|
|
{'Group': s.SERVER_URL + '/setting'},
|
|
]
|
|
return
|
|
elif request.method == 'POST':
|
|
if s.DEBUG:
|
|
for k, v in request.POST.items():
|
|
logger.info("Parameters received:")
|
|
logger.info(f"{k}\t{v}")
|
|
|
|
name = request.POST.get('prediction-setting-name')
|
|
description = request.POST.get('prediction-setting-description')
|
|
new_default = request.POST.get('prediction-setting-new-default', 'off') == 'on'
|
|
|
|
max_nodes = min(max(int(request.POST.get('prediction-setting-max-nodes', 1)), s.DEFAULT_MAX_NUMBER_OF_NODES),
|
|
s.DEFAULT_MAX_NUMBER_OF_NODES)
|
|
max_depth = min(max(int(request.POST.get('prediction-setting-max-depth', 1)), s.DEFAULT_MAX_DEPTH),
|
|
s.DEFAULT_MAX_DEPTH)
|
|
|
|
tp_gen_method = request.POST.get('tp-generation-method')
|
|
|
|
params = {}
|
|
|
|
if tp_gen_method == 'model-based-prediction-setting':
|
|
model_url = request.POST.get('model-based-prediction-setting-model')
|
|
|
|
model_uuid = model_url.split('/')[-1]
|
|
params['model'] = EPModel.objects.get(uuid=model_uuid)
|
|
params['model_threshold'] = request.POST.get('model-based-prediction-setting-threshold',
|
|
s.DEFAULT_MODEL_THRESHOLD)
|
|
|
|
if not PackageManager.readable(current_user, params['model'].package):
|
|
raise ValueError("")
|
|
|
|
elif tp_gen_method == 'rule-based-prediction-setting':
|
|
rule_packages = request.POST.getlist('rule-based-prediction-setting-packages')
|
|
params['rule_packages'] = [PackageManager.get_package_by_url(current_user, p) for p in rule_packages]
|
|
else:
|
|
raise ValueError("")
|
|
|
|
created_setting = SettingManager.create_setting(current_user, name=name, description=description,
|
|
max_nodes=max_nodes, max_depth=max_depth, **params)
|
|
|
|
if new_default:
|
|
current_user.default_setting = created_setting
|
|
current_user.save()
|
|
|
|
return HttpResponse("Success!")
|
|
|
|
else:
|
|
return HttpResponseNotAllowed(['GET', 'POST'])
|
|
|
|
|
|
def setting(request, setting_uuid):
|
|
pass
|
|
|
|
|
|
###########
|
|
# KETCHER #
|
|
###########
|
|
|
|
def indigo(request):
|
|
from indigo import Indigo
|
|
return JsonResponse({'Indigo': {'version': Indigo().version()}})
|
|
|
|
|
|
@csrf_exempt
|
|
def aromatize(request):
|
|
if request.method == 'POST':
|
|
data = json.loads(request.body)
|
|
mol_data = data.get('struct')
|
|
aromatized = IndigoUtils.aromatize(mol_data, False)
|
|
return JsonResponse({"struct": aromatized})
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
|
|
@csrf_exempt
|
|
def dearomatize(request):
|
|
if request.method == 'POST':
|
|
data = json.loads(request.body)
|
|
mol_data = data.get('struct')
|
|
dearomatized = IndigoUtils.dearomatize(mol_data, False)
|
|
return JsonResponse({"struct": dearomatized})
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
|
|
@csrf_exempt
|
|
def layout(request):
|
|
if request.method == 'POST':
|
|
data = json.loads(request.body)
|
|
mol_data = data.get('struct')
|
|
lay = IndigoUtils.layout(mol_data)
|
|
return JsonResponse({"struct": lay})
|
|
else:
|
|
return HttpResponseBadRequest()
|
|
|
|
|
|
##########################
|
|
# Generic/Non-Persistent #
|
|
##########################
|
|
def depict(request):
|
|
if smiles := request.GET.get('smiles'):
|
|
return HttpResponse(IndigoUtils.mol_to_svg(smiles), content_type='image/svg+xml')
|
|
|
|
elif smirks := request.GET.get('smirks'):
|
|
query_smirks = request.GET.get('is_query_smirks', False) == 'true'
|
|
return HttpResponse(IndigoUtils.smirks_to_svg(smirks, query_smirks), content_type='image/svg+xml')
|
|
else:
|
|
return HttpResponseBadRequest()
|