forked from enviPath/enviPy
[Misc] Performance improvements, SMIRKS Coverage, Minor Bugfixes (#132)
Bump Python Version to 3.12 Make use of "epauth" optional Cache `srs` property of rules to speed up apply Adjust view names for use of `reverse()` Fix Views for Scenario Attachments Added Simply Compare View/Template to identify differences between rdkit and ambit Make migrations consistent with tests + compare Fixes #76 Set default year for Scenario Modal Fix html tags for package description Added Tests for Pathway / Rule Added remove stereo for apply Co-authored-by: Tim Lorsbach <tim@lorsba.ch> Reviewed-on: enviPath/enviPy#132
This commit is contained in:
@ -4,6 +4,9 @@ import json
|
||||
from django.conf import settings as s
|
||||
from django.test import TestCase, tag
|
||||
from utilities.chem import FormatConverter
|
||||
from envipy_ambit import apply
|
||||
from rdkit import Chem
|
||||
from rdkit.Chem.MolStandardize import rdMolStandardize
|
||||
|
||||
@tag("slow")
|
||||
class RuleApplicationTest(TestCase):
|
||||
@ -16,17 +19,56 @@ class RuleApplicationTest(TestCase):
|
||||
super(RuleApplicationTest, cls).setUpClass()
|
||||
cls.data = json.load(gzip.open(s.BASE_DIR / 'fixtures' / 'ambit_rules.json.gz', 'rb'))
|
||||
cls.error_smiles = list()
|
||||
from collections import defaultdict
|
||||
cls.triggered = defaultdict(lambda: defaultdict(lambda: 0))
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
super().tearDownClass()
|
||||
print(f"\nTotal Errors across Rules {len(cls.error_smiles)}")
|
||||
# print(cls.error_smiles)
|
||||
from pprint import pprint
|
||||
from collections import Counter
|
||||
pprint(Counter(cls.error_smiles))
|
||||
# import json
|
||||
# pprint(json.loads(json.dumps(cls.triggered)))
|
||||
|
||||
def tearDown(self):
|
||||
super().tearDown()
|
||||
print(f"\nTotal errors {self.total_errors}")
|
||||
|
||||
@staticmethod
|
||||
def normalize_smiles(smiles):
|
||||
m1 = Chem.MolFromSmiles(smiles)
|
||||
if m1 is None:
|
||||
print("Couldnt read smi: ", smiles)
|
||||
return smiles
|
||||
Chem.RemoveStereochemistry(m1)
|
||||
# Normalizer takes care of charge/tautomer/resonance standardization
|
||||
normalizer = rdMolStandardize.Normalizer()
|
||||
return Chem.MolToSmiles(normalizer.normalize(m1), canonical=True)
|
||||
|
||||
@staticmethod
|
||||
def run_both_engines(smiles, smirks):
|
||||
ambit_res = apply(smirks, smiles)
|
||||
|
||||
ambit_res = list(
|
||||
set([RuleApplicationTest.normalize_smiles(str(x)) for x in FormatConverter.sanitize_smiles([str(s) for s in ambit_res])[0]]))
|
||||
|
||||
products = FormatConverter.apply(smiles, smirks)
|
||||
|
||||
all_rdkit_prods = []
|
||||
for ps in products:
|
||||
for p in ps:
|
||||
all_rdkit_prods.append(p)
|
||||
|
||||
all_rdkit_prods = list(set(all_rdkit_prods))
|
||||
|
||||
all_rdkit_res = list(set([RuleApplicationTest.normalize_smiles(str(x)) for x in
|
||||
FormatConverter.sanitize_smiles([str(s) for s in all_rdkit_prods])[0]]))
|
||||
|
||||
return ambit_res, 0, all_rdkit_res, 0
|
||||
|
||||
def run_bt_test(self, bt_rule_name):
|
||||
bt_rule = self.data[bt_rule_name]
|
||||
smirks = bt_rule['smirks']
|
||||
@ -34,35 +76,26 @@ class RuleApplicationTest(TestCase):
|
||||
res = True
|
||||
|
||||
all_prods = set()
|
||||
for comp, ambit_prod in zip(bt_rule['compounds'], bt_rule['products']):
|
||||
for comp in bt_rule['compounds']:
|
||||
|
||||
smi = comp['smiles']
|
||||
products = FormatConverter.apply(smi, smirks)
|
||||
|
||||
all_rdkit_prods = []
|
||||
for ps in products:
|
||||
for p in ps:
|
||||
all_rdkit_prods.append(p)
|
||||
|
||||
all_rdkit_prods = list(set(all_rdkit_prods))
|
||||
|
||||
ambit_smiles, ambit_errors = FormatConverter.sanitize_smiles(ambit_prod)
|
||||
rdkit_smiles, rdkit_errors = FormatConverter.sanitize_smiles(all_rdkit_prods)
|
||||
ambit_smiles, ambit_errors, rdkit_smiles, rdkit_errors = self.run_both_engines(smi, smirks)
|
||||
|
||||
for x in ambit_smiles:
|
||||
all_prods.add(x)
|
||||
|
||||
# TODO mode "intersection"
|
||||
# partial_res = (len(set(ambit_smiles).intersection(set(rdkit_smiles))) > 0) or (len(ambit_smiles) == 0)
|
||||
# FAILED (failures=33)
|
||||
# FAILED (failures=18)
|
||||
|
||||
# TODO mode = "full ambit"
|
||||
# partial_res = len(set(ambit_smiles).intersection(set(rdkit_smiles))) == len(ambit_smiles)
|
||||
# FAILED (failures=44)
|
||||
# partial_res = len(set(ambit_smiles).intersection(set(rdkit_smiles))) == len(set(ambit_smiles))
|
||||
# FAILED (failures=22)
|
||||
|
||||
# TODO mode = "equality"
|
||||
partial_res = set(ambit_smiles) == set(rdkit_smiles)
|
||||
# FAILED (failures=64)
|
||||
# FAILED (failures=30)
|
||||
|
||||
if len(ambit_smiles) and not partial_res:
|
||||
print(f"""
|
||||
|
||||
98
tests/views/test_pathway_views.py
Normal file
98
tests/views/test_pathway_views.py
Normal file
@ -0,0 +1,98 @@
|
||||
from django.test import TestCase, override_settings
|
||||
from django.urls import reverse
|
||||
from django.conf import settings as s
|
||||
|
||||
from epdb.logic import UserManager, PackageManager
|
||||
from epdb.models import Pathway, Edge
|
||||
|
||||
@override_settings(MODEL_DIR=s.FIXTURE_DIRS[0] / "models")
|
||||
class PathwayViewTest(TestCase):
|
||||
fixtures = ["test_fixtures_incl_model.jsonl.gz"]
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(PathwayViewTest, cls).setUpClass()
|
||||
cls.user1 = UserManager.create_user("user1", "user1@envipath.com", "SuperSafe",
|
||||
set_setting=True, add_to_group=True, is_active=True)
|
||||
cls.user1_default_package = cls.user1.default_package
|
||||
cls.package = PackageManager.create_package(cls.user1, 'Test', 'Test Pack')
|
||||
|
||||
def setUp(self):
|
||||
self.client.force_login(self.user1)
|
||||
|
||||
def test_predict_pathway(self):
|
||||
response = self.client.post(reverse("pathways"), {
|
||||
'name': 'Test Pathway',
|
||||
'description': 'Just a Description',
|
||||
'predict': 'predict',
|
||||
'smiles': 'CCN(CC)C(=O)C1=CC(=CC=C1)CO',
|
||||
})
|
||||
self.assertEqual(response.status_code, 302)
|
||||
pathway_url = response.url
|
||||
|
||||
pw = Pathway.objects.get(url=pathway_url)
|
||||
self.assertEqual(self.user1_default_package, pw.package)
|
||||
|
||||
self.assertEqual(pw.name, 'Test Pathway')
|
||||
self.assertEqual(pw.description, 'Just a Description')
|
||||
self.assertEqual(len(pw.root_nodes), 1)
|
||||
self.assertEqual(pw.root_nodes.first().default_node_label.smiles, 'CCN(CC)C(=O)C1=CC(CO)=CC=C1')
|
||||
|
||||
first_level_nodes = {
|
||||
# Edge 1
|
||||
'CCN(CC)C(=O)C1=CC(C=O)=CC=C1',
|
||||
# Edge 2
|
||||
'CCNC(=O)C1=CC(CO)=CC=C1',
|
||||
'CC=O',
|
||||
# Edge 3
|
||||
'CCNCC',
|
||||
'O=C(O)C1=CC(CO)=CC=C1',
|
||||
}
|
||||
|
||||
predicted_nodes = set()
|
||||
edges = Edge.objects.filter(start_nodes__in=[pw.root_nodes.first()])
|
||||
|
||||
for edge in edges:
|
||||
for n in edge.end_nodes.all():
|
||||
predicted_nodes.add(n.default_node_label.smiles)
|
||||
|
||||
self.assertEqual(first_level_nodes, predicted_nodes)
|
||||
|
||||
def test_predict_package_pathway(self):
|
||||
response = self.client.post(
|
||||
reverse("package pathway list", kwargs={'package_uuid': str(self.package.uuid)}), {
|
||||
'name': 'Test Pathway',
|
||||
'description': 'Just a Description',
|
||||
'predict': 'predict',
|
||||
'smiles': 'CCN(CC)C(=O)C1=CC(=CC=C1)CO',
|
||||
})
|
||||
self.assertEqual(response.status_code, 302)
|
||||
pathway_url = response.url
|
||||
|
||||
pw = Pathway.objects.get(url=pathway_url)
|
||||
self.assertEqual(self.package, pw.package)
|
||||
|
||||
self.assertEqual(pw.name, 'Test Pathway')
|
||||
self.assertEqual(pw.description, 'Just a Description')
|
||||
self.assertEqual(len(pw.root_nodes), 1)
|
||||
self.assertEqual(pw.root_nodes.first().default_node_label.smiles, 'CCN(CC)C(=O)C1=CC(CO)=CC=C1')
|
||||
|
||||
first_level_nodes = {
|
||||
# Edge 1
|
||||
'CCN(CC)C(=O)C1=CC(C=O)=CC=C1',
|
||||
# Edge 2
|
||||
'CCNC(=O)C1=CC(CO)=CC=C1',
|
||||
'CC=O',
|
||||
# Edge 3
|
||||
'CCNCC',
|
||||
'O=C(O)C1=CC(CO)=CC=C1',
|
||||
}
|
||||
|
||||
predicted_nodes = set()
|
||||
edges = Edge.objects.filter(start_nodes__in=[pw.root_nodes.first()])
|
||||
|
||||
for edge in edges:
|
||||
for n in edge.end_nodes.all():
|
||||
predicted_nodes.add(n.default_node_label.smiles)
|
||||
|
||||
self.assertEqual(first_level_nodes, predicted_nodes)
|
||||
225
tests/views/test_rule_views.py
Normal file
225
tests/views/test_rule_views.py
Normal file
@ -0,0 +1,225 @@
|
||||
from django.test import TestCase
|
||||
from django.urls import reverse
|
||||
from envipy_additional_information import Temperature, Interval
|
||||
|
||||
from epdb.logic import UserManager, PackageManager
|
||||
from epdb.models import Rule, Scenario
|
||||
|
||||
|
||||
class RuleViewTest(TestCase):
|
||||
fixtures = ["test_fixtures.jsonl.gz"]
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(RuleViewTest, cls).setUpClass()
|
||||
cls.user1 = UserManager.create_user("user1", "user1@envipath.com", "SuperSafe",
|
||||
set_setting=False, add_to_group=True, is_active=True)
|
||||
cls.user1_default_package = cls.user1.default_package
|
||||
cls.package = PackageManager.create_package(cls.user1, 'Test', 'Test Pack')
|
||||
|
||||
def setUp(self):
|
||||
self.client.force_login(self.user1)
|
||||
|
||||
def test_create_rule(self):
|
||||
response = self.client.post(
|
||||
reverse("rules"), {
|
||||
"rule-name": "Test Rule",
|
||||
"rule-description": "Just a Description",
|
||||
"rule-smirks": "[H:5][C:1]([#6:6])([#1,#9,#17,#35,#53:4])[#9,#17,#35,#53]>>[H:5][C:1]([#6:6])([#8])[#1,#9,#17,#35,#53:4]",
|
||||
"rule-type": "SimpleAmbitRule",
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 302)
|
||||
rule_url = response.url
|
||||
|
||||
r = Rule.objects.get(url=rule_url)
|
||||
|
||||
self.assertEqual(r.package, self.user1_default_package)
|
||||
self.assertEqual(r.name, "Test Rule")
|
||||
self.assertEqual(r.description, "Just a Description")
|
||||
self.assertEqual(r.smirks,
|
||||
"[H:5][C:1]([#6:6])([#1,#9,#17,#35,#53:4])[#9,#17,#35,#53]>>[H:5][C:1]([#6:6])([#8])[#1,#9,#17,#35,#53:4]")
|
||||
self.assertEqual(self.user1_default_package.rules.count(), 1)
|
||||
|
||||
# Adding the same rule again should return the existing one, hence not increasing the number of rules
|
||||
response = self.client.post(
|
||||
reverse("rules"), {
|
||||
"rule-name": "Test Rule",
|
||||
"rule-description": "Just a Description",
|
||||
"rule-smirks": "[H:5][C:1]([#6:6])([#1,#9,#17,#35,#53:4])[#9,#17,#35,#53]>>[H:5][C:1]([#6:6])([#8])[#1,#9,#17,#35,#53:4]",
|
||||
"rule-type": "SimpleAmbitRule",
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(response.url, rule_url)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(self.user1_default_package.rules.count(), 1)
|
||||
|
||||
# Adding the same rule in a different package should create a new rule
|
||||
response = self.client.post(
|
||||
reverse("package rule list", kwargs={'package_uuid': self.package.uuid}), {
|
||||
"rule-name": "Test Rule",
|
||||
"rule-description": "Just a Description",
|
||||
"rule-smirks": "[H:5][C:1]([#6:6])([#1,#9,#17,#35,#53:4])[#9,#17,#35,#53]>>[H:5][C:1]([#6:6])([#8])[#1,#9,#17,#35,#53:4]",
|
||||
"rule-type": "SimpleAmbitRule",
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertNotEqual(rule_url, response.url)
|
||||
|
||||
# Edit
|
||||
def test_edit_rule(self):
|
||||
response = self.client.post(
|
||||
reverse("rules"), {
|
||||
"rule-name": "Test Rule",
|
||||
"rule-description": "Just a Description",
|
||||
"rule-smirks": "[H:5][C:1]([#6:6])([#1,#9,#17,#35,#53:4])[#9,#17,#35,#53]>>[H:5][C:1]([#6:6])([#8])[#1,#9,#17,#35,#53:4]",
|
||||
"rule-type": "SimpleAmbitRule",
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 302)
|
||||
rule_url = response.url
|
||||
|
||||
r = Rule.objects.get(url=rule_url)
|
||||
|
||||
response = self.client.post(
|
||||
reverse("package rule detail", kwargs={
|
||||
'package_uuid': str(self.user1_default_package.uuid),
|
||||
'rule_uuid': str(r.uuid)
|
||||
}), {
|
||||
"rule-name": "Test Rule Adjusted",
|
||||
"rule-description": "New Description",
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 302)
|
||||
r = Rule.objects.get(url=rule_url)
|
||||
self.assertEqual(r.name, "Test Rule Adjusted")
|
||||
self.assertEqual(r.description, "New Description")
|
||||
|
||||
# Scenario
|
||||
def test_set_scenario(self):
|
||||
s1 = Scenario.create(
|
||||
self.user1_default_package,
|
||||
"Test Scen",
|
||||
"Test Desc",
|
||||
"2025-10",
|
||||
"soil",
|
||||
[Temperature(interval=Interval(start=20, end=30))]
|
||||
)
|
||||
|
||||
s2 = Scenario.create(
|
||||
self.user1_default_package,
|
||||
"Test Scen2",
|
||||
"Test Desc2",
|
||||
"2025-10",
|
||||
"soil",
|
||||
[Temperature(interval=Interval(start=10, end=20))]
|
||||
)
|
||||
|
||||
response = self.client.post(
|
||||
reverse("rules"), {
|
||||
"rule-name": "Test Rule",
|
||||
"rule-description": "Just a Description",
|
||||
"rule-smirks": "[H:5][C:1]([#6:6])([#1,#9,#17,#35,#53:4])[#9,#17,#35,#53]>>[H:5][C:1]([#6:6])([#8])[#1,#9,#17,#35,#53:4]",
|
||||
"rule-type": "SimpleAmbitRule",
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 302)
|
||||
rule_url = response.url
|
||||
r = Rule.objects.get(url=rule_url)
|
||||
|
||||
response = self.client.post(
|
||||
reverse("package rule detail", kwargs={
|
||||
'package_uuid': str(r.package.uuid),
|
||||
'rule_uuid': str(r.uuid)
|
||||
}), {
|
||||
"selected-scenarios": [s1.url, s2.url]
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(len(r.scenarios.all()), 2)
|
||||
|
||||
response = self.client.post(
|
||||
reverse("package rule detail", kwargs={
|
||||
'package_uuid': str(r.package.uuid),
|
||||
'rule_uuid': str(r.uuid)
|
||||
}), {
|
||||
"selected-scenarios": [s1.url]
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(len(r.scenarios.all()), 1)
|
||||
self.assertEqual(r.scenarios.first().url, s1.url)
|
||||
|
||||
response = self.client.post(
|
||||
reverse("package rule detail", kwargs={
|
||||
'package_uuid': str(r.package.uuid),
|
||||
'rule_uuid': str(r.uuid)
|
||||
}), {
|
||||
"selected-scenarios": []
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(len(r.scenarios.all()), 0)
|
||||
|
||||
def test_copy(self):
|
||||
response = self.client.post(
|
||||
reverse("rules"), {
|
||||
"rule-name": "Test Rule",
|
||||
"rule-description": "Just a Description",
|
||||
"rule-smirks": "[H:5][C:1]([#6:6])([#1,#9,#17,#35,#53:4])[#9,#17,#35,#53]>>[H:5][C:1]([#6:6])([#8])[#1,#9,#17,#35,#53:4]",
|
||||
"rule-type": "SimpleAmbitRule",
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 302)
|
||||
rule_url = response.url
|
||||
r = Rule.objects.get(url=rule_url)
|
||||
|
||||
response = self.client.post(
|
||||
reverse("package detail", kwargs={
|
||||
'package_uuid': str(r.package.uuid),
|
||||
}), {
|
||||
"hidden": "copy",
|
||||
"object_to_copy": r.url
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
copied_object_url = response.json()["success"]
|
||||
copied_rule = Rule.objects.get(url=copied_object_url)
|
||||
|
||||
self.assertEqual(copied_rule.name, r.name)
|
||||
self.assertEqual(copied_rule.description, r.description)
|
||||
self.assertEqual(copied_rule.smirks, r.smirks)
|
||||
|
||||
def test_delete(self):
|
||||
response = self.client.post(
|
||||
reverse("rules"), {
|
||||
"rule-name": "Test Rule",
|
||||
"rule-description": "Just a Description",
|
||||
"rule-smirks": "[H:5][C:1]([#6:6])([#1,#9,#17,#35,#53:4])[#9,#17,#35,#53]>>[H:5][C:1]([#6:6])([#8])[#1,#9,#17,#35,#53:4]",
|
||||
"rule-type": "SimpleAmbitRule",
|
||||
}
|
||||
)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
rule_url = response.url
|
||||
r = Rule.objects.get(url=rule_url)
|
||||
|
||||
response = self.client.post(
|
||||
reverse("package rule detail", kwargs={
|
||||
'package_uuid': str(r.package.uuid),
|
||||
'rule_uuid': str(r.uuid)
|
||||
}), {
|
||||
"hidden": "delete"
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(self.user1_default_package.rules.count(), 0)
|
||||
Reference in New Issue
Block a user