[Feature] Create API Key Authenticaton for v1 API (#327)

Add API key authentication to v1 API
Also includes:
- management command to create keys for users
- Improvements to API tests

Minor:
- more robust way to start docker dev container.

Reviewed-on: enviPath/enviPy#327
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
This commit is contained in:
2026-02-11 02:29:54 +13:00
committed by jebus
parent c0cfdb9255
commit 5789f20e7f
15 changed files with 282 additions and 165 deletions

View File

@ -51,47 +51,30 @@ class ValidationErrorUtilityTests(TestCase):
self.assertIn("active", formatted)
self.assertIn("inactive", formatted)
def test_format_string_type_error(self):
"""Test formatting of string type validation error."""
def test_format_type_errors(self):
"""Test formatting of type validation errors (string, int, float)."""
test_cases = [
# (field_type, invalid_value, expected_message)
# Note: We don't check exact error_type as Pydantic may use different types
# (e.g., int_type vs int_parsing) but we verify the formatted message is correct
(str, 123, "Please enter a valid string"),
(int, "not_a_number", "Please enter a valid int"),
(float, "not_a_float", "Please enter a valid float"),
]
class TestModel(BaseModel):
name: str
for field_type, invalid_value, expected_message in test_cases:
with self.subTest(field_type=field_type.__name__):
try:
TestModel(name=123)
except ValidationError as e:
errors = e.errors()
self.assertEqual(len(errors), 1)
formatted = format_validation_error(errors[0])
self.assertEqual(formatted, "Please enter a valid string")
class TestModel(BaseModel):
field: field_type
def test_format_int_type_error(self):
"""Test formatting of integer type validation error."""
class TestModel(BaseModel):
count: int
try:
TestModel(count="not_a_number")
except ValidationError as e:
errors = e.errors()
self.assertEqual(len(errors), 1)
formatted = format_validation_error(errors[0])
self.assertEqual(formatted, "Please enter a valid int")
def test_format_float_type_error(self):
"""Test formatting of float type validation error."""
class TestModel(BaseModel):
value: float
try:
TestModel(value="not_a_float")
except ValidationError as e:
errors = e.errors()
self.assertEqual(len(errors), 1)
formatted = format_validation_error(errors[0])
self.assertEqual(formatted, "Please enter a valid float")
try:
TestModel(field=invalid_value)
except ValidationError as e:
errors = e.errors()
self.assertEqual(len(errors), 1)
formatted = format_validation_error(errors[0])
self.assertEqual(formatted, expected_message)
def test_format_value_error(self):
"""Test formatting of value error from custom validator."""
@ -114,6 +97,19 @@ class ValidationErrorUtilityTests(TestCase):
formatted = format_validation_error(errors[0])
self.assertEqual(formatted, "Age must be positive")
def test_format_unknown_error_type_fallback(self):
"""Test that unknown error types fall back to default formatting."""
# Mock an error with an unknown type
mock_error = {
"type": "unknown_custom_type",
"msg": "Input should be a valid email address",
"ctx": {},
}
formatted = format_validation_error(mock_error)
# Should use the else branch which does replacements on the message
self.assertEqual(formatted, "Please enter a valid email address")
def test_handle_validation_error_structure(self):
"""Test that handle_validation_error raises HttpError with correct structure."""

View File

@ -380,13 +380,6 @@ class AdditionalInformationAPITests(TestCase):
self.assertEqual(items[0]["uuid"], item1_uuid)
self.assertEqual(items[0]["data"]["interval"]["start"], 15)
def test_unauthenticated_access_returns_401(self):
"""Test that unauthenticated requests return 401."""
# Don't log in
response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/")
self.assertEqual(response.status_code, 401)
def test_list_info_denied_without_permission(self):
"""User cannot list info for scenario in package they don't have access to"""
self.client.force_login(self.user)
@ -445,16 +438,6 @@ class AdditionalInformationAPITests(TestCase):
)
self.assertEqual(response.status_code, 403)
def test_anonymous_user_denied(self):
"""Anonymous users cannot access private scenarios"""
# Ensure scenario package is not reviewed (private)
self.scenario.package.reviewed = False
self.scenario.package.save()
# Unauthenticated users get 401 from the auth layer
response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/")
self.assertEqual(response.status_code, 401)
def test_nonexistent_scenario_returns_404(self):
"""Test operations on non-existent scenario return 404."""
self.client.force_login(self.user)

