feat: add admin provider analytics dashboard

This commit is contained in:
2026-04-19 18:56:17 +08:00
parent b89ca96e4b
commit 395cdf4edd
12 changed files with 886 additions and 51 deletions

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
@@ -8,6 +8,7 @@ from app.db.admin_models import Provider
from app.db.database import get_db
from app.services.adapters.registry import AdapterRegistry
from app.services.cost_tracker import cost_tracker
from app.services.generation_jobs import get_admin_provider_analytics
from app.services.provider_policy import DEFAULT_PROVIDERS, list_capability_policies
from app.services.secret_service import SecretService
@@ -56,6 +57,48 @@ class ProviderResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)
class ProviderAnalyticsBucket(BaseModel):
capability: str
adapter: str
call_count: int
success_count: int
failure_count: int
avg_latency_ms: float | None = None
estimated_cost_usd: float
class ProviderAnalyticsUserBucket(BaseModel):
user_id: str
call_count: int
success_count: int
failure_count: int
job_count: int
story_count: int
estimated_cost_usd: float
class ProviderAnalyticsFailureReason(BaseModel):
reason: str
count: int
class ProviderAnalyticsResponse(BaseModel):
scope: str
window_days: int | None = None
capability: str | None = None
total_calls: int
successful_calls: int
failed_calls: int
avg_latency_ms: float | None = None
estimated_cost_usd: float
user_count: int
job_count: int
story_count: int
by_provider: list[ProviderAnalyticsBucket]
by_user: list[ProviderAnalyticsUserBucket]
failure_reasons: list[ProviderAnalyticsFailureReason]
@router.get("/providers/adapters")
async def list_available_adapters():
"""获取所有可用的适配器类型 (定义的类)。"""
@@ -74,6 +117,20 @@ async def list_provider_capabilities():
return list_capability_policies()
@router.get("/providers/analytics", response_model=ProviderAnalyticsResponse)
async def get_provider_analytics(
days: int | None = Query(default=None, ge=1, le=365),
capability: str | None = Query(default=None),
db: AsyncSession = Depends(get_db),
):
"""获取当前环境跨用户的 Provider 运营摘要。"""
return await get_admin_provider_analytics(
db,
days=days,
capability=capability,
)
@router.get("/providers", response_model=list[ProviderResponse])
async def list_providers(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Provider))

View File

