forked from enviPath/enviPy
This implements a version of #274, relying on Pydantics built in JSON schema and JSON rendering. Requires additional UI tagging in the ai model repo but will remove HTML tags. Example scenario with filled information: 5882df9c-dae1-4d80-a40e-db4724271456/scenario/3a4d395a-6a6d-4154-8ce3-ced667fceec0 Reviewed-on: enviPath/enviPy#282 Co-authored-by: Tobias O <tobias.olenyi@envipath.com> Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
238 lines
8.0 KiB
Python
238 lines
8.0 KiB
Python
from ninja import Router, Body
|
|
from ninja.errors import HttpError
|
|
from uuid import UUID
|
|
from pydantic import ValidationError
|
|
from pydantic_core import ErrorDetails
|
|
from typing import Dict, Any
|
|
import logging
|
|
import json
|
|
|
|
from envipy_additional_information import registry
|
|
from envipy_additional_information.groups import GroupEnum
|
|
from epapi.utils.schema_transformers import build_rjsf_output
|
|
from ..dal import get_scenario_for_read, get_scenario_for_write
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = Router(tags=["Additional Information"])
|
|
|
|
|
|
@router.get("/information/schema/")
|
|
def list_all_schemas(request):
|
|
"""Return all schemas in RJSF format with lowercase class names as keys."""
|
|
result = {}
|
|
for name, cls in registry.list_models().items():
|
|
try:
|
|
result[name] = build_rjsf_output(cls)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to generate schema for {name}: {e}")
|
|
continue
|
|
return result
|
|
|
|
|
|
@router.get("/information/schema/{model_name}/")
|
|
def get_model_schema(request, model_name: str):
|
|
"""Return RJSF schema for specific model."""
|
|
cls = registry.get_model(model_name.lower())
|
|
if not cls:
|
|
raise HttpError(404, f"Unknown model: {model_name}")
|
|
return build_rjsf_output(cls)
|
|
|
|
|
|
@router.get("/scenario/{uuid:scenario_uuid}/information/")
|
|
def list_scenario_info(request, scenario_uuid: UUID):
|
|
"""List all additional information for a scenario"""
|
|
scenario = get_scenario_for_read(request.user, scenario_uuid)
|
|
|
|
result = []
|
|
for ai in scenario.get_additional_information():
|
|
result.append(
|
|
{
|
|
"type": ai.__class__.__name__,
|
|
"uuid": getattr(ai, "uuid", None),
|
|
"data": ai.model_dump(mode="json"),
|
|
}
|
|
)
|
|
return result
|
|
|
|
|
|
def _format_validation_error(error: ErrorDetails) -> str:
|
|
"""Format a Pydantic validation error into a user-friendly message."""
|
|
msg = error["msg"] or "Invalid value"
|
|
error_type = error["type"] or ""
|
|
|
|
# Handle common validation types with friendly messages
|
|
if error_type == "enum":
|
|
expected = error["ctx"]["expected"] if error["ctx"] else ""
|
|
return f"Please select a valid option{': ' + expected if expected else ''}"
|
|
elif error_type == "missing":
|
|
return "This field is required"
|
|
elif error_type in ("string_type", "int_type", "float_type"):
|
|
type_name = error_type.replace("_type", "")
|
|
return f"Please enter a valid {type_name}"
|
|
elif error_type == "value_error":
|
|
# Use the message as-is for value errors
|
|
return msg
|
|
else:
|
|
# Default: use the message from Pydantic but clean it up
|
|
return msg.replace("Input should be ", "Please enter ").replace("Value error, ", "")
|
|
|
|
|
|
@router.post("/scenario/{uuid:scenario_uuid}/information/{model_name}/")
|
|
def add_scenario_info(
|
|
request, scenario_uuid: UUID, model_name: str, payload: Dict[str, Any] = Body(...)
|
|
):
|
|
"""Add new additional information to scenario"""
|
|
cls = registry.get_model(model_name.lower())
|
|
if not cls:
|
|
raise HttpError(404, f"Unknown model: {model_name}")
|
|
|
|
try:
|
|
instance = cls(**payload) # Pydantic validates
|
|
except ValidationError as e:
|
|
# Transform Pydantic validation errors into user-friendly format
|
|
field_errors: dict[str, list[str]] = {}
|
|
for error in e.errors():
|
|
# Get the field name from location tuple
|
|
loc = error.get("loc", ())
|
|
field = str(loc[-1]) if loc else "root"
|
|
|
|
# Format the error message
|
|
friendly_msg = _format_validation_error(error)
|
|
|
|
if field not in field_errors:
|
|
field_errors[field] = []
|
|
field_errors[field].append(friendly_msg)
|
|
|
|
# Return structured error for frontend parsing
|
|
error_response = {
|
|
"type": "validation_error",
|
|
"field_errors": field_errors,
|
|
"message": "Please correct the errors below",
|
|
}
|
|
raise HttpError(400, json.dumps(error_response))
|
|
|
|
scenario = get_scenario_for_write(request.user, scenario_uuid)
|
|
|
|
# Model method now returns the UUID
|
|
created_uuid = scenario.add_additional_information(instance)
|
|
|
|
return {"status": "created", "uuid": created_uuid}
|
|
|
|
|
|
@router.patch("/scenario/{uuid:scenario_uuid}/information/item/{uuid:ai_uuid}/")
|
|
def update_scenario_info(
|
|
request, scenario_uuid: UUID, ai_uuid: UUID, payload: Dict[str, Any] = Body(...)
|
|
):
|
|
"""Update existing additional information for a scenario"""
|
|
scenario = get_scenario_for_write(request.user, scenario_uuid)
|
|
ai_uuid_str = str(ai_uuid)
|
|
|
|
# Find item to determine type for validation
|
|
found_type = None
|
|
for type_name, items in scenario.additional_information.items():
|
|
if any(item.get("uuid") == ai_uuid_str for item in items):
|
|
found_type = type_name
|
|
break
|
|
|
|
if found_type is None:
|
|
raise HttpError(404, f"Additional information not found: {ai_uuid}")
|
|
|
|
# Get the model class for validation
|
|
cls = registry.get_model(found_type.lower())
|
|
if not cls:
|
|
raise HttpError(500, f"Unknown model type in data: {found_type}")
|
|
|
|
# Validate the payload against the model
|
|
try:
|
|
instance = cls(**payload)
|
|
except ValidationError as e:
|
|
# Transform Pydantic validation errors into user-friendly format
|
|
field_errors: dict[str, list[str]] = {}
|
|
for error in e.errors():
|
|
# Get the field name from location tuple
|
|
loc = error.get("loc", ())
|
|
field = str(loc[-1]) if loc else "root"
|
|
|
|
# Format the error message
|
|
friendly_msg = _format_validation_error(error)
|
|
|
|
if field not in field_errors:
|
|
field_errors[field] = []
|
|
field_errors[field].append(friendly_msg)
|
|
|
|
# Return structured error for frontend parsing
|
|
error_response = {
|
|
"type": "validation_error",
|
|
"field_errors": field_errors,
|
|
"message": "Please correct the errors below",
|
|
}
|
|
raise HttpError(400, json.dumps(error_response))
|
|
|
|
# Use model method for update
|
|
try:
|
|
scenario.update_additional_information(ai_uuid_str, instance)
|
|
except ValueError as e:
|
|
raise HttpError(404, str(e))
|
|
|
|
return {"status": "updated", "uuid": ai_uuid_str}
|
|
|
|
|
|
@router.delete("/scenario/{uuid:scenario_uuid}/information/item/{uuid:ai_uuid}/")
|
|
def delete_scenario_info(request, scenario_uuid: UUID, ai_uuid: UUID):
|
|
"""Delete additional information from scenario"""
|
|
scenario = get_scenario_for_write(request.user, scenario_uuid)
|
|
|
|
try:
|
|
scenario.remove_additional_information(str(ai_uuid))
|
|
except ValueError as e:
|
|
raise HttpError(404, str(e))
|
|
|
|
return {"status": "deleted"}
|
|
|
|
|
|
@router.get("/information/groups/")
|
|
def list_groups(request):
|
|
"""Return list of available group names."""
|
|
return {"groups": GroupEnum.values()}
|
|
|
|
|
|
@router.get("/information/groups/{group_name}/")
|
|
def get_group_models(request, group_name: str):
|
|
"""
|
|
Return models for a specific group organized by subcategory.
|
|
|
|
Args:
|
|
group_name: One of "sludge", "soil", or "sediment" (string)
|
|
|
|
Returns:
|
|
Dictionary with subcategories (exp, spike, comp, misc, or group name)
|
|
as keys and lists of model info as values
|
|
"""
|
|
# Convert string to enum (raises ValueError if invalid)
|
|
try:
|
|
group_enum = GroupEnum(group_name)
|
|
except ValueError:
|
|
valid = ", ".join(GroupEnum.values())
|
|
raise HttpError(400, f"Invalid group '{group_name}'. Valid: {valid}")
|
|
|
|
try:
|
|
group_data = registry.collect_group(group_enum)
|
|
except (ValueError, TypeError) as e:
|
|
raise HttpError(400, str(e))
|
|
|
|
result = {}
|
|
for subcategory, models in group_data.items():
|
|
result[subcategory] = [
|
|
{
|
|
"name": cls.__name__.lower(),
|
|
"class": cls.__name__,
|
|
"title": getattr(cls.UI, "title", cls.__name__)
|
|
if hasattr(cls, "UI")
|
|
else cls.__name__,
|
|
}
|
|
for cls in models
|
|
]
|
|
|
|
return result
|