import secrets from fastapi import Depends, HTTPException, Request, status from fastapi.security import HTTPBasic, HTTPBasicCredentials from app.core.config import settings from app.core.rate_limiter import ( clear_failed_attempts, is_locked_out, record_failed_attempt, ) security = HTTPBasic() MAX_ATTEMPTS = 5 LOCKOUT_SECONDS = 900 # 15分钟 def _get_client_ip(request: Request) -> str: forwarded = request.headers.get("x-forwarded-for") if forwarded: return forwarded.split(",")[0].strip() if request.client and request.client.host: return request.client.host return "unknown" async def admin_guard( request: Request, credentials: HTTPBasicCredentials = Depends(security), ): client_ip = _get_client_ip(request) lockout_key = f"admin_login:{client_ip}" # 检查是否被锁定 remaining = await is_locked_out(lockout_key, MAX_ATTEMPTS, LOCKOUT_SECONDS) if remaining > 0: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=f"登录尝试过多,请 {remaining} 秒后重试", ) # 使用 secrets.compare_digest 防止时序攻击 username_ok = secrets.compare_digest( credentials.username.encode(), settings.admin_username.encode() ) password_ok = secrets.compare_digest( credentials.password.encode(), settings.admin_password.encode() ) if not (username_ok and password_ok): await record_failed_attempt(lockout_key, MAX_ATTEMPTS, LOCKOUT_SECONDS) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误", ) # 登录成功,清除失败记录 await clear_failed_attempts(lockout_key) return True