286 lines
8.4 KiB
Python
286 lines
8.4 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
|
|
from fastapi import FastAPI
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from app.api import admin_providers
|
|
from app.core.admin_auth import admin_guard
|
|
from app.db.database import get_db
|
|
from app.db.models import Story, User
|
|
from app.services.generation_jobs import create_generation_job, record_generation_event
|
|
|
|
|
|
def _build_admin_test_app(db_session) -> FastAPI:
|
|
app = FastAPI()
|
|
app.include_router(admin_providers.router, prefix="/admin")
|
|
|
|
async def override_get_db():
|
|
yield db_session
|
|
|
|
async def override_admin_guard():
|
|
return True
|
|
|
|
app.dependency_overrides[get_db] = override_get_db
|
|
app.dependency_overrides[admin_guard] = override_admin_guard
|
|
return app
|
|
|
|
|
|
async def _create_story(
|
|
db_session,
|
|
*,
|
|
user_id: str,
|
|
title: str,
|
|
mode: str = "generated",
|
|
) -> Story:
|
|
story = Story(
|
|
user_id=user_id,
|
|
title=title,
|
|
story_text="测试内容",
|
|
cover_prompt="A gentle moonlit forest",
|
|
mode=mode,
|
|
generation_status="partial_ready",
|
|
text_status="ready",
|
|
image_status="not_requested",
|
|
audio_status="not_requested",
|
|
)
|
|
db_session.add(story)
|
|
await db_session.commit()
|
|
await db_session.refresh(story)
|
|
return story
|
|
|
|
|
|
async def test_admin_provider_analytics_aggregate_across_users(db_session, test_user):
|
|
second_user = User(
|
|
id="github:67890",
|
|
name="Another User",
|
|
avatar_url="https://example.com/avatar-2.png",
|
|
provider="github",
|
|
)
|
|
db_session.add(second_user)
|
|
await db_session.commit()
|
|
|
|
first_story = await _create_story(db_session, user_id=test_user.id, title="第一位用户的故事")
|
|
second_story = await _create_story(
|
|
db_session,
|
|
user_id=second_user.id,
|
|
title="第二位用户的故事",
|
|
)
|
|
|
|
image_job = await create_generation_job(
|
|
db_session,
|
|
user_id=test_user.id,
|
|
output_mode="asset_retry",
|
|
input_type="image",
|
|
request_payload={"assets": ["image"]},
|
|
story_id=first_story.id,
|
|
)
|
|
await record_generation_event(
|
|
db_session,
|
|
job=image_job,
|
|
story_id=first_story.id,
|
|
event_type="provider_call_succeeded",
|
|
status="succeeded",
|
|
metadata={
|
|
"capability": "image",
|
|
"adapter": "demo",
|
|
"strategy": "priority",
|
|
"latency_ms": 42,
|
|
"estimated_cost_usd": 0.01,
|
|
},
|
|
)
|
|
await record_generation_event(
|
|
db_session,
|
|
job=image_job,
|
|
story_id=first_story.id,
|
|
event_type="provider_call_failed",
|
|
status="failed",
|
|
metadata={
|
|
"capability": "image",
|
|
"adapter": "cqtai",
|
|
"strategy": "priority",
|
|
"latency_ms": 120,
|
|
"error": "timeout",
|
|
},
|
|
)
|
|
|
|
audio_job = await create_generation_job(
|
|
db_session,
|
|
user_id=second_user.id,
|
|
output_mode="asset_retry",
|
|
input_type="audio",
|
|
request_payload={"assets": ["audio"]},
|
|
story_id=second_story.id,
|
|
)
|
|
await record_generation_event(
|
|
db_session,
|
|
job=audio_job,
|
|
story_id=second_story.id,
|
|
event_type="provider_call_succeeded",
|
|
status="succeeded",
|
|
metadata={
|
|
"capability": "tts",
|
|
"adapter": "edge_tts",
|
|
"strategy": "priority",
|
|
"latency_ms": 18,
|
|
"estimated_cost_usd": 0.003,
|
|
},
|
|
)
|
|
|
|
admin_app = _build_admin_test_app(db_session)
|
|
transport = ASGITransport(app=admin_app)
|
|
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
response = await client.get("/admin/providers/analytics")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["scope"] == "current_environment"
|
|
assert data["user_count"] == 2
|
|
assert data["job_count"] == 2
|
|
assert data["story_count"] == 2
|
|
assert data["total_calls"] == 3
|
|
assert data["successful_calls"] == 2
|
|
assert data["failed_calls"] == 1
|
|
assert data["avg_latency_ms"] == 60.0
|
|
assert data["estimated_cost_usd"] == 0.013
|
|
assert data["failure_reasons"] == [{"reason": "timeout", "count": 1}]
|
|
assert data["by_provider"] == [
|
|
{
|
|
"capability": "image",
|
|
"adapter": "cqtai",
|
|
"call_count": 1,
|
|
"success_count": 0,
|
|
"failure_count": 1,
|
|
"avg_latency_ms": 120.0,
|
|
"estimated_cost_usd": 0.0,
|
|
},
|
|
{
|
|
"capability": "image",
|
|
"adapter": "demo",
|
|
"call_count": 1,
|
|
"success_count": 1,
|
|
"failure_count": 0,
|
|
"avg_latency_ms": 42.0,
|
|
"estimated_cost_usd": 0.01,
|
|
},
|
|
{
|
|
"capability": "tts",
|
|
"adapter": "edge_tts",
|
|
"call_count": 1,
|
|
"success_count": 1,
|
|
"failure_count": 0,
|
|
"avg_latency_ms": 18.0,
|
|
"estimated_cost_usd": 0.003,
|
|
},
|
|
]
|
|
assert data["by_user"] == [
|
|
{
|
|
"user_id": test_user.id,
|
|
"call_count": 2,
|
|
"success_count": 1,
|
|
"failure_count": 1,
|
|
"job_count": 1,
|
|
"story_count": 1,
|
|
"estimated_cost_usd": 0.01,
|
|
},
|
|
{
|
|
"user_id": second_user.id,
|
|
"call_count": 1,
|
|
"success_count": 1,
|
|
"failure_count": 0,
|
|
"job_count": 1,
|
|
"story_count": 1,
|
|
"estimated_cost_usd": 0.003,
|
|
},
|
|
]
|
|
|
|
|
|
async def test_admin_provider_analytics_support_days_and_capability_filters(
|
|
db_session,
|
|
test_user,
|
|
):
|
|
second_user = User(
|
|
id="google:22222",
|
|
name="Filter User",
|
|
avatar_url="https://example.com/avatar-3.png",
|
|
provider="google",
|
|
)
|
|
db_session.add(second_user)
|
|
await db_session.commit()
|
|
|
|
first_story = await _create_story(db_session, user_id=test_user.id, title="旧事件故事")
|
|
second_story = await _create_story(db_session, user_id=second_user.id, title="最近事件故事")
|
|
|
|
image_job = await create_generation_job(
|
|
db_session,
|
|
user_id=test_user.id,
|
|
output_mode="asset_retry",
|
|
input_type="image",
|
|
request_payload={"assets": ["image"]},
|
|
story_id=first_story.id,
|
|
)
|
|
old_event = await record_generation_event(
|
|
db_session,
|
|
job=image_job,
|
|
story_id=first_story.id,
|
|
event_type="provider_call_failed",
|
|
status="failed",
|
|
metadata={
|
|
"capability": "image",
|
|
"adapter": "cqtai",
|
|
"strategy": "priority",
|
|
"latency_ms": 120,
|
|
"error": "timeout",
|
|
},
|
|
)
|
|
old_event.created_at = datetime.now(timezone.utc) - timedelta(days=10)
|
|
await db_session.commit()
|
|
|
|
audio_job = await create_generation_job(
|
|
db_session,
|
|
user_id=second_user.id,
|
|
output_mode="asset_retry",
|
|
input_type="audio",
|
|
request_payload={"assets": ["audio"]},
|
|
story_id=second_story.id,
|
|
)
|
|
await record_generation_event(
|
|
db_session,
|
|
job=audio_job,
|
|
story_id=second_story.id,
|
|
event_type="provider_call_succeeded",
|
|
status="succeeded",
|
|
metadata={
|
|
"capability": "tts",
|
|
"adapter": "edge_tts",
|
|
"strategy": "priority",
|
|
"latency_ms": 18,
|
|
"estimated_cost_usd": 0.003,
|
|
},
|
|
)
|
|
|
|
admin_app = _build_admin_test_app(db_session)
|
|
transport = ASGITransport(app=admin_app)
|
|
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
response = await client.get("/admin/providers/analytics?days=7")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["window_days"] == 7
|
|
assert data["total_calls"] == 1
|
|
assert data["user_count"] == 1
|
|
assert data["job_count"] == 1
|
|
assert data["story_count"] == 1
|
|
assert data["failure_reasons"] == []
|
|
|
|
response = await client.get("/admin/providers/analytics?capability=image")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["capability"] == "image"
|
|
assert data["total_calls"] == 1
|
|
assert data["failed_calls"] == 1
|
|
assert data["user_count"] == 1
|
|
assert data["job_count"] == 1
|
|
assert data["story_count"] == 1
|
|
assert data["failure_reasons"] == [{"reason": "timeout", "count": 1}]
|