import secrets from urllib.parse import urlencode import httpx from fastapi import APIRouter, Cookie, Depends, HTTPException, Query from fastapi.responses import RedirectResponse from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.deps import get_current_user from app.core.security import create_access_token from app.db.database import get_db from app.db.models import User router = APIRouter() # OAuth endpoints GITHUB_AUTHORIZE_URL = "https://github.com/login/oauth/authorize" GITHUB_TOKEN_URL = "https://github.com/login/oauth/access_token" GITHUB_USER_URL = "https://api.github.com/user" GOOGLE_AUTHORIZE_URL = "https://accounts.google.com/o/oauth2/v2/auth" GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" GOOGLE_USER_URL = "https://www.googleapis.com/oauth2/v2/userinfo" STATE_COOKIE = "oauth_state" STATE_MAX_AGE = 600 # 10 minutes def _set_state_cookie(response: RedirectResponse, provider: str, state: str) -> None: response.set_cookie( key=STATE_COOKIE, value=f"{provider}:{state}", httponly=True, secure=not settings.debug, samesite="lax", max_age=STATE_MAX_AGE, ) def _validate_state(state_from_query: str | None, state_cookie: str | None, provider: str): if not state_from_query or not state_cookie: raise HTTPException(status_code=400, detail="Missing OAuth state") expected_prefix = f"{provider}:" if not state_cookie.startswith(expected_prefix): raise HTTPException(status_code=400, detail="OAuth state mismatch") expected_state = state_cookie.removeprefix(expected_prefix) if not secrets.compare_digest(state_from_query, expected_state): raise HTTPException(status_code=400, detail="OAuth state mismatch") @router.get("/github/signin") async def github_signin(): """Start GitHub OAuth with state protection.""" state = secrets.token_urlsafe(16) params = { "client_id": settings.github_client_id, "redirect_uri": f"{settings.base_url}/auth/github/callback", "scope": "read:user user:email", "state": state, } url = f"{GITHUB_AUTHORIZE_URL}?{urlencode(params)}" response = RedirectResponse(url=url) _set_state_cookie(response, "github", state) return response @router.get("/github/callback") async def github_callback( code: str, state: str | None = Query(default=None), state_cookie: str | None = Cookie(default=None, alias=STATE_COOKIE), db: AsyncSession = Depends(get_db), ): """Handle GitHub OAuth callback.""" _validate_state(state, state_cookie, "github") try: async with httpx.AsyncClient() as client: token_resp = await client.post( GITHUB_TOKEN_URL, data={ "client_id": settings.github_client_id, "client_secret": settings.github_client_secret, "code": code, }, headers={"Accept": "application/json"}, ) token_resp.raise_for_status() token_data = token_resp.json() access_token = token_data.get("access_token") if not access_token: raise HTTPException(status_code=502, detail="GitHub login failed") user_resp = await client.get( GITHUB_USER_URL, headers={"Authorization": f"Bearer {access_token}"}, ) user_resp.raise_for_status() user_data = user_resp.json() except httpx.HTTPStatusError: raise HTTPException(status_code=502, detail="GitHub login failed") github_id = user_data.get("id") if github_id is None: raise HTTPException(status_code=502, detail="GitHub login failed") return await _handle_oauth_user( db=db, provider="github", user_id=str(github_id), name=user_data.get("name") or user_data.get("login") or "GitHub User", avatar_url=user_data.get("avatar_url"), ) @router.get("/google/signin") async def google_signin(): """Start Google OAuth with state protection.""" state = secrets.token_urlsafe(16) params = { "client_id": settings.google_client_id, "redirect_uri": f"{settings.base_url}/auth/google/callback", "response_type": "code", "scope": "openid email profile", "state": state, } url = f"{GOOGLE_AUTHORIZE_URL}?{urlencode(params)}" response = RedirectResponse(url=url) _set_state_cookie(response, "google", state) return response @router.get("/google/callback") async def google_callback( code: str, state: str | None = Query(default=None), state_cookie: str | None = Cookie(default=None, alias=STATE_COOKIE), db: AsyncSession = Depends(get_db), ): """Handle Google OAuth callback.""" _validate_state(state, state_cookie, "google") try: async with httpx.AsyncClient() as client: token_resp = await client.post( GOOGLE_TOKEN_URL, data={ "client_id": settings.google_client_id, "client_secret": settings.google_client_secret, "code": code, "grant_type": "authorization_code", "redirect_uri": f"{settings.base_url}/auth/google/callback", }, ) token_resp.raise_for_status() token_data = token_resp.json() access_token = token_data.get("access_token") if not access_token: raise HTTPException(status_code=502, detail="Google login failed") user_resp = await client.get( GOOGLE_USER_URL, headers={"Authorization": f"Bearer {access_token}"}, ) user_resp.raise_for_status() user_data = user_resp.json() except httpx.HTTPStatusError: raise HTTPException(status_code=502, detail="Google login failed") google_id = user_data.get("id") if google_id is None: raise HTTPException(status_code=502, detail="Google login failed") return await _handle_oauth_user( db=db, provider="google", user_id=str(google_id), name=user_data.get("name") or user_data.get("email") or "Google User", avatar_url=user_data.get("picture"), ) async def _handle_oauth_user( db: AsyncSession, provider: str, user_id: str, name: str, avatar_url: str | None, ) -> RedirectResponse: """Create/update user and issue session cookie.""" full_id = f"{provider}:{user_id}" result = await db.execute(select(User).where(User.id == full_id)) user = result.scalar_one_or_none() if not user: user = User( id=full_id, name=name, avatar_url=avatar_url, provider=provider, ) db.add(user) else: user.name = name user.avatar_url = avatar_url await db.commit() token = create_access_token({"sub": user.id}) frontend_url = "http://localhost:5173" if settings.cors_origins and len(settings.cors_origins) > 0: frontend_url = settings.cors_origins[0] response = RedirectResponse(url=f"{frontend_url}/my-stories", status_code=302) response.set_cookie( key="access_token", value=token, httponly=True, secure=not settings.debug, samesite="lax", max_age=60 * 60 * 24 * 7, # align with ACCESS_TOKEN_EXPIRE_DAYS ) response.delete_cookie(STATE_COOKIE) return response @router.post("/signout") async def signout(): """Sign out and clear cookies.""" response = RedirectResponse(url=settings.cors_origins[0], status_code=302) response.delete_cookie("access_token", samesite="lax", secure=not settings.debug) response.delete_cookie(STATE_COOKIE, samesite="lax", secure=not settings.debug) return response @router.get("/session") async def get_session(user: User | None = Depends(get_current_user)): """Fetch current session info.""" if not user: return {"user": None} return { "user": { "id": user.id, "name": user.name, "avatar_url": user.avatar_url, "provider": user.provider, } } @router.get("/dev/signin") async def dev_signin(db: AsyncSession = Depends(get_db)): """Developer backdoor login. Only works in DEBUG mode.""" # if not settings.debug: # raise HTTPException(status_code=403, detail="Developer login disabled") try: return await _handle_oauth_user( db=db, provider="github", user_id="dev_user_001", name="Developer", avatar_url="https://api.dicebear.com/7.x/avataaars/svg?seed=Developer" ) except Exception as e: import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=f"Dev login failed: {str(e)}")