Add sql syntax and refactor postpaid users and databases

This commit is contained in:
2025-05-17 00:03:05 +02:00
parent 167101d917
commit d4e01d5109
6 changed files with 461 additions and 125 deletions

View File

@@ -1,31 +1,49 @@
from fastapi import APIRouter, Depends, HTTPException
"""
This module handles OpenID Connect (OIDC) authentication using FastAPI and Authlib.
Routes:
/login/oidc (GET): Initiates the OIDC login flow by redirecting the user to the identity provider's authorization endpoint.
/authorize (ANY): Handles the callback from the identity provider, exchanges the authorization code for tokens, retrieves user profile information, stores it in the session, and ensures the user exists in the database.
/logout (GET): Logs the user out by clearing session data and redirecting to the identity provider's logout endpoint.
Environment Variables:
OIDC_CLIENT_ID: The client ID for the OIDC application.
OIDC_CLIENT_SECRET: The client secret for the OIDC application.
OIDC_CONFIG_URL: The OIDC discovery document URL.
OIDC_REDIRECT_URL: The redirect URI registered with the OIDC provider.
OIDC_LOGOUT_URL: The logout endpoint URL (optional fallback).
Dependencies:
- FastAPI
- Authlib
- SQLAlchemy
- python-dotenv
- Starlette
Session Keys:
SESSION_KEY: Key for storing user profile information in the session ("user_authentik").
"user_db_id": Key for storing the user's database ID in the session.
Database:
Uses SQLAlchemy to check for and create users in the 'users_postpaid' table.
Functions:
login(request): Starts the OIDC login process.
authorize(request): Handles the OIDC callback, processes user info, and manages user records.
logout(request): Logs the user out and redirects to the identity provider's logout endpoint.
"""
import os
from fastapi import APIRouter
from fastapi.responses import RedirectResponse
from authlib.integrations.starlette_client import OAuth
from starlette.requests import Request
from db.models import User, SessionLocal
from sqlalchemy import text
from dotenv import load_dotenv
import os
from db.models import (
engine,
create_postpaid_user,
)
oauth = OAuth()
router = APIRouter()
SESSION_KEY = "user_authentik"
# OIDC-Provider konfigurieren (hier als Beispiel Auth0)
# oauth.register(
# name="auth0",
# client_id="ZUZYpdYmqjMwdBDb2GdWWX4xkASNe2gsYqLlF9dy",
# client_secret="o6LXTspeaiAMhvPyX2vplQ0RUsRGhthFadg1M5LOJylpQXm9A0d8YJ4CeNwq0kAg2BrdCM7UyfZFOlnVjrJS2o4fBvvhLWDfbd7LhScCzde4Heh5P3C26ZWCRGQppJhb",
# authorize_url="https://login.fs.cs.uni-frankfurt.de/application/o/authorize/",
# authorize_params=None,
# access_token_url="https://login.fs.cs.uni-frankfurt.de/application/o/token/",
# access_token_params=None,
# client_kwargs={"scope": "openid profile email"},
# server_metadata_url="https://login.fs.cs.uni-frankfurt.de/application/o/testkicker/.well-known/openid-configuration",
# api_base_url="https://login.fs.cs.uni-frankfurt.de/application/o/testkicker/",
# jwks_uri="https://login.fs.cs.uni-frankfurt.de/application/o/testkicker/jwks/",
# userinfo_endpoint="https://login.fs.cs.uni-frankfurt.de/application/o/userinfo/",
# )
load_dotenv()
@@ -41,43 +59,74 @@ oauth.register(
@router.get("/login/oidc")
async def login(request: Request):
"""
Initiates the OAuth2 login flow using Auth0 and redirects the user to the Auth0 authorization endpoint.
Args:
request (Request): The incoming HTTP request object.
Returns:
Response: A redirect response to the Auth0 authorization URL.
Raises:
Exception: If the Auth0 client cannot be created or the redirect fails.
Environment Variables:
OIDC_REDIRECT_URL: The URL to which Auth0 should redirect after authentication.
"""
auth0_client = oauth.create_client("auth0")
redirect_uri = os.getenv("OIDC_REDIRECT_URL")
return await auth0_client.authorize_redirect(request, redirect_uri)
@router.route("/authorize")
async def authorize(request: Request):
"""
Handles the OAuth2 authorization callback, retrieves the user's profile from the identity provider,
stores user information in the session, checks if the user exists in the database (and creates them if not),
and redirects to the home page.
Args:
request (Request): The incoming HTTP request containing the OAuth2 callback.
Returns:
RedirectResponse: A redirect response to the home page after successful authorization and user handling.
Side Effects:
- Stores user profile and database user ID in the session.
- May create a new user in the database if not already present.
"""
token = await oauth.auth0.authorize_access_token(request)
userinfo_endpoint = oauth.auth0.server_metadata.get("userinfo_endpoint")
resp = await oauth.auth0.get(userinfo_endpoint, token=token)
resp.raise_for_status()
profile = resp.json()
print("Profile:", profile)
# save user info in session
request.session["user"] = profile
request.session[SESSION_KEY] = profile
# check if user is already in the database
db = SessionLocal()
user_db = db.query(User).filter(User.username == profile["preferred_username"]).first()
if not user_db:
print("Create User in DB")
user_db = User(
username=profile["preferred_username"],
role="user" # Default role
)
db.add(user_db)
db.commit()
db.refresh(user_db)
db.close()
print("User in DB:", user_db)
with engine.connect() as conn:
t = text("SELECT id FROM users_postpaid WHERE username = :username")
result = conn.execute(t, {"username": profile["preferred_username"]}).fetchone()
if result:
user_db_id = result[0]
else:
print("Create User in DB")
user_db_id = create_postpaid_user(profile["preferred_username"])
request.session["user_db_id"] = user_db_id
return RedirectResponse(url="/", status_code=303)
@router.get("/logout")
async def logout(request: Request):
request.session.pop("user", None)
"""
Logs out the current user by clearing session data and redirecting to the OIDC provider's logout endpoint.
Args:
request (Request): The incoming HTTP request containing the user's session.
Returns:
RedirectResponse: A response that redirects the user to the OIDC provider's logout URL with a 303 status code.
Notes:
- Removes the authentication session key and user database information from the session.
- Determines the logout URL from the OIDC provider's metadata or an environment variable.
"""
request.session.pop(SESSION_KEY, None)
request.session.pop("user_db", None)
logout_url = oauth.auth0.server_metadata.get("end_session_endpoint")
if not logout_url:
logout_url = os.getenv("OIDC_LOGOUT_URL")

View File

@@ -1,31 +1,78 @@
from fastapi import Depends, HTTPException
"""
This module provides session management utilities for postpaid users in a FastAPI application.
It includes functions to log in a user by username, log out the current user, and retrieve the
currently logged-in user from the session.
Functions:
login_postpaid_user(request: Request, username: str):
Raises HTTPException if the user is not found.
logout_user(request: Request):
get_current_user(request: Request):
Retrieves the current user from the session, returning the user object if present.
"""
from fastapi import HTTPException
from starlette.requests import Request
from db.models import User, SessionLocal
from sqlalchemy.orm import Session
from sqlalchemy import text
from starlette.requests import Request
from db.models import User, get_db
from sqlalchemy.orm import Session
SESSION_KEY = "user"
from db.models import engine, get_postpaid_user
def login_user(request: Request, db: Session, username: str):
user = db.query(User).filter(User.username == username).first()
if user:
request.session[SESSION_KEY] = user.id
return user
SESSION_KEY = "user_db_id"
def login_postpaid_user(request: Request, username: str):
"""
Logs in a postpaid user by their username and stores their user ID in the session.
Args:
request (Request): The incoming HTTP request object, which contains the session.
username (str): The username of the postpaid user to log in.
Returns:
int or None: The user ID if the user is found and logged in; otherwise, None.
Raises:
HTTPException: If the user with the given username is not found (404 error).
"""
t = text("SELECT id FROM users_postpaid WHERE username = :username")
with engine.connect() as conn:
result = conn.execute(t, {"username": username}).fetchone()
if result:
user_id = result[0]
else:
raise HTTPException(status_code=404, detail="User nicht gefunden")
if user_id:
request.session[SESSION_KEY] = user_id
return user_id
return None
def logout_user(request: Request):
"""
Logs out the current user by removing their session data.
This function removes the user's session key and any associated user database information
from the session, effectively logging the user out.
Args:
request (Request): The incoming request object containing the session.
Returns:
None
"""
request.session.pop(SESSION_KEY, None)
request.session.pop("user_db", None)
def get_current_user(request: Request, db: Session = Depends(get_db)):
def get_current_user(request: Request):
"""
Retrieve the current user from the session in the given request.
Args:
request (Request): The incoming HTTP request containing the session.
Returns:
User or None: The user object associated with the session if present, otherwise None.
"""
user_id = request.session.get(SESSION_KEY)
# if user_id is None:
# raise HTTPException(status_code=401, detail="Nicht eingeloggt")
# user = db.query(User).filter(User.id == user_id).first()
user = user_id
if not user_id:
return None
user = get_postpaid_user(user_id)
return user