Working prototype

This commit is contained in:
2025-05-13 14:19:37 +02:00
parent 2678372ced
commit d1adfe9f93
12 changed files with 491 additions and 0 deletions

66
auth/oidc.py Normal file
View File

@@ -0,0 +1,66 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import RedirectResponse
from authlib.integrations.starlette_client import OAuth
from starlette.requests import Request
from dotenv import load_dotenv
import os
oauth = OAuth()
router = APIRouter()
# OIDC-Provider konfigurieren (hier als Beispiel Auth0)
# oauth.register(
# name="auth0",
# client_id="ZUZYpdYmqjMwdBDb2GdWWX4xkASNe2gsYqLlF9dy",
# client_secret="o6LXTspeaiAMhvPyX2vplQ0RUsRGhthFadg1M5LOJylpQXm9A0d8YJ4CeNwq0kAg2BrdCM7UyfZFOlnVjrJS2o4fBvvhLWDfbd7LhScCzde4Heh5P3C26ZWCRGQppJhb",
# authorize_url="https://login.fs.cs.uni-frankfurt.de/application/o/authorize/",
# authorize_params=None,
# access_token_url="https://login.fs.cs.uni-frankfurt.de/application/o/token/",
# access_token_params=None,
# client_kwargs={"scope": "openid profile email"},
# server_metadata_url="https://login.fs.cs.uni-frankfurt.de/application/o/testkicker/.well-known/openid-configuration",
# api_base_url="https://login.fs.cs.uni-frankfurt.de/application/o/testkicker/",
# jwks_uri="https://login.fs.cs.uni-frankfurt.de/application/o/testkicker/jwks/",
# userinfo_endpoint="https://login.fs.cs.uni-frankfurt.de/application/o/userinfo/",
# )
load_dotenv()
oauth.register(
name="auth0",
client_id=os.getenv("OIDC_CLIENT_ID"),
client_secret=os.getenv("OIDC_CLIENT_SECRET"),
server_metadata_url=os.getenv("OIDC_CONFIG_URL"),
client_kwargs={
"scope": "openid email profile"
},
)
@router.get("/login/oidc")
async def login(request: Request):
auth0_client = oauth.create_client("auth0")
redirect_uri = os.getenv("OIDC_REDIRECT_URI")
return await auth0_client.authorize_redirect(request, redirect_uri)
@router.route("/authorize")
async def authorize(request: Request):
token = await oauth.auth0.authorize_access_token(request)
userinfo_endpoint = oauth.auth0.server_metadata.get("userinfo_endpoint")
resp = await oauth.auth0.get(userinfo_endpoint, token=token)
resp.raise_for_status()
profile = resp.json()
print("Profile:", profile)
# save user info in session
request.session["user"] = profile
return RedirectResponse(url="/", status_code=303)
@router.get("/logout")
async def logout(request: Request):
request.session.pop("user", None)
logout_url = oauth.auth0.server_metadata.get("end_session_endpoint")
if not logout_url:
logout_url = os.getenv("OIDC_LOGOUT_URL")
return RedirectResponse(url=logout_url, status_code=303)

31
auth/session.py Normal file
View File

@@ -0,0 +1,31 @@
from fastapi import Depends, HTTPException
from starlette.requests import Request
from db.models import User, SessionLocal
from sqlalchemy.orm import Session
from starlette.requests import Request
from db.models import User, get_db
from sqlalchemy.orm import Session
SESSION_KEY = "user"
def login_user(request: Request, db: Session, username: str):
user = db.query(User).filter(User.username == username).first()
if user:
request.session[SESSION_KEY] = user.id
return user
return None
def logout_user(request: Request):
request.session.pop(SESSION_KEY, None)
def get_current_user(request: Request, db: Session = Depends(get_db)):
user_id = request.session.get(SESSION_KEY)
# if user_id is None:
# raise HTTPException(status_code=401, detail="Nicht eingeloggt")
# user = db.query(User).filter(User.id == user_id).first()
user = user_id
return user

86
auth/webauthn.py Normal file
View File

@@ -0,0 +1,86 @@
from fastapi import APIRouter, Request, Response, HTTPException
from fastapi.responses import JSONResponse, RedirectResponse
from webauthn import (
generate_authentication_options,
verify_authentication_response,
)
from webauthn import (
generate_registration_options,
verify_registration_response,
options_to_json,
base64url_to_bytes,
)
from webauthn.helpers.cose import COSEAlgorithmIdentifier
from webauthn.helpers.structs import (
AttestationConveyancePreference,
AuthenticatorAttachment,
AuthenticatorSelectionCriteria,
PublicKeyCredentialDescriptor,
PublicKeyCredentialHint,
ResidentKeyRequirement,
)
import os
router = APIRouter()
# Simulierte Userdatenbank (nur zum Testen!)
fake_users = {
"admin@example.com": {
"id": b"user-id-in-bytes",
"credential_id": b"credential-id-in-bytes",
"public_key": b"public-key-in-bytes",
"sign_count": 0
}
}
RP_ID = "localhost" # Oder deine Domain bei Produktivbetrieb
ORIGIN = "http://localhost:8000"
@router.get("/login/webauthn/start")
async def start_webauthn(request: Request):
email = "admin@example.com" # Hardcoded Demo-User
if email not in fake_users:
raise HTTPException(status_code=404, detail="User nicht gefunden")
user = fake_users[email]
options = PublicKeyCredentialRequestOptions(
challenge=os.urandom(32),
rp_id=RP_ID,
allow_credentials=[...],
timeout=60000,
)
# Speichere Challenge für später
request.session["challenge"] = options.challenge
return JSONResponse(content=options.model_dump())
@router.post("/login/webauthn/finish")
async def finish_webauthn(request: Request):
body = await request.json()
email = "admin@example.com" # Again, Demo-User
if email not in fake_users:
raise HTTPException(status_code=404, detail="User nicht gefunden")
user = fake_users[email]
try:
verified_auth = verify_authentication_response(
credential=AuthenticationCredential.parse_obj(body),
expected_challenge=request.session.get("challenge"),
expected_rp_id=RP_ID,
expected_origin=ORIGIN,
credential_public_key=user["public_key"],
credential_current_sign_count=user["sign_count"],
credential_id=user["credential_id"]
)
# Erfolg setze Session
request.session["user"] = email
return RedirectResponse(url="/", status_code=303)
except Exception as e:
return JSONResponse({"detail": f"WebAuthn fehlgeschlagen: {str(e)}"}, status_code=400)