Files
enviPy-bayer/epauth/views.py
2026-04-22 06:08:39 +12:00

131 lines
3.6 KiB
Python

import msal
from django.conf import settings as s
from django.contrib.auth import get_user_model
from django.contrib.auth import login
from django.shortcuts import redirect
from epdb.logic import UserManager
def get_msal_app_with_cache(request):
"""
Create MSAL app with session-based token cache.
"""
cache = msal.SerializableTokenCache()
# Load cache from session if it exists
if request.session.get("msal_token_cache"):
cache.deserialize(request.session["msal_token_cache"])
msal_app = msal.ConfidentialClientApplication(
client_id=s.MS_ENTRA_CLIENT_ID,
client_credential=s.MS_ENTRA_CLIENT_SECRET,
authority=s.MS_ENTRA_AUTHORITY,
token_cache=cache,
)
return msal_app, cache
def entra_login(request):
msal_app = msal.ConfidentialClientApplication(
client_id=s.MS_ENTRA_CLIENT_ID,
client_credential=s.MS_ENTRA_CLIENT_SECRET,
authority=s.MS_ENTRA_AUTHORITY,
)
flow = msal_app.initiate_auth_code_flow(
scopes=s.MS_ENTRA_SCOPES, redirect_uri=s.MS_ENTRA_REDIRECT_URI
)
request.session["msal_auth_flow"] = flow
return redirect(flow["auth_uri"])
def entra_callback(request):
msal_app, cache = get_msal_app_with_cache(request)
flow = request.session.pop("msal_auth_flow", None)
if not flow:
return redirect("/")
# Acquire token using the flow and callback request
result = msal_app.acquire_token_by_auth_code_flow(flow, request.GET)
# Save the token cache to session
if cache.has_state_changed:
request.session["msal_token_cache"] = cache.serialize()
claims = result["id_token_claims"]
user_name = claims.get("name")
user_email = claims.get("emailaddress", claims.get("email"))
user_oid = claims.get("oid")
if not all([user_name, user_email, user_oid]):
raise ValueError("Missing required claims in ID token")
# Get implementing class
User = get_user_model()
if User.objects.filter(uuid=user_oid).exists():
u = User.objects.get(uuid=user_oid)
if u.username != user_name:
u.username = user_name
u.save()
else:
u = UserManager.create_user(user_name, user_email, None, uuid=user_oid, is_active=True)
login(request, u)
return redirect(s.SERVER_URL) # Handle errors
def get_access_token_from_request(request, scopes=None):
"""
Get an access token from the request using MSAL token cache.
"""
if scopes is None:
scopes = s.MS_ENTRA_SCOPES
# Get user from request (must be authenticated)
if not request.user.is_authenticated:
return None
# Create MSAL app with persistent cache
msal_app, cache = get_msal_app_with_cache(request)
# Try to get accounts from cache
accounts = msal_app.get_accounts()
if not accounts:
return None
# Find the account that matches the current user
user_account = None
for account in accounts:
if account.get("local_account_id") == str(request.user.uuid):
user_account = account
break
# If no matching account found, use the first available account
if not user_account and accounts:
user_account = accounts[0]
if not user_account:
return None
# Try to acquire token silently from cache
result = msal_app.acquire_token_silent(scopes=scopes, account=user_account)
# Save cache changes back to session
if cache.has_state_changed:
request.session["msal_token_cache"] = cache.serialize()
if result and "access_token" in result:
return result
return None