forked from enviPath/enviPy
This is an initial implementation that creates a working minimal .i6z document. It passes schema validation and can be imported into IUCLID. Caveat: IUCLID files target individual compounds. Pathway is not actually covered by the format. It can be added in either soil or water and soil OECD endpoints. **I currently only implemented the soil endpoint for all data.** This sort of works, and I can report all degradation products in a pathway (not a nice view, but we can report many transformation products and add a diagram attachment in the future). Adding additional information is an absolute pain, as we need to explicitly map each type of information to the relevant OECD field. I use the XSD scheme for validation, but unfortunately the IUCLID parser is not fully compliant and requires a specific order, etc. The workflow is: finding the AI structure from the XSD scheme -> make the scheme validation pass -> upload to IUCLID to get obscure error messages -> guess what could be wrong -> repeat 💣 New specifications get released once per year, so we will have to update accordingly. I believe that this should be a more expensive feature, as it requires significant effort to uphold. Currently implemented for root compound only in SOIL: - Soil Texture 2 - Soil Texture 1 - pH value - Half-life per soil sample / scenario (mapped to disappearance; not sure about that). - CEC - Organic Matter (only Carbon) - Moisture content - Humidity <img width="2123" alt="image.png" src="attachments/d29830e1-65ef-4136-8939-1825e0959c62"> <img width="2124" alt="image.png" src="attachments/ac9de2ac-bf68-4ba4-b40b-82f810a9de93"> <img width="2139" alt="image.png" src="attachments/5674c7e6-865e-420e-974a-6b825b331e6c"> Reviewed-on: enviPath/enviPy#338 Co-authored-by: Tobias O <tobias.olenyi@envipath.com> Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
494 lines
18 KiB
Python
494 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from uuid import UUID, uuid4
|
|
|
|
from epapi.v1.interfaces.iuclid.dto import PathwayExportDTO
|
|
from utilities.chem import FormatConverter
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class IUCLIDReferenceSubstanceData:
|
|
uuid: UUID
|
|
name: str
|
|
smiles: str | None = None
|
|
cas_number: str | None = None
|
|
ec_number: str | None = None
|
|
iupac_name: str | None = None
|
|
molecular_formula: str | None = None
|
|
molecular_weight: float | None = None
|
|
inchi: str | None = None
|
|
inchi_key: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class IUCLIDSubstanceData:
|
|
uuid: UUID
|
|
name: str
|
|
reference_substance_uuid: UUID | None = None
|
|
|
|
|
|
@dataclass
|
|
class SoilPropertiesData:
|
|
soil_no_code: str | None = None
|
|
soil_type: str | None = None
|
|
sand: float | None = None
|
|
silt: float | None = None
|
|
clay: float | None = None
|
|
org_carbon: float | None = None
|
|
ph_lower: float | None = None
|
|
ph_upper: float | None = None
|
|
ph_method: str | None = None
|
|
cec: float | None = None
|
|
moisture_content: float | None = None
|
|
soil_classification: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class IUCLIDEndpointStudyRecordData:
|
|
uuid: UUID
|
|
substance_uuid: UUID
|
|
name: str
|
|
half_lives: list[HalfLifeEntry] = field(default_factory=list)
|
|
temperature: tuple[float, float] | None = None
|
|
transformation_products: list[IUCLIDTransformationProductEntry] = field(default_factory=list)
|
|
model_name_and_version: list[str] = field(default_factory=list)
|
|
software_name_and_version: list[str] = field(default_factory=list)
|
|
model_remarks: list[str] = field(default_factory=list)
|
|
soil_properties: SoilPropertiesData | None = None
|
|
soil_properties_entries: list[SoilPropertiesData] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class HalfLifeEntry:
|
|
model: str
|
|
dt50_start: float
|
|
dt50_end: float
|
|
unit: str
|
|
source: str
|
|
soil_no_code: str | None = None
|
|
temperature: tuple[float, float] | None = None
|
|
|
|
|
|
@dataclass
|
|
class IUCLIDTransformationProductEntry:
|
|
uuid: UUID
|
|
product_reference_uuid: UUID
|
|
parent_reference_uuids: list[UUID] = field(default_factory=list)
|
|
kinetic_formation_fraction: float | None = None
|
|
source_edge_uuid: UUID | None = None
|
|
|
|
|
|
@dataclass
|
|
class IUCLIDDocumentBundle:
|
|
substances: list[IUCLIDSubstanceData] = field(default_factory=list)
|
|
reference_substances: list[IUCLIDReferenceSubstanceData] = field(default_factory=list)
|
|
endpoint_study_records: list[IUCLIDEndpointStudyRecordData] = field(default_factory=list)
|
|
|
|
|
|
class PathwayMapper:
|
|
def map(self, export: PathwayExportDTO) -> IUCLIDDocumentBundle:
|
|
bundle = IUCLIDDocumentBundle()
|
|
|
|
seen_compounds: dict[
|
|
int, tuple[UUID, UUID]
|
|
] = {} # compound PK -> (substance UUID, ref UUID)
|
|
compound_names: dict[int, str] = {}
|
|
|
|
for compound in export.compounds:
|
|
if compound.pk in seen_compounds:
|
|
continue
|
|
|
|
derived = self._compute_derived_properties(compound.smiles)
|
|
ref_sub_uuid = uuid4()
|
|
sub_uuid = uuid4()
|
|
seen_compounds[compound.pk] = (sub_uuid, ref_sub_uuid)
|
|
compound_names[compound.pk] = compound.name
|
|
|
|
ref_sub = IUCLIDReferenceSubstanceData(
|
|
uuid=ref_sub_uuid,
|
|
name=compound.name,
|
|
smiles=compound.smiles,
|
|
cas_number=compound.cas_number,
|
|
molecular_formula=derived["molecular_formula"],
|
|
molecular_weight=derived["molecular_weight"],
|
|
inchi=derived["inchi"],
|
|
inchi_key=derived["inchi_key"],
|
|
)
|
|
bundle.reference_substances.append(ref_sub)
|
|
|
|
sub = IUCLIDSubstanceData(
|
|
uuid=sub_uuid,
|
|
name=compound.name,
|
|
reference_substance_uuid=ref_sub_uuid,
|
|
)
|
|
bundle.substances.append(sub)
|
|
|
|
if not export.compounds:
|
|
return bundle
|
|
|
|
root_compound_pks: list[int] = []
|
|
seen_root_pks: set[int] = set()
|
|
for root_pk in export.root_compound_pks:
|
|
if root_pk in seen_compounds and root_pk not in seen_root_pks:
|
|
root_compound_pks.append(root_pk)
|
|
seen_root_pks.add(root_pk)
|
|
|
|
if not root_compound_pks:
|
|
fallback_root_pk = export.compounds[0].pk
|
|
if fallback_root_pk in seen_compounds:
|
|
root_compound_pks = [fallback_root_pk]
|
|
|
|
if not root_compound_pks:
|
|
return bundle
|
|
|
|
edge_templates: list[tuple[UUID, frozenset[int], tuple[int, ...], tuple[UUID, ...]]] = []
|
|
for edge in sorted(export.edges, key=lambda item: str(item.edge_uuid)):
|
|
parent_compound_pks = sorted(
|
|
{pk for pk in edge.start_compound_pks if pk in seen_compounds}
|
|
)
|
|
product_compound_pks = sorted(
|
|
{pk for pk in edge.end_compound_pks if pk in seen_compounds}
|
|
)
|
|
|
|
if not parent_compound_pks or not product_compound_pks:
|
|
continue
|
|
|
|
parent_ref_uuids = tuple(
|
|
sorted({seen_compounds[pk][1] for pk in parent_compound_pks}, key=str)
|
|
)
|
|
edge_templates.append(
|
|
(
|
|
edge.edge_uuid,
|
|
frozenset(parent_compound_pks),
|
|
tuple(product_compound_pks),
|
|
parent_ref_uuids,
|
|
)
|
|
)
|
|
|
|
model_names: list[str] = []
|
|
software_names: list[str] = []
|
|
model_remarks: list[str] = []
|
|
if export.model_info:
|
|
if export.model_info.model_name:
|
|
model_names.append(export.model_info.model_name)
|
|
if export.model_info.model_uuid:
|
|
model_remarks.append(f"Model UUID: {export.model_info.model_uuid}")
|
|
if export.model_info.software_name:
|
|
if export.model_info.software_version:
|
|
software_names.append(
|
|
f"{export.model_info.software_name} {export.model_info.software_version}"
|
|
)
|
|
else:
|
|
software_names.append(export.model_info.software_name)
|
|
|
|
# Aggregate scenario-aware AI from all root nodes for each root compound.
|
|
# Each entry is (scenario_uuid, scenario_name, effective_ai_list).
|
|
root_node_ai_by_scenario: dict[int, dict[str, tuple[UUID | None, str | None, list]]] = {}
|
|
for node in export.nodes:
|
|
if node.depth == 0 and node.compound_pk in seen_root_pks:
|
|
scenario_bucket = root_node_ai_by_scenario.setdefault(node.compound_pk, {})
|
|
if node.scenarios:
|
|
for scenario in node.scenarios:
|
|
scenario_key = str(scenario.scenario_uuid)
|
|
existing = scenario_bucket.get(scenario_key)
|
|
if existing is None:
|
|
scenario_bucket[scenario_key] = (
|
|
scenario.scenario_uuid,
|
|
scenario.name,
|
|
list(scenario.additional_info),
|
|
)
|
|
else:
|
|
existing[2].extend(scenario.additional_info)
|
|
else:
|
|
# Backward compatibility path for callers that only provide node.additional_info.
|
|
fallback_key = f"fallback:{node.node_uuid}"
|
|
scenario_bucket[fallback_key] = (None, None, list(node.additional_info))
|
|
|
|
has_multiple_roots = len(root_compound_pks) > 1
|
|
for root_pk in root_compound_pks:
|
|
substance_uuid, _ = seen_compounds[root_pk]
|
|
esr_name = f"Biodegradation in soil - {export.pathway_name}"
|
|
if has_multiple_roots:
|
|
root_name = compound_names.get(root_pk)
|
|
if root_name:
|
|
esr_name = f"{esr_name} ({root_name})"
|
|
|
|
transformation_entries: list[IUCLIDTransformationProductEntry] = []
|
|
reachable_compound_pks = self._reachable_compounds_from_root(root_pk, edge_templates)
|
|
seen_transformations: set[tuple[UUID, tuple[UUID, ...]]] = set()
|
|
for (
|
|
edge_uuid,
|
|
parent_compound_pks,
|
|
product_compound_pks,
|
|
parent_reference_uuids,
|
|
) in edge_templates:
|
|
if not parent_compound_pks.issubset(reachable_compound_pks):
|
|
continue
|
|
|
|
for product_compound_pk in product_compound_pks:
|
|
if product_compound_pk not in reachable_compound_pks:
|
|
continue
|
|
|
|
product_ref_uuid = seen_compounds[product_compound_pk][1]
|
|
dedupe_key = (product_ref_uuid, parent_reference_uuids)
|
|
if dedupe_key in seen_transformations:
|
|
continue
|
|
|
|
seen_transformations.add(dedupe_key)
|
|
transformation_entries.append(
|
|
IUCLIDTransformationProductEntry(
|
|
uuid=uuid4(),
|
|
product_reference_uuid=product_ref_uuid,
|
|
parent_reference_uuids=list(parent_reference_uuids),
|
|
source_edge_uuid=edge_uuid,
|
|
)
|
|
)
|
|
|
|
scenarios_for_root = list(root_node_ai_by_scenario.get(root_pk, {}).values())
|
|
if not scenarios_for_root:
|
|
scenarios_for_root = [(None, None, [])]
|
|
|
|
soil_entries: list[SoilPropertiesData] = []
|
|
soil_no_by_signature: dict[tuple, str] = {}
|
|
half_lives: list[HalfLifeEntry] = []
|
|
merged_ai_for_root: list = []
|
|
|
|
for _, _, ai_for_scenario in scenarios_for_root:
|
|
merged_ai_for_root.extend(ai_for_scenario)
|
|
|
|
soil = self._extract_soil_properties(ai_for_scenario)
|
|
temperature = self._extract_temperature(ai_for_scenario)
|
|
|
|
soil_no_code: str | None = None
|
|
if soil is not None:
|
|
soil_signature = self._soil_signature(soil)
|
|
soil_no_code = soil_no_by_signature.get(soil_signature)
|
|
if soil_no_code is None:
|
|
soil_no_code = self._soil_no_code_for_index(len(soil_entries))
|
|
if soil_no_code is not None:
|
|
soil.soil_no_code = soil_no_code
|
|
soil_entries.append(soil)
|
|
soil_no_by_signature[soil_signature] = soil_no_code
|
|
|
|
for hl in self._extract_half_lives(ai_for_scenario):
|
|
hl.soil_no_code = soil_no_code
|
|
hl.temperature = temperature
|
|
half_lives.append(hl)
|
|
|
|
esr = IUCLIDEndpointStudyRecordData(
|
|
uuid=uuid4(),
|
|
substance_uuid=substance_uuid,
|
|
name=esr_name,
|
|
half_lives=half_lives,
|
|
temperature=self._extract_temperature(merged_ai_for_root),
|
|
transformation_products=transformation_entries,
|
|
model_name_and_version=model_names,
|
|
software_name_and_version=software_names,
|
|
model_remarks=model_remarks,
|
|
soil_properties=soil_entries[0] if soil_entries else None,
|
|
soil_properties_entries=soil_entries,
|
|
)
|
|
bundle.endpoint_study_records.append(esr)
|
|
|
|
return bundle
|
|
|
|
@staticmethod
|
|
def _extract_half_lives(ai_list: list) -> list[HalfLifeEntry]:
|
|
from envipy_additional_information.information import HalfLife
|
|
|
|
entries = []
|
|
for ai in ai_list:
|
|
if not isinstance(ai, HalfLife):
|
|
continue
|
|
start = ai.dt50.start
|
|
end = ai.dt50.end
|
|
if start is None or end is None:
|
|
continue
|
|
entries.append(
|
|
HalfLifeEntry(
|
|
model=ai.model,
|
|
dt50_start=start,
|
|
dt50_end=end,
|
|
unit="d",
|
|
source=ai.source,
|
|
)
|
|
)
|
|
return entries
|
|
|
|
@staticmethod
|
|
def _extract_temperature(ai_list: list) -> tuple[float, float] | None:
|
|
from envipy_additional_information.information import Temperature
|
|
|
|
for ai in ai_list:
|
|
if not isinstance(ai, Temperature):
|
|
continue
|
|
lower = ai.interval.start
|
|
upper = ai.interval.end
|
|
if lower is None or upper is None:
|
|
continue
|
|
return (lower, upper)
|
|
return None
|
|
|
|
@staticmethod
|
|
def _extract_soil_properties(ai_list: list) -> SoilPropertiesData | None:
|
|
from envipy_additional_information.information import (
|
|
Acidity,
|
|
BulkDensity,
|
|
CEC,
|
|
Humidity,
|
|
OMContent,
|
|
SoilClassification,
|
|
SoilTexture1,
|
|
SoilTexture2,
|
|
)
|
|
|
|
props = SoilPropertiesData()
|
|
|
|
for ai in ai_list:
|
|
if isinstance(ai, SoilTexture1) and props.soil_type is None:
|
|
props.soil_type = ai.type.value
|
|
elif isinstance(ai, SoilTexture2):
|
|
if props.sand is None:
|
|
props.sand = ai.sand
|
|
if props.silt is None:
|
|
props.silt = ai.silt
|
|
if props.clay is None:
|
|
props.clay = ai.clay
|
|
elif isinstance(ai, OMContent) and props.org_carbon is None:
|
|
props.org_carbon = ai.in_oc
|
|
elif isinstance(ai, Acidity) and props.ph_lower is None:
|
|
props.ph_lower = ai.interval.start
|
|
props.ph_upper = ai.interval.end
|
|
if isinstance(ai.method, str):
|
|
props.ph_method = ai.method.strip() or None
|
|
else:
|
|
props.ph_method = ai.method
|
|
elif isinstance(ai, CEC) and props.cec is None:
|
|
props.cec = ai.capacity
|
|
elif isinstance(ai, Humidity) and props.moisture_content is None:
|
|
props.moisture_content = ai.humiditiy
|
|
elif isinstance(ai, SoilClassification) and props.soil_classification is None:
|
|
props.soil_classification = ai.system.value
|
|
elif isinstance(ai, BulkDensity):
|
|
pass # BulkDensity.data is a free-text string; not mapped to SoilPropertiesData
|
|
|
|
all_none = all(
|
|
v is None
|
|
for v in (
|
|
props.soil_type,
|
|
props.sand,
|
|
props.silt,
|
|
props.clay,
|
|
props.org_carbon,
|
|
props.ph_lower,
|
|
props.ph_upper,
|
|
props.ph_method,
|
|
props.cec,
|
|
props.moisture_content,
|
|
props.soil_classification,
|
|
)
|
|
)
|
|
return None if all_none else props
|
|
|
|
@staticmethod
|
|
def _reachable_compounds_from_root(
|
|
root_compound_pk: int,
|
|
edge_templates: list[tuple[UUID, frozenset[int], tuple[int, ...], tuple[UUID, ...]]],
|
|
) -> set[int]:
|
|
reachable: set[int] = {root_compound_pk}
|
|
changed = True
|
|
|
|
while changed:
|
|
changed = False
|
|
for _, parent_compound_pks, product_compound_pks, _ in edge_templates:
|
|
if not parent_compound_pks.issubset(reachable):
|
|
continue
|
|
|
|
for product_compound_pk in product_compound_pks:
|
|
if product_compound_pk in reachable:
|
|
continue
|
|
reachable.add(product_compound_pk)
|
|
changed = True
|
|
|
|
return reachable
|
|
|
|
@staticmethod
|
|
def _soil_signature(props: SoilPropertiesData) -> tuple:
|
|
return (
|
|
props.soil_type,
|
|
props.sand,
|
|
props.silt,
|
|
props.clay,
|
|
props.org_carbon,
|
|
props.ph_lower,
|
|
props.ph_upper,
|
|
props.ph_method,
|
|
props.cec,
|
|
props.moisture_content,
|
|
props.soil_classification,
|
|
)
|
|
|
|
@staticmethod
|
|
def _soil_no_code_for_index(index: int) -> str | None:
|
|
f137_codes = [
|
|
"2",
|
|
"4",
|
|
"5",
|
|
"6",
|
|
"7",
|
|
"8",
|
|
"9",
|
|
"10",
|
|
"11",
|
|
"3",
|
|
"4070",
|
|
"4071",
|
|
"4072",
|
|
"4073",
|
|
"4074",
|
|
"4075",
|
|
"4076",
|
|
"4077",
|
|
"4078",
|
|
"4079",
|
|
]
|
|
if 0 <= index < len(f137_codes):
|
|
return f137_codes[index]
|
|
return None
|
|
|
|
@staticmethod
|
|
def _compute_derived_properties(smiles: str | None) -> dict:
|
|
molecular_formula = None
|
|
molecular_weight = None
|
|
inchi = None
|
|
inchi_key = None
|
|
|
|
if smiles:
|
|
try:
|
|
molecular_formula = FormatConverter.formula(smiles)
|
|
except Exception:
|
|
logger.debug("Could not compute formula for %s", smiles)
|
|
try:
|
|
molecular_weight = FormatConverter.mass(smiles)
|
|
except Exception:
|
|
logger.debug("Could not compute mass for %s", smiles)
|
|
try:
|
|
inchi = FormatConverter.InChI(smiles)
|
|
except Exception:
|
|
logger.debug("Could not compute InChI for %s", smiles)
|
|
try:
|
|
inchi_key = FormatConverter.InChIKey(smiles)
|
|
except Exception:
|
|
logger.debug("Could not compute InChIKey for %s", smiles)
|
|
|
|
return {
|
|
"molecular_formula": molecular_formula,
|
|
"molecular_weight": molecular_weight,
|
|
"inchi": inchi,
|
|
"inchi_key": inchi_key,
|
|
}
|