forked from enviPath/enviPy
[Feature] Minimal IUCLID export (#338)
This is an initial implementation that creates a working minimal .i6z document. It passes schema validation and can be imported into IUCLID. Caveat: IUCLID files target individual compounds. Pathway is not actually covered by the format. It can be added in either soil or water and soil OECD endpoints. **I currently only implemented the soil endpoint for all data.** This sort of works, and I can report all degradation products in a pathway (not a nice view, but we can report many transformation products and add a diagram attachment in the future). Adding additional information is an absolute pain, as we need to explicitly map each type of information to the relevant OECD field. I use the XSD scheme for validation, but unfortunately the IUCLID parser is not fully compliant and requires a specific order, etc. The workflow is: finding the AI structure from the XSD scheme -> make the scheme validation pass -> upload to IUCLID to get obscure error messages -> guess what could be wrong -> repeat 💣 New specifications get released once per year, so we will have to update accordingly. I believe that this should be a more expensive feature, as it requires significant effort to uphold. Currently implemented for root compound only in SOIL: - Soil Texture 2 - Soil Texture 1 - pH value - Half-life per soil sample / scenario (mapped to disappearance; not sure about that). - CEC - Organic Matter (only Carbon) - Moisture content - Humidity <img width="2123" alt="image.png" src="attachments/d29830e1-65ef-4136-8939-1825e0959c62"> <img width="2124" alt="image.png" src="attachments/ac9de2ac-bf68-4ba4-b40b-82f810a9de93"> <img width="2139" alt="image.png" src="attachments/5674c7e6-865e-420e-974a-6b825b331e6c"> Reviewed-on: enviPath/enviPy#338 Co-authored-by: Tobias O <tobias.olenyi@envipath.com> Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
This commit is contained in:
0
epiuclid/serializers/__init__.py
Normal file
0
epiuclid/serializers/__init__.py
Normal file
118
epiuclid/serializers/i6z.py
Normal file
118
epiuclid/serializers/i6z.py
Normal file
@ -0,0 +1,118 @@
|
||||
import io
|
||||
import xml.etree.ElementTree as ET
|
||||
import zipfile
|
||||
|
||||
from epiuclid.builders.base import NS_PLATFORM_CONTAINER, document_key
|
||||
from epiuclid.builders.endpoint_study import EndpointStudyRecordBuilder
|
||||
from epiuclid.builders.reference_substance import ReferenceSubstanceBuilder
|
||||
from epiuclid.builders.substance import SubstanceBuilder
|
||||
from epiuclid.serializers.manifest import ManifestBuilder
|
||||
from epiuclid.serializers.pathway_mapper import IUCLIDDocumentBundle
|
||||
from epiuclid.schemas.loader import get_content_schema
|
||||
|
||||
|
||||
def _i6d_filename(uuid) -> str:
|
||||
return f"{uuid}_0.i6d"
|
||||
|
||||
|
||||
class I6ZSerializer:
|
||||
"""Serialize a IUCLIDDocumentBundle to a ZIP file containing the manifest.xml and the i6d files in memory."""
|
||||
|
||||
def serialize(self, bundle: IUCLIDDocumentBundle, *, validate: bool = False) -> bytes:
|
||||
return self._assemble(bundle, validate=validate)
|
||||
|
||||
def _assemble(self, bundle: IUCLIDDocumentBundle, *, validate: bool = False) -> bytes:
|
||||
sub_builder = SubstanceBuilder()
|
||||
ref_builder = ReferenceSubstanceBuilder()
|
||||
esr_builder = EndpointStudyRecordBuilder()
|
||||
|
||||
# (filename, xml_string, doc_type, uuid, subtype)
|
||||
files: list[tuple[str, str, str, str, str | None]] = []
|
||||
|
||||
for sub in bundle.substances:
|
||||
fname = _i6d_filename(sub.uuid)
|
||||
xml = sub_builder.build(sub)
|
||||
files.append((fname, xml, "SUBSTANCE", str(sub.uuid), None))
|
||||
|
||||
for ref in bundle.reference_substances:
|
||||
fname = _i6d_filename(ref.uuid)
|
||||
xml = ref_builder.build(ref)
|
||||
files.append((fname, xml, "REFERENCE_SUBSTANCE", str(ref.uuid), None))
|
||||
|
||||
for esr in bundle.endpoint_study_records:
|
||||
fname = _i6d_filename(esr.uuid)
|
||||
xml = esr_builder.build(esr)
|
||||
files.append(
|
||||
(fname, xml, "ENDPOINT_STUDY_RECORD", str(esr.uuid), "BiodegradationInSoil")
|
||||
)
|
||||
|
||||
if validate:
|
||||
self._validate_documents(files)
|
||||
|
||||
# Build document relationship links for manifest
|
||||
links = self._build_links(bundle)
|
||||
|
||||
# Build manifest
|
||||
manifest_docs = [(f[0], f[2], f[3], f[4]) for f in files]
|
||||
base_uuid = str(bundle.substances[0].uuid) if bundle.substances else ""
|
||||
manifest_xml = ManifestBuilder().build(manifest_docs, base_uuid, links=links)
|
||||
|
||||
# Assemble ZIP
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
zf.writestr("manifest.xml", manifest_xml)
|
||||
for fname, xml, _, _, _ in files:
|
||||
zf.writestr(fname, xml)
|
||||
return buf.getvalue()
|
||||
|
||||
@staticmethod
|
||||
def _validate_documents(
|
||||
files: list[tuple[str, str, str, str, str | None]],
|
||||
) -> None:
|
||||
"""Validate each i6d document against its XSD schema.
|
||||
|
||||
Raises ``xmlschema.XMLSchemaValidationError`` on the first failure.
|
||||
"""
|
||||
|
||||
for fname, xml, doc_type, _uuid, subtype in files:
|
||||
root = ET.fromstring(xml)
|
||||
content = root.find(f"{{{NS_PLATFORM_CONTAINER}}}Content")
|
||||
if content is None or len(content) == 0:
|
||||
continue
|
||||
content_el = list(content)[0]
|
||||
schema = get_content_schema(doc_type, subtype)
|
||||
schema.validate(content_el)
|
||||
|
||||
@staticmethod
|
||||
def _build_links(bundle: IUCLIDDocumentBundle) -> dict[str, list[tuple[str, str]]]:
|
||||
"""Build manifest link relationships between documents.
|
||||
|
||||
Returns a dict mapping document UUID (str) to list of (target_doc_key, ref_type).
|
||||
"""
|
||||
links: dict[str, list[tuple[str, str]]] = {}
|
||||
|
||||
def _add(uuid_str: str, target_key: str, ref_type: str) -> None:
|
||||
doc_links = links.setdefault(uuid_str, [])
|
||||
link = (target_key, ref_type)
|
||||
if link not in doc_links:
|
||||
doc_links.append(link)
|
||||
|
||||
# Substance -> REFERENCE link to its reference substance
|
||||
for sub in bundle.substances:
|
||||
if sub.reference_substance_uuid:
|
||||
ref_key = document_key(sub.reference_substance_uuid)
|
||||
_add(str(sub.uuid), ref_key, "REFERENCE")
|
||||
|
||||
# ESR -> PARENT link to its substance; substance -> CHILD link to ESR
|
||||
for esr in bundle.endpoint_study_records:
|
||||
sub_key = document_key(esr.substance_uuid)
|
||||
esr_key = document_key(esr.uuid)
|
||||
_add(str(esr.uuid), sub_key, "PARENT")
|
||||
_add(str(esr.substance_uuid), esr_key, "CHILD")
|
||||
|
||||
for tp in esr.transformation_products:
|
||||
_add(str(esr.uuid), document_key(tp.product_reference_uuid), "REFERENCE")
|
||||
for parent_ref_uuid in tp.parent_reference_uuids:
|
||||
_add(str(esr.uuid), document_key(parent_ref_uuid), "REFERENCE")
|
||||
|
||||
return links
|
||||
120
epiuclid/serializers/manifest.py
Normal file
120
epiuclid/serializers/manifest.py
Normal file
@ -0,0 +1,120 @@
|
||||
import xml.etree.ElementTree as ET
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from epiuclid.builders.base import document_key
|
||||
|
||||
NS_MANIFEST = "http://iuclid6.echa.europa.eu/namespaces/manifest/v1"
|
||||
NS_XLINK = "http://www.w3.org/1999/xlink"
|
||||
|
||||
ET.register_namespace("", NS_MANIFEST)
|
||||
ET.register_namespace("xlink", NS_XLINK)
|
||||
|
||||
|
||||
def _i6d_filename(uuid) -> str:
|
||||
"""Convert UUID to i6d filename (uuid_0.i6d for raw data)."""
|
||||
return f"{uuid}_0.i6d"
|
||||
|
||||
|
||||
def _tag(local: str) -> str:
|
||||
return f"{{{NS_MANIFEST}}}{local}"
|
||||
|
||||
|
||||
def _add_link(links_elem: ET.Element, ref_uuid: str, ref_type: str) -> None:
|
||||
"""Add a <link> element with ref-uuid and ref-type."""
|
||||
link = ET.SubElement(links_elem, _tag("link"))
|
||||
ref_uuid_elem = ET.SubElement(link, _tag("ref-uuid"))
|
||||
ref_uuid_elem.text = ref_uuid
|
||||
ref_type_elem = ET.SubElement(link, _tag("ref-type"))
|
||||
ref_type_elem.text = ref_type
|
||||
|
||||
|
||||
class ManifestBuilder:
|
||||
def build(
|
||||
self,
|
||||
documents: list[tuple[str, str, str, str | None]],
|
||||
base_document_uuid: str,
|
||||
links: dict[str, list[tuple[str, str]]] | None = None,
|
||||
) -> str:
|
||||
"""Build manifest.xml.
|
||||
|
||||
Args:
|
||||
documents: List of (filename, doc_type, uuid, subtype) tuples.
|
||||
base_document_uuid: UUID of the base document (the substance export started from).
|
||||
links: Optional dict mapping document UUID to list of (target_doc_key, ref_type) tuples.
|
||||
ref_type is one of: PARENT, CHILD, REFERENCE.
|
||||
"""
|
||||
if links is None:
|
||||
links = {}
|
||||
|
||||
root = ET.Element(_tag("manifest"))
|
||||
|
||||
# general-information
|
||||
gi = ET.SubElement(root, _tag("general-information"))
|
||||
title = ET.SubElement(gi, _tag("title"))
|
||||
title.text = "IUCLID 6 container manifest file"
|
||||
|
||||
created = ET.SubElement(gi, _tag("created"))
|
||||
created.text = datetime.now(timezone.utc).strftime("%a %b %d %H:%M:%S %Z %Y")
|
||||
|
||||
author = ET.SubElement(gi, _tag("author"))
|
||||
author.text = "enviPath"
|
||||
|
||||
application = ET.SubElement(gi, _tag("application"))
|
||||
application.text = "enviPath IUCLID Export"
|
||||
|
||||
submission_type = ET.SubElement(gi, _tag("submission-type"))
|
||||
submission_type.text = "R_INT_ONSITE"
|
||||
|
||||
archive_type = ET.SubElement(gi, _tag("archive-type"))
|
||||
archive_type.text = "RAW_DATA"
|
||||
|
||||
legislations = ET.SubElement(gi, _tag("legislations-info"))
|
||||
leg = ET.SubElement(legislations, _tag("legislation"))
|
||||
leg_id = ET.SubElement(leg, _tag("id"))
|
||||
leg_id.text = "core"
|
||||
leg_ver = ET.SubElement(leg, _tag("version"))
|
||||
leg_ver.text = "10.0"
|
||||
|
||||
# base-document-uuid
|
||||
base_doc = ET.SubElement(root, _tag("base-document-uuid"))
|
||||
base_doc.text = document_key(base_document_uuid)
|
||||
|
||||
# contained-documents
|
||||
contained = ET.SubElement(root, _tag("contained-documents"))
|
||||
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
for filename, doc_type, uuid, subtype in documents:
|
||||
doc_key = document_key(uuid)
|
||||
doc_elem = ET.SubElement(contained, _tag("document"))
|
||||
doc_elem.set("id", doc_key)
|
||||
|
||||
type_elem = ET.SubElement(doc_elem, _tag("type"))
|
||||
type_elem.text = doc_type
|
||||
|
||||
if subtype:
|
||||
subtype_elem = ET.SubElement(doc_elem, _tag("subtype"))
|
||||
subtype_elem.text = subtype
|
||||
|
||||
name_elem = ET.SubElement(doc_elem, _tag("name"))
|
||||
name_elem.set(f"{{{NS_XLINK}}}type", "simple")
|
||||
name_elem.set(f"{{{NS_XLINK}}}href", filename)
|
||||
name_elem.text = filename
|
||||
|
||||
first_mod = ET.SubElement(doc_elem, _tag("first-modification-date"))
|
||||
first_mod.text = now
|
||||
|
||||
last_mod = ET.SubElement(doc_elem, _tag("last-modification-date"))
|
||||
last_mod.text = now
|
||||
|
||||
uuid_elem = ET.SubElement(doc_elem, _tag("uuid"))
|
||||
uuid_elem.text = doc_key
|
||||
|
||||
# Add links for this document if any
|
||||
doc_links = links.get(uuid, [])
|
||||
if doc_links:
|
||||
links_elem = ET.SubElement(doc_elem, _tag("links"))
|
||||
for target_key, ref_type in doc_links:
|
||||
_add_link(links_elem, target_key, ref_type)
|
||||
|
||||
return ET.tostring(root, encoding="unicode", xml_declaration=True)
|
||||
493
epiuclid/serializers/pathway_mapper.py
Normal file
493
epiuclid/serializers/pathway_mapper.py
Normal file
@ -0,0 +1,493 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from epapi.v1.interfaces.iuclid.dto import PathwayExportDTO
|
||||
from utilities.chem import FormatConverter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class IUCLIDReferenceSubstanceData:
|
||||
uuid: UUID
|
||||
name: str
|
||||
smiles: str | None = None
|
||||
cas_number: str | None = None
|
||||
ec_number: str | None = None
|
||||
iupac_name: str | None = None
|
||||
molecular_formula: str | None = None
|
||||
molecular_weight: float | None = None
|
||||
inchi: str | None = None
|
||||
inchi_key: str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class IUCLIDSubstanceData:
|
||||
uuid: UUID
|
||||
name: str
|
||||
reference_substance_uuid: UUID | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class SoilPropertiesData:
|
||||
soil_no_code: str | None = None
|
||||
soil_type: str | None = None
|
||||
sand: float | None = None
|
||||
silt: float | None = None
|
||||
clay: float | None = None
|
||||
org_carbon: float | None = None
|
||||
ph_lower: float | None = None
|
||||
ph_upper: float | None = None
|
||||
ph_method: str | None = None
|
||||
cec: float | None = None
|
||||
moisture_content: float | None = None
|
||||
soil_classification: str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class IUCLIDEndpointStudyRecordData:
|
||||
uuid: UUID
|
||||
substance_uuid: UUID
|
||||
name: str
|
||||
half_lives: list[HalfLifeEntry] = field(default_factory=list)
|
||||
temperature: tuple[float, float] | None = None
|
||||
transformation_products: list[IUCLIDTransformationProductEntry] = field(default_factory=list)
|
||||
model_name_and_version: list[str] = field(default_factory=list)
|
||||
software_name_and_version: list[str] = field(default_factory=list)
|
||||
model_remarks: list[str] = field(default_factory=list)
|
||||
soil_properties: SoilPropertiesData | None = None
|
||||
soil_properties_entries: list[SoilPropertiesData] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HalfLifeEntry:
|
||||
model: str
|
||||
dt50_start: float
|
||||
dt50_end: float
|
||||
unit: str
|
||||
source: str
|
||||
soil_no_code: str | None = None
|
||||
temperature: tuple[float, float] | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class IUCLIDTransformationProductEntry:
|
||||
uuid: UUID
|
||||
product_reference_uuid: UUID
|
||||
parent_reference_uuids: list[UUID] = field(default_factory=list)
|
||||
kinetic_formation_fraction: float | None = None
|
||||
source_edge_uuid: UUID | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class IUCLIDDocumentBundle:
|
||||
substances: list[IUCLIDSubstanceData] = field(default_factory=list)
|
||||
reference_substances: list[IUCLIDReferenceSubstanceData] = field(default_factory=list)
|
||||
endpoint_study_records: list[IUCLIDEndpointStudyRecordData] = field(default_factory=list)
|
||||
|
||||
|
||||
class PathwayMapper:
|
||||
def map(self, export: PathwayExportDTO) -> IUCLIDDocumentBundle:
|
||||
bundle = IUCLIDDocumentBundle()
|
||||
|
||||
seen_compounds: dict[
|
||||
int, tuple[UUID, UUID]
|
||||
] = {} # compound PK -> (substance UUID, ref UUID)
|
||||
compound_names: dict[int, str] = {}
|
||||
|
||||
for compound in export.compounds:
|
||||
if compound.pk in seen_compounds:
|
||||
continue
|
||||
|
||||
derived = self._compute_derived_properties(compound.smiles)
|
||||
ref_sub_uuid = uuid4()
|
||||
sub_uuid = uuid4()
|
||||
seen_compounds[compound.pk] = (sub_uuid, ref_sub_uuid)
|
||||
compound_names[compound.pk] = compound.name
|
||||
|
||||
ref_sub = IUCLIDReferenceSubstanceData(
|
||||
uuid=ref_sub_uuid,
|
||||
name=compound.name,
|
||||
smiles=compound.smiles,
|
||||
cas_number=compound.cas_number,
|
||||
molecular_formula=derived["molecular_formula"],
|
||||
molecular_weight=derived["molecular_weight"],
|
||||
inchi=derived["inchi"],
|
||||
inchi_key=derived["inchi_key"],
|
||||
)
|
||||
bundle.reference_substances.append(ref_sub)
|
||||
|
||||
sub = IUCLIDSubstanceData(
|
||||
uuid=sub_uuid,
|
||||
name=compound.name,
|
||||
reference_substance_uuid=ref_sub_uuid,
|
||||
)
|
||||
bundle.substances.append(sub)
|
||||
|
||||
if not export.compounds:
|
||||
return bundle
|
||||
|
||||
root_compound_pks: list[int] = []
|
||||
seen_root_pks: set[int] = set()
|
||||
for root_pk in export.root_compound_pks:
|
||||
if root_pk in seen_compounds and root_pk not in seen_root_pks:
|
||||
root_compound_pks.append(root_pk)
|
||||
seen_root_pks.add(root_pk)
|
||||
|
||||
if not root_compound_pks:
|
||||
fallback_root_pk = export.compounds[0].pk
|
||||
if fallback_root_pk in seen_compounds:
|
||||
root_compound_pks = [fallback_root_pk]
|
||||
|
||||
if not root_compound_pks:
|
||||
return bundle
|
||||
|
||||
edge_templates: list[tuple[UUID, frozenset[int], tuple[int, ...], tuple[UUID, ...]]] = []
|
||||
for edge in sorted(export.edges, key=lambda item: str(item.edge_uuid)):
|
||||
parent_compound_pks = sorted(
|
||||
{pk for pk in edge.start_compound_pks if pk in seen_compounds}
|
||||
)
|
||||
product_compound_pks = sorted(
|
||||
{pk for pk in edge.end_compound_pks if pk in seen_compounds}
|
||||
)
|
||||
|
||||
if not parent_compound_pks or not product_compound_pks:
|
||||
continue
|
||||
|
||||
parent_ref_uuids = tuple(
|
||||
sorted({seen_compounds[pk][1] for pk in parent_compound_pks}, key=str)
|
||||
)
|
||||
edge_templates.append(
|
||||
(
|
||||
edge.edge_uuid,
|
||||
frozenset(parent_compound_pks),
|
||||
tuple(product_compound_pks),
|
||||
parent_ref_uuids,
|
||||
)
|
||||
)
|
||||
|
||||
model_names: list[str] = []
|
||||
software_names: list[str] = []
|
||||
model_remarks: list[str] = []
|
||||
if export.model_info:
|
||||
if export.model_info.model_name:
|
||||
model_names.append(export.model_info.model_name)
|
||||
if export.model_info.model_uuid:
|
||||
model_remarks.append(f"Model UUID: {export.model_info.model_uuid}")
|
||||
if export.model_info.software_name:
|
||||
if export.model_info.software_version:
|
||||
software_names.append(
|
||||
f"{export.model_info.software_name} {export.model_info.software_version}"
|
||||
)
|
||||
else:
|
||||
software_names.append(export.model_info.software_name)
|
||||
|
||||
# Aggregate scenario-aware AI from all root nodes for each root compound.
|
||||
# Each entry is (scenario_uuid, scenario_name, effective_ai_list).
|
||||
root_node_ai_by_scenario: dict[int, dict[str, tuple[UUID | None, str | None, list]]] = {}
|
||||
for node in export.nodes:
|
||||
if node.depth == 0 and node.compound_pk in seen_root_pks:
|
||||
scenario_bucket = root_node_ai_by_scenario.setdefault(node.compound_pk, {})
|
||||
if node.scenarios:
|
||||
for scenario in node.scenarios:
|
||||
scenario_key = str(scenario.scenario_uuid)
|
||||
existing = scenario_bucket.get(scenario_key)
|
||||
if existing is None:
|
||||
scenario_bucket[scenario_key] = (
|
||||
scenario.scenario_uuid,
|
||||
scenario.name,
|
||||
list(scenario.additional_info),
|
||||
)
|
||||
else:
|
||||
existing[2].extend(scenario.additional_info)
|
||||
else:
|
||||
# Backward compatibility path for callers that only provide node.additional_info.
|
||||
fallback_key = f"fallback:{node.node_uuid}"
|
||||
scenario_bucket[fallback_key] = (None, None, list(node.additional_info))
|
||||
|
||||
has_multiple_roots = len(root_compound_pks) > 1
|
||||
for root_pk in root_compound_pks:
|
||||
substance_uuid, _ = seen_compounds[root_pk]
|
||||
esr_name = f"Biodegradation in soil - {export.pathway_name}"
|
||||
if has_multiple_roots:
|
||||
root_name = compound_names.get(root_pk)
|
||||
if root_name:
|
||||
esr_name = f"{esr_name} ({root_name})"
|
||||
|
||||
transformation_entries: list[IUCLIDTransformationProductEntry] = []
|
||||
reachable_compound_pks = self._reachable_compounds_from_root(root_pk, edge_templates)
|
||||
seen_transformations: set[tuple[UUID, tuple[UUID, ...]]] = set()
|
||||
for (
|
||||
edge_uuid,
|
||||
parent_compound_pks,
|
||||
product_compound_pks,
|
||||
parent_reference_uuids,
|
||||
) in edge_templates:
|
||||
if not parent_compound_pks.issubset(reachable_compound_pks):
|
||||
continue
|
||||
|
||||
for product_compound_pk in product_compound_pks:
|
||||
if product_compound_pk not in reachable_compound_pks:
|
||||
continue
|
||||
|
||||
product_ref_uuid = seen_compounds[product_compound_pk][1]
|
||||
dedupe_key = (product_ref_uuid, parent_reference_uuids)
|
||||
if dedupe_key in seen_transformations:
|
||||
continue
|
||||
|
||||
seen_transformations.add(dedupe_key)
|
||||
transformation_entries.append(
|
||||
IUCLIDTransformationProductEntry(
|
||||
uuid=uuid4(),
|
||||
product_reference_uuid=product_ref_uuid,
|
||||
parent_reference_uuids=list(parent_reference_uuids),
|
||||
source_edge_uuid=edge_uuid,
|
||||
)
|
||||
)
|
||||
|
||||
scenarios_for_root = list(root_node_ai_by_scenario.get(root_pk, {}).values())
|
||||
if not scenarios_for_root:
|
||||
scenarios_for_root = [(None, None, [])]
|
||||
|
||||
soil_entries: list[SoilPropertiesData] = []
|
||||
soil_no_by_signature: dict[tuple, str] = {}
|
||||
half_lives: list[HalfLifeEntry] = []
|
||||
merged_ai_for_root: list = []
|
||||
|
||||
for _, _, ai_for_scenario in scenarios_for_root:
|
||||
merged_ai_for_root.extend(ai_for_scenario)
|
||||
|
||||
soil = self._extract_soil_properties(ai_for_scenario)
|
||||
temperature = self._extract_temperature(ai_for_scenario)
|
||||
|
||||
soil_no_code: str | None = None
|
||||
if soil is not None:
|
||||
soil_signature = self._soil_signature(soil)
|
||||
soil_no_code = soil_no_by_signature.get(soil_signature)
|
||||
if soil_no_code is None:
|
||||
soil_no_code = self._soil_no_code_for_index(len(soil_entries))
|
||||
if soil_no_code is not None:
|
||||
soil.soil_no_code = soil_no_code
|
||||
soil_entries.append(soil)
|
||||
soil_no_by_signature[soil_signature] = soil_no_code
|
||||
|
||||
for hl in self._extract_half_lives(ai_for_scenario):
|
||||
hl.soil_no_code = soil_no_code
|
||||
hl.temperature = temperature
|
||||
half_lives.append(hl)
|
||||
|
||||
esr = IUCLIDEndpointStudyRecordData(
|
||||
uuid=uuid4(),
|
||||
substance_uuid=substance_uuid,
|
||||
name=esr_name,
|
||||
half_lives=half_lives,
|
||||
temperature=self._extract_temperature(merged_ai_for_root),
|
||||
transformation_products=transformation_entries,
|
||||
model_name_and_version=model_names,
|
||||
software_name_and_version=software_names,
|
||||
model_remarks=model_remarks,
|
||||
soil_properties=soil_entries[0] if soil_entries else None,
|
||||
soil_properties_entries=soil_entries,
|
||||
)
|
||||
bundle.endpoint_study_records.append(esr)
|
||||
|
||||
return bundle
|
||||
|
||||
@staticmethod
|
||||
def _extract_half_lives(ai_list: list) -> list[HalfLifeEntry]:
|
||||
from envipy_additional_information.information import HalfLife
|
||||
|
||||
entries = []
|
||||
for ai in ai_list:
|
||||
if not isinstance(ai, HalfLife):
|
||||
continue
|
||||
start = ai.dt50.start
|
||||
end = ai.dt50.end
|
||||
if start is None or end is None:
|
||||
continue
|
||||
entries.append(
|
||||
HalfLifeEntry(
|
||||
model=ai.model,
|
||||
dt50_start=start,
|
||||
dt50_end=end,
|
||||
unit="d",
|
||||
source=ai.source,
|
||||
)
|
||||
)
|
||||
return entries
|
||||
|
||||
@staticmethod
|
||||
def _extract_temperature(ai_list: list) -> tuple[float, float] | None:
|
||||
from envipy_additional_information.information import Temperature
|
||||
|
||||
for ai in ai_list:
|
||||
if not isinstance(ai, Temperature):
|
||||
continue
|
||||
lower = ai.interval.start
|
||||
upper = ai.interval.end
|
||||
if lower is None or upper is None:
|
||||
continue
|
||||
return (lower, upper)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _extract_soil_properties(ai_list: list) -> SoilPropertiesData | None:
|
||||
from envipy_additional_information.information import (
|
||||
Acidity,
|
||||
BulkDensity,
|
||||
CEC,
|
||||
Humidity,
|
||||
OMContent,
|
||||
SoilClassification,
|
||||
SoilTexture1,
|
||||
SoilTexture2,
|
||||
)
|
||||
|
||||
props = SoilPropertiesData()
|
||||
|
||||
for ai in ai_list:
|
||||
if isinstance(ai, SoilTexture1) and props.soil_type is None:
|
||||
props.soil_type = ai.type.value
|
||||
elif isinstance(ai, SoilTexture2):
|
||||
if props.sand is None:
|
||||
props.sand = ai.sand
|
||||
if props.silt is None:
|
||||
props.silt = ai.silt
|
||||
if props.clay is None:
|
||||
props.clay = ai.clay
|
||||
elif isinstance(ai, OMContent) and props.org_carbon is None:
|
||||
props.org_carbon = ai.in_oc
|
||||
elif isinstance(ai, Acidity) and props.ph_lower is None:
|
||||
props.ph_lower = ai.interval.start
|
||||
props.ph_upper = ai.interval.end
|
||||
if isinstance(ai.method, str):
|
||||
props.ph_method = ai.method.strip() or None
|
||||
else:
|
||||
props.ph_method = ai.method
|
||||
elif isinstance(ai, CEC) and props.cec is None:
|
||||
props.cec = ai.capacity
|
||||
elif isinstance(ai, Humidity) and props.moisture_content is None:
|
||||
props.moisture_content = ai.humiditiy
|
||||
elif isinstance(ai, SoilClassification) and props.soil_classification is None:
|
||||
props.soil_classification = ai.system.value
|
||||
elif isinstance(ai, BulkDensity):
|
||||
pass # BulkDensity.data is a free-text string; not mapped to SoilPropertiesData
|
||||
|
||||
all_none = all(
|
||||
v is None
|
||||
for v in (
|
||||
props.soil_type,
|
||||
props.sand,
|
||||
props.silt,
|
||||
props.clay,
|
||||
props.org_carbon,
|
||||
props.ph_lower,
|
||||
props.ph_upper,
|
||||
props.ph_method,
|
||||
props.cec,
|
||||
props.moisture_content,
|
||||
props.soil_classification,
|
||||
)
|
||||
)
|
||||
return None if all_none else props
|
||||
|
||||
@staticmethod
|
||||
def _reachable_compounds_from_root(
|
||||
root_compound_pk: int,
|
||||
edge_templates: list[tuple[UUID, frozenset[int], tuple[int, ...], tuple[UUID, ...]]],
|
||||
) -> set[int]:
|
||||
reachable: set[int] = {root_compound_pk}
|
||||
changed = True
|
||||
|
||||
while changed:
|
||||
changed = False
|
||||
for _, parent_compound_pks, product_compound_pks, _ in edge_templates:
|
||||
if not parent_compound_pks.issubset(reachable):
|
||||
continue
|
||||
|
||||
for product_compound_pk in product_compound_pks:
|
||||
if product_compound_pk in reachable:
|
||||
continue
|
||||
reachable.add(product_compound_pk)
|
||||
changed = True
|
||||
|
||||
return reachable
|
||||
|
||||
@staticmethod
|
||||
def _soil_signature(props: SoilPropertiesData) -> tuple:
|
||||
return (
|
||||
props.soil_type,
|
||||
props.sand,
|
||||
props.silt,
|
||||
props.clay,
|
||||
props.org_carbon,
|
||||
props.ph_lower,
|
||||
props.ph_upper,
|
||||
props.ph_method,
|
||||
props.cec,
|
||||
props.moisture_content,
|
||||
props.soil_classification,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _soil_no_code_for_index(index: int) -> str | None:
|
||||
f137_codes = [
|
||||
"2",
|
||||
"4",
|
||||
"5",
|
||||
"6",
|
||||
"7",
|
||||
"8",
|
||||
"9",
|
||||
"10",
|
||||
"11",
|
||||
"3",
|
||||
"4070",
|
||||
"4071",
|
||||
"4072",
|
||||
"4073",
|
||||
"4074",
|
||||
"4075",
|
||||
"4076",
|
||||
"4077",
|
||||
"4078",
|
||||
"4079",
|
||||
]
|
||||
if 0 <= index < len(f137_codes):
|
||||
return f137_codes[index]
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _compute_derived_properties(smiles: str | None) -> dict:
|
||||
molecular_formula = None
|
||||
molecular_weight = None
|
||||
inchi = None
|
||||
inchi_key = None
|
||||
|
||||
if smiles:
|
||||
try:
|
||||
molecular_formula = FormatConverter.formula(smiles)
|
||||
except Exception:
|
||||
logger.debug("Could not compute formula for %s", smiles)
|
||||
try:
|
||||
molecular_weight = FormatConverter.mass(smiles)
|
||||
except Exception:
|
||||
logger.debug("Could not compute mass for %s", smiles)
|
||||
try:
|
||||
inchi = FormatConverter.InChI(smiles)
|
||||
except Exception:
|
||||
logger.debug("Could not compute InChI for %s", smiles)
|
||||
try:
|
||||
inchi_key = FormatConverter.InChIKey(smiles)
|
||||
except Exception:
|
||||
logger.debug("Could not compute InChIKey for %s", smiles)
|
||||
|
||||
return {
|
||||
"molecular_formula": molecular_formula,
|
||||
"molecular_weight": molecular_weight,
|
||||
"inchi": inchi,
|
||||
"inchi_key": inchi_key,
|
||||
}
|
||||
Reference in New Issue
Block a user