Merge remote-tracking branch 'origin/develop' into feature/frontend_update

This commit is contained in:
2025-11-05 17:25:27 +13:00
11 changed files with 377 additions and 31 deletions

View File

@ -16,3 +16,5 @@ POSTGRES_PORT=
# MAIL # MAIL
EMAIL_HOST_USER= EMAIL_HOST_USER=
EMAIL_HOST_PASSWORD= EMAIL_HOST_PASSWORD=
# MATOMO
MATOMO_SITE_ID

View File

@ -372,3 +372,6 @@ if MS_ENTRA_ENABLED:
MS_ENTRA_AUTHORITY = f"https://login.microsoftonline.com/{MS_ENTRA_TENANT_ID}" MS_ENTRA_AUTHORITY = f"https://login.microsoftonline.com/{MS_ENTRA_TENANT_ID}"
MS_ENTRA_REDIRECT_URI = os.environ["MS_REDIRECT_URI"] MS_ENTRA_REDIRECT_URI = os.environ["MS_REDIRECT_URI"]
MS_ENTRA_SCOPES = os.environ.get("MS_SCOPES", "").split(",") MS_ENTRA_SCOPES = os.environ.get("MS_SCOPES", "").split(",")
# Site ID 10 -> beta.envipath.org
MATOMO_SITE_ID = os.environ.get("MATOMO_SITE_ID", "10")

View File

@ -20,11 +20,12 @@ from .models import (
Setting, Setting,
ExternalDatabase, ExternalDatabase,
ExternalIdentifier, ExternalIdentifier,
JobLog,
) )
class UserAdmin(admin.ModelAdmin): class UserAdmin(admin.ModelAdmin):
pass list_display = ["username", "email", "is_active"]
class UserPackagePermissionAdmin(admin.ModelAdmin): class UserPackagePermissionAdmin(admin.ModelAdmin):
@ -39,8 +40,14 @@ class GroupPackagePermissionAdmin(admin.ModelAdmin):
pass pass
class JobLogAdmin(admin.ModelAdmin):
pass
class EPAdmin(admin.ModelAdmin): class EPAdmin(admin.ModelAdmin):
search_fields = ["name", "description"] search_fields = ["name", "description"]
list_display = ["name", "url", "created"]
ordering = ["-created"]
class PackageAdmin(EPAdmin): class PackageAdmin(EPAdmin):
@ -107,6 +114,7 @@ admin.site.register(User, UserAdmin)
admin.site.register(UserPackagePermission, UserPackagePermissionAdmin) admin.site.register(UserPackagePermission, UserPackagePermissionAdmin)
admin.site.register(Group, GroupAdmin) admin.site.register(Group, GroupAdmin)
admin.site.register(GroupPackagePermission, GroupPackagePermissionAdmin) admin.site.register(GroupPackagePermission, GroupPackagePermissionAdmin)
admin.site.register(JobLog, JobLogAdmin)
admin.site.register(Package, PackageAdmin) admin.site.register(Package, PackageAdmin)
admin.site.register(MLRelativeReasoning, MLRelativeReasoningAdmin) admin.site.register(MLRelativeReasoning, MLRelativeReasoningAdmin)
admin.site.register(EnviFormer, EnviFormerAdmin) admin.site.register(EnviFormer, EnviFormerAdmin)

View File

