from __future__ import annotations import dataclasses from collections import defaultdict from datetime import datetime from typing import List, Optional import numpy as np from sklearn.base import BaseEstimator, ClassifierMixin from sklearn.decomposition import PCA from sklearn.metrics import accuracy_score from sklearn.multioutput import ClassifierChain from sklearn.preprocessing import StandardScaler from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier # @dataclasses.dataclass # class Feature: # name: str # value: float # # # # class Row: # def __init__(self, compound_uuid: str, compound_smiles: str, descriptors: List[int]): # self.data = {} # # # # class DataSet(object): # # def __init__(self): # self.rows: List[Row] = [] # # def add_row(self, row: Row): # pass from dataclasses import dataclass, field from utilities.chem import FormatConverter @dataclass class Compound: smiles: str uuid: str = field(default=None, compare=False, hash=False) def __hash__(self): if not hasattr(self, '_hash'): self._hash = hash(( self.smiles )) return self._hash @dataclass class Reaction: educts: List[Compound] products: List[Compound] rule_uuid: str = field(default=None, compare=False, hash=False) reaction_uuid: str = field(default=None, compare=False, hash=False) def __hash__(self): if not hasattr(self, '_hash'): self._hash = hash(( tuple(sorted(self.educts, key=lambda x: x.smiles)), tuple(sorted(self.products, key=lambda x: x.smiles)), )) return self._hash def __eq__(self, other): if not isinstance(other, Reaction): return NotImplemented return ( sorted(self.educts, key=lambda x: x.smiles) == sorted(other.educts, key=lambda x: x.smiles) and sorted(self.products, key=lambda x: x.smiles) == sorted(other.products, key=lambda x: x.smiles) ) class Dataset(object): def __init__(self, headers=List['str'], data=List[List[str|int|float]]): self.headers = headers self.data = data def features(self): pass def labels(self): pass def to_json(self): pass def to_csv(self): pass def to_arff(self): pass class DatasetGenerator(object): @staticmethod def generate_dataset(compounds: List[Compound], reactions: List[Reaction], applicable_rules: 'Rule', compounds_to_exclude: Optional[Compound] = None, educts_only: bool = False) -> Dataset: rows = [] if educts_only: compounds = set() for r in reactions: for e in r.educts: compounds.add(e) compounds = list(compounds) total = len(compounds) for i, c in enumerate(compounds): row = [] print(f"{i + 1}/{total} - {c.smiles}") for r in applicable_rules: product_sets = r.rule.apply(c.smiles) if len(product_sets) == 0: row.append([]) continue #triggered.add(f"{r.uuid} + {c.uuid}") reacts = set() for ps in product_sets: products = [] for p in ps: products.append(Compound(FormatConverter.standardize(p))) reacts.add(Reaction([c], products, r)) row.append(list(reacts)) rows.append(row) return rows class SparseLabelECC(BaseEstimator, ClassifierMixin): """ Ensemble of Classifier Chains with sparse label removal. Removes labels that are constant across all samples in training. """ def __init__(self, base_clf=RandomForestClassifier(n_estimators=100, max_features='log2', random_state=42), num_chains: int = 10): self.base_clf = base_clf self.num_chains = num_chains def fit(self, X, Y): y = np.array(Y) self.n_labels_ = y.shape[1] self.removed_labels_ = {} self.keep_columns_ = [] for col in range(self.n_labels_): unique_values = np.unique(y[:, col]) if len(unique_values) == 1: self.removed_labels_[col] = unique_values[0] else: self.keep_columns_.append(col) y_reduced = y[:, self.keep_columns_] self.chains_ = [ClassifierChain(self.base_clf, order='random', random_state=i) for i in range(self.num_chains)] for i, chain in enumerate(self.chains_): print(f"{datetime.now()} fitting {i + 1}/{self.num_chains}") chain.fit(X, y_reduced) return self def predict(self, X, threshold=0.5): avg_preds = np.mean([chain.predict(X) for chain in self.chains_], axis=0) > threshold full_y = np.zeros((avg_preds.shape[0], self.n_labels_)) for idx, col in enumerate(self.keep_columns_): full_y[:, col] = avg_preds[:, idx] for col, value in self.removed_labels_.items(): full_y[:, col] = bool(value) return full_y def predict_proba(self, X): avg_proba = np.mean([chain.predict_proba(X) for chain in self.chains_], axis=0) full_y = np.zeros((avg_proba.shape[0], self.n_labels_)) for idx, col in enumerate(self.keep_columns_): full_y[:, col] = avg_proba[:, idx] for col, value in self.removed_labels_.items(): full_y[:, col] = float(value) return full_y def score(self, X, Y, sample_weight=None): """ Default scoring using subset accuracy (exact match). """ y_true = np.array(Y) y_pred = self.predict(X) return accuracy_score(y_true, y_pred, sample_weight=sample_weight) class ApplicabilityDomain(PCA): def __init__(self, n_components=5): super().__init__(n_components=n_components) self.scaler = StandardScaler() self.min_vals = None self.max_vals = None def build(self, X): # transform X_scaled = self.scaler.fit_transform(X) # fit pca X_pca = self.fit_transform(X_scaled) self.max_vals = np.max(X_pca, axis=0) self.min_vals = np.min(X_pca, axis=0) def is_applicable(self, instances): instances_scaled = self.scaler.transform(instances) instances_pca = self.transform(instances_scaled) is_applicable = [] for i, instance in enumerate(instances_pca): is_applicable.append(True) for min_v, max_v, new_v in zip(self.min_vals, self.max_vals, instance): if not min_v <= new_v <= max_v: is_applicable[i] = False return is_applicable