View File

@ -261,13 +261,6 @@ class GlobalCompoundListPermissionTest(APIPermissionTestBase):
self.assertEqual(response.status_code, 200)
payload = response.json()
# user2 should see compounds from:
# - reviewed_package (public)
# - unreviewed_package_read (READ permission)
# - unreviewed_package_write (WRITE permission)
# - unreviewed_package_all (ALL permission)
# - group_package (via group membership)
# Total: 5 compounds
self.assertEqual(payload["total_items"], 5)
visible_uuids = {item["uuid"] for item in payload["items"]}
@ -303,54 +296,6 @@ class GlobalCompoundListPermissionTest(APIPermissionTestBase):
# user1 owns all packages, so sees all compounds
self.assertEqual(payload["total_items"], 7)
def test_read_permission_allows_viewing(self):
"""READ permission allows viewing compounds."""
self.client.force_login(self.user2)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
# Check that read_compound is included
uuids = [item["uuid"] for item in payload["items"]]
self.assertIn(str(self.read_compound.uuid), uuids)
def test_write_permission_allows_viewing(self):
"""WRITE permission also allows viewing compounds."""
self.client.force_login(self.user2)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
# Check that write_compound is included
uuids = [item["uuid"] for item in payload["items"]]
self.assertIn(str(self.write_compound.uuid), uuids)
def test_all_permission_allows_viewing(self):
"""ALL permission allows viewing compounds."""
self.client.force_login(self.user2)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
# Check that all_compound is included
uuids = [item["uuid"] for item in payload["items"]]
self.assertIn(str(self.all_compound.uuid), uuids)
def test_group_permission_allows_viewing(self):
"""Group membership grants access to group-permitted packages."""
self.client.force_login(self.user2)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
# Check that group_compound is included
uuids = [item["uuid"] for item in payload["items"]]
self.assertIn(str(self.group_compound.uuid), uuids)
@tag("api", "end2end")
class PackageScopedCompoundListPermissionTest(APIPermissionTestBase):

View File

@ -134,7 +134,7 @@ class BaseTestAPIGetPaginated:
f"({self.total_reviewed} <= {self.default_page_size})"
)
response = self.client.get(self.global_endpoint, {"page": 2})
response = self.client.get(self.global_endpoint, {"page": 2, "review_status": True})
self.assertEqual(response.status_code, 200)
payload = response.json()

View File

@ -119,7 +119,7 @@ class ScenarioCreationAPITests(TestCase):
"scenario_type": "biodegradation",
"additional_information": [
{
"type": "SomeValidType",
"type": "invalid_type_name",
"data": None, # This should cause a validation error
}
],
@ -131,8 +131,8 @@ class ScenarioCreationAPITests(TestCase):
content_type="application/json",
)
# Should return 400 for validation errors
self.assertIn(response.status_code, [400, 422])
# Should return 422 for validation errors
self.assertEqual(response.status_code, 422)
def test_create_scenario_success(self):
"""Test that valid scenario creation returns 200."""
@ -162,30 +162,6 @@ class ScenarioCreationAPITests(TestCase):
self.assertEqual(scenario.package, self.package)
self.assertEqual(scenario.scenario_type, "biodegradation")
def test_create_scenario_with_additional_information(self):
"""Test creating scenario with valid additional information models."""
self.client.force_login(self.user)
# This test will succeed if the registry has valid models
# For now, test with empty additional_information
payload = {
"name": "Scenario with AI",
"description": "Test with additional information",
"scenario_date": "2024-01-01",
"scenario_type": "biodegradation",
"additional_information": [],
}
response = self.client.post(
f"/api/v1/package/{self.package.uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data["name"], "Scenario with AI")
def test_create_scenario_auto_name(self):
"""Test that empty name triggers auto-generation."""
self.client.force_login(self.user)
@ -265,7 +241,7 @@ class ScenarioCreationAPITests(TestCase):
"scenario_type": "biodegradation",
"additional_information": [
{
"type": "SomeType",
"type": "invalid_type_name",
"data": "string instead of dict", # Wrong type
}
],
@ -277,8 +253,8 @@ class ScenarioCreationAPITests(TestCase):
content_type="application/json",
)
# Should return 400 for validation errors
self.assertIn(response.status_code, [400, 422])
# Should return 422 for validation errors
self.assertEqual(response.status_code, 422)
def test_create_scenario_default_values(self):
"""Test that default values are applied correctly."""