@@ -606,23 +606,37 @@ def _aggregate_provider_events(
}
def _event_matches_capability(
event: GenerationJobEvent,
capability: str | None = None,
) -> bool:
event_capability = str((event.event_metadata or {}).get("capability") or "unknown")
return capability is None or event_capability == capability
def _provider_events_query(
*,
user_id: str,
user_id: str | None = None,
story_id: int | None = None,
days: int | None = None,
):
query = (
select(GenerationJobEvent)
select(
GenerationJobEvent,
GenerationJob.user_id,
GenerationJob.story_id,
)
.join(GenerationJob, GenerationJobEvent.job_id == GenerationJob.id)
.where(
GenerationJob.user_id == user_id,
GenerationJobEvent.event_type.in_(
["provider_call_succeeded", "provider_call_failed"]
),
)
)
if user_id is not None:
query = query.where(GenerationJob.user_id == user_id)
if story_id is not None:
query = query.where(GenerationJob.story_id == story_id)
@@ -681,17 +695,12 @@ async def get_user_provider_analytics(
filtered_event_job_ids = {
event.job_id
for event in events
if capability is None
or str((event.event_metadata or {}).get("capability") or "unknown") == capability
if _event_matches_capability(event, capability)
}
filtered_story_ids = {
event.story_id
for event in events
if event.story_id is not None
and (
capability is None
or str((event.event_metadata or {}).get("capability") or "unknown") == capability
)
if event.story_id is not None and _event_matches_capability(event, capability)
}
return {
@@ -703,6 +712,87 @@ async def get_user_provider_analytics(
}
async def get_admin_provider_analytics(
db: AsyncSession,
*,
days: int | None = None,
capability: str | None = None,
) -> dict[str, Any]:
"""Aggregate provider telemetry across every user in the current environment."""
rows = (await db.execute(_provider_events_query(days=days))).all()
events = [event for event, _, _ in rows]
filtered_rows = [
(event, user_id, story_id)
for event, user_id, story_id in rows
if _event_matches_capability(event, capability)
]
by_user: dict[str, dict[str, Any]] = {}
filtered_job_ids = {event.job_id for event, _, _ in filtered_rows}
filtered_story_ids = {
story_id for _, _, story_id in filtered_rows if story_id is not None
}
filtered_user_ids = {user_id for _, user_id, _ in filtered_rows}
for event, user_id, story_id in filtered_rows:
bucket = by_user.setdefault(
user_id,
{
"user_id": user_id,
"call_count": 0,
"success_count": 0,
"failure_count": 0,
"estimated_cost_usd": 0.0,
"job_ids": set(),
"story_ids": set(),
},
)
bucket["call_count"] += 1
bucket["job_ids"].add(event.job_id)
if story_id is not None:
bucket["story_ids"].add(story_id)
if event.event_type == "provider_call_succeeded":
bucket["success_count"] += 1
bucket["estimated_cost_usd"] += (
_as_float((event.event_metadata or {}).get("estimated_cost_usd")) or 0.0
)
else:
bucket["failure_count"] += 1
serialized_users = [
{
"user_id": user_id,
"call_count": bucket["call_count"],
"success_count": bucket["success_count"],
"failure_count": bucket["failure_count"],
"job_count": len(bucket["job_ids"]),
"story_count": len(bucket["story_ids"]),
"estimated_cost_usd": round(bucket["estimated_cost_usd"], 6),
}
for user_id, bucket in by_user.items()
]
serialized_users.sort(
key=lambda item: (
-int(item["call_count"]),
-float(item["estimated_cost_usd"]),
str(item["user_id"]),
)
)
return {
"scope": "current_environment",
"window_days": days,
"capability": capability,
**_aggregate_provider_events(events, capability=capability),
"user_count": len(filtered_user_ids),
"job_count": len(filtered_job_ids),
"story_count": len(filtered_story_ids),
"by_user": serialized_users,
}
async def get_user_generation_ops_summary(
db: AsyncSession,
*,

View File

@@ -0,0 +1,285 @@
from datetime import datetime, timedelta, timezone
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
from app.api import admin_providers
from app.core.admin_auth import admin_guard
from app.db.database import get_db
from app.db.models import Story, User
from app.services.generation_jobs import create_generation_job, record_generation_event
def _build_admin_test_app(db_session) -> FastAPI:
app = FastAPI()
app.include_router(admin_providers.router, prefix="/admin")
async def override_get_db():
yield db_session
async def override_admin_guard():
return True
app.dependency_overrides[get_db] = override_get_db
app.dependency_overrides[admin_guard] = override_admin_guard
return app
async def _create_story(
db_session,
*,
user_id: str,
title: str,
mode: str = "generated",
) -> Story:
story = Story(
user_id=user_id,
title=title,
story_text="测试内容",
cover_prompt="A gentle moonlit forest",
mode=mode,
generation_status="partial_ready",
text_status="ready",
image_status="not_requested",
audio_status="not_requested",
)
db_session.add(story)
await db_session.commit()
await db_session.refresh(story)
return story
async def test_admin_provider_analytics_aggregate_across_users(db_session, test_user):
second_user = User(
id="github:67890",
name="Another User",
avatar_url="https://example.com/avatar-2.png",
provider="github",
)
db_session.add(second_user)
await db_session.commit()
first_story = await _create_story(db_session, user_id=test_user.id, title="第一位用户的故事")
second_story = await _create_story(
db_session,
user_id=second_user.id,
title="第二位用户的故事",
)
image_job = await create_generation_job(
db_session,
user_id=test_user.id,
output_mode="asset_retry",
input_type="image",
request_payload={"assets": ["image"]},
story_id=first_story.id,
)
await record_generation_event(
db_session,
job=image_job,
story_id=first_story.id,
event_type="provider_call_succeeded",
status="succeeded",
metadata={
"capability": "image",
"adapter": "demo",
"strategy": "priority",
"latency_ms": 42,
"estimated_cost_usd": 0.01,
},
)
await record_generation_event(
db_session,
job=image_job,
story_id=first_story.id,
event_type="provider_call_failed",
status="failed",
metadata={
"capability": "image",
"adapter": "cqtai",
"strategy": "priority",
"latency_ms": 120,
"error": "timeout",
},
)
audio_job = await create_generation_job(
db_session,
user_id=second_user.id,
output_mode="asset_retry",
input_type="audio",
request_payload={"assets": ["audio"]},
story_id=second_story.id,
)
await record_generation_event(
db_session,
job=audio_job,
story_id=second_story.id,
event_type="provider_call_succeeded",
status="succeeded",
metadata={
"capability": "tts",
"adapter": "edge_tts",
"strategy": "priority",
"latency_ms": 18,
"estimated_cost_usd": 0.003,
},
)
admin_app = _build_admin_test_app(db_session)
transport = ASGITransport(app=admin_app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get("/admin/providers/analytics")
assert response.status_code == 200
data = response.json()
assert data["scope"] == "current_environment"
assert data["user_count"] == 2
assert data["job_count"] == 2
assert data["story_count"] == 2
assert data["total_calls"] == 3
assert data["successful_calls"] == 2
assert data["failed_calls"] == 1
assert data["avg_latency_ms"] == 60.0
assert data["estimated_cost_usd"] == 0.013
assert data["failure_reasons"] == [{"reason": "timeout", "count": 1}]
assert data["by_provider"] == [
{
"capability": "image",
"adapter": "cqtai",
"call_count": 1,
"success_count": 0,
"failure_count": 1,
"avg_latency_ms": 120.0,
"estimated_cost_usd": 0.0,
},
{
"capability": "image",
"adapter": "demo",
"call_count": 1,
"success_count": 1,
"failure_count": 0,
"avg_latency_ms": 42.0,
"estimated_cost_usd": 0.01,
},
{
"capability": "tts",
"adapter": "edge_tts",
"call_count": 1,
"success_count": 1,
"failure_count": 0,
"avg_latency_ms": 18.0,
"estimated_cost_usd": 0.003,
},
]
assert data["by_user"] == [
{
"user_id": test_user.id,
"call_count": 2,
"success_count": 1,
"failure_count": 1,
"job_count": 1,
"story_count": 1,
"estimated_cost_usd": 0.01,
},
{
"user_id": second_user.id,
"call_count": 1,
"success_count": 1,
"failure_count": 0,
"job_count": 1,
"story_count": 1,
"estimated_cost_usd": 0.003,
},
]
async def test_admin_provider_analytics_support_days_and_capability_filters(
db_session,
test_user,
):
second_user = User(
id="google:22222",
name="Filter User",
avatar_url="https://example.com/avatar-3.png",
provider="google",
)
db_session.add(second_user)
await db_session.commit()
first_story = await _create_story(db_session, user_id=test_user.id, title="旧事件故事")
second_story = await _create_story(db_session, user_id=second_user.id, title="最近事件故事")
image_job = await create_generation_job(
db_session,
user_id=test_user.id,
output_mode="asset_retry",
input_type="image",
request_payload={"assets": ["image"]},
story_id=first_story.id,
)
old_event = await record_generation_event(
db_session,
job=image_job,
story_id=first_story.id,
event_type="provider_call_failed",
status="failed",
metadata={
"capability": "image",
"adapter": "cqtai",
"strategy": "priority",
"latency_ms": 120,
"error": "timeout",
},
)
old_event.created_at = datetime.now(timezone.utc) - timedelta(days=10)
await db_session.commit()
audio_job = await create_generation_job(
db_session,
user_id=second_user.id,
output_mode="asset_retry",
input_type="audio",
request_payload={"assets": ["audio"]},
story_id=second_story.id,
)
await record_generation_event(
db_session,
job=audio_job,
story_id=second_story.id,
event_type="provider_call_succeeded",
status="succeeded",
metadata={
"capability": "tts",
"adapter": "edge_tts",
"strategy": "priority",
"latency_ms": 18,
"estimated_cost_usd": 0.003,
},
)
admin_app = _build_admin_test_app(db_session)
transport = ASGITransport(app=admin_app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get("/admin/providers/analytics?days=7")
assert response.status_code == 200
data = response.json()
assert data["window_days"] == 7
assert data["total_calls"] == 1
assert data["user_count"] == 1
assert data["job_count"] == 1
assert data["story_count"] == 1
assert data["failure_reasons"] == []
response = await client.get("/admin/providers/analytics?capability=image")
assert response.status_code == 200
data = response.json()
assert data["capability"] == "image"
assert data["total_calls"] == 1
assert data["failed_calls"] == 1
assert data["user_count"] == 1
assert data["job_count"] == 1
assert data["story_count"] == 1
assert data["failure_reasons"] == [{"reason": "timeout", "count": 1}]