forked from enviPath/enviPy
Initial bayer app Show Pack Classification Adjusted docker compose to bayer specifics Adjusted Dockerfile for Bayer Adding secret flags to group, add secret pools to packages Adjusted View for Package creation Prep configs, added Package Create Modal wip More on PES wip wip
155 lines
4.4 KiB
Python
155 lines
4.4 KiB
Python
import msal
|
|
from django.conf import settings as s
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth import login
|
|
from django.shortcuts import redirect
|
|
|
|
from epdb.logic import UserManager
|
|
|
|
|
|
def get_msal_app_with_cache(request):
|
|
"""
|
|
Create MSAL app with session-based token cache.
|
|
"""
|
|
cache = msal.SerializableTokenCache()
|
|
|
|
# Load cache from session if it exists
|
|
if request.session.get("msal_token_cache"):
|
|
cache.deserialize(request.session["msal_token_cache"])
|
|
|
|
msal_app = msal.ConfidentialClientApplication(
|
|
client_id=s.MS_ENTRA_CLIENT_ID,
|
|
client_credential=s.MS_ENTRA_CLIENT_SECRET,
|
|
authority=s.MS_ENTRA_AUTHORITY,
|
|
token_cache=cache,
|
|
)
|
|
|
|
return msal_app, cache
|
|
|
|
|
|
def entra_login(request):
|
|
msal_app = msal.ConfidentialClientApplication(
|
|
client_id=s.MS_ENTRA_CLIENT_ID,
|
|
client_credential=s.MS_ENTRA_CLIENT_SECRET,
|
|
authority=s.MS_ENTRA_AUTHORITY,
|
|
)
|
|
|
|
flow = msal_app.initiate_auth_code_flow(
|
|
scopes=s.MS_ENTRA_SCOPES, redirect_uri=s.MS_ENTRA_REDIRECT_URI
|
|
)
|
|
|
|
request.session["msal_auth_flow"] = flow
|
|
return redirect(flow["auth_uri"])
|
|
|
|
|
|
def entra_callback(request):
|
|
msal_app, cache = get_msal_app_with_cache(request)
|
|
|
|
flow = request.session.pop("msal_auth_flow", None)
|
|
if not flow:
|
|
return redirect("/")
|
|
|
|
# Acquire token using the flow and callback request
|
|
result = msal_app.acquire_token_by_auth_code_flow(flow, request.GET)
|
|
|
|
# Save the token cache to session
|
|
if cache.has_state_changed:
|
|
request.session["msal_token_cache"] = cache.serialize()
|
|
|
|
claims = result["id_token_claims"]
|
|
|
|
user_name = claims.get("name")
|
|
user_email = claims.get("emailaddress", claims.get("email"))
|
|
user_oid = claims.get("oid")
|
|
|
|
if not all([user_name, user_email, user_oid]):
|
|
raise ValueError("Missing required claims in ID token")
|
|
|
|
# Get implementing class
|
|
User = get_user_model()
|
|
|
|
if User.objects.filter(uuid=user_oid).exists():
|
|
u = User.objects.get(uuid=user_oid)
|
|
|
|
if u.username != user_name:
|
|
u.username = user_name
|
|
u.save()
|
|
|
|
else:
|
|
u = UserManager.create_user(user_name, user_email, None, uuid=user_oid, is_active=True)
|
|
|
|
login(request, u)
|
|
|
|
# EDIT START
|
|
# Ensure groups exists in eP
|
|
for id, name in s.ENTRA_SECRET_GROUPS.items():
|
|
if not Group.objects.filter(uuid=id).exists():
|
|
g = GroupManager.create_group(User.objects.get(username="admin"), name, f"Synced Entra Group {name} ", uuid=id)
|
|
else:
|
|
g = Group.objects.get(uuid=id)
|
|
# Ensure its secret
|
|
g.secret = True
|
|
g.save()
|
|
|
|
for id, name in s.ENTRA_GROUPS.items():
|
|
if not Group.objects.filter(uuid=id).exists():
|
|
g = GroupManager.create_group(User.objects.get(username="admin"), name, f"Synced Entra Group {name} ", uuid=id)
|
|
else:
|
|
g = Group.objects.get(uuid=id)
|
|
|
|
for group_uuid in claims.get("groups", []):
|
|
if Group.objects.filter(uuid=group_uuid).exists():
|
|
g = Group.objects.get(uuid=group_uuid)
|
|
g.user_member.add(u)
|
|
|
|
# EDIT END
|
|
|
|
return redirect(s.SERVER_URL) # Handle errors
|
|
|
|
|
|
def get_access_token_from_request(request, scopes=None):
|
|
"""
|
|
Get an access token from the request using MSAL token cache.
|
|
"""
|
|
if scopes is None:
|
|
scopes = s.MS_ENTRA_SCOPES
|
|
|
|
# Get user from request (must be authenticated)
|
|
if not request.user.is_authenticated:
|
|
return None
|
|
|
|
# Create MSAL app with persistent cache
|
|
msal_app, cache = get_msal_app_with_cache(request)
|
|
|
|
# Try to get accounts from cache
|
|
accounts = msal_app.get_accounts()
|
|
|
|
if not accounts:
|
|
return None
|
|
|
|
# Find the account that matches the current user
|
|
user_account = None
|
|
for account in accounts:
|
|
if account.get("local_account_id") == str(request.user.uuid):
|
|
user_account = account
|
|
break
|
|
|
|
# If no matching account found, use the first available account
|
|
if not user_account and accounts:
|
|
user_account = accounts[0]
|
|
|
|
if not user_account:
|
|
return None
|
|
|
|
# Try to acquire token silently from cache
|
|
result = msal_app.acquire_token_silent(scopes=scopes, account=user_account)
|
|
|
|
# Save cache changes back to session
|
|
if cache.has_state_changed:
|
|
request.session["msal_token_cache"] = cache.serialize()
|
|
|
|
if result and "access_token" in result:
|
|
return result
|
|
|
|
return None
|