forked from enviPath/enviPy
minor
This commit is contained in:
@ -1,19 +1,17 @@
|
||||
import hashlib
|
||||
from collections import defaultdict
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import jwt
|
||||
import requests
|
||||
|
||||
import nh3
|
||||
import requests
|
||||
from django.conf import settings as s
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.core.cache import cache
|
||||
from django.http import HttpResponse, JsonResponse
|
||||
from django.shortcuts import redirect
|
||||
from jwt import InvalidIssuerError
|
||||
from ninja import Field, Form, Query, Router, Schema
|
||||
from ninja.errors import HttpError
|
||||
from ninja.security import HttpBearer
|
||||
from ninja.security import SessionAuth
|
||||
|
||||
from utilities.chem import FormatConverter
|
||||
from utilities.misc import PackageExporter
|
||||
@ -51,6 +49,26 @@ from .models import (
|
||||
Package = s.GET_PACKAGE_MODEL()
|
||||
|
||||
|
||||
def get_cached_jwks(tenant_id: str, force=False) -> Dict:
|
||||
"""Get JWKS using Django cache"""
|
||||
cache_key = f"jwks_{tenant_id}"
|
||||
|
||||
jwks = cache.get(cache_key)
|
||||
|
||||
if jwks is None or force:
|
||||
# Cache miss, fetch new keys
|
||||
jwks_uri = f"https://login.microsoftonline.com/{tenant_id}/discovery/v2.0/keys"
|
||||
response = requests.get(jwks_uri)
|
||||
response.raise_for_status()
|
||||
|
||||
jwks = response.json()
|
||||
|
||||
# Cache for 1 hour (3600 seconds)
|
||||
cache.set(cache_key, jwks, 3600)
|
||||
|
||||
return jwks
|
||||
|
||||
|
||||
def get_package_for_write(user, package_uuid):
|
||||
p = PackageManager.get_package_by_id(user, package_uuid)
|
||||
if not PackageManager.writable(user, p):
|
||||
@ -68,9 +86,7 @@ def validate_token(token: str) -> dict:
|
||||
TENANT_ID = s.MS_ENTRA_TENANT_ID
|
||||
CLIENT_ID = s.MS_ENTRA_CLIENT_ID
|
||||
|
||||
# Fetch Microsoft's public keys
|
||||
jwks_uri = f"https://login.microsoftonline.com/{TENANT_ID}/discovery/v2.0/keys"
|
||||
jwks = requests.get(jwks_uri).json()
|
||||
jwks = get_cached_jwks(TENANT_ID)
|
||||
|
||||
header = jwt.get_unverified_header(token)
|
||||
|
||||
@ -78,13 +94,21 @@ def validate_token(token: str) -> dict:
|
||||
next(k for k in jwks["keys"] if k["kid"] == header["kid"])
|
||||
)
|
||||
|
||||
claims = jwt.decode(
|
||||
token,
|
||||
public_key,
|
||||
algorithms=["RS256"],
|
||||
audience=[CLIENT_ID, f"api://{CLIENT_ID}"],
|
||||
issuer=f"https://sts.windows.net/{TENANT_ID}/",
|
||||
)
|
||||
# Handle V1 and V2 tokens
|
||||
try:
|
||||
claims = jwt.decode(
|
||||
token,
|
||||
public_key,
|
||||
algorithms=["RS256"],
|
||||
audience=[CLIENT_ID, f"api://{CLIENT_ID}"],
|
||||
issuer=[
|
||||
f"https://sts.windows.net/{TENANT_ID}/",
|
||||
f"https://login.microsoftonline.com/{TENANT_ID}/v2.0"
|
||||
]
|
||||
)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Token verification failed! - {e}")
|
||||
|
||||
return claims
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user