forked from enviPath/enviPy
Adds a way to input/display timeseries data to the additional information Reviewed-on: enviPath/enviPy#313 Reviewed-by: jebus <lorsbach@envipath.com> Co-authored-by: Tobias O <tobias.olenyi@envipath.com> Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
175 lines
5.5 KiB
Python
175 lines
5.5 KiB
Python
from ninja import Router, Body
|
|
from ninja.errors import HttpError
|
|
from uuid import UUID
|
|
from pydantic import ValidationError
|
|
from typing import Dict, Any
|
|
import logging
|
|
|
|
from envipy_additional_information import registry
|
|
from envipy_additional_information.groups import GroupEnum
|
|
from epapi.utils.schema_transformers import build_rjsf_output
|
|
from epapi.utils.validation_errors import handle_validation_error
|
|
from ..dal import get_scenario_for_read, get_scenario_for_write
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = Router(tags=["Additional Information"])
|
|
|
|
|
|
@router.get("/information/schema/")
|
|
def list_all_schemas(request):
|
|
"""Return all schemas in RJSF format with lowercase class names as keys."""
|
|
result = {}
|
|
for name, cls in registry.list_models().items():
|
|
try:
|
|
result[name] = build_rjsf_output(cls)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to generate schema for {name}: {e}")
|
|
continue
|
|
return result
|
|
|
|
|
|
@router.get("/information/schema/{model_name}/")
|
|
def get_model_schema(request, model_name: str):
|
|
"""Return RJSF schema for specific model."""
|
|
cls = registry.get_model(model_name.lower())
|
|
if not cls:
|
|
raise HttpError(404, f"Unknown model: {model_name}")
|
|
return build_rjsf_output(cls)
|
|
|
|
|
|
@router.get("/scenario/{uuid:scenario_uuid}/information/")
|
|
def list_scenario_info(request, scenario_uuid: UUID):
|
|
"""List all additional information for a scenario"""
|
|
scenario = get_scenario_for_read(request.user, scenario_uuid)
|
|
|
|
result = []
|
|
for ai in scenario.get_additional_information():
|
|
result.append(
|
|
{
|
|
"type": ai.__class__.__name__,
|
|
"uuid": getattr(ai, "uuid", None),
|
|
"data": ai.model_dump(mode="json"),
|
|
}
|
|
)
|
|
return result
|
|
|
|
|
|
@router.post("/scenario/{uuid:scenario_uuid}/information/{model_name}/")
|
|
def add_scenario_info(
|
|
request, scenario_uuid: UUID, model_name: str, payload: Dict[str, Any] = Body(...)
|
|
):
|
|
"""Add new additional information to scenario"""
|
|
cls = registry.get_model(model_name.lower())
|
|
if not cls:
|
|
raise HttpError(404, f"Unknown model: {model_name}")
|
|
|
|
try:
|
|
instance = cls(**payload) # Pydantic validates
|
|
except ValidationError as e:
|
|
handle_validation_error(e)
|
|
|
|
scenario = get_scenario_for_write(request.user, scenario_uuid)
|
|
|
|
# Model method now returns the UUID
|
|
created_uuid = scenario.add_additional_information(instance)
|
|
|
|
return {"status": "created", "uuid": created_uuid}
|
|
|
|
|
|
@router.patch("/scenario/{uuid:scenario_uuid}/information/item/{uuid:ai_uuid}/")
|
|
def update_scenario_info(
|
|
request, scenario_uuid: UUID, ai_uuid: UUID, payload: Dict[str, Any] = Body(...)
|
|
):
|
|
"""Update existing additional information for a scenario"""
|
|
scenario = get_scenario_for_write(request.user, scenario_uuid)
|
|
ai_uuid_str = str(ai_uuid)
|
|
|
|
# Find item to determine type for validation
|
|
found_type = None
|
|
for type_name, items in scenario.additional_information.items():
|
|
if any(item.get("uuid") == ai_uuid_str for item in items):
|
|
found_type = type_name
|
|
break
|
|
|
|
if found_type is None:
|
|
raise HttpError(404, f"Additional information not found: {ai_uuid}")
|
|
|
|
# Get the model class for validation
|
|
cls = registry.get_model(found_type.lower())
|
|
if not cls:
|
|
raise HttpError(500, f"Unknown model type in data: {found_type}")
|
|
|
|
# Validate the payload against the model
|
|
try:
|
|
instance = cls(**payload)
|
|
except ValidationError as e:
|
|
handle_validation_error(e)
|
|
|
|
# Use model method for update
|
|
try:
|
|
scenario.update_additional_information(ai_uuid_str, instance)
|
|
except ValueError as e:
|
|
raise HttpError(404, str(e))
|
|
|
|
return {"status": "updated", "uuid": ai_uuid_str}
|
|
|
|
|
|
@router.delete("/scenario/{uuid:scenario_uuid}/information/item/{uuid:ai_uuid}/")
|
|
def delete_scenario_info(request, scenario_uuid: UUID, ai_uuid: UUID):
|
|
"""Delete additional information from scenario"""
|
|
scenario = get_scenario_for_write(request.user, scenario_uuid)
|
|
|
|
try:
|
|
scenario.remove_additional_information(str(ai_uuid))
|
|
except ValueError as e:
|
|
raise HttpError(404, str(e))
|
|
|
|
return {"status": "deleted"}
|
|
|
|
|
|
@router.get("/information/groups/")
|
|
def list_groups(request):
|
|
"""Return list of available group names."""
|
|
return {"groups": GroupEnum.values()}
|
|
|
|
|
|
@router.get("/information/groups/{group_name}/")
|
|
def get_group_models(request, group_name: str):
|
|
"""
|
|
Return models for a specific group organized by subcategory.
|
|
|
|
Args:
|
|
group_name: One of "sludge", "soil", or "sediment" (string)
|
|
|
|
Returns:
|
|
Dictionary with subcategories (exp, spike, comp, misc, or group name)
|
|
as keys and lists of model info as values
|
|
"""
|
|
# Convert string to enum (raises ValueError if invalid)
|
|
try:
|
|
group_enum = GroupEnum(group_name)
|
|
except ValueError:
|
|
valid = ", ".join(GroupEnum.values())
|
|
raise HttpError(400, f"Invalid group '{group_name}'. Valid: {valid}")
|
|
|
|
try:
|
|
group_data = registry.collect_group(group_enum)
|
|
except (ValueError, TypeError) as e:
|
|
raise HttpError(400, str(e))
|
|
|
|
result = {}
|
|
for subcategory, models in group_data.items():
|
|
result[subcategory] = [
|
|
{
|
|
"name": cls.__name__.lower(),
|
|
"class": cls.__name__,
|
|
"title": getattr(cls.UI, "title", cls.__name__)
|
|
if hasattr(cls, "UI")
|
|
else cls.__name__,
|
|
}
|
|
for cls in models
|
|
]
|
|
|
|
return result
|