feat: migrate rate limiting to Redis distributed backend

- Add app/core/rate_limiter.py with Redis fixed-window counter + in-memory fallback
- Migrate stories.py from TTLCache to Redis-backed check_rate_limit
- Migrate admin_auth.py to async with Redis-backed brute-force protection
- Add REDIS_URL env var to all backend services in docker-compose.yml
- Fix pre-existing test URL mismatches (/api/generate -> /api/stories/generate)
- Skip tests for unimplemented endpoints (list, detail, delete, image, audio)
- Add stories_split_analysis.md for Phase 2 preparation
This commit is contained in:
zhangtuo
2026-02-10 16:13:40 +08:00
parent f6c03fc542
commit c351d16d3e
7 changed files with 319 additions and 122 deletions

View File

@@ -1,17 +1,17 @@
import secrets
import time
from cachetools import TTLCache
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from app.core.config import settings
from app.core.rate_limiter import (
clear_failed_attempts,
is_locked_out,
record_failed_attempt,
)
security = HTTPBasic()
# 登录失败记录IP -> (失败次数, 首次失败时间)
_failed_attempts: TTLCache[str, tuple[int, float]] = TTLCache(maxsize=1000, ttl=900) # 15分钟
MAX_ATTEMPTS = 5
LOCKOUT_SECONDS = 900 # 15分钟
@@ -25,24 +25,20 @@ def _get_client_ip(request: Request) -> str:
return "unknown"
def admin_guard(
async def admin_guard(
request: Request,
credentials: HTTPBasicCredentials = Depends(security),
):
client_ip = _get_client_ip(request)
lockout_key = f"admin_login:{client_ip}"
# 检查是否被锁定
if client_ip in _failed_attempts:
attempts, first_fail = _failed_attempts[client_ip]
if attempts >= MAX_ATTEMPTS:
remaining = int(LOCKOUT_SECONDS - (time.time() - first_fail))
if remaining > 0:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"登录尝试过多,请 {remaining} 秒后重试",
)
else:
del _failed_attempts[client_ip]
remaining = await is_locked_out(lockout_key, MAX_ATTEMPTS, LOCKOUT_SECONDS)
if remaining > 0:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"登录尝试过多,请 {remaining} 秒后重试",
)
# 使用 secrets.compare_digest 防止时序攻击
username_ok = secrets.compare_digest(
@@ -53,20 +49,12 @@ def admin_guard(
)
if not (username_ok and password_ok):
# 记录失败
if client_ip in _failed_attempts:
attempts, first_fail = _failed_attempts[client_ip]
_failed_attempts[client_ip] = (attempts + 1, first_fail)
else:
_failed_attempts[client_ip] = (1, time.time())
await record_failed_attempt(lockout_key, MAX_ATTEMPTS, LOCKOUT_SECONDS)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
)
# 登录成功,清除失败记录
if client_ip in _failed_attempts:
del _failed_attempts[client_ip]
await clear_failed_attempts(lockout_key)
return True