import secrets import time from cachetools import TTLCache from fastapi import Depends, HTTPException, Request, status from fastapi.security import HTTPBasic, HTTPBasicCredentials from app.core.config import settings security = HTTPBasic() # 登录失败记录:IP -> (失败次数, 首次失败时间) _failed_attempts: TTLCache[str, tuple[int, float]] = TTLCache(maxsize=1000, ttl=900) # 15分钟 MAX_ATTEMPTS = 5 LOCKOUT_SECONDS = 900 # 15分钟 def _get_client_ip(request: Request) -> str: forwarded = request.headers.get("x-forwarded-for") if forwarded: return forwarded.split(",")[0].strip() if request.client and request.client.host: return request.client.host return "unknown" def admin_guard( request: Request, credentials: HTTPBasicCredentials = Depends(security), ): client_ip = _get_client_ip(request) # 检查是否被锁定 if client_ip in _failed_attempts: attempts, first_fail = _failed_attempts[client_ip] if attempts >= MAX_ATTEMPTS: remaining = int(LOCKOUT_SECONDS - (time.time() - first_fail)) if remaining > 0: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=f"登录尝试过多,请 {remaining} 秒后重试", ) else: del _failed_attempts[client_ip] # 使用 secrets.compare_digest 防止时序攻击 username_ok = secrets.compare_digest( credentials.username.encode(), settings.admin_username.encode() ) password_ok = secrets.compare_digest( credentials.password.encode(), settings.admin_password.encode() ) if not (username_ok and password_ok): # 记录失败 if client_ip in _failed_attempts: attempts, first_fail = _failed_attempts[client_ip] _failed_attempts[client_ip] = (attempts + 1, first_fail) else: _failed_attempts[client_ip] = (1, time.time()) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误", ) # 登录成功,清除失败记录 if client_ip in _failed_attempts: del _failed_attempts[client_ip] return True