from datetime import datetime, timedelta, timezone from decimal import Decimal from fastapi import FastAPI from httpx import ASGITransport, AsyncClient from app.api import admin_providers from app.core.admin_auth import admin_guard from app.db.admin_models import CostRecord from app.db.database import get_db from app.db.models import Story, User, VoiceSession, VoiceSessionEvent, VoiceTurn from app.services.generation_jobs import create_generation_job, record_generation_event def _build_admin_test_app(db_session) -> FastAPI: app = FastAPI() app.include_router(admin_providers.router, prefix="/admin") async def override_get_db(): yield db_session async def override_admin_guard(): return True app.dependency_overrides[get_db] = override_get_db app.dependency_overrides[admin_guard] = override_admin_guard return app def _build_admin_auth_required_test_app(db_session) -> FastAPI: app = FastAPI() app.include_router(admin_providers.router, prefix="/admin") async def override_get_db(): yield db_session app.dependency_overrides[get_db] = override_get_db return app async def _create_story( db_session, *, user_id: str, title: str, mode: str = "generated", ) -> Story: story = Story( user_id=user_id, title=title, story_text="测试内容", cover_prompt="A gentle moonlit forest", mode=mode, generation_status="partial_ready", text_status="ready", image_status="not_requested", audio_status="not_requested", ) db_session.add(story) await db_session.commit() await db_session.refresh(story) return story async def _record_evaluation_event( db_session, *, user_id: str, story_id: int, output_mode: str, artifact: str, status: str, metadata: dict, ): job = await create_generation_job( db_session, user_id=user_id, output_mode=output_mode, input_type="keywords", request_payload={"data": "测试"}, story_id=story_id, ) return await record_generation_event( db_session, job=job, story_id=story_id, event_type="evaluation_completed", status=status, metadata={ "step": "evaluation", "artifact": artifact, **metadata, }, ) async def test_admin_provider_analytics_aggregate_across_users(db_session, test_user): second_user = User( id="github:67890", name="Another User", avatar_url="https://example.com/avatar-2.png", provider="github", ) db_session.add(second_user) await db_session.commit() first_story = await _create_story(db_session, user_id=test_user.id, title="第一位用户的故事") second_story = await _create_story( db_session, user_id=second_user.id, title="第二位用户的故事", ) image_job = await create_generation_job( db_session, user_id=test_user.id, output_mode="asset_retry", input_type="image", request_payload={"assets": ["image"]}, story_id=first_story.id, ) await record_generation_event( db_session, job=image_job, story_id=first_story.id, event_type="provider_call_succeeded", status="succeeded", metadata={ "capability": "image", "adapter": "demo", "strategy": "priority", "latency_ms": 42, "estimated_cost_usd": 0.01, }, ) await record_generation_event( db_session, job=image_job, story_id=first_story.id, event_type="provider_call_failed", status="failed", metadata={ "capability": "image", "adapter": "cqtai", "strategy": "priority", "latency_ms": 120, "error": "timeout", }, ) audio_job = await create_generation_job( db_session, user_id=second_user.id, output_mode="asset_retry", input_type="audio", request_payload={"assets": ["audio"]}, story_id=second_story.id, ) await record_generation_event( db_session, job=audio_job, story_id=second_story.id, event_type="provider_call_succeeded", status="succeeded", metadata={ "capability": "tts", "adapter": "edge_tts", "strategy": "priority", "latency_ms": 18, "estimated_cost_usd": 0.003, }, ) admin_app = _build_admin_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/providers/analytics") assert response.status_code == 200 data = response.json() assert data["scope"] == "current_environment" assert data["user_count"] == 2 assert data["job_count"] == 2 assert data["story_count"] == 2 assert data["total_calls"] == 3 assert data["successful_calls"] == 2 assert data["failed_calls"] == 1 assert data["avg_latency_ms"] == 60.0 assert data["estimated_cost_usd"] == 0.013 assert data["failure_reasons"] == [{"reason": "timeout", "count": 1}] assert data["by_provider"] == [ { "capability": "image", "adapter": "cqtai", "call_count": 1, "success_count": 0, "failure_count": 1, "avg_latency_ms": 120.0, "estimated_cost_usd": 0.0, }, { "capability": "image", "adapter": "demo", "call_count": 1, "success_count": 1, "failure_count": 0, "avg_latency_ms": 42.0, "estimated_cost_usd": 0.01, }, { "capability": "tts", "adapter": "edge_tts", "call_count": 1, "success_count": 1, "failure_count": 0, "avg_latency_ms": 18.0, "estimated_cost_usd": 0.003, }, ] assert data["by_user"] == [ { "user_id": test_user.id, "call_count": 2, "success_count": 1, "failure_count": 1, "job_count": 1, "story_count": 1, "estimated_cost_usd": 0.01, }, { "user_id": second_user.id, "call_count": 1, "success_count": 1, "failure_count": 0, "job_count": 1, "story_count": 1, "estimated_cost_usd": 0.003, }, ] async def test_admin_evaluation_analytics_aggregate_internal_events( db_session, test_user, ): second_user = User( id="google:evaluation-user", name="Evaluation User", avatar_url="https://example.com/eval.png", provider="google", ) db_session.add(second_user) await db_session.commit() story = await _create_story(db_session, user_id=test_user.id, title="评测故事") storybook = await _create_story( db_session, user_id=second_user.id, title="评测绘本", mode="storybook", ) await _record_evaluation_event( db_session, user_id=test_user.id, story_id=story.id, output_mode="story", artifact="story_text", status="succeeded", metadata={ "overall_score": 0.92, "passed": True, "blocking": False, "scores": [ {"dimension": "structure", "score": 1.0, "reason": "完整"}, {"dimension": "readability", "score": 0.84, "reason": "可读"}, ], "warnings": [], }, ) await _record_evaluation_event( db_session, user_id=second_user.id, story_id=storybook.id, output_mode="storybook", artifact="storybook_pages", status="failed", metadata={ "overall_score": 0.0, "passed": False, "blocking": True, "scores": [ {"dimension": "structure", "score": 0.0, "reason": "结构失败"}, {"dimension": "safety", "score": 0.0, "reason": "安全失败"}, ], "quality_gate": { "issues": [ { "code": "unsafe_child_content", "message": "风险词", "failure_category": "safety_error", "field": "pages", } ] }, "warnings": ["绘本分页正文长度可能不适合 3-8 岁儿童的翻页阅读体验。"], }, ) admin_app = _build_admin_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/evaluations/analytics") assert response.status_code == 200 data = response.json() assert data["scope"] == "admin_internal_evaluations" assert data["total_evaluations"] == 2 assert data["passed_evaluations"] == 1 assert data["blocked_evaluations"] == 1 assert data["pass_rate"] == 0.5 assert data["average_score"] == 0.46 assert data["job_count"] == 2 assert data["story_count"] == 2 assert data["user_count"] == 2 assert data["by_artifact"] == [ {"artifact": "story_text", "count": 1}, {"artifact": "storybook_pages", "count": 1}, ] assert data["by_output_mode"] == [ {"output_mode": "story", "count": 1}, {"output_mode": "storybook", "count": 1}, ] assert data["score_bands"] == [ {"band": "blocked_quality_gate", "count": 1}, {"band": "excellent", "count": 1}, ] assert data["dimension_scores"] == [ {"dimension": "structure", "average_score": 0.5, "count": 2}, {"dimension": "readability", "average_score": 0.84, "count": 1}, {"dimension": "safety", "average_score": 0.0, "count": 1}, ] assert data["quality_gate_issues"] == [ {"code": "unsafe_child_content", "count": 1}, ] assert data["failure_categories"] == [ {"category": "safety_error", "count": 1}, ] assert data["warnings"] == [ { "message": "绘本分页正文长度可能不适合 3-8 岁儿童的翻页阅读体验。", "count": 1, }, ] assert "评测故事" not in str(data) assert "风险词" not in str(data) assert "完整" not in str(data) async def test_admin_evaluation_analytics_support_days_and_artifact_filters( db_session, test_user, ): story = await _create_story(db_session, user_id=test_user.id, title="旧评测") storybook = await _create_story( db_session, user_id=test_user.id, title="新评测", mode="storybook", ) old_event = await _record_evaluation_event( db_session, user_id=test_user.id, story_id=story.id, output_mode="story", artifact="story_text", status="succeeded", metadata={ "overall_score": 0.96, "passed": True, "blocking": False, "scores": [{"dimension": "structure", "score": 1.0, "reason": "完整"}], "warnings": [], }, ) old_event.created_at = datetime.now(timezone.utc) - timedelta(days=10) await db_session.commit() await _record_evaluation_event( db_session, user_id=test_user.id, story_id=storybook.id, output_mode="storybook", artifact="storybook_pages", status="failed", metadata={ "overall_score": 0.72, "passed": False, "blocking": True, "scores": [{"dimension": "readability", "score": 0.62, "reason": "过短"}], "warnings": ["分页正文长度偏短"], }, ) admin_app = _build_admin_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/evaluations/analytics?days=7") assert response.status_code == 200 data = response.json() assert data["window_days"] == 7 assert data["total_evaluations"] == 1 assert data["artifact"] is None assert data["by_artifact"] == [{"artifact": "storybook_pages", "count": 1}] response = await client.get( "/admin/evaluations/analytics?artifact=story_text" ) assert response.status_code == 200 data = response.json() assert data["artifact"] == "story_text" assert data["total_evaluations"] == 1 assert data["average_score"] == 0.96 response = await client.get("/admin/evaluations/analytics?artifact=image") assert response.status_code == 422 async def test_admin_evaluation_analytics_requires_admin_auth(db_session): admin_app = _build_admin_auth_required_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/evaluations/analytics") assert response.status_code == 401 async def test_admin_generation_job_trace_returns_internal_event_stream( db_session, test_user, ): story = await _create_story(db_session, user_id=test_user.id, title="内部链路故事") job = await create_generation_job( db_session, user_id=test_user.id, output_mode="story", input_type="keywords", request_payload={ "output_mode": "story", "type": "keywords", "data": "月亮森林", "internal_dispatch_token": "admin-visible-token", "provider_override": "internal-provider", "evaluation_policy": {"threshold": 0.9}, }, story_id=story.id, ) await record_generation_event( db_session, job=job, story_id=story.id, event_type="workflow_planned", status="succeeded", metadata={ "step": "request_acceptance", "artifact": "none", "plan": { "mode": "story", "tasks": [ { "key": "generate_narrative", "step": "text_generation", "artifact": "story_text", "required": True, "recoverable": False, } ], }, "internal_threshold": 0.9, }, ) await record_generation_event( db_session, job=job, story_id=story.id, event_type="evaluation_completed", status="succeeded", metadata={ "step": "evaluation", "artifact": "story_text", "overall_score": 0.94, "passed": True, "blocking": False, "scores": [{"dimension": "structure", "score": 1.0}], }, ) await record_generation_event( db_session, job=job, story_id=story.id, event_type="executor_completed", status="succeeded", metadata={ "plan_mode": "asset_generation", "planned_task_count": 3, "executed_task_count": 1, "ignored_task_count": 2, "executed_task_keys": ["complete_image_asset"], "ignored_task_keys": [ "start_asset_generation", "complete_asset_generation", ], "result_assets": ["cover_image"], }, ) admin_app = _build_admin_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get(f"/admin/generations/jobs/{job.id}/trace") assert response.status_code == 200 data = response.json() assert data["id"] == job.id assert data["user_id"] == test_user.id assert data["request_payload"]["data"] == "月亮森林" assert data["request_payload"]["internal_dispatch_token"] == "admin-visible-token" assert data["request_payload"]["evaluation_policy"] == {"threshold": 0.9} event_types = [event["event_type"] for event in data["events"]] assert event_types == [ "request_accepted", "workflow_planned", "evaluation_completed", "executor_completed", ] workflow_event = data["events"][1] assert workflow_event["event_metadata"]["plan"]["tasks"][0]["key"] == ( "generate_narrative" ) assert workflow_event["event_metadata"]["internal_threshold"] == 0.9 evaluation_event = data["events"][2] assert evaluation_event["event_metadata"]["overall_score"] == 0.94 assert evaluation_event["event_metadata"]["scores"] == [ {"dimension": "structure", "score": 1.0} ] executor_event = data["events"][3] assert executor_event["event_metadata"]["executed_task_keys"] == [ "complete_image_asset" ] assert executor_event["event_metadata"]["result_assets"] == ["cover_image"] executor_coverage = data["executor_coverage"] assert executor_coverage["scope"] == "admin_internal_job_executor_coverage" assert executor_coverage["total_runs"] == 1 assert executor_coverage["total_planned_tasks"] == 3 assert executor_coverage["total_executed_tasks"] == 1 assert executor_coverage["total_ignored_tasks"] == 2 assert executor_coverage["coverage_ratio"] == 0.3333 assert executor_coverage["job_count"] == 1 assert executor_coverage["story_count"] == 1 assert executor_coverage["user_count"] == 1 assert executor_coverage["by_plan_mode"] == [ {"plan_mode": "asset_generation", "count": 1} ] assert executor_coverage["by_output_mode"] == [ {"output_mode": "story", "count": 1} ] assert executor_coverage["executed_task_keys"] == [ {"task_key": "complete_image_asset", "count": 1} ] assert executor_coverage["ignored_task_keys"] == [ {"task_key": "complete_asset_generation", "count": 1}, {"task_key": "start_asset_generation", "count": 1}, ] assert executor_coverage["result_assets"] == [ {"asset": "cover_image", "count": 1} ] async def test_admin_generation_job_trace_requires_admin_auth(db_session): admin_app = _build_admin_auth_required_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/generations/jobs/missing-job/trace") assert response.status_code == 401 async def test_admin_executor_coverage_aggregates_internal_events( db_session, test_user, ): story = await _create_story(db_session, user_id=test_user.id, title="执行器覆盖故事") asset_job = await create_generation_job( db_session, user_id=test_user.id, output_mode="asset_generation", input_type="audio,image", request_payload={"story_id": story.id, "assets": ["audio", "image"]}, story_id=story.id, ) await record_generation_event( db_session, job=asset_job, story_id=story.id, event_type="executor_completed", status="succeeded", metadata={ "plan_mode": "asset_generation", "planned_task_count": 4, "executed_task_count": 2, "ignored_task_count": 2, "executed_task_keys": ["complete_audio_asset", "complete_image_asset"], "ignored_task_keys": [ "start_asset_generation", "complete_asset_generation", ], "result_assets": ["audio", "cover_image"], }, ) retry_job = await create_generation_job( db_session, user_id=test_user.id, output_mode="asset_retry", input_type="image", request_payload={"story_id": story.id, "assets": ["image"]}, story_id=story.id, ) await record_generation_event( db_session, job=retry_job, story_id=story.id, event_type="executor_completed", status="succeeded", metadata={ "plan_mode": "asset_retry", "planned_task_count": 3, "executed_task_count": 1, "ignored_task_count": 2, "executed_task_keys": ["complete_image_asset"], "ignored_task_keys": ["start_asset_retry", "complete_asset_retry"], "result_assets": ["cover_image"], }, ) admin_app = _build_admin_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/executors/coverage") assert response.status_code == 200 data = response.json() assert data["scope"] == "admin_internal_executor_coverage" assert data["total_runs"] == 2 assert data["total_planned_tasks"] == 7 assert data["total_executed_tasks"] == 3 assert data["total_ignored_tasks"] == 4 assert data["coverage_ratio"] == 0.4286 assert data["job_count"] == 2 assert data["story_count"] == 1 assert data["user_count"] == 1 assert data["by_plan_mode"] == [ {"plan_mode": "asset_generation", "count": 1}, {"plan_mode": "asset_retry", "count": 1}, ] assert data["executed_task_keys"] == [ {"task_key": "complete_image_asset", "count": 2}, {"task_key": "complete_audio_asset", "count": 1}, ] assert data["result_assets"] == [ {"asset": "cover_image", "count": 2}, {"asset": "audio", "count": 1}, ] response = await client.get("/admin/executors/coverage?plan_mode=asset_retry") assert response.status_code == 200 data = response.json() assert data["plan_mode"] == "asset_retry" assert data["total_runs"] == 1 assert data["total_planned_tasks"] == 3 assert data["total_executed_tasks"] == 1 response = await client.get("/admin/executors/coverage?plan_mode=story") assert response.status_code == 422 async def test_admin_executor_coverage_requires_admin_auth(db_session): admin_app = _build_admin_auth_required_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/executors/coverage") assert response.status_code == 401 async def test_admin_harness_readiness_returns_ready_when_internal_gates_pass( db_session, test_user, ): story = await _create_story(db_session, user_id=test_user.id, title="readiness 故事") await _record_evaluation_event( db_session, user_id=test_user.id, story_id=story.id, output_mode="story", artifact="story_text", status="succeeded", metadata={ "overall_score": 0.92, "passed": True, "blocking": False, "scores": [ {"dimension": "structure", "score": 1.0, "reason": "内部 reason"}, {"dimension": "readability", "score": 0.84, "reason": "内部 reason"}, ], "warnings": [], }, ) asset_job = await create_generation_job( db_session, user_id=test_user.id, output_mode="asset_generation", input_type="image", request_payload={"story_id": story.id, "assets": ["image"]}, story_id=story.id, ) await record_generation_event( db_session, job=asset_job, story_id=story.id, event_type="executor_completed", status="succeeded", metadata={ "plan_mode": "asset_generation", "planned_task_count": 3, "executed_task_count": 1, "ignored_task_count": 2, "executed_task_keys": ["complete_image_asset"], "ignored_task_keys": [ "start_asset_generation", "complete_asset_generation", ], "result_assets": ["cover_image"], }, ) admin_app = _build_admin_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/harness/readiness") assert response.status_code == 200 data = response.json() assert data["scope"] == "admin_internal_harness_readiness" assert data["status"] == "ready" assert data["thresholds"] == { "min_runtime_evaluations": 1, "min_executor_runs": 1, "min_evaluation_pass_rate": 0.7, "min_evaluation_average_score": 0.7, "min_executor_coverage_ratio": 0.2, } assert {check["code"]: check["status"] for check in data["checks"]} == { "golden_replay": "ready", "runtime_evaluation_samples": "ready", "runtime_evaluation_quality": "ready", "executor_coverage_samples": "ready", "executor_coverage_ratio": "ready", } assert data["golden_replay"]["passed"] is True assert data["golden_replay"]["total_cases"] == 11 assert data["evaluation_analytics"]["total_evaluations"] == 1 assert data["evaluation_analytics"]["pass_rate"] == 1.0 assert data["executor_coverage"]["total_runs"] == 1 assert data["executor_coverage"]["coverage_ratio"] == 0.3333 assert "内部 reason" not in str(data) assert "readiness 故事" not in str(data) async def test_admin_harness_readiness_blocks_low_runtime_quality( db_session, test_user, ): story = await _create_story(db_session, user_id=test_user.id, title="低质量 readiness") await _record_evaluation_event( db_session, user_id=test_user.id, story_id=story.id, output_mode="story", artifact="story_text", status="failed", metadata={ "overall_score": 0.0, "passed": False, "blocking": True, "scores": [{"dimension": "structure", "score": 0.0, "reason": "缺失"}], "quality_gate": { "issues": [ { "code": "missing_story_text", "message": "正文缺失", "failure_category": "schema_error", "field": "story_text", } ] }, "warnings": [], }, ) admin_app = _build_admin_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/harness/readiness") assert response.status_code == 200 data = response.json() assert data["status"] == "blocked" checks = {check["code"]: check for check in data["checks"]} assert checks["golden_replay"]["status"] == "ready" assert checks["runtime_evaluation_samples"]["status"] == "ready" assert checks["runtime_evaluation_quality"]["status"] == "blocked" assert checks["executor_coverage_samples"]["status"] == "needs_attention" assert checks["executor_coverage_ratio"]["status"] == "needs_attention" assert data["evaluation_analytics"]["blocked_evaluations"] == 1 assert data["executor_coverage"]["total_runs"] == 0 assert "正文缺失" not in str(data) assert "低质量 readiness" not in str(data) async def test_admin_harness_readiness_requires_admin_auth(db_session): admin_app = _build_admin_auth_required_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/harness/readiness") assert response.status_code == 401 async def test_admin_provider_analytics_support_days_and_capability_filters( db_session, test_user, ): second_user = User( id="google:22222", name="Filter User", avatar_url="https://example.com/avatar-3.png", provider="google", ) db_session.add(second_user) await db_session.commit() first_story = await _create_story(db_session, user_id=test_user.id, title="旧事件故事") second_story = await _create_story(db_session, user_id=second_user.id, title="最近事件故事") image_job = await create_generation_job( db_session, user_id=test_user.id, output_mode="asset_retry", input_type="image", request_payload={"assets": ["image"]}, story_id=first_story.id, ) old_event = await record_generation_event( db_session, job=image_job, story_id=first_story.id, event_type="provider_call_failed", status="failed", metadata={ "capability": "image", "adapter": "cqtai", "strategy": "priority", "latency_ms": 120, "error": "timeout", }, ) old_event.created_at = datetime.now(timezone.utc) - timedelta(days=10) await db_session.commit() audio_job = await create_generation_job( db_session, user_id=second_user.id, output_mode="asset_retry", input_type="audio", request_payload={"assets": ["audio"]}, story_id=second_story.id, ) await record_generation_event( db_session, job=audio_job, story_id=second_story.id, event_type="provider_call_succeeded", status="succeeded", metadata={ "capability": "tts", "adapter": "edge_tts", "strategy": "priority", "latency_ms": 18, "estimated_cost_usd": 0.003, }, ) admin_app = _build_admin_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/providers/analytics?days=7") assert response.status_code == 200 data = response.json() assert data["window_days"] == 7 assert data["total_calls"] == 1 assert data["user_count"] == 1 assert data["job_count"] == 1 assert data["story_count"] == 1 assert data["failure_reasons"] == [] response = await client.get("/admin/providers/analytics?capability=image") assert response.status_code == 200 data = response.json() assert data["capability"] == "image" assert data["total_calls"] == 1 assert data["failed_calls"] == 1 assert data["user_count"] == 1 assert data["job_count"] == 1 assert data["story_count"] == 1 assert data["failure_reasons"] == [{"reason": "timeout", "count": 1}] response = await client.get("/admin/providers/analytics?capability=unknown") assert response.status_code == 422 async def test_admin_provider_analytics_includes_voice_asr_calls( db_session, test_user, ): second_user = User( id="google:asr-user", name="ASR User", avatar_url="https://example.com/asr.png", provider="google", ) db_session.add(second_user) await db_session.commit() successful_session = VoiceSession(user_id=test_user.id, status="active") failed_session = VoiceSession(user_id=second_user.id, status="active") db_session.add_all([successful_session, failed_session]) await db_session.commit() await db_session.refresh(successful_session) await db_session.refresh(failed_session) db_session.add_all( [ VoiceTurn( session_id=successful_session.id, turn_index=1, status="completed", user_audio_path="/tmp/voice-turn.webm", user_audio_mime_type="audio/webm", user_audio_duration_ms=1300, user_transcript="我想听一个星星故事", transcript_confidence=0.96, detected_intent="continue_story", intent_confidence=0.9, story_patch={"transcription_provider": "demo"}, ), VoiceSessionEvent( session_id=failed_session.id, event_type="turn_transcription_failed", status="failed", message="Voice transcription failed.", event_metadata={"error": "OPENAI_API_KEY 未配置"}, ), CostRecord( user_id=test_user.id, provider_name="demo", capability="asr", estimated_cost=Decimal("0.002"), ), ] ) await db_session.commit() admin_app = _build_admin_test_app(db_session) transport = ASGITransport(app=admin_app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.get("/admin/providers/analytics?capability=asr") assert response.status_code == 200 data = response.json() assert data["capability"] == "asr" assert data["total_calls"] == 2 assert data["successful_calls"] == 1 assert data["failed_calls"] == 1 assert data["user_count"] == 2 assert data["job_count"] == 0 assert data["story_count"] == 0 assert data["voice_session_count"] == 2 assert data["voice_turn_count"] == 1 assert data["estimated_cost_usd"] == 0.002 assert data["failure_reasons"] == [ {"reason": "OPENAI_API_KEY 未配置", "count": 1} ] assert data["by_provider"] == [ { "capability": "asr", "adapter": "demo", "call_count": 1, "success_count": 1, "failure_count": 0, "avg_latency_ms": None, "estimated_cost_usd": 0.002, }, { "capability": "asr", "adapter": "unknown", "call_count": 1, "success_count": 0, "failure_count": 1, "avg_latency_ms": None, "estimated_cost_usd": 0.0, }, ] users = {row["user_id"]: row for row in data["by_user"]} assert users[test_user.id]["call_count"] == 1 assert users[test_user.id]["success_count"] == 1 assert users[test_user.id]["estimated_cost_usd"] == 0.002 assert users[second_user.id]["call_count"] == 1 assert users[second_user.id]["failure_count"] == 1