forked from enviPath/enviPy
finished app domain conversion #120
This commit is contained in:
119
epdb/models.py
119
epdb/models.py
@ -2764,7 +2764,7 @@ class ApplicabilityDomain(EnviPathModel):
|
||||
def training_set_probs(self):
|
||||
ds = self.model.load_dataset()
|
||||
col_ids = ds.block_indices("prob")
|
||||
return ds[ds.columns[col_ids[0]: col_ids[1]]]
|
||||
return ds[:, col_ids]
|
||||
|
||||
def build(self):
|
||||
ds = self.model.load_dataset()
|
||||
@ -2819,18 +2819,17 @@ class ApplicabilityDomain(EnviPathModel):
|
||||
# it identifies all training structures that have the same trigger reaction activated (i.e., value 1).
|
||||
# This is used to find "qualified neighbours" — training examples that share the same triggered feature
|
||||
# with a given assessment structure under a particular rule.
|
||||
qualified_neighbours_per_rule: Dict = {}
|
||||
|
||||
import polars as pl
|
||||
qualified_neighbours_per_rule: Dict = {}
|
||||
# Select only the triggered columns
|
||||
for i, row in enumerate(assessment_ds[:, assessment_ds.triggered()].iter_rows(named=True)):
|
||||
# Find the rules the structure triggers. For each rule, filter the training dataset to rows that also
|
||||
# trigger that rule. Select the structure_id of the compounds in those filtered rows
|
||||
train_trig = {col_name: ds.df.filter(pl.col(col_name).eq(1)).select("structure_id") for col_name, value in row.items() if value == 1}
|
||||
# trigger that rule.
|
||||
train_trig = {trig_uuid.split("_")[-1]: ds.filter(pl.col(trig_uuid).eq(1))
|
||||
for trig_uuid, value in row.items() if value == 1}
|
||||
qualified_neighbours_per_rule[i] = train_trig
|
||||
|
||||
probs = self.training_set_probs
|
||||
# preds = self.model.model.predict_proba(assessment_ds.X())
|
||||
rule_to_i = {str(r.uuid): i for i, r in enumerate(self.model.applicable_rules)}
|
||||
preds = self.model.combine_products_and_probs(
|
||||
self.model.applicable_rules,
|
||||
self.model.model.predict_proba(assessment_ds.X().to_numpy())[0],
|
||||
@ -2840,57 +2839,28 @@ class ApplicabilityDomain(EnviPathModel):
|
||||
assessments = list()
|
||||
|
||||
# loop through our assessment dataset
|
||||
for i, instance in enumerate(assessment_ds):
|
||||
for i, instance in enumerate(assessment_ds[:, assessment_ds.struct_features()]):
|
||||
rule_reliabilities = dict()
|
||||
local_compatibilities = dict()
|
||||
neighbours_per_rule = dict()
|
||||
neighbor_probs_per_rule = dict()
|
||||
|
||||
# loop through rule indices together with the collected neighbours indices from train dataset
|
||||
for rule_idx, vals in qualified_neighbours_per_rule[i].items():
|
||||
# collect the train dataset instances and store it along with the index (a.k.a. row number) of the
|
||||
# train dataset
|
||||
train_instances = []
|
||||
for v in vals:
|
||||
train_instances.append((v, ds.at(v)))
|
||||
|
||||
# sf is a tuple with start/end index of the features
|
||||
sf = ds.struct_features()
|
||||
|
||||
# compute tanimoto distance for all neighbours
|
||||
# result ist a list of tuples with train index and computed distance
|
||||
dists = self._compute_distances(
|
||||
instance.X()[0][sf[0] : sf[1]],
|
||||
[ti[1].X()[0][sf[0] : sf[1]] for ti in train_instances],
|
||||
)
|
||||
|
||||
dists_with_index = list()
|
||||
for ti, dist in zip(train_instances, dists):
|
||||
dists_with_index.append((ti[0], dist[1]))
|
||||
for rule_uuid, train_instances in qualified_neighbours_per_rule[i].items():
|
||||
# compute tanimoto distance for all neighbours and add to dataset
|
||||
dists = self._compute_distances(assessment_ds[i, assessment_ds.struct_features()].to_numpy()[0],
|
||||
train_instances[:, train_instances.struct_features()].to_numpy())
|
||||
train_instances = train_instances.with_columns(dist=pl.Series(dists))
|
||||
|
||||
# sort them in a descending way and take at most `self.num_neighbours`
|
||||
dists_with_index = sorted(dists_with_index, key=lambda x: x[1], reverse=True)
|
||||
dists_with_index = dists_with_index[: self.num_neighbours]
|
||||
|
||||
# TODO: Should this be descending? If we want the most similar then we want values close to zero (ascending)
|
||||
train_instances = train_instances.sort("dist", descending=True)[:self.num_neighbours]
|
||||
# compute average distance
|
||||
rule_reliabilities[rule_idx] = (
|
||||
sum([d[1] for d in dists_with_index]) / len(dists_with_index)
|
||||
if len(dists_with_index) > 0
|
||||
else 0.0
|
||||
)
|
||||
|
||||
rule_reliabilities[rule_uuid] = train_instances.select(pl.mean("dist")).fill_nan(0.0).item()
|
||||
# for local_compatibility we'll need the datasets for the indices having the highest similarity
|
||||
neighbour_datasets = [(d[0], ds.at(d[0])) for d in dists_with_index]
|
||||
local_compatibilities[rule_idx] = self._compute_compatibility(
|
||||
rule_idx, probs, neighbour_datasets
|
||||
)
|
||||
neighbours_per_rule[rule_idx] = [
|
||||
CompoundStructure.objects.get(uuid=ds.structure_id(1))
|
||||
for ds in neighbour_datasets
|
||||
]
|
||||
neighbor_probs_per_rule[rule_idx] = [
|
||||
probs[d[0]][rule_idx] for d in dists_with_index
|
||||
]
|
||||
local_compatibilities[rule_uuid] = self._compute_compatibility(rule_uuid, train_instances)
|
||||
neighbours_per_rule[rule_uuid] = list(CompoundStructure.objects.filter(uuid__in=train_instances["structure_id"]))
|
||||
neighbor_probs_per_rule[rule_uuid] = train_instances[f"prob_{rule_uuid}"].to_list()
|
||||
|
||||
ad_res = {
|
||||
"ad_params": {
|
||||
@ -2902,22 +2872,20 @@ class ApplicabilityDomain(EnviPathModel):
|
||||
},
|
||||
"assessment": {
|
||||
"smiles": smiles,
|
||||
"inside_app_domain": self.pca.is_applicable(instance)[0],
|
||||
"inside_app_domain": self.pca.is_applicable(assessment_ds[i])[0],
|
||||
},
|
||||
}
|
||||
|
||||
transformations = list()
|
||||
for rule_idx in rule_reliabilities.keys():
|
||||
rule = Rule.objects.get(
|
||||
uuid=instance.columns[instance.observed()[0] + rule_idx].replace("obs_", "")
|
||||
)
|
||||
for rule_uuid in rule_reliabilities.keys():
|
||||
rule = Rule.objects.get(uuid=rule_uuid)
|
||||
|
||||
rule_data = rule.simple_json()
|
||||
rule_data["image"] = f"{rule.url}?image=svg"
|
||||
|
||||
neighbors = []
|
||||
for n, n_prob in zip(
|
||||
neighbours_per_rule[rule_idx], neighbor_probs_per_rule[rule_idx]
|
||||
neighbours_per_rule[rule_uuid], neighbor_probs_per_rule[rule_uuid]
|
||||
):
|
||||
neighbor = n.simple_json()
|
||||
neighbor["image"] = f"{n.url}?image=svg"
|
||||
@ -2934,14 +2902,14 @@ class ApplicabilityDomain(EnviPathModel):
|
||||
|
||||
transformation = {
|
||||
"rule": rule_data,
|
||||
"reliability": rule_reliabilities[rule_idx],
|
||||
"reliability": rule_reliabilities[rule_uuid],
|
||||
# We're setting it here to False, as we don't know whether "assess" is called during pathway
|
||||
# prediction or from Model Page. For persisted Nodes this field will be overwritten at runtime
|
||||
"is_predicted": False,
|
||||
"local_compatibility": local_compatibilities[rule_idx],
|
||||
"probability": preds[rule_idx].probability,
|
||||
"local_compatibility": local_compatibilities[rule_uuid],
|
||||
"probability": preds[rule_to_i[rule_uuid]].probability,
|
||||
"transformation_products": [
|
||||
x.product_set for x in preds[rule_idx].product_sets
|
||||
x.product_set for x in preds[rule_to_i[rule_uuid]].product_sets
|
||||
],
|
||||
"times_triggered": ds.times_triggered(str(rule.uuid)),
|
||||
"neighbors": neighbors,
|
||||
@ -2959,32 +2927,23 @@ class ApplicabilityDomain(EnviPathModel):
|
||||
def _compute_distances(classify_instance: List[int], train_instances: List[List[int]]):
|
||||
from utilities.ml import tanimoto_distance
|
||||
|
||||
distances = [
|
||||
(i, tanimoto_distance(classify_instance, train))
|
||||
for i, train in enumerate(train_instances)
|
||||
]
|
||||
distances = [tanimoto_distance(classify_instance, train) for train in train_instances]
|
||||
return distances
|
||||
|
||||
@staticmethod
|
||||
def _compute_compatibility(rule_idx: int, preds, neighbours: List[Tuple[int, "RuleBasedDataset"]]):
|
||||
tp, tn, fp, fn = 0.0, 0.0, 0.0, 0.0
|
||||
def _compute_compatibility(rule_idx: int, neighbours: "RuleBasedDataset"):
|
||||
accuracy = 0.0
|
||||
|
||||
for n in neighbours:
|
||||
obs = n[1].y()[0][rule_idx]
|
||||
pred = preds[n[0]][rule_idx]
|
||||
if obs and pred:
|
||||
tp += 1
|
||||
elif not obs and pred:
|
||||
fp += 1
|
||||
elif obs and not pred:
|
||||
fn += 1
|
||||
else:
|
||||
tn += 1
|
||||
# Jaccard Index
|
||||
if tp + tn > 0.0:
|
||||
accuracy = (tp + tn) / (tp + tn + fp + fn)
|
||||
|
||||
import polars as pl
|
||||
# TODO: Use a threshold to convert prob to boolean, or pass boolean in
|
||||
obs_pred = neighbours.select(obs=pl.col(f"obs_{rule_idx}").cast(pl.Boolean),
|
||||
pred=pl.col(f"prob_{rule_idx}").cast(pl.Boolean))
|
||||
# Compute tp, tn, fp, fn using polars expressions
|
||||
tp = obs_pred.filter((pl.col("obs")) & (pl.col("pred"))).height
|
||||
tn = obs_pred.filter((~pl.col("obs")) & (~pl.col("pred"))).height
|
||||
fp = obs_pred.filter((~pl.col("obs")) & (pl.col("pred"))).height
|
||||
fn = obs_pred.filter((pl.col("obs")) & (~pl.col("pred"))).height
|
||||
if tp + tn > 0.0:
|
||||
accuracy = (tp + tn) / (tp + tn + fp + fn)
|
||||
return accuracy
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user