[Feature] Identify Missing Rules (#177)

Fixes #97
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#177
This commit is contained in:
2025-10-30 00:47:45 +13:00
parent f1b4c5aadb
commit 13ed86a780
7 changed files with 361 additions and 29 deletions

View File

@ -9,36 +9,37 @@ from collections import defaultdict
from datetime import datetime
from enum import Enum
from types import NoneType
from typing import Dict, Any, List
from typing import Any, Dict, List
from django.db import transaction
from envipy_additional_information import Interval, EnviPyModel
from envipy_additional_information import NAME_MAPPING
from envipy_additional_information import NAME_MAPPING, EnviPyModel, Interval
from pydantic import BaseModel, HttpUrl
from epdb.models import (
Package,
Compound,
CompoundStructure,
SimpleRule,
Edge,
EnviFormer,
EPModel,
ExternalDatabase,
ExternalIdentifier,
License,
MLRelativeReasoning,
Node,
Package,
ParallelRule,
Pathway,
PluginModel,
Reaction,
Rule,
RuleBasedRelativeReasoning,
Scenario,
SequentialRule,
SimpleAmbitRule,
SimpleRDKitRule,
ParallelRule,
SequentialRule,
Reaction,
Pathway,
Node,
Edge,
Scenario,
EPModel,
MLRelativeReasoning,
RuleBasedRelativeReasoning,
EnviFormer,
PluginModel,
ExternalIdentifier,
ExternalDatabase,
License,
SimpleRule,
)
from utilities.chem import FormatConverter
logger = logging.getLogger(__name__)
@ -48,7 +49,7 @@ class HTMLGenerator:
@staticmethod
def generate_html(additional_information: "EnviPyModel", prefix="") -> str:
from typing import get_origin, get_args, Union
from typing import Union, get_args, get_origin
if isinstance(additional_information, type):
clz_name = additional_information.__name__
@ -1171,3 +1172,89 @@ class PackageImporter:
url=identifier_data.get("url", ""),
is_primary=identifier_data.get("is_primary", False),
)
class PathwayUtils:
def __init__(self, pathway: "Pathway"):
self.pathway = pathway
@staticmethod
def _get_products(smiles: str, rules: List["Rule"]):
educt_rule_products: Dict[str, Dict[str, List[str]]] = defaultdict(
lambda: defaultdict(list)
)
for r in rules:
product_sets = r.apply(smiles)
for product_set in product_sets:
for product in product_set:
educt_rule_products[smiles][r.url].append(product)
return educt_rule_products
def find_missing_rules(self, rules: List["Rule"]):
print(f"Processing {self.pathway.name}")
# compute products for each node / rule combination in the pathway
educt_rule_products = defaultdict(lambda: defaultdict(list))
for node in self.pathway.nodes:
educt_rule_products.update(**self._get_products(node.default_node_label.smiles, rules))
# loop through edges and determine reactions that can't be constructed by
# any of the rules or a combination of two rules in a chained fashion
res: Dict[str, List["Rule"]] = dict()
for edge in self.pathway.edges:
found = False
reaction = edge.edge_label
educts = [cs for cs in reaction.educts.all()]
products = [cs.smiles for cs in reaction.products.all()]
rule_chain = []
for educt in educts:
educt = educt.smiles
triggered_rules = list(educt_rule_products.get(educt, {}).keys())
for triggered_rule in triggered_rules:
if rule_products := educt_rule_products[educt][triggered_rule]:
# check if this rule covers the reaction
if FormatConverter.smiles_covered_by(
products, rule_products, standardize=True, canonicalize_tautomers=True
):
found = True
else:
# Check if another prediction step would cover the reaction
for product in rule_products:
prod_rule_products = self._get_products(product, rules)
prod_triggered_rules = list(
prod_rule_products.get(product, {}).keys()
)
for prod_triggered_rule in prod_triggered_rules:
if second_step_products := prod_rule_products[product][
prod_triggered_rule
]:
if FormatConverter.smiles_covered_by(
products,
second_step_products,
standardize=True,
canonicalize_tautomers=True,
):
rule_chain.append(
(
triggered_rule,
Rule.objects.get(url=triggered_rule).name,
)
)
rule_chain.append(
(
prod_triggered_rule,
Rule.objects.get(url=prod_triggered_rule).name,
)
)
res[edge.url] = rule_chain
if not found:
res[edge.url] = rule_chain
return res