View File

@ -0,0 +1,94 @@
from datetime import timedelta
from django.test import TestCase, tag
from django.utils import timezone
from epdb.logic import PackageManager, UserManager
from epdb.models import APIToken
@tag("api", "auth")
class BearerTokenAuthTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.user = UserManager.create_user(
"token-user",
"token-user@envipath.com",
"SuperSafe",
set_setting=False,
add_to_group=False,
is_active=True,
)
default_pkg = cls.user.default_package
cls.user.default_package = None
cls.user.save()
if default_pkg:
default_pkg.delete()
cls.unreviewed_package = PackageManager.create_package(
cls.user, "Token Auth Package", "Package for token auth tests"
)
def _auth_header(self, raw_token):
return {"HTTP_AUTHORIZATION": f"Bearer {raw_token}"}
def test_valid_token_allows_access(self):
_, raw_token = APIToken.create_token(self.user, name="Valid Token", expires_days=1)
response = self.client.get("/api/v1/compounds/", **self._auth_header(raw_token))
self.assertEqual(response.status_code, 200)
def test_expired_token_rejected(self):
token, raw_token = APIToken.create_token(self.user, name="Expired Token", expires_days=1)
token.expires_at = timezone.now() - timedelta(days=1)
token.save(update_fields=["expires_at"])
response = self.client.get("/api/v1/compounds/", **self._auth_header(raw_token))
self.assertEqual(response.status_code, 401)
def test_inactive_token_rejected(self):
token, raw_token = APIToken.create_token(self.user, name="Inactive Token", expires_days=1)
token.is_active = False
token.save(update_fields=["is_active"])
response = self.client.get("/api/v1/compounds/", **self._auth_header(raw_token))
self.assertEqual(response.status_code, 401)
def test_invalid_token_rejected(self):
response = self.client.get("/api/v1/compounds/", HTTP_AUTHORIZATION="Bearer invalid-token")
self.assertEqual(response.status_code, 401)
def test_no_token_rejected(self):
self.client.logout()
response = self.client.get("/api/v1/compounds/")
self.assertEqual(response.status_code, 401)
def test_bearer_populates_request_user_for_packages(self):
response = self.client.get("/api/v1/packages/")
self.assertEqual(response.status_code, 200)
payload = response.json()
uuids = {item["uuid"] for item in payload["items"]}
self.assertNotIn(str(self.unreviewed_package.uuid), uuids)
_, raw_token = APIToken.create_token(self.user, name="Package Token", expires_days=1)
response = self.client.get("/api/v1/packages/", **self._auth_header(raw_token))
self.assertEqual(response.status_code, 200)
payload = response.json()
uuids = {item["uuid"] for item in payload["items"]}
self.assertIn(str(self.unreviewed_package.uuid), uuids)
def test_session_auth_still_works_without_bearer(self):
self.client.force_login(self.user)
response = self.client.get("/api/v1/packages/")
self.assertEqual(response.status_code, 200)
payload = response.json()
uuids = {item["uuid"] for item in payload["items"]}
self.assertIn(str(self.unreviewed_package.uuid), uuids)