Fix App Domain Bug when a Rule can be applied more than once (#49)

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#49
This commit is contained in:
2025-08-19 22:10:18 +12:00
parent c3c1d4f5cf
commit fc8192fb0d
4 changed files with 286 additions and 94 deletions

View File

@ -2,6 +2,8 @@ import abc
import json
import logging
import os
import secrets
import hashlib
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Union, List, Optional, Dict, Tuple, Set
@ -58,27 +60,110 @@ class User(AbstractUser):
return self.default_setting
class APIToken(models.Model):
hashed_key = models.CharField(max_length=128, unique=True)
user = models.ForeignKey(User, on_delete=models.CASCADE)
created = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField(null=True, blank=True, default=timezone.now() + timedelta(days=90))
name = models.CharField(max_length=100, blank=True, help_text="Optional name for the token")
class APIToken(TimeStampedModel):
"""
API authentication token for users.
def is_valid(self):
return not self.expires_at or self.expires_at > timezone.now()
Provides secure token-based authentication with expiration support.
"""
hashed_key = models.CharField(
max_length=128,
unique=True,
help_text="SHA-256 hash of the token key"
)
@staticmethod
def create_token(user, name="", valid_for=90):
import secrets
raw_token = secrets.token_urlsafe(32)
hashed = make_password(raw_token)
token = APIToken.objects.create(user=user, hashed_key=hashed, name=name,
expires_at=timezone.now() + timedelta(days=valid_for))
return token, raw_token
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='api_tokens',
help_text="User who owns this token"
)
def check_token(self, raw_token):
return check_password(raw_token, self.hashed_key)
expires_at = models.DateTimeField(
null=True,
blank=True,
help_text="Token expiration time (null for no expiration)"
)
name = models.CharField(
max_length=100,
help_text="Descriptive name for this token"
)
is_active = models.BooleanField(
default=True,
help_text="Whether this token is active"
)
class Meta:
db_table = 'epdb_api_token'
verbose_name = 'API Token'
verbose_name_plural = 'API Tokens'
ordering = ['-created']
def __str__(self) -> str:
return f"{self.name} ({self.user.username})"
def is_valid(self) -> bool:
"""Check if token is valid and not expired."""
if not self.is_active:
return False
if self.expires_at and timezone.now() > self.expires_at:
return False
return True
@classmethod
def create_token(cls, user: User, name: str, expires_days: Optional[int] = None) -> Tuple['APIToken', str]:
"""
Create a new API token for a user.
Args:
user: User to create token for
name: Descriptive name for the token
expires_days: Number of days until expiration (None for no expiration)
Returns:
Tuple of (token_instance, raw_key)
"""
raw_key = secrets.token_urlsafe(32)
hashed_key = hashlib.sha256(raw_key.encode()).hexdigest()
expires_at = None
if expires_days:
expires_at = timezone.now() + timezone.timedelta(days=expires_days)
token = cls.objects.create(
user=user,
name=name,
hashed_key=hashed_key,
expires_at=expires_at
)
return token, raw_key
@classmethod
def authenticate(cls, raw_key: str) -> Optional[User]:
"""
Authenticate a user using an API token.
Args:
raw_key: Raw token key
Returns:
User if token is valid, None otherwise
"""
hashed_key = hashlib.sha256(raw_key.encode()).hexdigest()
try:
token = cls.objects.select_related('user').get(hashed_key=hashed_key)
if token.is_valid():
return token.user
except cls.DoesNotExist:
pass
return None
class Group(TimeStampedModel):
@ -1090,16 +1175,15 @@ class Node(EnviPathModel, AliasMixin, ScenarioMixin):
data = self.kv.get('app_domain_assessment', None)
if data:
rule_ids = dict()
rule_ids = defaultdict(list)
for e in Edge.objects.filter(start_nodes__in=[self]):
for r in e.edge_label.rules.all():
rule_ids[str(r.uuid)] = e
rule_ids[str(r.uuid)].append(e.simple_json())
for t in data['assessment']['transformations']:
if t['rule']['uuid'] in rule_ids:
t['is_predicted'] = True
t['edge'] = rule_ids[t['rule']['uuid']].simple_json()
t['edges'] = rule_ids[t['rule']['uuid']]
return data
@ -1141,23 +1225,25 @@ class Edge(EnviPathModel, AliasMixin, ScenarioMixin):
if app_domain_data:
for t in app_domain_data['assessment']['transformations']:
if 'edge' in t and t['edge']['uuid'] == str(self.uuid):
passes_app_domain = (
t['local_compatibility'] >= app_domain_data['ad_params']['local_compatibility_threshold']
) and (
t['reliability'] >= app_domain_data['ad_params']['reliability_threshold']
)
if 'edges' in t:
for e in t['edges']:
if e['uuid'] == str(self.uuid):
passes_app_domain = (
t['local_compatibility'] >= app_domain_data['ad_params']['local_compatibility_threshold']
) and (
t['reliability'] >= app_domain_data['ad_params']['reliability_threshold']
)
edge_json['app_domain'] = {
'passes_app_domain': passes_app_domain,
'local_compatibility': t['local_compatibility'],
'local_compatibility_threshold': app_domain_data['ad_params']['local_compatibility_threshold'],
'reliability': t['reliability'],
'reliability_threshold': app_domain_data['ad_params']['reliability_threshold'],
'times_triggered': t['times_triggered'],
}
edge_json['app_domain'] = {
'passes_app_domain': passes_app_domain,
'local_compatibility': t['local_compatibility'],
'local_compatibility_threshold': app_domain_data['ad_params']['local_compatibility_threshold'],
'reliability': t['reliability'],
'reliability_threshold': app_domain_data['ad_params']['reliability_threshold'],
'times_triggered': t['times_triggered'],
}
break
break
return edge_json