forked from enviPath/enviPy
[Feature] Scenario Creation (#78)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch> Reviewed-on: enviPath/enviPy#78
This commit is contained in:
168
utilities/misc.py
Normal file
168
utilities/misc.py
Normal file
@ -0,0 +1,168 @@
|
||||
import html
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from enum import Enum
|
||||
from types import NoneType
|
||||
from typing import Dict, List, Any
|
||||
|
||||
from envipy_additional_information import Interval, EnviPyModel
|
||||
from envipy_additional_information import NAME_MAPPING
|
||||
from pydantic import BaseModel, HttpUrl
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HTMLGenerator:
|
||||
registry = {x.__name__: x for x in NAME_MAPPING.values()}
|
||||
|
||||
@staticmethod
|
||||
def generate_html(additional_information: 'EnviPyModel', prefix='') -> str:
|
||||
from typing import get_origin, get_args, Union
|
||||
|
||||
if isinstance(additional_information, type):
|
||||
clz_name = additional_information.__name__
|
||||
else:
|
||||
clz_name = additional_information.__class__.__name__
|
||||
|
||||
widget = f'<h4>{clz_name}</h4>'
|
||||
|
||||
if hasattr(additional_information, 'uuid'):
|
||||
uuid = additional_information.uuid
|
||||
widget += f'<input type="hidden" name="{clz_name}__{prefix}__uuid" value="{uuid}">'
|
||||
|
||||
for name, field in additional_information.model_fields.items():
|
||||
value = getattr(additional_information, name, None)
|
||||
full_name = f"{clz_name}__{prefix}__{name}"
|
||||
annotation = field.annotation
|
||||
base_type = get_origin(annotation) or annotation
|
||||
|
||||
# Optional[Interval[float]] alias for Union[X, None]
|
||||
if base_type is Union:
|
||||
for arg in get_args(annotation):
|
||||
if arg is not NoneType:
|
||||
field_type = arg
|
||||
break
|
||||
else:
|
||||
field_type = base_type
|
||||
|
||||
is_interval_float = (
|
||||
field_type == Interval[float] or
|
||||
str(field_type) == str(Interval[float]) or
|
||||
'Interval[float]' in str(field_type)
|
||||
)
|
||||
|
||||
if is_interval_float:
|
||||
widget += f"""
|
||||
<div class="form-group row">
|
||||
<div class="col-md-6">
|
||||
<label for="{full_name}__start">{' '.join([x.capitalize() for x in name.split('_')])} Start</label>
|
||||
<input type="number" class="form-control" id="{full_name}__start" name="{full_name}__start" value="{value.start if value else ''}">
|
||||
</div>
|
||||
<div class="col-md-6">
|
||||
<label for="{full_name}__end">{' '.join([x.capitalize() for x in name.split('_')])} End</label>
|
||||
<input type="number" class="form-control" id="{full_name}__end" name="{full_name}__end" value="{value.end if value else ''}">
|
||||
</div>
|
||||
</div>
|
||||
"""
|
||||
elif issubclass(field_type, Enum):
|
||||
options: str = ''
|
||||
for e in field_type:
|
||||
options += f'<option value="{e.value}" {"selected" if e == value else ""}>{html.escape(e.name)}</option>'
|
||||
|
||||
widget += f"""
|
||||
<div class="form-group">
|
||||
<label for="{full_name}">{' '.join([x.capitalize() for x in name.split('_')])}</label>
|
||||
<select class="form-control" id="{full_name}" name="{full_name}">
|
||||
<option value="" disabled selected>Select {' '.join([x.capitalize() for x in name.split('_')])}</option>
|
||||
{options}
|
||||
</select>
|
||||
</div>
|
||||
"""
|
||||
else:
|
||||
if field_type == str or field_type == HttpUrl:
|
||||
input_type = 'text'
|
||||
elif field_type == float or field_type == int:
|
||||
input_type = 'number'
|
||||
elif field_type == bool:
|
||||
input_type = 'checkbox'
|
||||
else:
|
||||
raise ValueError(f"Could not parse field type {field_type} for {name}")
|
||||
|
||||
value_to_use = value if value and field_type != bool else ''
|
||||
|
||||
widget += f"""
|
||||
<div class="form-group">
|
||||
<label for="{full_name}">{' '.join([x.capitalize() for x in name.split('_')])}</label>
|
||||
<input type="{input_type}" class="form-control" id="{full_name}" name="{full_name}" value="{value_to_use}" {"checked" if value and field_type == bool else ""}>
|
||||
</div>
|
||||
"""
|
||||
|
||||
return widget + "<hr>"
|
||||
|
||||
@staticmethod
|
||||
def build_models(params) -> Dict[str, List['EnviPyModel']]:
|
||||
|
||||
def has_non_none(d):
|
||||
"""
|
||||
Recursively checks if any value in a (possibly nested) dict is not None.
|
||||
"""
|
||||
for value in d.values():
|
||||
if isinstance(value, dict):
|
||||
if has_non_none(value): # recursive check
|
||||
return True
|
||||
elif value is not None:
|
||||
return True
|
||||
return False
|
||||
|
||||
"""
|
||||
Build Pydantic model instances from flattened HTML parameters.
|
||||
|
||||
Args:
|
||||
params: dict of {param_name: value}, e.g. form data
|
||||
model_registry: mapping of class names (strings) to Pydantic model classes
|
||||
|
||||
Returns:
|
||||
dict: {ClassName: [list of model instances]}
|
||||
"""
|
||||
grouped: Dict[str, Dict[str, Dict[str, Any]]] = {}
|
||||
|
||||
# Step 1: group fields by ClassName and Number
|
||||
for key, value in params.items():
|
||||
if value == '':
|
||||
value = None
|
||||
|
||||
parts = key.split("__")
|
||||
if len(parts) < 3:
|
||||
continue # skip invalid keys
|
||||
|
||||
class_name, number, *field_parts = parts
|
||||
grouped.setdefault(class_name, {}).setdefault(number, {})
|
||||
|
||||
# handle nested fields like interval__start
|
||||
target = grouped[class_name][number]
|
||||
current = target
|
||||
for p in field_parts[:-1]:
|
||||
current = current.setdefault(p, {})
|
||||
current[field_parts[-1]] = value
|
||||
|
||||
# Step 2: instantiate Pydantic models
|
||||
instances: Dict[str, List[BaseModel]] = defaultdict(list)
|
||||
for class_name, number_dict in grouped.items():
|
||||
model_cls = HTMLGenerator.registry.get(class_name)
|
||||
|
||||
if not model_cls:
|
||||
logger.info(f"Could not find model class for {class_name}")
|
||||
continue
|
||||
|
||||
for number, fields in number_dict.items():
|
||||
if not has_non_none(fields):
|
||||
print(f"Skipping empty {class_name} {number} {fields}")
|
||||
continue
|
||||
|
||||
uuid = fields.pop('uuid', None)
|
||||
instance = model_cls(**fields)
|
||||
if uuid:
|
||||
instance.__dict__['uuid'] = uuid
|
||||
instances[class_name].append(instance)
|
||||
|
||||
return instances
|
||||
Reference in New Issue
Block a user