forked from enviPath/enviPy
158 lines
4.5 KiB
Python
158 lines
4.5 KiB
Python
import msal
|
|
from django.conf import settings as s
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth import login
|
|
from django.shortcuts import redirect
|
|
|
|
from epdb.logic import UserManager, GroupManager
|
|
from epdb.models import Group
|
|
|
|
|
|
def get_msal_app_with_cache(request):
|
|
"""
|
|
Create MSAL app with session-based token cache.
|
|
"""
|
|
cache = msal.SerializableTokenCache()
|
|
|
|
# Load cache from session if it exists
|
|
if request.session.get("msal_token_cache"):
|
|
cache.deserialize(request.session["msal_token_cache"])
|
|
|
|
msal_app = msal.ConfidentialClientApplication(
|
|
client_id=s.MS_ENTRA_CLIENT_ID,
|
|
client_credential=s.MS_ENTRA_CLIENT_SECRET,
|
|
authority=s.MS_ENTRA_AUTHORITY,
|
|
token_cache=cache
|
|
)
|
|
|
|
return msal_app, cache
|
|
|
|
|
|
def entra_login(request):
|
|
msal_app = msal.ConfidentialClientApplication(
|
|
client_id=s.MS_ENTRA_CLIENT_ID,
|
|
client_credential=s.MS_ENTRA_CLIENT_SECRET,
|
|
authority=s.MS_ENTRA_AUTHORITY,
|
|
)
|
|
|
|
flow = msal_app.initiate_auth_code_flow(
|
|
scopes=s.MS_ENTRA_SCOPES, redirect_uri=s.MS_ENTRA_REDIRECT_URI
|
|
)
|
|
|
|
request.session["msal_auth_flow"] = flow
|
|
return redirect(flow["auth_uri"])
|
|
|
|
|
|
def entra_callback(request):
|
|
msal_app, cache = get_msal_app_with_cache(request)
|
|
|
|
flow = request.session.pop("msal_auth_flow", None)
|
|
if not flow:
|
|
return redirect("/")
|
|
|
|
# Acquire token using the flow and callback request
|
|
result = msal_app.acquire_token_by_auth_code_flow(flow, request.GET)
|
|
|
|
# Save the token cache to session
|
|
if cache.has_state_changed:
|
|
request.session["msal_token_cache"] = cache.serialize()
|
|
|
|
claims = result["id_token_claims"]
|
|
|
|
user_name = claims["name"]
|
|
user_email = claims.get("emailaddress", claims["email"])
|
|
user_oid = claims["oid"]
|
|
|
|
# Get implementing class
|
|
User = get_user_model()
|
|
|
|
if User.objects.filter(uuid=user_oid).exists():
|
|
u = User.objects.get(uuid=user_oid)
|
|
|
|
if u.username != user_name:
|
|
u.username = user_name
|
|
u.save()
|
|
|
|
else:
|
|
u = UserManager.create_user(user_name, user_email, None, uuid=user_oid, is_active=True)
|
|
|
|
login(request, u)
|
|
|
|
# EDIT START
|
|
# Ensure groups exists in eP
|
|
for id, name in s.ENTRA_SECRET_GROUPS.items():
|
|
if not Group.objects.filter(uuid=id).exists():
|
|
g = GroupManager.create_group(User.objects.get(username="admin"), name, f"Synced Entra Group {name} ",
|
|
uuid=id)
|
|
else:
|
|
g = Group.objects.get(uuid=id)
|
|
# Ensure its secret
|
|
g.secret = True
|
|
g.save()
|
|
|
|
for id, name in s.ENTRA_GROUPS.items():
|
|
if not Group.objects.filter(uuid=id).exists():
|
|
g = GroupManager.create_group(User.objects.get(username="admin"), name, f"Synced Entra Group {name} ",
|
|
uuid=id)
|
|
else:
|
|
g = Group.objects.get(uuid=id)
|
|
|
|
for group_uuid in claims.get("groups", []):
|
|
if Group.objects.filter(uuid=group_uuid).exists():
|
|
g = Group.objects.get(uuid=group_uuid)
|
|
g.user_member.add(u)
|
|
|
|
# EDIT END
|
|
|
|
return redirect(s.SERVER_URL) # Handle errors
|
|
|
|
|
|
def get_access_token_from_request(request, scopes=None):
|
|
"""
|
|
Get an access token from the request using MSAL token cache.
|
|
"""
|
|
if scopes is None:
|
|
scopes = s.MS_ENTRA_SCOPES
|
|
|
|
# Get user from request (must be authenticated)
|
|
if not request.user.is_authenticated:
|
|
return None
|
|
|
|
# Create MSAL app with persistent cache
|
|
msal_app, cache = get_msal_app_with_cache(request)
|
|
|
|
# Try to get accounts from cache
|
|
accounts = msal_app.get_accounts()
|
|
|
|
if not accounts:
|
|
return None
|
|
|
|
# Find the account that matches the current user
|
|
user_account = None
|
|
for account in accounts:
|
|
if account.get("local_account_id") == str(request.user.uuid):
|
|
user_account = account
|
|
break
|
|
|
|
# If no matching account found, use the first available account
|
|
if not user_account and accounts:
|
|
user_account = accounts[0]
|
|
|
|
if not user_account:
|
|
return None
|
|
|
|
# Try to acquire token silently from cache
|
|
result = msal_app.acquire_token_silent(
|
|
scopes=scopes,
|
|
account=user_account
|
|
)
|
|
|
|
# Save cache changes back to session
|
|
if cache.has_state_changed:
|
|
request.session["msal_token_cache"] = cache.serialize()
|
|
|
|
if result and "access_token" in result:
|
|
return result
|
|
|
|
return None
|