import msal from django.conf import settings as s from django.contrib.auth import get_user_model from django.contrib.auth import login from django.shortcuts import redirect from epdb.logic import UserManager, GroupManager from epdb.models import Group def get_msal_app_with_cache(request): """ Create MSAL app with session-based token cache. """ cache = msal.SerializableTokenCache() # Load cache from session if it exists if request.session.get("msal_token_cache"): cache.deserialize(request.session["msal_token_cache"]) msal_app = msal.ConfidentialClientApplication( client_id=s.MS_ENTRA_CLIENT_ID, client_credential=s.MS_ENTRA_CLIENT_SECRET, authority=s.MS_ENTRA_AUTHORITY, token_cache=cache ) return msal_app, cache def entra_login(request): msal_app = msal.ConfidentialClientApplication( client_id=s.MS_ENTRA_CLIENT_ID, client_credential=s.MS_ENTRA_CLIENT_SECRET, authority=s.MS_ENTRA_AUTHORITY, ) flow = msal_app.initiate_auth_code_flow( scopes=s.MS_ENTRA_SCOPES, redirect_uri=s.MS_ENTRA_REDIRECT_URI ) request.session["msal_auth_flow"] = flow return redirect(flow["auth_uri"]) def entra_callback(request): msal_app, cache = get_msal_app_with_cache(request) flow = request.session.pop("msal_auth_flow", None) if not flow: return redirect("/") # Acquire token using the flow and callback request result = msal_app.acquire_token_by_auth_code_flow(flow, request.GET) # Save the token cache to session if cache.has_state_changed: request.session["msal_token_cache"] = cache.serialize() claims = result["id_token_claims"] user_name = claims["name"] user_email = claims.get("emailaddress", claims["email"]) user_oid = claims["oid"] # Get implementing class User = get_user_model() if User.objects.filter(uuid=user_oid).exists(): u = User.objects.get(uuid=user_oid) if u.username != user_name: u.username = user_name u.save() else: u = UserManager.create_user(user_name, user_email, None, uuid=user_oid, is_active=True) login(request, u) # EDIT START # Ensure groups exists in eP for id, name in s.ENTRA_SECRET_GROUPS.items(): if not Group.objects.filter(uuid=id).exists(): g = GroupManager.create_group(User.objects.get(username="admin"), name, f"Synced Entra Group {name} ", uuid=id) else: g = Group.objects.get(uuid=id) # Ensure its secret g.secret = True g.save() for id, name in s.ENTRA_GROUPS.items(): if not Group.objects.filter(uuid=id).exists(): g = GroupManager.create_group(User.objects.get(username="admin"), name, f"Synced Entra Group {name} ", uuid=id) else: g = Group.objects.get(uuid=id) for group_uuid in claims.get("groups", []): if Group.objects.filter(uuid=group_uuid).exists(): g = Group.objects.get(uuid=group_uuid) g.user_member.add(u) # EDIT END return redirect(s.SERVER_URL) # Handle errors def get_access_token_from_request(request, scopes=None): """ Get an access token from the request using MSAL token cache. """ if scopes is None: scopes = s.MS_ENTRA_SCOPES # Get user from request (must be authenticated) if not request.user.is_authenticated: return None # Create MSAL app with persistent cache msal_app, cache = get_msal_app_with_cache(request) # Try to get accounts from cache accounts = msal_app.get_accounts() if not accounts: return None # Find the account that matches the current user user_account = None for account in accounts: if account.get("local_account_id") == str(request.user.uuid): user_account = account break # If no matching account found, use the first available account if not user_account and accounts: user_account = accounts[0] if not user_account: return None # Try to acquire token silently from cache result = msal_app.acquire_token_silent( scopes=scopes, account=user_account ) # Save cache changes back to session if cache.has_state_changed: request.session["msal_token_cache"] = cache.serialize() if result and "access_token" in result: return result return None