diff --git a/epapi/tests/v1/test_additional_information.py b/epapi/tests/v1/test_additional_information.py new file mode 100644 index 00000000..d8c19b25 --- /dev/null +++ b/epapi/tests/v1/test_additional_information.py @@ -0,0 +1,420 @@ +""" +Tests for Additional Information API endpoints. + +Tests CRUD operations on scenario additional information including the new PATCH endpoint. +""" + +from django.test import TestCase, tag +import json +from uuid import uuid4 + +from epdb.logic import PackageManager, UserManager +from epdb.models import Scenario + + +@tag("api", "additional_information") +class AdditionalInformationAPITests(TestCase): + """Test additional information API endpoints.""" + + @classmethod + def setUpTestData(cls): + """Set up test data: user, package, and scenario.""" + cls.user = UserManager.create_user( + "ai-test-user", + "ai-test@envipath.com", + "SuperSafe", + set_setting=False, + add_to_group=False, + is_active=True, + ) + cls.other_user = UserManager.create_user( + "ai-other-user", + "ai-other@envipath.com", + "SuperSafe", + set_setting=False, + add_to_group=False, + is_active=True, + ) + cls.package = PackageManager.create_package( + cls.user, "AI Test Package", "Test package for additional information" + ) + # Package owned by other_user (no access for cls.user) + cls.other_package = PackageManager.create_package( + cls.other_user, "Other Package", "Package without access" + ) + # Create a scenario for testing + cls.scenario = Scenario.objects.create( + package=cls.package, + name="Test Scenario", + description="Test scenario for additional information tests", + scenario_type="biodegradation", + scenario_date="2024-01-01", + additional_information={}, # Initialize with empty dict + ) + cls.other_scenario = Scenario.objects.create( + package=cls.other_package, + name="Other Scenario", + description="Scenario in package without access", + scenario_type="biodegradation", + scenario_date="2024-01-01", + additional_information={}, + ) + + def test_list_all_schemas(self): + """Test GET /api/v1/information/schema/ returns all schemas.""" + self.client.force_login(self.user) + + response = self.client.get("/api/v1/information/schema/") + + self.assertEqual(response.status_code, 200) + data = response.json() + self.assertIsInstance(data, dict) + # Should have multiple schemas + self.assertGreater(len(data), 0) + # Each schema should have RJSF format + for name, schema in data.items(): + self.assertIn("schema", schema) + self.assertIn("uiSchema", schema) + self.assertIn("formData", schema) + self.assertIn("groups", schema) + + def test_get_specific_schema(self): + """Test GET /api/v1/information/schema/{model_name}/ returns specific schema.""" + self.client.force_login(self.user) + + # Assuming 'temperature' is a valid model + response = self.client.get("/api/v1/information/schema/temperature/") + + self.assertEqual(response.status_code, 200) + data = response.json() + self.assertIn("schema", data) + self.assertIn("uiSchema", data) + + def test_get_nonexistent_schema_returns_404(self): + """Test GET for non-existent schema returns 404.""" + self.client.force_login(self.user) + + response = self.client.get("/api/v1/information/schema/nonexistent/") + + self.assertEqual(response.status_code, 404) + + def test_list_scenario_information_empty(self): + """Test GET /api/v1/scenario/{uuid}/information/ returns empty list initially.""" + self.client.force_login(self.user) + + response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/") + + self.assertEqual(response.status_code, 200) + data = response.json() + self.assertIsInstance(data, list) + self.assertEqual(len(data), 0) + + def test_create_additional_information(self): + """Test POST creates additional information.""" + self.client.force_login(self.user) + + # Create temperature information (assuming temperature model exists) + payload = {"interval": {"start": 20, "end": 25}} + response = self.client.post( + f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/", + data=json.dumps(payload), + content_type="application/json", + ) + + self.assertEqual(response.status_code, 200) + data = response.json() + self.assertEqual(data["status"], "created") + self.assertIn("uuid", data) + self.assertIsNotNone(data["uuid"]) + + def test_create_with_invalid_data_returns_400(self): + """Test POST with invalid data returns 400 with validation errors.""" + self.client.force_login(self.user) + + # Invalid data (missing required fields or wrong types) + payload = {"invalid_field": "value"} + response = self.client.post( + f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/", + data=json.dumps(payload), + content_type="application/json", + ) + + self.assertEqual(response.status_code, 400) + data = response.json() + # Should have validation error details in 'detail' field + self.assertIn("detail", data) + + def test_validation_errors_are_user_friendly(self): + """Test that validation errors are user-friendly and field-specific.""" + self.client.force_login(self.user) + + # Invalid data - wrong type (string instead of number in interval) + payload = {"interval": {"start": "not_a_number", "end": 25}} + response = self.client.post( + f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/", + data=json.dumps(payload), + content_type="application/json", + ) + + self.assertEqual(response.status_code, 400) + data = response.json() + + # Parse the error response - Django Ninja wraps errors in 'detail' + error_str = data.get("detail") or data.get("error") + self.assertIsNotNone(error_str, "Response should contain error details") + + # Parse the JSON error string + error_data = json.loads(error_str) + + # Check structure + self.assertEqual(error_data.get("type"), "validation_error") + self.assertIn("field_errors", error_data) + self.assertIn("message", error_data) + + # Ensure error messages are user-friendly (no Pydantic URLs or technical jargon) + error_str = json.dumps(error_data) + self.assertNotIn("pydantic", error_str.lower()) + self.assertNotIn("https://errors.pydantic.dev", error_str) + self.assertNotIn("loc", error_str) # No technical field like 'loc' + + # Check that error message is helpful + self.assertIn("Please", error_data["message"]) # User-friendly language + + def test_patch_additional_information(self): + """Test PATCH updates existing additional information.""" + self.client.force_login(self.user) + + # First create an item + create_payload = {"interval": {"start": 20, "end": 25}} + create_response = self.client.post( + f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/", + data=json.dumps(create_payload), + content_type="application/json", + ) + item_uuid = create_response.json()["uuid"] + + # Then update it with PATCH + update_payload = {"interval": {"start": 30, "end": 35}} + patch_response = self.client.patch( + f"/api/v1/scenario/{self.scenario.uuid}/information/item/{item_uuid}/", + data=json.dumps(update_payload), + content_type="application/json", + ) + + self.assertEqual(patch_response.status_code, 200) + data = patch_response.json() + self.assertEqual(data["status"], "updated") + self.assertEqual(data["uuid"], item_uuid) # UUID preserved + + # Verify the data was updated + list_response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/") + items = list_response.json() + self.assertEqual(len(items), 1) + updated_item = items[0] + self.assertEqual(updated_item["uuid"], item_uuid) + self.assertEqual(updated_item["data"]["interval"]["start"], 30) + self.assertEqual(updated_item["data"]["interval"]["end"], 35) + + def test_patch_nonexistent_item_returns_404(self): + """Test PATCH on non-existent item returns 404.""" + self.client.force_login(self.user) + + fake_uuid = str(uuid4()) + payload = {"interval": {"start": 30, "end": 35}} + response = self.client.patch( + f"/api/v1/scenario/{self.scenario.uuid}/information/item/{fake_uuid}/", + data=json.dumps(payload), + content_type="application/json", + ) + + self.assertEqual(response.status_code, 404) + + def test_patch_with_invalid_data_returns_400(self): + """Test PATCH with invalid data returns 400.""" + self.client.force_login(self.user) + + # First create an item + create_payload = {"interval": {"start": 20, "end": 25}} + create_response = self.client.post( + f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/", + data=json.dumps(create_payload), + content_type="application/json", + ) + item_uuid = create_response.json()["uuid"] + + # Try to update with invalid data + invalid_payload = {"invalid_field": "value"} + patch_response = self.client.patch( + f"/api/v1/scenario/{self.scenario.uuid}/information/item/{item_uuid}/", + data=json.dumps(invalid_payload), + content_type="application/json", + ) + + self.assertEqual(patch_response.status_code, 400) + + def test_delete_additional_information(self): + """Test DELETE removes additional information.""" + self.client.force_login(self.user) + + # Create an item + create_payload = {"interval": {"start": 20, "end": 25}} + create_response = self.client.post( + f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/", + data=json.dumps(create_payload), + content_type="application/json", + ) + item_uuid = create_response.json()["uuid"] + + # Delete it + delete_response = self.client.delete( + f"/api/v1/scenario/{self.scenario.uuid}/information/item/{item_uuid}/" + ) + + self.assertEqual(delete_response.status_code, 200) + data = delete_response.json() + self.assertEqual(data["status"], "deleted") + + # Verify deletion + list_response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/") + items = list_response.json() + self.assertEqual(len(items), 0) + + def test_delete_nonexistent_item_returns_404(self): + """Test DELETE on non-existent item returns 404.""" + self.client.force_login(self.user) + + fake_uuid = str(uuid4()) + response = self.client.delete( + f"/api/v1/scenario/{self.scenario.uuid}/information/item/{fake_uuid}/" + ) + + self.assertEqual(response.status_code, 404) + + def test_multiple_items_crud(self): + """Test creating, updating, and deleting multiple items.""" + self.client.force_login(self.user) + + # Create first item + item1_payload = {"interval": {"start": 20, "end": 25}} + response1 = self.client.post( + f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/", + data=json.dumps(item1_payload), + content_type="application/json", + ) + item1_uuid = response1.json()["uuid"] + + # Create second item (different type if available, or same type) + item2_payload = {"interval": {"start": 30, "end": 35}} + response2 = self.client.post( + f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/", + data=json.dumps(item2_payload), + content_type="application/json", + ) + item2_uuid = response2.json()["uuid"] + + # Verify both exist + list_response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/") + items = list_response.json() + self.assertEqual(len(items), 2) + + # Update first item + update_payload = {"interval": {"start": 15, "end": 20}} + self.client.patch( + f"/api/v1/scenario/{self.scenario.uuid}/information/item/{item1_uuid}/", + data=json.dumps(update_payload), + content_type="application/json", + ) + + # Delete second item + self.client.delete(f"/api/v1/scenario/{self.scenario.uuid}/information/item/{item2_uuid}/") + + # Verify final state: one item with updated data + list_response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/") + items = list_response.json() + self.assertEqual(len(items), 1) + self.assertEqual(items[0]["uuid"], item1_uuid) + self.assertEqual(items[0]["data"]["interval"]["start"], 15) + + def test_unauthenticated_access_returns_401(self): + """Test that unauthenticated requests return 401.""" + # Don't log in + response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/") + + self.assertEqual(response.status_code, 401) + + def test_list_info_denied_without_permission(self): + """User cannot list info for scenario in package they don't have access to""" + self.client.force_login(self.user) + response = self.client.get(f"/api/v1/scenario/{self.other_scenario.uuid}/information/") + self.assertEqual(response.status_code, 403) + + def test_add_info_denied_without_permission(self): + """User cannot add info to scenario in package they don't have access to""" + self.client.force_login(self.user) + payload = {"interval": {"start": 25, "end": 30}} + response = self.client.post( + f"/api/v1/scenario/{self.other_scenario.uuid}/information/temperature/", + json.dumps(payload), + content_type="application/json", + ) + self.assertEqual(response.status_code, 403) + + def test_update_info_denied_without_permission(self): + """User cannot update info in scenario they don't have access to""" + self.client.force_login(self.other_user) + # First create an item as other_user + create_payload = {"interval": {"start": 20, "end": 25}} + create_response = self.client.post( + f"/api/v1/scenario/{self.other_scenario.uuid}/information/temperature/", + data=json.dumps(create_payload), + content_type="application/json", + ) + item_uuid = create_response.json()["uuid"] + + # Try to update as user (who doesn't have access) + self.client.force_login(self.user) + update_payload = {"interval": {"start": 30, "end": 35}} + response = self.client.patch( + f"/api/v1/scenario/{self.other_scenario.uuid}/information/item/{item_uuid}/", + data=json.dumps(update_payload), + content_type="application/json", + ) + self.assertEqual(response.status_code, 403) + + def test_delete_info_denied_without_permission(self): + """User cannot delete info from scenario they don't have access to""" + self.client.force_login(self.other_user) + # First create an item as other_user + create_payload = {"interval": {"start": 20, "end": 25}} + create_response = self.client.post( + f"/api/v1/scenario/{self.other_scenario.uuid}/information/temperature/", + data=json.dumps(create_payload), + content_type="application/json", + ) + item_uuid = create_response.json()["uuid"] + + # Try to delete as user (who doesn't have access) + self.client.force_login(self.user) + response = self.client.delete( + f"/api/v1/scenario/{self.other_scenario.uuid}/information/item/{item_uuid}/" + ) + self.assertEqual(response.status_code, 403) + + def test_anonymous_user_denied(self): + """Anonymous users cannot access private scenarios""" + # Ensure scenario package is not reviewed (private) + self.scenario.package.reviewed = False + self.scenario.package.save() + + # Unauthenticated users get 401 from the auth layer + response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/") + self.assertEqual(response.status_code, 401) + + def test_nonexistent_scenario_returns_404(self): + """Test operations on non-existent scenario return 404.""" + self.client.force_login(self.user) + + fake_uuid = uuid4() + response = self.client.get(f"/api/v1/scenario/{fake_uuid}/information/") + + self.assertEqual(response.status_code, 404) diff --git a/epapi/tests/v1/test_scenario_creation.py b/epapi/tests/v1/test_scenario_creation.py new file mode 100644 index 00000000..3cd16579 --- /dev/null +++ b/epapi/tests/v1/test_scenario_creation.py @@ -0,0 +1,325 @@ +""" +Tests for Scenario Creation Endpoint Error Handling. + +Tests comprehensive error handling for POST /api/v1/package/{uuid}/scenario/ +including package not found, permission denied, validation errors, and database errors. +""" + +from django.test import TestCase, tag +import json +from uuid import uuid4 + +from epdb.logic import PackageManager, UserManager +from epdb.models import Scenario + + +@tag("api", "scenario_creation") +class ScenarioCreationAPITests(TestCase): + """Test scenario creation endpoint error handling.""" + + @classmethod + def setUpTestData(cls): + """Set up test data: users and packages.""" + cls.user = UserManager.create_user( + "scenario-test-user", + "scenario-test@envipath.com", + "SuperSafe", + set_setting=False, + add_to_group=False, + is_active=True, + ) + cls.other_user = UserManager.create_user( + "other-user", + "other@envipath.com", + "SuperSafe", + set_setting=False, + add_to_group=False, + is_active=True, + ) + cls.package = PackageManager.create_package( + cls.user, "Test Package", "Test package for scenario creation" + ) + + def test_create_scenario_package_not_found(self): + """Test that non-existent package UUID returns 404.""" + self.client.force_login(self.user) + + fake_uuid = uuid4() + payload = { + "name": "Test Scenario", + "description": "Test description", + "scenario_date": "2024-01-01", + "scenario_type": "biodegradation", + "additional_information": [], + } + + response = self.client.post( + f"/api/v1/package/{fake_uuid}/scenario/", + data=json.dumps(payload), + content_type="application/json", + ) + + self.assertEqual(response.status_code, 404) + self.assertIn("Package not found", response.json()["detail"]) + + def test_create_scenario_insufficient_permissions(self): + """Test that unauthorized access returns 403.""" + self.client.force_login(self.other_user) + + payload = { + "name": "Test Scenario", + "description": "Test description", + "scenario_date": "2024-01-01", + "scenario_type": "biodegradation", + "additional_information": [], + } + + response = self.client.post( + f"/api/v1/package/{self.package.uuid}/scenario/", + data=json.dumps(payload), + content_type="application/json", + ) + + self.assertEqual(response.status_code, 403) + self.assertIn("permission", response.json()["detail"].lower()) + + def test_create_scenario_invalid_ai_type(self): + """Test that unknown additional information type returns 400.""" + self.client.force_login(self.user) + + payload = { + "name": "Test Scenario", + "description": "Test description", + "scenario_date": "2024-01-01", + "scenario_type": "biodegradation", + "additional_information": [ + {"type": "invalid_type_that_does_not_exist", "data": {"some_field": "some_value"}} + ], + } + + response = self.client.post( + f"/api/v1/package/{self.package.uuid}/scenario/", + data=json.dumps(payload), + content_type="application/json", + ) + + self.assertEqual(response.status_code, 400) + response_data = response.json() + self.assertIn("Validation errors", response_data["detail"]) + + def test_create_scenario_validation_error(self): + """Test that invalid additional information data returns 400.""" + self.client.force_login(self.user) + + # Use malformed data structure for an actual AI type + payload = { + "name": "Test Scenario", + "description": "Test description", + "scenario_date": "2024-01-01", + "scenario_type": "biodegradation", + "additional_information": [ + { + "type": "SomeValidType", + "data": None, # This should cause a validation error + } + ], + } + + response = self.client.post( + f"/api/v1/package/{self.package.uuid}/scenario/", + data=json.dumps(payload), + content_type="application/json", + ) + + # Should return 400 for validation errors + self.assertIn(response.status_code, [400, 422]) + + def test_create_scenario_success(self): + """Test that valid scenario creation returns 200.""" + self.client.force_login(self.user) + + payload = { + "name": "Test Scenario", + "description": "Test description", + "scenario_date": "2024-01-01", + "scenario_type": "biodegradation", + "additional_information": [], + } + + response = self.client.post( + f"/api/v1/package/{self.package.uuid}/scenario/", + data=json.dumps(payload), + content_type="application/json", + ) + + self.assertEqual(response.status_code, 200) + data = response.json() + self.assertEqual(data["name"], "Test Scenario") + self.assertEqual(data["description"], "Test description") + + # Verify scenario was actually created + scenario = Scenario.objects.get(name="Test Scenario") + self.assertEqual(scenario.package, self.package) + self.assertEqual(scenario.scenario_type, "biodegradation") + + def test_create_scenario_with_additional_information(self): + """Test creating scenario with valid additional information models.""" + self.client.force_login(self.user) + + # This test will succeed if the registry has valid models + # For now, test with empty additional_information + payload = { + "name": "Scenario with AI", + "description": "Test with additional information", + "scenario_date": "2024-01-01", + "scenario_type": "biodegradation", + "additional_information": [], + } + + response = self.client.post( + f"/api/v1/package/{self.package.uuid}/scenario/", + data=json.dumps(payload), + content_type="application/json", + ) + + self.assertEqual(response.status_code, 200) + data = response.json() + self.assertEqual(data["name"], "Scenario with AI") + + def test_create_scenario_auto_name(self): + """Test that empty name triggers auto-generation.""" + self.client.force_login(self.user) + + payload = { + "name": "", # Empty name should be auto-generated + "description": "Test description", + "scenario_date": "2024-01-01", + "scenario_type": "biodegradation", + "additional_information": [], + } + + response = self.client.post( + f"/api/v1/package/{self.package.uuid}/scenario/", + data=json.dumps(payload), + content_type="application/json", + ) + + self.assertEqual(response.status_code, 200) + data = response.json() + # Auto-generated name should follow pattern "Scenario N" + self.assertTrue(data["name"].startswith("Scenario ")) + + def test_create_scenario_xss_protection(self): + """Test that XSS attempts are sanitized.""" + self.client.force_login(self.user) + + payload = { + "name": "Clean Name", + "description": "Description", + "scenario_date": "2024-01-01", + "scenario_type": "biodegradation", + "additional_information": [], + } + + response = self.client.post( + f"/api/v1/package/{self.package.uuid}/scenario/", + data=json.dumps(payload), + content_type="application/json", + ) + + self.assertEqual(response.status_code, 200) + data = response.json() + # XSS should be cleaned out + self.assertNotIn(" + + {% include "modals/collections/new_scenario_modal.html" %} +{% endblock action_modals %} + {% block action_button %} {% if meta.can_edit %}