[Fix] Mitigate XSS attack vector by cleaning input before it hits our Database (#171)

## Changes

- All text input fields are now cleaned with nh3 to remove html tags. We allow certain html tags under `settings.py/ALLOWED_HTML_TAGS` so we can easily update the tags we allow in the future.
- All names and descriptions now use the template tag `nh_safe` in all html files.
- Usernames and emails are a small exception and are not allowed any html tags

Co-authored-by: Liam Brydon <62733830+MyCreativityOutlet@users.noreply.github.com>
Co-authored-by: jebus <lorsbach@envipath.com>
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#171
Reviewed-by: jebus <lorsbach@envipath.com>
Co-authored-by: liambrydon <lbry121@aucklanduni.ac.nz>
Co-committed-by: liambrydon <lbry121@aucklanduni.ac.nz>
This commit is contained in:
2025-11-11 22:49:55 +13:00
committed by jebus
parent 1cccefa991
commit 34589efbde
53 changed files with 444 additions and 230 deletions

View File

@ -11,6 +11,7 @@ from typing import Union, List, Optional, Dict, Tuple, Set, Any
from uuid import uuid4
import math
import joblib
import nh3
import numpy as np
from django.conf import settings as s
from django.contrib.auth.models import AbstractUser
@ -28,8 +29,14 @@ from sklearn.metrics import precision_score, recall_score, jaccard_score
from sklearn.model_selection import ShuffleSplit
from utilities.chem import FormatConverter, ProductSet, PredictionResult, IndigoUtils
from utilities.ml import RuleBasedDataset, ApplicabilityDomainPCA, EnsembleClassifierChain, RelativeReasoning, \
EnviFormerDataset, Dataset
from utilities.ml import (
RuleBasedDataset,
ApplicabilityDomainPCA,
EnsembleClassifierChain,
RelativeReasoning,
EnviFormerDataset,
Dataset,
)
logger = logging.getLogger(__name__)
@ -803,14 +810,16 @@ class Compound(EnviPathModel, AliasMixin, ScenarioMixin, ChemicalIdentifierMixin
c = Compound()
c.package = package
if name is None or name.strip() == "":
if name is not None:
# Clean for potential XSS
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if name is None or name == "":
name = f"Compound {Compound.objects.filter(package=package).count() + 1}"
c.name = name
# We have a default here only set the value if it carries some payload
if description is not None and description.strip() != "":
c.description = description.strip()
c.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
c.save()
@ -982,11 +991,11 @@ class CompoundStructure(EnviPathModel, AliasMixin, ScenarioMixin, ChemicalIdenti
raise ValueError("Unpersisted Compound! Persist compound first!")
cs = CompoundStructure()
# Clean for potential XSS
if name is not None:
cs.name = name
cs.name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if description is not None:
cs.description = description
cs.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
cs.smiles = smiles
cs.compound = compound
@ -1188,21 +1197,29 @@ class SimpleAmbitRule(SimpleRule):
r = SimpleAmbitRule()
r.package = package
if name is None or name.strip() == "":
if name is not None:
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if name is None or name == "":
name = f"Rule {Rule.objects.filter(package=package).count() + 1}"
r.name = name
if description is not None and description.strip() != "":
r.description = description
r.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
r.smirks = smirks
if reactant_filter_smarts is not None and reactant_filter_smarts.strip() != "":
r.reactant_filter_smarts = reactant_filter_smarts
if not FormatConverter.is_valid_smarts(reactant_filter_smarts.strip()):
raise ValueError(f'Reactant Filter SMARTS "{reactant_filter_smarts}" is invalid!')
else:
r.reactant_filter_smarts = reactant_filter_smarts.strip()
if product_filter_smarts is not None and product_filter_smarts.strip() != "":
r.product_filter_smarts = product_filter_smarts
if not FormatConverter.is_valid_smarts(product_filter_smarts.strip()):
raise ValueError(f'Product Filter SMARTS "{product_filter_smarts}" is invalid!')
else:
r.product_filter_smarts = product_filter_smarts.strip()
r.save()
return r
@ -1403,12 +1420,11 @@ class Reaction(EnviPathModel, AliasMixin, ScenarioMixin, ReactionIdentifierMixin
r = Reaction()
r.package = package
# Clean for potential XSS
if name is not None and name.strip() != "":
r.name = name
r.name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if description is not None and name.strip() != "":
r.description = description
r.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
r.multi_step = multi_step
@ -1716,14 +1732,15 @@ class Pathway(EnviPathModel, AliasMixin, ScenarioMixin):
):
pw = Pathway()
pw.package = package
if name is None or name.strip() == "":
if name is not None:
# Clean for potential XSS
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if name is None or name == "":
name = f"Pathway {Pathway.objects.filter(package=package).count() + 1}"
pw.name = name
if description is not None and description.strip() != "":
pw.description = description
pw.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
pw.save()
try:
@ -2018,11 +2035,16 @@ class Edge(EnviPathModel, AliasMixin, ScenarioMixin):
for node in end_nodes:
e.end_nodes.add(node)
if name is None:
# Clean for potential XSS
# Cleaning technically not needed as it is also done in Reaction.create, including it here for consistency
if name is not None:
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if name is None or name == "":
name = f"Reaction {pathway.package.reactions.count() + 1}"
if description is None:
description = s.DEFAULT_VALUES["description"]
description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
r = Reaction.create(
pathway.package,
@ -2344,7 +2366,9 @@ class PackageBasedModel(EPModel):
eval_reactions = list(
Reaction.objects.filter(package__in=self.eval_packages.all()).distinct()
)
ds = RuleBasedDataset.generate_dataset(eval_reactions, self.applicable_rules, educts_only=True)
ds = RuleBasedDataset.generate_dataset(
eval_reactions, self.applicable_rules, educts_only=True
)
if isinstance(self, RuleBasedRelativeReasoning):
X = ds.X(exclude_id_col=False, na_replacement=None).to_numpy()
y = ds.y(na_replacement=np.nan).to_numpy()
@ -2542,14 +2566,15 @@ class RuleBasedRelativeReasoning(PackageBasedModel):
):
rbrr = RuleBasedRelativeReasoning()
rbrr.package = package
if name is None or name.strip() == "":
if name is not None:
# Clean for potential XSS
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if name is None or name == "":
name = f"RuleBasedRelativeReasoning {RuleBasedRelativeReasoning.objects.filter(package=package).count() + 1}"
rbrr.name = name
if description is not None and description.strip() != "":
rbrr.description = description
rbrr.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
if threshold is None or (threshold <= 0 or 1 <= threshold):
raise ValueError("Threshold must be a float between 0 and 1.")
@ -2646,14 +2671,15 @@ class MLRelativeReasoning(PackageBasedModel):
):
mlrr = MLRelativeReasoning()
mlrr.package = package
if name is None or name.strip() == "":
if name is not None:
# Clean for potential XSS
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if name is None or name == "":
name = f"MLRelativeReasoning {MLRelativeReasoning.objects.filter(package=package).count() + 1}"
mlrr.name = name
if description is not None and description.strip() != "":
mlrr.description = description
mlrr.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
if threshold is None or (threshold <= 0 or 1 <= threshold):
raise ValueError("Threshold must be a float between 0 and 1.")
@ -2807,7 +2833,9 @@ class ApplicabilityDomain(EnviPathModel):
else:
smiles.append(structures)
assessment_ds, assessment_prods = ds.classification_dataset(structures, self.model.applicable_rules)
assessment_ds, assessment_prods = ds.classification_dataset(
structures, self.model.applicable_rules
)
# qualified_neighbours_per_rule is a nested dictionary structured as:
# {
@ -2823,12 +2851,16 @@ class ApplicabilityDomain(EnviPathModel):
qualified_neighbours_per_rule: Dict = {}
import polars as pl
# Select only the triggered columns
for i, row in enumerate(assessment_ds[:, assessment_ds.triggered()].iter_rows(named=True)):
# Find the rules the structure triggers. For each rule, filter the training dataset to rows that also
# trigger that rule.
train_trig = {trig_uuid.split("_")[-1]: ds.filter(pl.col(trig_uuid).eq(1))
for trig_uuid, value in row.items() if value == 1}
train_trig = {
trig_uuid.split("_")[-1]: ds.filter(pl.col(trig_uuid).eq(1))
for trig_uuid, value in row.items()
if value == 1
}
qualified_neighbours_per_rule[i] = train_trig
rule_to_i = {str(r.uuid): i for i, r in enumerate(self.model.applicable_rules)}
preds = self.model.combine_products_and_probs(
@ -2848,18 +2880,28 @@ class ApplicabilityDomain(EnviPathModel):
# loop through rule indices together with the collected neighbours indices from train dataset
for rule_uuid, train_instances in qualified_neighbours_per_rule[i].items():
# compute tanimoto distance for all neighbours and add to dataset
dists = self._compute_distances(assessment_ds[i, assessment_ds.struct_features()].to_numpy()[0],
train_instances[:, train_instances.struct_features()].to_numpy())
dists = self._compute_distances(
assessment_ds[i, assessment_ds.struct_features()].to_numpy()[0],
train_instances[:, train_instances.struct_features()].to_numpy(),
)
train_instances = train_instances.with_columns(dist=pl.Series(dists))
# sort them in a descending way and take at most `self.num_neighbours`
# TODO: Should this be descending? If we want the most similar then we want values close to zero (ascending)
train_instances = train_instances.sort("dist", descending=True)[:self.num_neighbours]
train_instances = train_instances.sort("dist", descending=True)[
: self.num_neighbours
]
# compute average distance
rule_reliabilities[rule_uuid] = train_instances.select(pl.mean("dist")).fill_nan(0.0).item()
rule_reliabilities[rule_uuid] = (
train_instances.select(pl.mean("dist")).fill_nan(0.0).item()
)
# for local_compatibility we'll need the datasets for the indices having the highest similarity
local_compatibilities[rule_uuid] = self._compute_compatibility(rule_uuid, train_instances)
neighbours_per_rule[rule_uuid] = list(CompoundStructure.objects.filter(uuid__in=train_instances["structure_id"]))
local_compatibilities[rule_uuid] = self._compute_compatibility(
rule_uuid, train_instances
)
neighbours_per_rule[rule_uuid] = list(
CompoundStructure.objects.filter(uuid__in=train_instances["structure_id"])
)
neighbor_probs_per_rule[rule_uuid] = train_instances[f"prob_{rule_uuid}"].to_list()
ad_res = {
@ -2933,8 +2975,11 @@ class ApplicabilityDomain(EnviPathModel):
def _compute_compatibility(self, rule_idx: int, neighbours: "RuleBasedDataset"):
accuracy = 0.0
import polars as pl
obs_pred = neighbours.select(obs=pl.col(f"obs_{rule_idx}").cast(pl.Boolean),
pred=pl.col(f"prob_{rule_idx}") >= self.model.threshold)
obs_pred = neighbours.select(
obs=pl.col(f"obs_{rule_idx}").cast(pl.Boolean),
pred=pl.col(f"prob_{rule_idx}") >= self.model.threshold,
)
# Compute tp, tn, fp, fn using polars expressions
tp = obs_pred.filter((pl.col("obs")) & (pl.col("pred"))).height
tn = obs_pred.filter((~pl.col("obs")) & (~pl.col("pred"))).height
@ -2961,14 +3006,15 @@ class EnviFormer(PackageBasedModel):
):
mod = EnviFormer()
mod.package = package
if name is None or name.strip() == "":
if name is not None:
# Clean for potential XSS
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if name is None or name == "":
name = f"EnviFormer {EnviFormer.objects.filter(package=package).count() + 1}"
mod.name = name
if description is not None and description.strip() != "":
mod.description = description
mod.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
if threshold is None or (threshold <= 0 or 1 <= threshold):
raise ValueError("Threshold must be a float between 0 and 1.")
@ -3103,7 +3149,7 @@ class EnviFormer(PackageBasedModel):
pred_dict = {}
for k, pred in enumerate(predictions):
pred_smiles, pred_proba = zip(*pred.items())
reactant, true_product = test_ds[k, "educts"], test_ds[k, "products"]
reactant, _ = test_ds[k, "educts"], test_ds[k, "products"]
pred_dict.setdefault(reactant, {"predict": [], "scores": []})
for smiles, proba in zip(pred_smiles, pred_proba):
smiles = set(smiles.split("."))
@ -3217,8 +3263,9 @@ class EnviFormer(PackageBasedModel):
# If there are eval packages perform single generation evaluation on them instead of random splits
if self.eval_packages.count() > 0:
ds = EnviFormerDataset.generate_dataset(Reaction.objects.filter(
package__in=self.eval_packages.all()).distinct())
ds = EnviFormerDataset.generate_dataset(
Reaction.objects.filter(package__in=self.eval_packages.all()).distinct()
)
test_result = self.model.predict_batch(ds.X())
single_gen_result = evaluate_sg(ds, test_result, self.threshold)
self.eval_results = self.compute_averages([single_gen_result])
@ -3236,7 +3283,9 @@ class EnviFormer(PackageBasedModel):
train = ds[train_index]
test = ds[test_index]
start = datetime.now()
model = fine_tune(train.X(), train.y(), s.MODEL_DIR, str(split_id), device=s.ENVIFORMER_DEVICE)
model = fine_tune(
train.X(), train.y(), s.MODEL_DIR, str(split_id), device=s.ENVIFORMER_DEVICE
)
end = datetime.now()
logger.debug(
f"EnviFormer finetuning took {(end - start).total_seconds():.2f} seconds"
@ -3313,7 +3362,12 @@ class EnviFormer(PackageBasedModel):
for pathway in train_pathways:
for reaction in pathway.edges:
reaction = reaction.edge_label
if any([educt in test_educts for educt in reaction_to_educts[str(reaction.uuid)]]):
if any(
[
educt in test_educts
for educt in reaction_to_educts[str(reaction.uuid)]
]
):
overlap += 1
continue
train_reactions.append(reaction)
@ -3370,41 +3424,44 @@ class Scenario(EnviPathModel):
scenario_type: str,
additional_information: List["EnviPyModel"],
):
s = Scenario()
s.package = package
if name is None or name.strip() == "":
new_s = Scenario()
new_s.package = package
if name is not None:
# Clean for potential XSS
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if name is None or name == "":
name = f"Scenario {Scenario.objects.filter(package=package).count() + 1}"
s.name = name
new_s.name = name
if description is not None and description.strip() != "":
s.description = description
new_s.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
if scenario_date is not None and scenario_date.strip() != "":
s.scenario_date = scenario_date
new_s.scenario_date = nh3.clean(scenario_date).strip()
if scenario_type is not None and scenario_type.strip() != "":
s.scenario_type = scenario_type
new_s.scenario_type = scenario_type
add_inf = defaultdict(list)
for info in additional_information:
cls_name = info.__class__.__name__
ai_data = json.loads(info.model_dump_json())
# Clean for potential XSS hidden in the additional information fields.
ai_data = json.loads(nh3.clean(info.model_dump_json()).strip())
ai_data["uuid"] = f"{uuid4()}"
add_inf[cls_name].append(ai_data)
s.additional_information = add_inf
new_s.additional_information = add_inf
s.save()
new_s.save()
return s
return new_s
@transaction.atomic
def add_additional_information(self, data: "EnviPyModel"):
cls_name = data.__class__.__name__
ai_data = json.loads(data.model_dump_json())
# Clean for potential XSS hidden in the additional information fields.
ai_data = json.loads(nh3.clean(data.model_dump_json()).strip())
ai_data["uuid"] = f"{uuid4()}"
if cls_name not in self.additional_information:
@ -3439,7 +3496,8 @@ class Scenario(EnviPathModel):
new_ais = defaultdict(list)
for k, vals in data.items():
for v in vals:
ai_data = json.loads(v.model_dump_json())
# Clean for potential XSS hidden in the additional information fields.
ai_data = json.loads(nh3.clean(v.model_dump_json()).strip())
if hasattr(v, "uuid"):
ai_data["uuid"] = str(v.uuid)
else: