forked from enviPath/enviPy
This is an initial implementation that creates a working minimal .i6z document. It passes schema validation and can be imported into IUCLID. Caveat: IUCLID files target individual compounds. Pathway is not actually covered by the format. It can be added in either soil or water and soil OECD endpoints. **I currently only implemented the soil endpoint for all data.** This sort of works, and I can report all degradation products in a pathway (not a nice view, but we can report many transformation products and add a diagram attachment in the future). Adding additional information is an absolute pain, as we need to explicitly map each type of information to the relevant OECD field. I use the XSD scheme for validation, but unfortunately the IUCLID parser is not fully compliant and requires a specific order, etc. The workflow is: finding the AI structure from the XSD scheme -> make the scheme validation pass -> upload to IUCLID to get obscure error messages -> guess what could be wrong -> repeat 💣 New specifications get released once per year, so we will have to update accordingly. I believe that this should be a more expensive feature, as it requires significant effort to uphold. Currently implemented for root compound only in SOIL: - Soil Texture 2 - Soil Texture 1 - pH value - Half-life per soil sample / scenario (mapped to disappearance; not sure about that). - CEC - Organic Matter (only Carbon) - Moisture content - Humidity <img width="2123" alt="image.png" src="attachments/d29830e1-65ef-4136-8939-1825e0959c62"> <img width="2124" alt="image.png" src="attachments/ac9de2ac-bf68-4ba4-b40b-82f810a9de93"> <img width="2139" alt="image.png" src="attachments/5674c7e6-865e-420e-974a-6b825b331e6c"> Reviewed-on: enviPath/enviPy#338 Co-authored-by: Tobias O <tobias.olenyi@envipath.com> Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
142 lines
5.1 KiB
Python
142 lines
5.1 KiB
Python
from uuid import UUID
|
|
|
|
from django.conf import settings as s
|
|
from django.db.models import Model
|
|
from epdb.logic import PackageManager
|
|
from epdb.models import CompoundStructure, User, Compound, Scenario
|
|
|
|
from .errors import EPAPINotFoundError, EPAPIPermissionDeniedError
|
|
|
|
Package = s.GET_PACKAGE_MODEL()
|
|
|
|
|
|
def get_compound_for_read(user, compound_uuid: UUID):
|
|
"""
|
|
Get compound by UUID with permission check.
|
|
"""
|
|
try:
|
|
compound = Compound.objects.get(uuid=compound_uuid)
|
|
package = compound.package
|
|
except Compound.DoesNotExist:
|
|
raise EPAPINotFoundError(f"Compound with UUID {compound_uuid} not found")
|
|
|
|
# FIXME: optimize package manager to exclusively work with UUIDs
|
|
if not user or user.is_anonymous or not PackageManager.readable(user, package):
|
|
raise EPAPIPermissionDeniedError("Insufficient permissions to access this compound.")
|
|
|
|
return compound
|
|
|
|
|
|
def get_package_for_read(user, package_uuid: UUID):
|
|
"""
|
|
Get package by UUID with permission check.
|
|
"""
|
|
|
|
# FIXME: update package manager with custom exceptions to avoid manual checks here
|
|
try:
|
|
package = Package.objects.get(uuid=package_uuid)
|
|
except Package.DoesNotExist:
|
|
raise EPAPINotFoundError(f"Package with UUID {package_uuid} not found")
|
|
|
|
# FIXME: optimize package manager to exclusively work with UUIDs
|
|
if not user or user.is_anonymous or not PackageManager.readable(user, package):
|
|
raise EPAPIPermissionDeniedError("Insufficient permissions to access this package.")
|
|
|
|
return package
|
|
|
|
|
|
def get_package_for_write(user, package_uuid: UUID):
|
|
"""
|
|
Get package by UUID with permission check.
|
|
"""
|
|
|
|
# FIXME: update package manager with custom exceptions to avoid manual checks here
|
|
try:
|
|
package = Package.objects.get(uuid=package_uuid)
|
|
except Package.DoesNotExist:
|
|
raise EPAPINotFoundError(f"Package with UUID {package_uuid} not found")
|
|
|
|
# FIXME: optimize package manager to exclusively work with UUIDs
|
|
if not user or user.is_anonymous or not PackageManager.writable(user, package):
|
|
raise EPAPIPermissionDeniedError("Insufficient permissions to access this package.")
|
|
|
|
return package
|
|
|
|
|
|
def get_scenario_for_read(user, scenario_uuid: UUID):
|
|
"""Get scenario by UUID with read permission check."""
|
|
try:
|
|
scenario = Scenario.objects.select_related("package").get(uuid=scenario_uuid)
|
|
except Scenario.DoesNotExist:
|
|
raise EPAPINotFoundError(f"Scenario with UUID {scenario_uuid} not found")
|
|
|
|
if not user or user.is_anonymous or not PackageManager.readable(user, scenario.package):
|
|
raise EPAPIPermissionDeniedError("Insufficient permissions to access this scenario.")
|
|
|
|
return scenario
|
|
|
|
|
|
def get_scenario_for_write(user, scenario_uuid: UUID):
|
|
"""Get scenario by UUID with write permission check."""
|
|
try:
|
|
scenario = Scenario.objects.select_related("package").get(uuid=scenario_uuid)
|
|
except Scenario.DoesNotExist:
|
|
raise EPAPINotFoundError(f"Scenario with UUID {scenario_uuid} not found")
|
|
|
|
if not user or user.is_anonymous or not PackageManager.writable(user, scenario.package):
|
|
raise EPAPIPermissionDeniedError("Insufficient permissions to modify this scenario.")
|
|
|
|
return scenario
|
|
|
|
|
|
def get_user_packages_for_read(user: User | None):
|
|
"""Get all packages readable by the user."""
|
|
if not user or user.is_anonymous:
|
|
return PackageManager.get_reviewed_packages()
|
|
return PackageManager.get_all_readable_packages(user, include_reviewed=True)
|
|
|
|
|
|
def get_user_entities_for_read(model_class: Model, user: User | None):
|
|
"""Build queryset for reviewed package entities."""
|
|
|
|
if not user or user.is_anonymous:
|
|
return model_class.objects.filter(package__reviewed=True).select_related("package")
|
|
|
|
qs = model_class.objects.filter(
|
|
package__in=PackageManager.get_all_readable_packages(user, include_reviewed=True)
|
|
).select_related("package")
|
|
return qs
|
|
|
|
|
|
def get_package_entities_for_read(model_class: Model, package_uuid: UUID, user: User | None = None):
|
|
"""Build queryset for specific package entities."""
|
|
package = get_package_for_read(user, package_uuid)
|
|
qs = model_class.objects.filter(package=package).select_related("package")
|
|
return qs
|
|
|
|
|
|
def get_user_structure_for_read(user: User | None):
|
|
"""Build queryset for structures accessible to the user (via compound->package)."""
|
|
|
|
if not user or user.is_anonymous:
|
|
return CompoundStructure.objects.filter(compound__package__reviewed=True).select_related(
|
|
"compound__package"
|
|
)
|
|
|
|
qs = CompoundStructure.objects.filter(
|
|
compound__package__in=PackageManager.get_all_readable_packages(user, include_reviewed=True)
|
|
).select_related("compound__package")
|
|
return qs
|
|
|
|
|
|
def get_package_compound_structure_for_read(
|
|
package_uuid: UUID, compound_uuid: UUID, user: User | None = None
|
|
):
|
|
"""Build queryset for specific package compound structures."""
|
|
|
|
get_package_for_read(user, package_uuid)
|
|
compound = get_compound_for_read(user, compound_uuid)
|
|
|
|
qs = CompoundStructure.objects.filter(compound=compound).select_related("compound__package")
|
|
return qs
|