[Feature] Scenario and Additional Information creation via enviPath-python, Add Half Lifes to API Output, Fix source/target ids in legacy API (#340)

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#340
This commit is contained in:
2026-03-06 07:20:18 +13:00
parent 81cc612e69
commit 6e00926371
3 changed files with 795 additions and 32 deletions

View File

@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional
import nh3 import nh3
from django.conf import settings as s from django.conf import settings as s
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.http import HttpResponse from django.http import HttpResponse, JsonResponse
from django.shortcuts import redirect from django.shortcuts import redirect
from ninja import Field, Form, Query, Router, Schema from ninja import Field, Form, Query, Router, Schema
from ninja.security import SessionAuth from ninja.security import SessionAuth
@ -558,21 +558,42 @@ class CompoundSchema(Schema):
@staticmethod @staticmethod
def resolve_halflifes(obj: Compound): def resolve_halflifes(obj: Compound):
return [] res = []
for scen, hls in obj.half_lifes().items():
for hl in hls:
res.append(
{
"hl": str(hl.dt50),
"hlComment": hl.comment,
"hlFit": hl.fit,
"hlModel": hl.model,
"scenarioId": scen.url,
"scenarioName": scen.name,
"scenarioType": scen.scenario_type,
"source": hl.source,
}
)
return res
@staticmethod @staticmethod
def resolve_pubchem_compound_references(obj: Compound): def resolve_pubchem_compound_references(obj: Compound):
# TODO
return [] return []
@staticmethod @staticmethod
def resolve_pathway_scenarios(obj: Compound): def resolve_pathway_scenarios(obj: Compound):
return [ res = []
{ for pw in obj.related_pathways:
"scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/cd8350cd-4249-4111-ba9f-4e2209338501", for scen in pw.scenarios.all():
"scenarioName": "Fritz, R. & Brauner, A. (1989) - (00004)", res.append(
"scenarioType": "Soil", {
} "scenarioId": scen.url,
] "scenarioName": scen.name,
"scenarioType": scen.scenario_type,
}
)
return res
class CompoundStructureSchema(Schema): class CompoundStructureSchema(Schema):
@ -625,7 +646,22 @@ class CompoundStructureSchema(Schema):
@staticmethod @staticmethod
def resolve_halflifes(obj: CompoundStructure): def resolve_halflifes(obj: CompoundStructure):
return [] res = []
for scen, hls in obj.half_lifes().items():
for hl in hls:
res.append(
{
"hl": str(hl.dt50),
"hlComment": hl.comment,
"hlFit": hl.fit,
"hlModel": hl.model,
"scenarioId": scen.url,
"scenarioName": scen.name,
"scenarioType": scen.scenario_type,
"source": hl.source,
}
)
return res
@staticmethod @staticmethod
def resolve_pubchem_compound_references(obj: CompoundStructure): def resolve_pubchem_compound_references(obj: CompoundStructure):
@ -633,13 +669,18 @@ class CompoundStructureSchema(Schema):
@staticmethod @staticmethod
def resolve_pathway_scenarios(obj: CompoundStructure): def resolve_pathway_scenarios(obj: CompoundStructure):
return [ res = []
{ for pw in obj.related_pathways:
"scenarioId": "https://envipath.org/package/5882df9c-dae1-4d80-a40e-db4724271456/scenario/cd8350cd-4249-4111-ba9f-4e2209338501", for scen in pw.scenarios.all():
"scenarioName": "Fritz, R. & Brauner, A. (1989) - (00004)", res.append(
"scenarioType": "Soil", {
} "scenarioId": scen.url,
] "scenarioName": scen.name,
"scenarioType": scen.scenario_type,
}
)
return res
class CompoundStructureWrapper(Schema): class CompoundStructureWrapper(Schema):
@ -1327,9 +1368,42 @@ def get_package_scenario(request, package_uuid, scenario_uuid):
} }
@router.post("/package/{uuid:package_uuid}/scenario") @router.post("/package/{uuid:package_uuid}/scenario", response={200: str | Any, 403: Error})
def create_package_scenario(request, package_uuid): def create_package_scenario(request, package_uuid):
pass from utilities.legacy import build_additional_information_from_request
try:
p = get_package_for_write(request.user, package_uuid)
scen_date = None
date_year = request.POST.get("dateYear")
date_month = request.POST.get("dateMonth")
date_day = request.POST.get("dateDay")
if date_year:
scen_date = date_year
if date_month:
scen_date += f"-{date_month}"
if date_day:
scen_date += f"-{date_day}"
name = request.POST.get("studyname")
description = request.POST.get("studydescription")
study_type = request.POST.get("type")
ais = []
types = request.POST.getlist("adInfoTypes[]")
for t in types:
ais.append(build_additional_information_from_request(request, t))
new_s = Scenario.create(p, name, description, scen_date, study_type, ais)
return JsonResponse({"scenarioLocation": new_s.url})
except ValueError:
return 403, {
"message": f"Getting Package with id {package_uuid} failed due to insufficient rights!"
}
@router.delete("/package/{uuid:package_uuid}/scenario") @router.delete("/package/{uuid:package_uuid}/scenario")
@ -1376,8 +1450,8 @@ class PathwayEdge(Schema):
pseudo: bool = False pseudo: bool = False
rule: Optional[str] = Field(None, alias="rule") rule: Optional[str] = Field(None, alias="rule")
scenarios: List[SimpleScenario] = Field([], alias="scenarios") scenarios: List[SimpleScenario] = Field([], alias="scenarios")
source: int = -1 source: int = Field(-1)
target: int = -1 target: int = Field(-1)
@staticmethod @staticmethod
def resolve_rule(obj: Edge): def resolve_rule(obj: Edge):
@ -1440,9 +1514,9 @@ class PathwaySchema(Schema):
isIncremental: bool = Field(None, alias="is_incremental") isIncremental: bool = Field(None, alias="is_incremental")
isPredicted: bool = Field(None, alias="is_predicted") isPredicted: bool = Field(None, alias="is_predicted")
lastModified: int = Field(None, alias="last_modified") lastModified: int = Field(None, alias="last_modified")
links: List[PathwayEdge] = Field([], alias="edges") links: List[PathwayEdge] = Field([])
name: str = Field(None, alias="name") name: str = Field(None, alias="name")
nodes: List[PathwayNode] = Field([], alias="nodes") nodes: List[PathwayNode] = Field([])
pathwayName: str = Field(None, alias="name") pathwayName: str = Field(None, alias="name")
reviewStatus: str = Field(None, alias="review_status") reviewStatus: str = Field(None, alias="review_status")
scenarios: List["SimpleScenario"] = Field([], alias="scenarios") scenarios: List["SimpleScenario"] = Field([], alias="scenarios")
@ -1464,6 +1538,14 @@ class PathwaySchema(Schema):
def resolve_last_modified(obj: Pathway): def resolve_last_modified(obj: Pathway):
return int(obj.modified.timestamp()) return int(obj.modified.timestamp())
@staticmethod
def resolve_links(obj: Pathway):
return obj.d3_json().get("links", [])
@staticmethod
def resolve_nodes(obj: Pathway):
return obj.d3_json().get("nodes", [])
@router.get("/pathway", response={200: PathwayWrapper, 403: Error}) @router.get("/pathway", response={200: PathwayWrapper, 403: Error})
def get_pathways(request): def get_pathways(request):
@ -1507,8 +1589,8 @@ class CreatePathway(Schema):
selectedSetting: str | None = None selectedSetting: str | None = None
@router.post("/package/{uuid:package_uuid}/pathway") @router.post("/package/{uuid:package_uuid}/pathway", response={200: Any, 403: Error})
def create_pathway( def create_package_pathway(
request, request,
package_uuid, package_uuid,
pw: Form[CreatePathway], pw: Form[CreatePathway],
@ -1516,9 +1598,6 @@ def create_pathway(
try: try:
p = get_package_for_write(request.user, package_uuid) p = get_package_for_write(request.user, package_uuid)
if not PackageManager.writable(request.user, p):
raise ValueError("You do not have the rights to create a Pathway!")
stand_smiles = FormatConverter.standardize(pw.smilesinput.strip(), remove_stereo=True) stand_smiles = FormatConverter.standardize(pw.smilesinput.strip(), remove_stereo=True)
new_pw = Pathway.create(p, stand_smiles, name=pw.name, description=pw.description) new_pw = Pathway.create(p, stand_smiles, name=pw.name, description=pw.description)
@ -1546,7 +1625,7 @@ def create_pathway(
return redirect(new_pw.url) return redirect(new_pw.url)
except ValueError as e: except ValueError as e:
return 400, {"message": str(e)} return 403, {"message": str(e)}
@router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}") @router.delete("/package/{uuid:package_uuid}/pathway/{uuid:pathway_uuid}")

View File

@ -1842,7 +1842,7 @@ class Pathway(EnviPathModel, AliasMixin, ScenarioMixin):
queue.append(n) queue.append(n)
# Add unconnected nodes # Add unconnected nodes
for n in self.nodes: for n in self.nodes.order_by("url"):
if len(n.out_edges.all()) == 0: if len(n.out_edges.all()) == 0:
if n not in queue: if n not in queue:
queue.append(n) queue.append(n)
@ -1852,8 +1852,8 @@ class Pathway(EnviPathModel, AliasMixin, ScenarioMixin):
processed.add(current) processed.add(current)
nodes.append(current.d3_json()) nodes.append(current.d3_json())
for e in self.edges.filter(start_nodes=current).distinct(): for e in self.edges.filter(start_nodes=current).order_by("url").distinct():
for prod in e.end_nodes.all(): for prod in e.end_nodes.all().order_by("url"):
if prod not in queue and prod not in processed: if prod not in queue and prod not in processed:
queue.append(prod) queue.append(prod)

684
utilities/legacy.py Normal file
View File

@ -0,0 +1,684 @@
# legacy counts:
# count property
# 8618 referringscenario
# 6572 halflife
# 5332 acidity
# 5253 temperature
# 5235 spikecompound
# 5096 soiltexture1
# 4939 waterstoragecapacity
# 4783 soiltexture2
# 4750 soilsource
# 4736 redox
# 4681 omcontent
# 4299 cec
# 4101 spikeconcentration
# 3967 humidity
# 3317 soilclassificationsystem
# 3154 biomass
# 2750 minormajor
# 1776 bulkdens
# 1588 initorganism
# 1499 reference
# 1383 enzyme
# 1144 sourcescenario
# 583 confidencelevel
# 477 acidity_ws
# 477 halflife_ws
# 477 samplelocation
# 451 organiccontent
# 368 organiccarbonwater
# 331 rateconstant
# 313 rulelikelihood
# 304 columnheight
# 291 redoxpotential
# 289 oxygencontent
# 276 biomass_ws
# 247 initialmasssediment
# 243 initialvolumewater
# 101 proposedintermediate
# 85 bioreactor
# 85 finalcompoundconcentration
# 75 purposeofwwtp
# 75 typeofaeration
# 74 location
# 68 inoculumsource
# 66 samplingdepth
# 60 originalsludgeamount
# 53 tts
# 52 typeofaddition
# 51 solventforcompoundsolution
# 50 sourceofliquidmatrix
# 43 sludgeretentiontime
# 36 biologicaltreatmenttechnology
# 22 aerationtype
# 16 dissolvedorganiccarbon
# 13 additionofnutrients
# 11 nitrogencontent
# 9 oxygendemand
# 8 phosphoruscontent
# 6 Dissolvedoxygenconcentration
# 4 oxygenuptakerate
# 1 amionauptakerate
# 1 volatiletts
import logging
from envipy_additional_information import HalfLife, HalfLifeWS
from envipy_additional_information.information import Interval
from envipy_additional_information.parsers import (
AcidityParser,
AdditionParser,
AerationTypeParser,
AmmoniaUptakeRateParser,
BiologicalTreatmentTechnologyParser,
BiomassParser,
BioReactorParser,
BulkDensityParser,
CECParser,
ColumnHeightParser,
CompoundSolutionSolventParser,
ConfidenceParser,
DissolvedOrganicCarbonParser,
DissolvedOxygenConcentrationParser,
FinalCompoundConcentrationParser,
HumidityParser,
InitialSedimentMassParser,
InitialVolumeWaterParser,
InoculumSourceParser,
IntervalParser,
LocationParser,
NitrogenContentParser,
NutrientsParser,
OMContentParser,
OrganicCarbonWaterParser,
OrganicContentParser,
OriginalSludgeAmountParser,
OxygenContentParser,
OxygenDemandParser,
PFASManufacturingCategoryParser,
PhosphorusContentParser,
ProposedIntermediateParser,
RateConstantParser,
RedoxParser,
RedoxPotentialParser,
ReferenceParser,
RuleLikelihoodParser,
SampleLocationParser,
SamplingDepthParser,
SludgeRetentionTimeParser,
SoilClassificationParser,
SoilSourceParser,
SoilTexture1Parser,
SoilTexture2Parser,
SpikeCompoundParser,
SpikeConcentrationParser,
TemperatureParser,
TotalOrganicCarbonParser,
TotalSuspendedSolidsParser,
TransformationProductImportanceParser,
VolatileTotalSuspendedSolidsParser,
WaterSedimentAcidityParser,
WaterSedimentBiomassParser,
WaterStorageCapacityParser,
WWTPPurposeParser,
LiquidMatrixSourceParser,
OxygenUptakeRateParser,
InitiatingOrganismParser,
)
logger = logging.getLogger(__name__)
def extract_influent_effluent(request, influentName, effluentName):
influent = get_parameter_or_empty_string(request, influentName)
effluent = get_parameter_or_empty_string(request, effluentName)
return influent + ";" + effluent
def get_parameter(request, paramname):
res = request.POST.get(paramname)
if res is not None and res.strip() != "":
return res
return ValueError("Not all parameters are set!")
def get_parameter_or_empty_string(request, paramname):
return request.POST.get(paramname, "")
def extract_range(request, value1, value2):
start = get_parameter_or_empty_string(request, value1)
end = get_parameter_or_empty_string(request, value2)
if start == "":
start = end
if end == "":
end = start
return start + ";" + end
def build_additional_information_from_request(request, type_):
try:
if type_ == "amionauptakerate":
data = extract_range(request, "amionauptakerateStart", "amionauptakerateEnd")
return AmmoniaUptakeRateParser.from_string(data)
elif type_ == "biomass":
start = get_parameter_or_empty_string(request, "biomassStart")
start = "-1" if start == "" else start
end = get_parameter_or_empty_string(request, "biomassEnd")
end = "-1" if end == "" else end
data = f"{start};{end}"
return BiomassParser.from_string(data)
elif type_ == "bulkdens":
data = get_parameter(request, "bulkdensity")
return BulkDensityParser.from_string(data)
elif type_ == "cec":
data = get_parameter(request, "cecdata")
return CECParser.from_string(data)
elif type_ == "humidity":
# humidity can be just a double or a double - condition pair
exp_humid = get_parameter(request, "expHumid")
try:
hum_conditions = get_parameter(request, "humConditions")
data = f"{exp_humid} - {hum_conditions}"
except ValueError:
data = exp_humid
return HumidityParser.from_string(data)
elif type_ == "omcontent":
value_om = get_parameter_or_empty_string(request, "omcontentInOM")
value_oc = get_parameter_or_empty_string(request, "omcontentINOC")
data = f"{value_om};OM;{value_oc};OC"
return OMContentParser.from_string(data)
elif type_ == "organiccontent":
oc_content = extract_range(request, "OC_content_low", "OC_content_high")
om_content = extract_range(request, "OM_content_low", "OM_content_high")
data = f"{oc_content};{om_content}"
return OrganicContentParser.from_string(data)
elif type_ == "organiccarbonwater":
toc_water = extract_range(request, "TOC_low", "TOC_high")
doc_water = extract_range(request, "DOC_low", "DOC_high")
data = f"{toc_water};{doc_water}"
return OrganicCarbonWaterParser.from_string(data)
elif type_ == "redox":
data = get_parameter(request, "redoxType")
return RedoxParser.from_string(data)
elif type_ == "redoxpotential":
value_range_water = extract_range(request, "lowPotentialWater", "highPotentialWater")
value_range_sediment = extract_range(
request, "lowPotentialSediment", "highPotentialSediment"
)
data = f"{value_range_water};{value_range_sediment}"
return RedoxPotentialParser.from_string(data)
elif type_ == "samplelocation":
data = get_parameter(request, "samplelocation")
return SampleLocationParser.from_string(data)
elif type_ == "samplingdepth":
data = extract_range(request, "samplingDepthMin", "samplingDepthMax")
return SamplingDepthParser.from_string(data)
elif type_ == "initialmasssediment":
initial_mass = get_parameter(request, "initial_mass_sediment")
wet_or_dry = get_parameter(request, "wet_or_dry")
data = f"{initial_mass};{wet_or_dry}"
return InitialSedimentMassParser.from_string(data)
elif type_ == "initialvolumewater":
data = get_parameter(request, "initialvolumewater")
return InitialVolumeWaterParser.from_string(data)
elif type_ == "sedimentporosity":
data = get_parameter(request, "sedimentporosity")
raise ValueError("sedimentporosity is not yet implemented")
elif type_ == "columnheight":
height_sediment = get_parameter_or_empty_string(request, "column_height_sediment")
height_water = get_parameter_or_empty_string(request, "column_height_water")
data = f"{height_sediment};{height_water}"
return ColumnHeightParser.from_string(data)
elif type_ == "oxygencontent":
oxygen_content_water = extract_range(
request, "oxygen_content_water_low", "oxygen_content_water_high"
)
oxygen_content_sediment = extract_range(
request, "oxygen_content_sediment_low", "oxygen_content_sediment_high"
)
data = f"{oxygen_content_water};{oxygen_content_sediment}"
return OxygenContentParser.from_string(data)
elif type_ == "biomass_ws":
biomass_water_cells = extract_range(request, "start_water_cells", "end_water_cells")
biomass_sediment_cells = extract_range(
request, "start_sediment_cells", "end_sediment_cells"
)
biomass_sediment_mg = extract_range(request, "start_sediment_mg", "end_sediment_mg")
data = f"{biomass_water_cells};{biomass_sediment_cells};{biomass_sediment_mg}"
return WaterSedimentBiomassParser.from_string(data)
elif type_ == "soilsource":
data = get_parameter(request, "soilsourcedata")
return SoilSourceParser.from_string(data)
elif type_ == "soiltexture1":
data = get_parameter(request, "soilTextureType")
return SoilTexture1Parser.from_string(data)
elif type_ == "soiltexture2":
sand = get_parameter(request, "sand")
silt = get_parameter(request, "silt")
clay = get_parameter(request, "clay")
data = f"{sand};{silt};{clay}"
return SoilTexture2Parser.from_string(data)
elif type_ == "temperature":
data = extract_range(request, "temperatureMin", "temperatureMax")
return TemperatureParser.from_string(data)
elif type_ == "reference":
try:
data = get_parameter(request, "reference")
except ValueError:
data = get_parameter(request, "pmid")
return ReferenceParser.from_string(data)
elif type_ == "sourcescenario":
# return get_parameter(request, "sourcescenario")
raise ValueError("sourcescenario is not yet implemented")
elif type_ == "acidity":
measurement_methods = get_parameter_or_empty_string(request, "acidityType")
# ACIDITY is separated by " - ". so replace ";" by "-"
value_range = extract_range(request, "lowPh", "highPh").replace(";", " - ")
data = f"{value_range};{measurement_methods}"
return AcidityParser.from_string(data)
elif type_ == "acidity_ws":
measurement_methods_ws = get_parameter_or_empty_string(request, "acidityType")
# ACIDITY is separated by " - ". so replace ";" by "-"
value_range_acidity_water = extract_range(
request, "pH_water_low", "pH_water_high"
).replace(";", " - ")
value_range_acidity_sediment = extract_range(
request, "pH_sediment_low", "pH_sediment_high"
).replace(";", " - ")
data = f"{value_range_acidity_water};{value_range_acidity_sediment};{measurement_methods_ws}"
return WaterSedimentAcidityParser.from_string(data)
elif type_ == "waterstoragecapacity":
wst = get_parameter(request, "wst")
wst_condition = get_parameter(request, "wstConditions")
mwst = get_parameter_or_empty_string(request, "maximumWaterstoragecapacity")
data = f"{wst} - {wst_condition} - {mwst}"
return WaterStorageCapacityParser.from_string(data)
elif type_ == "spikecompound":
if (
request.get_parameter("spikeCompSmiles") is not None
and request.get_parameter("spikeCompSmiles") != ""
):
data = get_parameter(request, "spikeCompSmiles")
elif request.get_parameter("smile") is not None:
data = get_parameter(request, "smile")
else:
data = get_parameter(request, "spikeComp")
return SpikeCompoundParser.from_string(data)
elif type_ == "spikeconcentration":
concentration = get_parameter(request, "spikeConcentration")
unit = get_parameter(request, "spikeconcentrationUnit")
data = f"{concentration};{unit}"
return SpikeConcentrationParser.from_string(data)
elif type_ == "soilclassificationsystem":
data = get_parameter(request, "soilclassificationsystem")
return SoilClassificationParser.from_string(data)
elif type_ == "rulelikelihood":
data = get_parameter(request, "ruleLikelihood")
return RuleLikelihoodParser.from_string(data)
elif type_ == "aerationtype":
data = get_parameter(request, "aerationtype")
return AerationTypeParser.from_string(data)
elif type_ == "bioreactor":
bioreactor_type = get_parameter_or_empty_string(request, "bioreactortype")
bioreactor_size = get_parameter_or_empty_string(request, "bioreactorsize")
data = f"{bioreactor_type};{bioreactor_size}"
return BioReactorParser.from_string(data)
elif type_ == "finalcompoundconcentration":
data = get_parameter(request, "finalcompoundconcentration")
return FinalCompoundConcentrationParser.from_string(data)
elif type_ == "inoculumsource":
data = get_parameter(request, "inoculumsource")
return InoculumSourceParser.from_string(data)
elif type_ == "modelpredictionprob":
data = get_parameter(request, "modelpredictionprob")
raise ValueError("modelpredictionprob is not yet implemented")
elif type_ == "modelbayespredictionprob":
data = get_parameter(request, "modelbayespredictionprob")
raise ValueError("modelbayespredictionprob is not yet implemented")
elif type_ == "additionofnutrients":
data = get_parameter(request, "additionofnutrients")
return NutrientsParser.from_string(data)
elif type_ == "originalsludgeamount":
data = get_parameter(request, "originalsludgeamount")
return OriginalSludgeAmountParser.from_string(data)
elif type_ == "addparametersmeasured":
data = get_parameter(request, "addparametersmeasured")
# return AdditionalMeasuredParameterParser.from_string(data)
raise ValueError("addparametersmeasured is not yet implemented")
elif type_ == "sludgeretentiontime":
# TODO check constraints
sludge_type = get_parameter(request, "sludgeretentiontimeType")
time = get_parameter(request, "sludgeretentiontime")
data = f"{sludge_type};{time}"
return SludgeRetentionTimeParser.from_string(data)
elif type_ == "solventforcompoundsolution":
solvent1 = get_parameter(request, "solventforcompoundsolution1")
try:
solvent2 = get_parameter(request, "solventforcompoundsolution2")
proportion = get_parameter(request, "proportion")
except ValueError:
return solvent1
solvent3 = ""
try:
solvent3 = get_parameter(request, "solventforcompoundsolution3")
except ValueError:
return f"{solvent1};{solvent2};{proportion}"
data = f"{solvent1};{solvent2};{solvent3};{proportion}"
return CompoundSolutionSolventParser.from_string(data)
elif type_ == "tpa":
tpa_name = get_parameter(request, "tpaName")
tpa_ident_level = get_parameter(request, "tpaIdentLevel")
tpa_structure = get_parameter(request, "tpaStructure")
data = f"{tpa_name};{tpa_ident_level};{tpa_structure}"
raise ValueError("tpa is not yet implemented")
elif type_ == "tts":
data = extract_range(request, "ttsStart", "ttsEnd")
return TotalSuspendedSolidsParser.from_string(data)
elif type_ == "typeofaddition":
data = get_parameter(request, "typeofaddition")
return AdditionParser.from_string(data)
elif type_ == "volatiletts":
data = extract_range(request, "volatilettsStart", "volatilettsEnd")
return VolatileTotalSuspendedSolidsParser.from_string(data)
elif type_ == "enzyme":
# name = get_parameter(request, "enzymeName")
# ec_number = get_parameter(request, "enzymeECNumber")
raise ValueError("enzyme is not yet implemented")
elif type_ == "proposedintermediate":
prop = get_parameter(request, "proposed")
return ProposedIntermediateParser.from_string(prop)
elif type_ == "confidencelevel":
data = get_parameter(request, "radioconfidence")
return ConfidenceParser.from_string(data)
elif type_ == "minormajor":
data = get_parameter(request, "radiomin")
return TransformationProductImportanceParser.from_string(data)
elif type_ == "initorganism":
data = get_parameter(request, "organism")
return InitiatingOrganismParser.from_string(data)
elif type_ == "oxygenuptakerate":
data = extract_range(request, "oxygenuptakerateStart", "oxygenuptakerateEnd")
return OxygenUptakeRateParser.from_string(data)
elif type_ == "sourceofliquidmatrix":
data = get_parameter(request, "sourceofliquidmatrix")
return LiquidMatrixSourceParser.from_string(data)
elif type_ == "halflife":
lower = get_parameter(request, "lower")
upper = get_parameter(request, "upper")
i = Interval(start=float(lower), end=float(upper))
comment = get_parameter_or_empty_string(request, "comment")
source = get_parameter_or_empty_string(request, "source")
first_order = get_parameter_or_empty_string(request, "firstOrder")
model = get_parameter_or_empty_string(request, "model")
fit = get_parameter_or_empty_string(request, "fit")
if first_order != "":
if model != "":
raise ValueError("not both, model and firstOrder can be set!")
if first_order == "true":
model = "SFO"
else:
logger.info("firstOrder is set to false which is not meaningful")
return HalfLife(model=model, fit=fit, comment=comment, dt50=i, source=source)
elif type_ == "halflife_ws": # Halflife in water-sediment systems
hl_ws_total = extract_range(request, "total_low", "total_high")
# When no value is given for water and/or sediment DT50,
# extract_range results in two values separated by ";"
# and has to be replaced by empty string
hl_ws_water = extract_range(request, "water_low", "water_high")
if hl_ws_water == ";":
hl_ws_water = ""
else:
hl_ws_water = hl_ws_water.replace(";", " - ")
hl_ws_sediment = extract_range(request, "sediment_low", "sediment_high")
if hl_ws_sediment == ";":
hl_ws_sediment = ""
else:
hl_ws_sediment = hl_ws_sediment.replace(";", " - ")
comment_ws = get_parameter_or_empty_string(request, "comment_ws")
source_ws = get_parameter_or_empty_string(request, "source_ws")
model_ws = get_parameter_or_empty_string(request, "model_ws")
fit_ws = get_parameter_or_empty_string(request, "fit_ws")
dt50_total = IntervalParser.from_string(hl_ws_total)
dt50_sediment = IntervalParser.from_string(hl_ws_sediment)
dt50_water = IntervalParser.from_string(hl_ws_water)
return HalfLifeWS(
model=model_ws,
fit=fit_ws,
comment=comment_ws,
dt50_total=dt50_total,
dt50_water=dt50_water,
dt50_sediment=dt50_sediment,
source=source_ws,
)
elif type_ == "kineticevaluation":
# kinetic_dt50_lower = get_parameter(request, "kineticDt50Lower")
# kinetic_dt50_upper = get_parameter(request, "kineticDt50Upper")
#
# interval = Interval(start=float(kinetic_dt50_lower), end=float(kinetic_dt50_upper))
#
# dt50_string = f"{interval.start} - {interval.end}"
# kinetic_normalized_dt50 = get_parameter_or_empty_string(
# request, "kineticNormalizedDt50"
# )
# kinetic_chi2err = get_parameter_or_empty_string(request, "kineticChi2err")
# kinetic_t_test = get_parameter_or_empty_string(request, "kineticTTest")
# kinetic_swarc = get_parameter_or_empty_string(request, "kineticSwarc")
# kinetic_visual_fit = get_parameter_or_empty_string(request, "kineticVisualFit")
# kinetic_comment = get_parameter_or_empty_string(request, "kinetiCcomment")
# kinetic_source = get_parameter_or_empty_string(request, "kineticSource")
# kinetic_model = get_parameter_or_empty_string(request, "kineticModel")
# kinetic_k1 = get_parameter_or_empty_string(request, "kineticK1")
# kinetic_k2 = get_parameter_or_empty_string(request, "kineticK2")
# kinetic_g = get_parameter_or_empty_string(request, "kineticG")
# kinetic_tb = get_parameter_or_empty_string(request, "kineticTb")
# kinetic_alpha = get_parameter_or_empty_string(request, "kineticAlpha")
# kinetic_beta = get_parameter_or_empty_string(request, "kineticBeta")
raise ValueError("kinetic evaluation is not yet implemented")
elif type_ == "referringscenario":
data = get_parameter(request, "referringscenario")
return ValueError("referringscenario is not yet implemented")
elif type_ == "keggreference":
# kegg_compound = get_parameter(request, "keggCompound")
# kegg_reaction = get_parameter(request, "keggReaction")
# kegg_r_pair = get_parameter(request, "keggRPair")
# kegg_r_class = get_parameter(request, "keggRClass")
# kegg_metabolic_pathway = get_parameter(request, "keggMetabolicPathway")
raise ValueError("kegg reference is not yet implemented")
elif type_ == "totalorganiccarbon":
data = extract_range(request, "totalorganiccarbonStart", "totalorganiccarbonEnd")
return TotalOrganicCarbonParser.from_string(data)
elif type_ == "dissolvedorganiccarbon":
data = extract_range(
request, "dissolvedorganiccarbonStart", "dissolvedorganiccarbonEnd"
)
return DissolvedOrganicCarbonParser.from_string(data)
elif type_ == "purposeofwwtp":
data = get_parameter(request, "purposeofwwtp")
return WWTPPurposeParser.from_string(data)
elif type_ == "biologicaltreatmenttechnology":
data = get_parameter(request, "biologicaltreatmenttechnology")
return BiologicalTreatmentTechnologyParser.from_string(data)
elif type_ == "typeofaeration":
data = get_parameter(request, "typeofaeration")
return AerationTypeParser.from_string(data)
elif type_ == "pfasmanufacturingcategory":
data = get_parameter(request, "pfasmanufacturingcategory")
return PFASManufacturingCategoryParser.from_string(data)
elif type_ == "phosphoruscontent":
data = extract_influent_effluent(
request, "phosphoruscontentInfluent", "phosphoruscontentEffluent"
)
return PhosphorusContentParser.from_string(data)
elif type_ == "oxygendemand":
ox_dem_type = get_parameter(request, "oxygendemandType")
ox_inf_eff = extract_influent_effluent(
request, "oxygendemandInfluent", "oxygendemandEffluent"
)
data = f"{ox_dem_type};{ox_inf_eff}"
return OxygenDemandParser.from_string(data)
elif type_ == "nitrogencontent":
nitrogen_type = get_parameter(request, "nitrogencontentType")
nit_inf_enf = extract_influent_effluent(
request, "nitrogencontentInfluent", "nitrogencontentEffluent"
)
data = f"{nitrogen_type};{nit_inf_enf}"
return NitrogenContentParser.from_string(data)
elif type_ == "location":
data = get_parameter(request, "location")
return LocationParser.from_string(data)
elif type_ == "Dissolvedoxygenconcentration":
data = extract_range(
request, "DissolvedoxygenconcentrationLow", "DissolvedoxygenconcentrationHigh"
)
return DissolvedOxygenConcentrationParser.from_string(data)
elif type_ == "lagphase":
data = get_parameter(request, "lagphase")
raise ValueError("lagphase is not yet implemented")
elif type_ == "rateconstant":
# Order and value has to be set
value = extract_range(request, "rateconstantlower", "rateconstantupper")
if value == ";":
raise ValueError("Rate constant value has to be set.")
order = get_parameter(request, "rateconstantorder")
try:
corrected = get_parameter(request, "rateconstantcorrected")
except ValueError:
corrected = ""
try:
rate_cons_comment = get_parameter(request, "rateconstantcomment")
except ValueError:
rate_cons_comment = "no comment"
# Escape ;
rate_cons_comment = rate_cons_comment.replace(";", "\\;")
data = f"{order};{corrected};{value};{rate_cons_comment}"
return RateConstantParser.from_string(data)
elif type_ == "compoundlabel":
compound_label = get_parameter(request, "compoundlabel")
if compound_label is None or compound_label == "":
raise ValueError("compoundlabel parameter not transmitted or empty")
raise ValueError("compoundlabel is not yet implemented")
elif type_ == "observation":
observation_type = get_parameter_or_empty_string(request, "observationType")
min_occ = get_parameter_or_empty_string(request, "minOcc")
max_occ = get_parameter_or_empty_string(request, "maxOcc")
res = f"{observation_type};"
res += "NA" if min_occ == "" else min_occ
res += ";"
res += "NA" if max_occ == "" else max_occ
raise ValueError("observation is not yet implemented")
elif type_ == "studymoisture":
# moisture = get_parameter(request, "moisture")
raise ValueError("moisture is not yet implemented")
elif type_ == "studywst":
# study_wst_cond = get_parameter(request, "studywstcond")
raise ValueError("studywstcond is not yet implemented")
else:
raise ValueError(f"No corresponding AdditionalInformation for {type_} found!")
except Exception as e:
raise ValueError(
f"cannot build data string for addinf {type_}: {type(e).__name__} - {str(e)}", e
)