Experimental App Domain (#43)

Backend App Domain done, Frontend missing

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#43
This commit is contained in:
2025-08-08 20:52:21 +12:00
parent 280ddc7205
commit 579cd519d0
14 changed files with 1094 additions and 574 deletions

View File

@ -131,6 +131,24 @@ class FormatConverter(object):
# TODO call to AMBIT Service
return smiles
@staticmethod
def ep_standardize(smiles):
change = True
while change:
change = False
for standardizer in MATCH_STANDARDIZER:
tmp_smiles = standardizer.standardize(smiles)
if tmp_smiles != smiles:
print(f"change {smiles} to {tmp_smiles}")
change = True
smiles = tmp_smiles
if change is False:
print(f"nothing changed")
return smiles
@staticmethod
def standardize(smiles):
# Taken from https://bitsilla.com/blog/2021/06/standardizing-a-molecule-using-rdkit/
@ -180,54 +198,6 @@ class FormatConverter(object):
atom.UpdatePropertyCache()
return mol
# @staticmethod
# def apply(smiles, smirks, preprocess_smiles=True, bracketize=False, standardize=True):
# logger.debug(f'Applying {smirks} on {smiles}')
#
# if bracketize:
# smirks = smirks.split('>>')[0] + ">>(" + smirks.split('>>')[1] + ")"
#
# res = set()
# try:
# rxn = rdChemReactions.ReactionFromSmarts(smirks)
# mol = Chem.MolFromSmiles(smiles)
#
# # Inplace
# if preprocess_smiles:
# Chem.SanitizeMol(mol)
# mol = Chem.AddHs(mol)
#
# # apply!
# reacts = rxn.RunReactants((mol,))
# if len(reacts):
# # Sanitize mols
# for product_set in reacts:
# prod_set = list()
# for product in product_set:
# # Fixes
# # [2025-01-30 23:00:50] ERROR chem - Sanitizing and converting failed:
# # non-ring atom 3 marked aromatic
# # But does not improve overall performance
# #
# # for a in product.GetAtoms():
# # if (not a.IsInRing()) and a.GetIsAromatic():
# # a.SetIsAromatic(False)
# # for b in product.GetBonds():
# # if (not b.IsInRing()) and b.GetIsAromatic():
# # b.SetIsAromatic(False)
#
# try:
# Chem.SanitizeMol(product)
# prod_set.append(FormatConverter.standardize(Chem.MolToSmiles(product)))
# except ValueError as e:
# logger.error(f'Sanitizing and converting failed:\n{e}')
# continue
# res.add(tuple(list(set(prod_set))))
# except Exception as e:
# logger.error(f'Applying {smirks} on {smiles} failed:\n{e}')
#
# return list(res)
@staticmethod
def is_valid_smirks(smirks: str) -> bool:
try:

View File

@ -1,46 +1,29 @@
from __future__ import annotations
import dataclasses
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime
from typing import List, Optional
from typing import List, Dict, Set, Tuple
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.multioutput import ClassifierChain
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
# @dataclasses.dataclass
# class Feature:
# name: str
# value: float
#
#
#
# class Row:
# def __init__(self, compound_uuid: str, compound_smiles: str, descriptors: List[int]):
# self.data = {}
#
#
#
# class DataSet(object):
#
# def __init__(self):
# self.rows: List[Row] = []
#
# def add_row(self, row: Row):
# pass
logger = logging.getLogger(__name__)
from dataclasses import dataclass, field
from utilities.chem import FormatConverter
from utilities.chem import FormatConverter, PredictionResult
@dataclass
class Compound:
class SCompound:
smiles: str
uuid: str = field(default=None, compare=False, hash=False)
@ -53,10 +36,10 @@ class Compound:
@dataclass
class Reaction:
educts: List[Compound]
products: List[Compound]
rule_uuid: str = field(default=None, compare=False, hash=False)
class SReaction:
educts: List[SCompound]
products: List[SCompound]
rule_uuid: SRule = field(default=None, compare=False, hash=False)
reaction_uuid: str = field(default=None, compare=False, hash=False)
def __hash__(self):
@ -68,77 +51,294 @@ class Reaction:
return self._hash
def __eq__(self, other):
if not isinstance(other, Reaction):
if not isinstance(other, SReaction):
return NotImplemented
return (
sorted(self.educts, key=lambda x: x.smiles) == sorted(other.educts, key=lambda x: x.smiles) and
sorted(self.products, key=lambda x: x.smiles) == sorted(other.products, key=lambda x: x.smiles)
sorted(self.educts, key=lambda x: x.smiles) == sorted(other.educts, key=lambda x: x.smiles) and
sorted(self.products, key=lambda x: x.smiles) == sorted(other.products, key=lambda x: x.smiles)
)
class Dataset(object):
@dataclass
class SRule(ABC):
def __init__(self, headers=List['str'], data=List[List[str|int|float]]):
self.headers = headers
self.data = data
def features(self):
pass
def labels(self):
pass
def to_json(self):
pass
def to_csv(self):
pass
def to_arff(self):
@abstractmethod
def apply(self):
pass
@dataclass
class SSimpleRule:
pass
class DatasetGenerator(object):
@dataclass
class SParallelRule:
pass
class Dataset:
def __init__(self, columns: List[str], num_labels: int, data: List[List[str | int | float]] = None):
self.columns: List[str] = columns
self.num_labels: int = num_labels
if data is None:
self.data: List[List[str | int | float]] = list()
else:
self.data = data
self.num_features: int = len(columns) - self.num_labels
self._struct_features: Tuple[int, int] = self._block_indices('feature_')
self._triggered: Tuple[int, int] = self._block_indices('trig_')
self._observed: Tuple[int, int] = self._block_indices('obs_')
def _block_indices(self, prefix) -> Tuple[int, int]:
indices: List[int] = []
for i, feature in enumerate(self.columns):
if feature.startswith(prefix):
indices.append(i)
return min(indices), max(indices)
def structure_id(self):
return self.data[0][0]
def add_row(self, row: List[str | int | float]):
if len(self.columns) != len(row):
raise ValueError(f"Header and Data are not aligned {len(self.columns)} vs. {len(row)}")
self.data.append(row)
def struct_features(self) -> Tuple[int, int]:
return self._struct_features
def triggered(self) -> Tuple[int, int]:
return self._triggered
def observed(self) -> Tuple[int, int]:
return self._observed
def at(self, position: int) -> Dataset:
return Dataset(self.columns, self.num_labels, [self.data[position]])
def limit(self, limit: int) -> Dataset:
return Dataset(self.columns, self.num_labels, self.data[:limit])
def __iter__(self):
return (self.at(i) for i, _ in enumerate(self.data))
def classification_dataset(self, structures: List[str | 'CompoundStructure'], applicable_rules: List['Rule']) -> Tuple[Dataset, List[List[PredictionResult]]]:
classify_data = []
classify_products = []
for struct in structures:
if isinstance(struct, str):
struct_id = None
struct_smiles = struct
else:
struct_id = str(struct.uuid)
struct_smiles = struct.smiles
features = FormatConverter.maccs(struct_smiles)
trig = []
prods = []
for rule in applicable_rules:
products = rule.apply(struct_smiles)
if len(products):
trig.append(1)
prods.append(products)
else:
trig.append(0)
prods.append([])
classify_data.append([struct_id] + features + trig + ([-1] * len(trig)))
classify_products.append(prods)
return Dataset(columns=self.columns, num_labels=self.num_labels, data=classify_data), classify_products
@staticmethod
def generate_dataset(compounds: List[Compound], reactions: List[Reaction], applicable_rules: 'Rule',
compounds_to_exclude: Optional[Compound] = None, educts_only: bool = False) -> Dataset:
def generate_dataset(reactions: List['Reaction'], applicable_rules: List['Rule'], educts_only: bool = True) -> Dataset:
_structures = set()
rows = []
for r in reactions:
for e in r.educts.all():
_structures.add(e)
if educts_only:
compounds = set()
for r in reactions:
for e in r.educts:
compounds.add(e)
compounds = list(compounds)
if not educts_only:
for e in r.products:
_structures.add(e)
total = len(compounds)
for i, c in enumerate(compounds):
row = []
print(f"{i + 1}/{total} - {c.smiles}")
for r in applicable_rules:
product_sets = r.rule.apply(c.smiles)
compounds = sorted(_structures, key=lambda x: x.url)
triggered: Dict[str, Set[str]] = defaultdict(set)
observed: Set[str] = set()
# Apply rules on collected compounds and store tps
for i, comp in enumerate(compounds):
logger.debug(f"{i + 1}/{len(compounds)}...")
for rule in applicable_rules:
product_sets = rule.apply(comp.smiles)
if len(product_sets) == 0:
row.append([])
continue
#triggered.add(f"{r.uuid} + {c.uuid}")
reacts = set()
for ps in product_sets:
products = []
for p in ps:
products.append(Compound(FormatConverter.standardize(p)))
key = f"{rule.uuid} + {comp.uuid}"
reacts.add(Reaction([c], products, r))
row.append(list(reacts))
if key in triggered:
logger.info(f"{key} already present. Duplicate reaction?")
rows.append(row)
for prod_set in product_sets:
for smi in prod_set:
return rows
try:
smi = FormatConverter.standardize(smi)
except Exception:
# :shrug:
logger.debug(f'Standardizing SMILES failed for {smi}')
pass
triggered[key].add(smi)
for i, r in enumerate(reactions):
logger.debug(f"{i + 1}/{len(reactions)}...")
if len(r.educts.all()) != 1:
logger.debug(f"Skipping {r.url} as it has {len(r.educts.all())} substrates!")
continue
for comp in r.educts.all():
for rule in applicable_rules:
key = f"{rule.uuid} + {comp.uuid}"
if key not in triggered:
continue
# standardize products from reactions for comparison
standardized_products = []
for cs in r.products.all():
smi = cs.smiles
try:
smi = FormatConverter.standardize(smi)
except Exception as e:
# :shrug:
logger.debug(f'Standardizing SMILES failed for {smi}')
pass
standardized_products.append(smi)
if len(set(standardized_products).difference(triggered[key])) == 0:
observed.add(key)
else:
pass
ds = None
for i, comp in enumerate(compounds):
# Features
feat = FormatConverter.maccs(comp.smiles)
trig = []
obs = []
for rule in applicable_rules:
key = f"{rule.uuid} + {comp.uuid}"
# Check triggered
if key in triggered:
trig.append(1)
else:
trig.append(0)
# Check obs
if key in observed:
obs.append(1)
elif key not in triggered:
obs.append(None)
else:
obs.append(0)
if ds is None:
header = ['structure_id'] + \
[f'feature_{i}' for i, _ in enumerate(feat)] \
+ [f'trig_{r.uuid}' for r in applicable_rules] \
+ [f'obs_{r.uuid}' for r in applicable_rules]
ds = Dataset(header, len(applicable_rules))
ds.add_row([str(comp.uuid)] + feat + trig + obs)
return ds
def X(self, exclude_id_col=True, na_replacement=0):
res = self.__getitem__((slice(None), slice(1 if exclude_id_col else 0, len(self.columns) - self.num_labels)))
if na_replacement is not None:
res = [[x if x is not None else na_replacement for x in row] for row in res]
return res
def y(self, na_replacement=0):
res = self.__getitem__((slice(None), slice(len(self.columns) - self.num_labels, None)))
if na_replacement is not None:
res = [[x if x is not None else na_replacement for x in row] for row in res]
return res
def __getitem__(self, key):
if not isinstance(key, tuple):
raise TypeError("Dataset must be indexed with dataset[rows, columns]")
row_key, col_key = key
# Normalize rows
if isinstance(row_key, int):
rows = [self.data[row_key]]
else:
rows = self.data[row_key]
# Normalize columns
if isinstance(col_key, int):
res = [row[col_key] for row in rows]
else:
res = [[row[i] for i in range(*col_key.indices(len(row)))] if isinstance(col_key, slice)
else [row[i] for i in col_key] for row in rows]
return res
def save(self, path: 'Path'):
import pickle
with open(path, "wb") as fh:
pickle.dump(self, fh)
@staticmethod
def load(path: 'Path'):
import pickle
return pickle.load(open(path, "rb"))
def to_arff(self, path: 'Path'):
arff = f"@relation 'enviPy-dataset: -C {self.num_labels}'\n"
arff += "\n"
for c in self.columns[-self.num_labels:] + self.columns[:self.num_features]:
if c == 'structure_id':
arff += f"@attribute {c} string\n"
else:
arff += f"@attribute {c} {{0,1}}\n"
arff += f"\n@data\n"
for d in self.data:
ys = ','.join([str(v if v is not None else '?') for v in d[-self.num_labels:]])
xs = ','.join([str(v if v is not None else '?') for v in d[:self.num_features]])
arff += f'{ys},{xs}\n'
with open(path, "w") as fh:
fh.write(arff)
fh.flush()
def __repr__(self):
return f"<Dataset #rows={len(self.data)} #cols={len(self.columns)} #labels={self.num_labels}>"
class SparseLabelECC(BaseEstimator, ClassifierMixin):
@ -166,8 +366,7 @@ class SparseLabelECC(BaseEstimator, ClassifierMixin):
self.keep_columns_.append(col)
y_reduced = y[:, self.keep_columns_]
self.chains_ = [ClassifierChain(self.base_clf, order='random', random_state=i)
for i in range(self.num_chains)]
self.chains_ = [ClassifierChain(self.base_clf) for i in range(self.num_chains)]
for i, chain in enumerate(self.chains_):
print(f"{datetime.now()} fitting {i + 1}/{self.num_chains}")
@ -208,26 +407,169 @@ class SparseLabelECC(BaseEstimator, ClassifierMixin):
return accuracy_score(y_true, y_pred, sample_weight=sample_weight)
class ApplicabilityDomain(PCA):
def __init__(self, n_components=5):
super().__init__(n_components=n_components)
import copy
import numpy as np
from sklearn.dummy import DummyClassifier
from sklearn.tree import DecisionTreeClassifier
class BinaryRelevance:
def __init__(self, baseline_clf):
self.clf = baseline_clf
self.classifiers = None
def fit(self, X, Y):
if self.classifiers is None:
self.classifiers = []
for l in range(len(Y[0])):
X_l = X[~np.isnan(Y[:, l])]
Y_l = (Y[~np.isnan(Y[:, l]), l])
if len(X_l) == 0: # all labels are nan -> predict 0
clf = DummyClassifier(strategy='constant', constant=0)
clf.fit([X[0]], [0])
self.classifiers.append(clf)
continue
elif len(np.unique(Y_l)) == 1: # only one class -> predict that class
clf = DummyClassifier(strategy='most_frequent')
else:
clf = copy.deepcopy(self.clf)
clf.fit(X_l, Y_l)
self.classifiers.append(clf)
def predict(self, X):
labels = []
for clf in self.classifiers:
labels.append(clf.predict(X))
return np.column_stack(labels)
def predict_proba(self, X):
labels = np.empty((len(X), 0))
for clf in self.classifiers:
pred = clf.predict_proba(X)
if pred.shape[1] > 1:
pred = pred[:, 1]
else:
pred = pred * clf.predict([X[0]])[0]
labels = np.column_stack((labels, pred))
return labels
class MissingValuesClassifierChain:
def __init__(self, base_clf):
self.base_clf = base_clf
self.permutation = None
self.classifiers = None
def fit(self, X, Y):
X = np.array(X)
Y = np.array(Y)
if self.permutation is None:
self.permutation = np.random.permutation(len(Y[0]))
Y = Y[:, self.permutation]
if self.classifiers is None:
self.classifiers = []
for p in range(len(self.permutation)):
X_p = X[~np.isnan(Y[:, p])]
Y_p = Y[~np.isnan(Y[:, p]), p]
if len(X_p) == 0: # all labels are nan -> predict 0
clf = DummyClassifier(strategy='constant', constant=0)
self.classifiers.append(clf.fit([X[0]], [0]))
elif len(np.unique(Y_p)) == 1: # only one class -> predict that class
clf = DummyClassifier(strategy='most_frequent')
self.classifiers.append(clf.fit(X_p, Y_p))
else:
clf = copy.deepcopy(self.base_clf)
self.classifiers.append(clf.fit(X_p, Y_p))
newcol = Y[:, p]
pred = clf.predict(X)
newcol[np.isnan(newcol)] = pred[np.isnan(newcol)] # fill in missing values with clf predictions
X = np.column_stack((X, newcol))
def predict(self, X):
labels = np.empty((len(X), 0))
for clf in self.classifiers:
pred = clf.predict(np.column_stack((X, labels)))
labels = np.column_stack((labels, pred))
return labels[:, np.argsort(self.permutation)]
def predict_proba(self, X):
labels = np.empty((len(X), 0))
for clf in self.classifiers:
pred = clf.predict_proba(np.column_stack((X, np.round(labels))))
if pred.shape[1] > 1:
pred = pred[:, 1]
else:
pred = pred * clf.predict(np.column_stack(([X[0]], np.round([labels[0]]))))[0]
labels = np.column_stack((labels, pred))
return labels[:, np.argsort(self.permutation)]
class EnsembleClassifierChain:
def __init__(self, base_clf, num_chains=10):
self.base_clf = base_clf
self.num_chains = num_chains
self.num_labels = None
self.classifiers = None
def fit(self, X, Y):
if self.classifiers is None:
self.classifiers = []
if self.num_labels is None:
self.num_labels = len(Y[0])
for p in range(self.num_chains):
print(f"{datetime.now()} fitting {p + 1}/{self.num_chains}")
clf = MissingValuesClassifierChain(self.base_clf)
clf.fit(X, Y)
self.classifiers.append(clf)
def predict(self, X):
labels = np.zeros((len(X), self.num_labels))
for clf in self.classifiers:
labels += clf.predict(X)
return np.round(labels / self.num_chains)
def predict_proba(self, X):
labels = np.zeros((len(X), self.num_labels))
for clf in self.classifiers:
labels += clf.predict_proba(X)
return labels / self.num_chains
class ApplicabilityDomainPCA(PCA):
def __init__(self, num_neighbours: int = 5):
super().__init__(n_components=num_neighbours)
self.scaler = StandardScaler()
self.num_neighbours = num_neighbours
self.min_vals = None
self.max_vals = None
def build(self, X):
def build(self, train_dataset: 'Dataset'):
# transform
X_scaled = self.scaler.fit_transform(X)
X_scaled = self.scaler.fit_transform(train_dataset.X())
# fit pca
X_pca = self.fit_transform(X_scaled)
self.max_vals = np.max(X_pca, axis=0)
self.min_vals = np.min(X_pca, axis=0)
def is_applicable(self, instances):
def __transform(self, instances):
instances_scaled = self.scaler.transform(instances)
instances_pca = self.transform(instances_scaled)
return instances_pca
def is_applicable(self, classify_instances: 'Dataset'):
instances_pca = self.__transform(classify_instances.X())
is_applicable = []
for i, instance in enumerate(instances_pca):
@ -237,3 +579,17 @@ class ApplicabilityDomain(PCA):
is_applicable[i] = False
return is_applicable
def tanimoto_distance(a: List[int], b: List[int]):
if len(a) != len(b):
raise ValueError(f"Lists must be the same length {len(a)} != {len(b)}")
sum_a = sum(a)
sum_b = sum(b)
sum_c = sum(v1 and v2 for v1, v2 in zip(a, b))
if sum_a + sum_b - sum_c == 0:
return 0.0
return 1 - (sum_c / (sum_a + sum_b - sum_c))