More database handling and admin interface
This commit is contained in:
20
auth/oidc.py
20
auth/oidc.py
@@ -3,6 +3,8 @@ from fastapi.responses import RedirectResponse
|
||||
from authlib.integrations.starlette_client import OAuth
|
||||
from starlette.requests import Request
|
||||
|
||||
from db.models import User, SessionLocal
|
||||
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
|
||||
@@ -40,7 +42,7 @@ oauth.register(
|
||||
@router.get("/login/oidc")
|
||||
async def login(request: Request):
|
||||
auth0_client = oauth.create_client("auth0")
|
||||
redirect_uri = os.getenv("OIDC_REDIRECT_URI")
|
||||
redirect_uri = os.getenv("OIDC_REDIRECT_URL")
|
||||
return await auth0_client.authorize_redirect(request, redirect_uri)
|
||||
|
||||
@router.route("/authorize")
|
||||
@@ -55,6 +57,22 @@ async def authorize(request: Request):
|
||||
# save user info in session
|
||||
request.session["user"] = profile
|
||||
|
||||
# check if user is already in the database
|
||||
db = SessionLocal()
|
||||
user_db = db.query(User).filter(User.username == profile["preferred_username"]).first()
|
||||
if not user_db:
|
||||
print("Create User in DB")
|
||||
user_db = User(
|
||||
username=profile["preferred_username"],
|
||||
role="user" # Default role
|
||||
)
|
||||
db.add(user_db)
|
||||
db.commit()
|
||||
db.refresh(user_db)
|
||||
db.close()
|
||||
|
||||
print("User in DB:", user_db)
|
||||
|
||||
return RedirectResponse(url="/", status_code=303)
|
||||
|
||||
@router.get("/logout")
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
from fastapi import APIRouter, Request, Response, HTTPException
|
||||
from fastapi.responses import JSONResponse, RedirectResponse
|
||||
from webauthn import (
|
||||
generate_authentication_options,
|
||||
verify_authentication_response,
|
||||
)
|
||||
from webauthn import (
|
||||
generate_registration_options,
|
||||
verify_registration_response,
|
||||
options_to_json,
|
||||
base64url_to_bytes,
|
||||
)
|
||||
from webauthn.helpers.cose import COSEAlgorithmIdentifier
|
||||
from webauthn.helpers.structs import (
|
||||
AttestationConveyancePreference,
|
||||
AuthenticatorAttachment,
|
||||
AuthenticatorSelectionCriteria,
|
||||
PublicKeyCredentialDescriptor,
|
||||
PublicKeyCredentialHint,
|
||||
ResidentKeyRequirement,
|
||||
)
|
||||
|
||||
import os
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
# Simulierte Userdatenbank (nur zum Testen!)
|
||||
fake_users = {
|
||||
"admin@example.com": {
|
||||
"id": b"user-id-in-bytes",
|
||||
"credential_id": b"credential-id-in-bytes",
|
||||
"public_key": b"public-key-in-bytes",
|
||||
"sign_count": 0
|
||||
}
|
||||
}
|
||||
|
||||
RP_ID = "localhost" # Oder deine Domain bei Produktivbetrieb
|
||||
ORIGIN = "http://localhost:8000"
|
||||
|
||||
@router.get("/login/webauthn/start")
|
||||
async def start_webauthn(request: Request):
|
||||
email = "admin@example.com" # Hardcoded Demo-User
|
||||
|
||||
if email not in fake_users:
|
||||
raise HTTPException(status_code=404, detail="User nicht gefunden")
|
||||
|
||||
user = fake_users[email]
|
||||
|
||||
options = PublicKeyCredentialRequestOptions(
|
||||
challenge=os.urandom(32),
|
||||
rp_id=RP_ID,
|
||||
allow_credentials=[...],
|
||||
timeout=60000,
|
||||
)
|
||||
|
||||
# Speichere Challenge für später
|
||||
request.session["challenge"] = options.challenge
|
||||
return JSONResponse(content=options.model_dump())
|
||||
|
||||
@router.post("/login/webauthn/finish")
|
||||
async def finish_webauthn(request: Request):
|
||||
body = await request.json()
|
||||
email = "admin@example.com" # Again, Demo-User
|
||||
|
||||
if email not in fake_users:
|
||||
raise HTTPException(status_code=404, detail="User nicht gefunden")
|
||||
|
||||
user = fake_users[email]
|
||||
|
||||
try:
|
||||
verified_auth = verify_authentication_response(
|
||||
credential=AuthenticationCredential.parse_obj(body),
|
||||
expected_challenge=request.session.get("challenge"),
|
||||
expected_rp_id=RP_ID,
|
||||
expected_origin=ORIGIN,
|
||||
credential_public_key=user["public_key"],
|
||||
credential_current_sign_count=user["sign_count"],
|
||||
credential_id=user["credential_id"]
|
||||
)
|
||||
|
||||
# Erfolg – setze Session
|
||||
request.session["user"] = email
|
||||
return RedirectResponse(url="/", status_code=303)
|
||||
|
||||
except Exception as e:
|
||||
return JSONResponse({"detail": f"WebAuthn fehlgeschlagen: {str(e)}"}, status_code=400)
|
||||
|
||||
Reference in New Issue
Block a user