@ -1,13 +1,15 @@
import csv
import io
import logging import logging
from datetime import datetime from datetime import datetime
from typing import Callable, Optional from typing import Any, Callable, List, Optional
from uuid import uuid4 from uuid import uuid4
from celery import shared_task from celery import shared_task
from celery.utils.functional import LRUCache from celery.utils.functional import LRUCache
from epdb.logic import SPathway from epdb.logic import SPathway
from epdb.models import EPModel, JobLog, Node, Package, Pathway, Setting, User from epdb.models import EPModel, JobLog, Node, Package, Pathway, Rule, Setting, User, Edge
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ML_CACHE = LRUCache(3) # Cache the three most recent ML models to reduce load times. ML_CACHE = LRUCache(3) # Cache the three most recent ML models to reduce load times.
@ -186,3 +188,96 @@ def predict(
JobLog.objects.filter(task_id=self.request.id).update(status="SUCCESS", task_result=pw.url) JobLog.objects.filter(task_id=self.request.id).update(status="SUCCESS", task_result=pw.url)
return pw.url return pw.url
@shared_task(bind=True, queue="background")
def identify_missing_rules(
self,
pw_pks: List[int],
rule_package_pk: int,
):
from utilities.misc import PathwayUtils
rules = Package.objects.get(pk=rule_package_pk).get_applicable_rules()
rows: List[Any] = []
header = [
"Package Name",
"Pathway Name",
"Educt Name",
"Educt SMILES",
"Reaction Name",
"Reaction SMIRKS",
"Triggered Rules",
"Reactant SMARTS",
"Product SMARTS",
"Product Names",
"Product SMILES",
]
rows.append(header)
for pw in Pathway.objects.filter(pk__in=pw_pks):
pu = PathwayUtils(pw)
missing_rules = pu.find_missing_rules(rules)
package_name = pw.package.name
pathway_name = pw.name
for edge_url, rule_chain in missing_rules.items():
row: List[Any] = [package_name, pathway_name]
edge = Edge.objects.get(url=edge_url)
educts = edge.start_nodes.all()
for educt in educts:
row.append(educt.default_node_label.name)
row.append(educt.default_node_label.smiles)
row.append(edge.edge_label.name)
row.append(edge.edge_label.smirks())
rule_names = []
reactant_smarts = []
product_smarts = []
for r in rule_chain:
r = Rule.objects.get(url=r[0])
rule_names.append(r.name)
rs = r.reactants_smarts
if isinstance(rs, set):
rs = list(rs)
ps = r.products_smarts
if isinstance(ps, set):
ps = list(ps)
reactant_smarts.append(rs)
product_smarts.append(ps)
row.append(rule_names)
row.append(reactant_smarts)
row.append(product_smarts)
products = edge.end_nodes.all()
product_names = []
product_smiles = []
for product in products:
product_names.append(product.default_node_label.name)
product_smiles.append(product.default_node_label.smiles)
row.append(product_names)
row.append(product_smiles)
rows.append(row)
buffer = io.StringIO()
writer = csv.writer(buffer)
writer.writerows(rows)
buffer.seek(0)
return buffer.getvalue()

View File

@ -240,6 +240,7 @@ def get_base_context(request, for_user=None) -> Dict[str, Any]:
"enabled_features": s.FLAGS, "enabled_features": s.FLAGS,
"debug": s.DEBUG, "debug": s.DEBUG,
"external_databases": ExternalDatabase.get_databases(), "external_databases": ExternalDatabase.get_databases(),
"site_id": s.MATOMO_SITE_ID,
}, },
} }
@ -1869,6 +1870,25 @@ def package_pathway(request, package_uuid, pathway_uuid):
return response return response
if (
request.GET.get("identify-missing-rules", False) == "true"
and request.GET.get("rule-package") is not None
):
from .tasks import dispatch_eager, identify_missing_rules
rule_package = PackageManager.get_package_by_url(
current_user, request.GET.get("rule-package")
)
res = dispatch_eager(
current_user, identify_missing_rules, [current_pathway.pk], rule_package.pk
)
filename = f"{current_pathway.name.replace(' ', '_')}_{current_pathway.uuid}.csv"
response = HttpResponse(res, content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
# Pathway d3_json() relies on a lot of related objects (Nodes, Structures, Edges, Reaction, Rules, ...) # Pathway d3_json() relies on a lot of related objects (Nodes, Structures, Edges, Reaction, Rules, ...)
# we will again fetch the current pathway identified by this url, but this time together with nearly all # we will again fetch the current pathway identified by this url, but this time together with nearly all
# related objects # related objects

View File

@ -22,6 +22,10 @@
<i class="glyphicon glyphicon-floppy-save"></i> Download Pathway as Image</a> <i class="glyphicon glyphicon-floppy-save"></i> Download Pathway as Image</a>
</li> </li>
{% if meta.can_edit %} {% if meta.can_edit %}
<li>
<a class="button" data-toggle="modal" data-target="#identify_missing_rules_modal">
<i class="glyphicon glyphicon-question-sign"></i> Identify Missing Rules</a>
</li>
<li role="separator" class="divider"></li> <li role="separator" class="divider"></li>
<li> <li>
<a class="button" data-toggle="modal" data-target="#edit_pathway_modal"> <a class="button" data-toggle="modal" data-target="#edit_pathway_modal">

View File

@ -66,7 +66,7 @@
(function () { (function () {
var u = "//matomo.envipath.com/"; var u = "//matomo.envipath.com/";
_paq.push(['setTrackerUrl', u + 'matomo.php']); _paq.push(['setTrackerUrl', u + 'matomo.php']);
_paq.push(['setSiteId', '10']); _paq.push(['setSiteId', '{{ meta.site_id }}']);
var d = document, g = d.createElement('script'), s = d.getElementsByTagName('script')[0]; var d = document, g = d.createElement('script'), s = d.getElementsByTagName('script')[0];
g.async = true; g.async = true;
g.src = u + 'matomo.js'; g.src = u + 'matomo.js';

View File

@ -0,0 +1,54 @@
{% load static %}
<!-- Identify Missing Rules -->
<div id="identify_missing_rules_modal" class="modal" tabindex="-1">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h3 class="modal-title">Identify Missing Rules</h3>
<button type="button" class="close" data-dismiss="modal" aria-label="Close">
<span aria-hidden="true">&times;</span>
</button>
</div>
<div class="modal-body">
By clicking on Download we'll search the Pathway for Reactions that are not backed by
a Rule or which can be assembled by chaining two rules.
<form id="identify-missing-rules-modal-form" accept-charset="UTF-8" action="{{ pathway.url }}"
data-remote="true" method="GET">
<label for="rule-package">Select the Rule Package</label>
<select id="rule-package" name="rule-package" data-actions-box='true' class="form-control"
data-width='100%'>
<option disabled>Reviewed Packages</option>
{% for obj in meta.readable_packages %}
{% if obj.reviewed %}
<option value="{{ obj.url }}">{{ obj.name }}</option>
{% endif %}
{% endfor %}
<option disabled>Unreviewed Packages</option>
{% for obj in meta.readable_packages %}
{% if not obj.reviewed %}
<option value="{{ obj.url }}">{{ obj.name }}</option>
{% endif %}
{% endfor %}
</select>
<input type="hidden" name="identify-missing-rules" value="true"/>
</form>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-dismiss="modal">Close</button>
<button type="button" class="btn btn-primary" id="identify-missing-rules-modal-submit">Download</button>
</div>
</div>
</div>
</div>
<script>
$(function () {
$('#identify-missing-rules-modal-submit').click(function (e) {
e.preventDefault();
$('#identify-missing-rules-modal-form').submit();
$('#identify_missing_rules_modal').modal('hide');
});
})
</script>

View File

@ -83,6 +83,7 @@
{% include "modals/objects/add_pathway_edge_modal.html" %} {% include "modals/objects/add_pathway_edge_modal.html" %}
{% include "modals/objects/download_pathway_csv_modal.html" %} {% include "modals/objects/download_pathway_csv_modal.html" %}
{% include "modals/objects/download_pathway_image_modal.html" %} {% include "modals/objects/download_pathway_image_modal.html" %}
{% include "modals/objects/identify_missing_rules_modal.html" %}
{% include "modals/objects/generic_copy_object_modal.html" %} {% include "modals/objects/generic_copy_object_modal.html" %}
{% include "modals/objects/edit_pathway_modal.html" %} {% include "modals/objects/edit_pathway_modal.html" %}
{% include "modals/objects/generic_set_aliases_modal.html" %} {% include "modals/objects/generic_set_aliases_modal.html" %}

View File

@ -185,7 +185,7 @@ class FormatConverter(object):
return smiles return smiles
@staticmethod @staticmethod
def standardize(smiles, remove_stereo=False): def standardize(smiles, remove_stereo=False, canonicalize_tautomers=False):
# Taken from https://bitsilla.com/blog/2021/06/standardizing-a-molecule-using-rdkit/ # Taken from https://bitsilla.com/blog/2021/06/standardizing-a-molecule-using-rdkit/
# follows the steps in # follows the steps in
# https://github.com/greglandrum/RSC_OpenScience_Standardization_202104/blob/main/MolStandardize%20pieces.ipynb # https://github.com/greglandrum/RSC_OpenScience_Standardization_202104/blob/main/MolStandardize%20pieces.ipynb
@ -203,19 +203,21 @@ class FormatConverter(object):
uncharger = ( uncharger = (
rdMolStandardize.Uncharger() rdMolStandardize.Uncharger()
) # annoying, but necessary as no convenience method exists ) # annoying, but necessary as no convenience method exists
uncharged_parent_clean_mol = uncharger.uncharge(parent_clean_mol) res_mol = uncharger.uncharge(parent_clean_mol)
# note that no attempt is made at reionization at this step # note that no attempt is made at reionization at this step
# nor at ionization at some pH (rdkit has no pKa caculator) # nor at ionization at some pH (rdkit has no pKa caculator)
# the main aim to to represent all molecules from different sources # the main aim to to represent all molecules from different sources
# in a (single) standard way, for use in ML, catalogue, etc. # in a (single) standard way, for use in ML, catalogue, etc.
# te = rdMolStandardize.TautomerEnumerator() # idem
# taut_uncharged_parent_clean_mol = te.Canonicalize(uncharged_parent_clean_mol)
if remove_stereo: if remove_stereo:
Chem.RemoveStereochemistry(uncharged_parent_clean_mol) Chem.RemoveStereochemistry(res_mol)
return Chem.MolToSmiles(uncharged_parent_clean_mol, kekuleSmiles=True) if canonicalize_tautomers:
te = rdMolStandardize.TautomerEnumerator() # idem
res_mol = te.Canonicalize(res_mol)
return Chem.MolToSmiles(res_mol, kekuleSmiles=True)
@staticmethod @staticmethod
def neutralize_smiles(smiles): def neutralize_smiles(smiles):
@ -363,6 +365,76 @@ class FormatConverter(object):
return parsed_smiles, errors return parsed_smiles, errors
@staticmethod
def smiles_covered_by(
l_smiles: List[str],
r_smiles: List[str],
standardize: bool = True,
canonicalize_tautomers: bool = True,
) -> bool:
"""
Check if all SMILES in the left list are covered by (contained in) the right list.
This function performs a subset check to determine if every chemical structure
represented in l_smiles has a corresponding representation in r_smiles.
Args:
l_smiles (List[str]): List of SMILES strings to check for coverage.
r_smiles (List[str]): List of SMILES strings that should contain all l_smiles.
standardize (bool, optional): Whether to standardize SMILES before comparison.
Defaults to True. When True, applies FormatConverter.standardize() to
normalize representations for accurate comparison.
canonicalize_tautomers (bool, optional): Whether to canonicalize tautomers
Defaults to False. When True, applies rdMolStandardize.TautomerEnumerator().Canonicalize(res_mol)
to the compounds before comparison.
Returns:
bool: True if all SMILES in l_smiles are found in r_smiles (i.e., l_smiles
is a subset of r_smiles), False otherwise.
Note:
- Comparison treats lists as sets, ignoring duplicates and order
- Failed standardization attempts are silently ignored (original SMILES used)
- This is a one-directional check: l_smiles ⊆ r_smiles
- For bidirectional equality, both directions must be checked separately
Example:
>>> FormatConverter.smiles_covered_by(["CCO", "CC"], ["CCO", "CC", "CCC"])
True
>>> FormatConverter.smiles_covered_by(["CCO", "CCCC"], ["CCO", "CC", "CCC"])
False
"""
standardized_l_smiles = []
if standardize:
for smi in l_smiles:
try:
smi = FormatConverter.standardize(
smi, canonicalize_tautomers=canonicalize_tautomers
)
except Exception:
# :shrug:
# logger.debug(f'Standardizing SMILES failed for {smi}')
pass
standardized_l_smiles.append(smi)
else:
standardized_l_smiles = l_smiles
standardized_r_smiles = []
if standardize:
for smi in r_smiles:
try:
smi = FormatConverter.standardize(smi)
except Exception:
# :shrug:
# logger.debug(f'Standardizing SMILES failed for {smi}')
pass
standardized_r_smiles.append(smi)
else:
standardized_r_smiles = r_smiles
return len(set(standardized_l_smiles).difference(set(standardized_r_smiles))) == 0
class Standardizer(ABC): class Standardizer(ABC):
def __init__(self, name): def __init__(self, name):

View File

@ -9,36 +9,37 @@ from collections import defaultdict
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from types import NoneType from types import NoneType
from typing import Dict, Any, List from typing import Any, Dict, List
from django.db import transaction from django.db import transaction
from envipy_additional_information import Interval, EnviPyModel from envipy_additional_information import NAME_MAPPING, EnviPyModel, Interval
from envipy_additional_information import NAME_MAPPING
from pydantic import BaseModel, HttpUrl from pydantic import BaseModel, HttpUrl
from epdb.models import ( from epdb.models import (
Package,
Compound, Compound,
CompoundStructure, CompoundStructure,
SimpleRule, Edge,
EnviFormer,
EPModel,
ExternalDatabase,
ExternalIdentifier,
License,
MLRelativeReasoning,
Node,
Package,
ParallelRule,
Pathway,
PluginModel,
Reaction,
Rule,
RuleBasedRelativeReasoning,
Scenario,
SequentialRule,
SimpleAmbitRule, SimpleAmbitRule,
SimpleRDKitRule, SimpleRDKitRule,
ParallelRule, SimpleRule,
SequentialRule,
Reaction,
Pathway,
Node,
Edge,
Scenario,
EPModel,
MLRelativeReasoning,
RuleBasedRelativeReasoning,
EnviFormer,
PluginModel,
ExternalIdentifier,
ExternalDatabase,
License,
) )
from utilities.chem import FormatConverter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -48,7 +49,7 @@ class HTMLGenerator:
@staticmethod @staticmethod
def generate_html(additional_information: "EnviPyModel", prefix="") -> str: def generate_html(additional_information: "EnviPyModel", prefix="") -> str:
from typing import get_origin, get_args, Union from typing import Union, get_args, get_origin
if isinstance(additional_information, type): if isinstance(additional_information, type):
clz_name = additional_information.__name__ clz_name = additional_information.__name__
@ -1171,3 +1172,89 @@ class PackageImporter:
url=identifier_data.get("url", ""), url=identifier_data.get("url", ""),
is_primary=identifier_data.get("is_primary", False), is_primary=identifier_data.get("is_primary", False),
) )
class PathwayUtils:
def __init__(self, pathway: "Pathway"):
self.pathway = pathway
@staticmethod
def _get_products(smiles: str, rules: List["Rule"]):
educt_rule_products: Dict[str, Dict[str, List[str]]] = defaultdict(
lambda: defaultdict(list)
)
for r in rules:
product_sets = r.apply(smiles)
for product_set in product_sets:
for product in product_set:
educt_rule_products[smiles][r.url].append(product)
return educt_rule_products
def find_missing_rules(self, rules: List["Rule"]):
print(f"Processing {self.pathway.name}")
# compute products for each node / rule combination in the pathway
educt_rule_products = defaultdict(lambda: defaultdict(list))
for node in self.pathway.nodes:
educt_rule_products.update(**self._get_products(node.default_node_label.smiles, rules))
# loop through edges and determine reactions that can't be constructed by
# any of the rules or a combination of two rules in a chained fashion
res: Dict[str, List["Rule"]] = dict()
for edge in self.pathway.edges:
found = False
reaction = edge.edge_label
educts = [cs for cs in reaction.educts.all()]
products = [cs.smiles for cs in reaction.products.all()]
rule_chain = []
for educt in educts:
educt = educt.smiles
triggered_rules = list(educt_rule_products.get(educt, {}).keys())
for triggered_rule in triggered_rules:
if rule_products := educt_rule_products[educt][triggered_rule]:
# check if this rule covers the reaction
if FormatConverter.smiles_covered_by(
products, rule_products, standardize=True, canonicalize_tautomers=True
):
found = True
else:
# Check if another prediction step would cover the reaction
for product in rule_products:
prod_rule_products = self._get_products(product, rules)
prod_triggered_rules = list(
prod_rule_products.get(product, {}).keys()
)
for prod_triggered_rule in prod_triggered_rules:
if second_step_products := prod_rule_products[product][
prod_triggered_rule
]:
if FormatConverter.smiles_covered_by(
products,
second_step_products,
standardize=True,
canonicalize_tautomers=True,
):
rule_chain.append(
(
triggered_rule,
Rule.objects.get(url=triggered_rule).name,
)
)
rule_chain.append(
(
prod_triggered_rule,
Rule.objects.get(url=prod_triggered_rule).name,
)
)
res[edge.url] = rule_chain
if not found:
res[edge.url] = rule_chain
return res