from __future__ import annotations import logging from dataclasses import dataclass, field from uuid import UUID, uuid4 from epapi.v1.interfaces.iuclid.dto import PathwayExportDTO from utilities.chem import FormatConverter logger = logging.getLogger(__name__) @dataclass class IUCLIDReferenceSubstanceData: uuid: UUID name: str smiles: str | None = None cas_number: str | None = None ec_number: str | None = None iupac_name: str | None = None molecular_formula: str | None = None molecular_weight: float | None = None inchi: str | None = None inchi_key: str | None = None @dataclass class IUCLIDSubstanceData: uuid: UUID name: str reference_substance_uuid: UUID | None = None @dataclass class SoilPropertiesData: soil_no_code: str | None = None soil_type: str | None = None sand: float | None = None silt: float | None = None clay: float | None = None org_carbon: float | None = None ph_lower: float | None = None ph_upper: float | None = None ph_method: str | None = None cec: float | None = None moisture_content: float | None = None soil_classification: str | None = None @dataclass class IUCLIDEndpointStudyRecordData: uuid: UUID substance_uuid: UUID name: str half_lives: list[HalfLifeEntry] = field(default_factory=list) temperature: tuple[float, float] | None = None transformation_products: list[IUCLIDTransformationProductEntry] = field(default_factory=list) model_name_and_version: list[str] = field(default_factory=list) software_name_and_version: list[str] = field(default_factory=list) model_remarks: list[str] = field(default_factory=list) soil_properties: SoilPropertiesData | None = None soil_properties_entries: list[SoilPropertiesData] = field(default_factory=list) @dataclass class HalfLifeEntry: model: str dt50_start: float dt50_end: float unit: str source: str soil_no_code: str | None = None temperature: tuple[float, float] | None = None @dataclass class IUCLIDTransformationProductEntry: uuid: UUID product_reference_uuid: UUID parent_reference_uuids: list[UUID] = field(default_factory=list) kinetic_formation_fraction: float | None = None source_edge_uuid: UUID | None = None @dataclass class IUCLIDDocumentBundle: substances: list[IUCLIDSubstanceData] = field(default_factory=list) reference_substances: list[IUCLIDReferenceSubstanceData] = field(default_factory=list) endpoint_study_records: list[IUCLIDEndpointStudyRecordData] = field(default_factory=list) class PathwayMapper: def map(self, export: PathwayExportDTO) -> IUCLIDDocumentBundle: bundle = IUCLIDDocumentBundle() seen_compounds: dict[ int, tuple[UUID, UUID] ] = {} # compound PK -> (substance UUID, ref UUID) compound_names: dict[int, str] = {} for compound in export.compounds: if compound.pk in seen_compounds: continue derived = self._compute_derived_properties(compound.smiles) ref_sub_uuid = uuid4() sub_uuid = uuid4() seen_compounds[compound.pk] = (sub_uuid, ref_sub_uuid) compound_names[compound.pk] = compound.name ref_sub = IUCLIDReferenceSubstanceData( uuid=ref_sub_uuid, name=compound.name, smiles=compound.smiles, cas_number=compound.cas_number, molecular_formula=derived["molecular_formula"], molecular_weight=derived["molecular_weight"], inchi=derived["inchi"], inchi_key=derived["inchi_key"], ) bundle.reference_substances.append(ref_sub) sub = IUCLIDSubstanceData( uuid=sub_uuid, name=compound.name, reference_substance_uuid=ref_sub_uuid, ) bundle.substances.append(sub) if not export.compounds: return bundle root_compound_pks: list[int] = [] seen_root_pks: set[int] = set() for root_pk in export.root_compound_pks: if root_pk in seen_compounds and root_pk not in seen_root_pks: root_compound_pks.append(root_pk) seen_root_pks.add(root_pk) if not root_compound_pks: fallback_root_pk = export.compounds[0].pk if fallback_root_pk in seen_compounds: root_compound_pks = [fallback_root_pk] if not root_compound_pks: return bundle edge_templates: list[tuple[UUID, frozenset[int], tuple[int, ...], tuple[UUID, ...]]] = [] for edge in sorted(export.edges, key=lambda item: str(item.edge_uuid)): parent_compound_pks = sorted( {pk for pk in edge.start_compound_pks if pk in seen_compounds} ) product_compound_pks = sorted( {pk for pk in edge.end_compound_pks if pk in seen_compounds} ) if not parent_compound_pks or not product_compound_pks: continue parent_ref_uuids = tuple( sorted({seen_compounds[pk][1] for pk in parent_compound_pks}, key=str) ) edge_templates.append( ( edge.edge_uuid, frozenset(parent_compound_pks), tuple(product_compound_pks), parent_ref_uuids, ) ) model_names: list[str] = [] software_names: list[str] = [] model_remarks: list[str] = [] if export.model_info: if export.model_info.model_name: model_names.append(export.model_info.model_name) if export.model_info.model_uuid: model_remarks.append(f"Model UUID: {export.model_info.model_uuid}") if export.model_info.software_name: if export.model_info.software_version: software_names.append( f"{export.model_info.software_name} {export.model_info.software_version}" ) else: software_names.append(export.model_info.software_name) # Aggregate scenario-aware AI from all root nodes for each root compound. # Each entry is (scenario_uuid, scenario_name, effective_ai_list). root_node_ai_by_scenario: dict[int, dict[str, tuple[UUID | None, str | None, list]]] = {} for node in export.nodes: if node.depth == 0 and node.compound_pk in seen_root_pks: scenario_bucket = root_node_ai_by_scenario.setdefault(node.compound_pk, {}) if node.scenarios: for scenario in node.scenarios: scenario_key = str(scenario.scenario_uuid) existing = scenario_bucket.get(scenario_key) if existing is None: scenario_bucket[scenario_key] = ( scenario.scenario_uuid, scenario.name, list(scenario.additional_info), ) else: existing[2].extend(scenario.additional_info) else: # Backward compatibility path for callers that only provide node.additional_info. fallback_key = f"fallback:{node.node_uuid}" scenario_bucket[fallback_key] = (None, None, list(node.additional_info)) has_multiple_roots = len(root_compound_pks) > 1 for root_pk in root_compound_pks: substance_uuid, _ = seen_compounds[root_pk] esr_name = f"Biodegradation in soil - {export.pathway_name}" if has_multiple_roots: root_name = compound_names.get(root_pk) if root_name: esr_name = f"{esr_name} ({root_name})" transformation_entries: list[IUCLIDTransformationProductEntry] = [] reachable_compound_pks = self._reachable_compounds_from_root(root_pk, edge_templates) seen_transformations: set[tuple[UUID, tuple[UUID, ...]]] = set() for ( edge_uuid, parent_compound_pks, product_compound_pks, parent_reference_uuids, ) in edge_templates: if not parent_compound_pks.issubset(reachable_compound_pks): continue for product_compound_pk in product_compound_pks: if product_compound_pk not in reachable_compound_pks: continue product_ref_uuid = seen_compounds[product_compound_pk][1] dedupe_key = (product_ref_uuid, parent_reference_uuids) if dedupe_key in seen_transformations: continue seen_transformations.add(dedupe_key) transformation_entries.append( IUCLIDTransformationProductEntry( uuid=uuid4(), product_reference_uuid=product_ref_uuid, parent_reference_uuids=list(parent_reference_uuids), source_edge_uuid=edge_uuid, ) ) scenarios_for_root = list(root_node_ai_by_scenario.get(root_pk, {}).values()) if not scenarios_for_root: scenarios_for_root = [(None, None, [])] soil_entries: list[SoilPropertiesData] = [] soil_no_by_signature: dict[tuple, str] = {} half_lives: list[HalfLifeEntry] = [] merged_ai_for_root: list = [] for _, _, ai_for_scenario in scenarios_for_root: merged_ai_for_root.extend(ai_for_scenario) soil = self._extract_soil_properties(ai_for_scenario) temperature = self._extract_temperature(ai_for_scenario) soil_no_code: str | None = None if soil is not None: soil_signature = self._soil_signature(soil) soil_no_code = soil_no_by_signature.get(soil_signature) if soil_no_code is None: soil_no_code = self._soil_no_code_for_index(len(soil_entries)) if soil_no_code is not None: soil.soil_no_code = soil_no_code soil_entries.append(soil) soil_no_by_signature[soil_signature] = soil_no_code for hl in self._extract_half_lives(ai_for_scenario): hl.soil_no_code = soil_no_code hl.temperature = temperature half_lives.append(hl) esr = IUCLIDEndpointStudyRecordData( uuid=uuid4(), substance_uuid=substance_uuid, name=esr_name, half_lives=half_lives, temperature=self._extract_temperature(merged_ai_for_root), transformation_products=transformation_entries, model_name_and_version=model_names, software_name_and_version=software_names, model_remarks=model_remarks, soil_properties=soil_entries[0] if soil_entries else None, soil_properties_entries=soil_entries, ) bundle.endpoint_study_records.append(esr) return bundle @staticmethod def _extract_half_lives(ai_list: list) -> list[HalfLifeEntry]: from envipy_additional_information.information import HalfLife entries = [] for ai in ai_list: if not isinstance(ai, HalfLife): continue start = ai.dt50.start end = ai.dt50.end if start is None or end is None: continue entries.append( HalfLifeEntry( model=ai.model, dt50_start=start, dt50_end=end, unit="d", source=ai.source, ) ) return entries @staticmethod def _extract_temperature(ai_list: list) -> tuple[float, float] | None: from envipy_additional_information.information import Temperature for ai in ai_list: if not isinstance(ai, Temperature): continue lower = ai.interval.start upper = ai.interval.end if lower is None or upper is None: continue return (lower, upper) return None @staticmethod def _extract_soil_properties(ai_list: list) -> SoilPropertiesData | None: from envipy_additional_information.information import ( Acidity, BulkDensity, CEC, Humidity, OMContent, SoilClassification, SoilTexture1, SoilTexture2, ) props = SoilPropertiesData() for ai in ai_list: if isinstance(ai, SoilTexture1) and props.soil_type is None: props.soil_type = ai.type.value elif isinstance(ai, SoilTexture2): if props.sand is None: props.sand = ai.sand if props.silt is None: props.silt = ai.silt if props.clay is None: props.clay = ai.clay elif isinstance(ai, OMContent) and props.org_carbon is None: props.org_carbon = ai.in_oc elif isinstance(ai, Acidity) and props.ph_lower is None: props.ph_lower = ai.interval.start props.ph_upper = ai.interval.end if isinstance(ai.method, str): props.ph_method = ai.method.strip() or None else: props.ph_method = ai.method elif isinstance(ai, CEC) and props.cec is None: props.cec = ai.capacity elif isinstance(ai, Humidity) and props.moisture_content is None: props.moisture_content = ai.humiditiy elif isinstance(ai, SoilClassification) and props.soil_classification is None: props.soil_classification = ai.system.value elif isinstance(ai, BulkDensity): pass # BulkDensity.data is a free-text string; not mapped to SoilPropertiesData all_none = all( v is None for v in ( props.soil_type, props.sand, props.silt, props.clay, props.org_carbon, props.ph_lower, props.ph_upper, props.ph_method, props.cec, props.moisture_content, props.soil_classification, ) ) return None if all_none else props @staticmethod def _reachable_compounds_from_root( root_compound_pk: int, edge_templates: list[tuple[UUID, frozenset[int], tuple[int, ...], tuple[UUID, ...]]], ) -> set[int]: reachable: set[int] = {root_compound_pk} changed = True while changed: changed = False for _, parent_compound_pks, product_compound_pks, _ in edge_templates: if not parent_compound_pks.issubset(reachable): continue for product_compound_pk in product_compound_pks: if product_compound_pk in reachable: continue reachable.add(product_compound_pk) changed = True return reachable @staticmethod def _soil_signature(props: SoilPropertiesData) -> tuple: return ( props.soil_type, props.sand, props.silt, props.clay, props.org_carbon, props.ph_lower, props.ph_upper, props.ph_method, props.cec, props.moisture_content, props.soil_classification, ) @staticmethod def _soil_no_code_for_index(index: int) -> str | None: f137_codes = [ "2", "4", "5", "6", "7", "8", "9", "10", "11", "3", "4070", "4071", "4072", "4073", "4074", "4075", "4076", "4077", "4078", "4079", ] if 0 <= index < len(f137_codes): return f137_codes[index] return None @staticmethod def _compute_derived_properties(smiles: str | None) -> dict: molecular_formula = None molecular_weight = None inchi = None inchi_key = None if smiles: try: molecular_formula = FormatConverter.formula(smiles) except Exception: logger.debug("Could not compute formula for %s", smiles) try: molecular_weight = FormatConverter.mass(smiles) except Exception: logger.debug("Could not compute mass for %s", smiles) try: inchi = FormatConverter.InChI(smiles) except Exception: logger.debug("Could not compute InChI for %s", smiles) try: inchi_key = FormatConverter.InChIKey(smiles) except Exception: logger.debug("Could not compute InChIKey for %s", smiles) return { "molecular_formula": molecular_formula, "molecular_weight": molecular_weight, "inchi": inchi, "inchi_key": inchi_key, }