@@ -236,6 +319,70 @@ onMounted(() => {
当前样本中最前面的能力组合是 {{ topProvider.capability }} / {{ topProvider.adapter }},成功 {{ topProvider.success_count }} 次,失败 {{ topProvider.failure_count }} 次。
+
+ 最常见失败原因:{{ topFailureReason.reason }}({{ topFailureReason.count }} 次)
+
+
+
+
+
+
+
任务运行概览
+
+ 最近 {{ opsSummary.window_hours }} 小时的任务健康度,运行超过
+ {{ opsSummary.stale_threshold_minutes }} 分钟会被视为卡住。
+
+
+
+
+
运行中
+
{{ opsSummary.active_jobs }}
+
+
+
超时待收敛
+
+ {{ opsSummary.stale_running_jobs }}
+
+
+
+
最近失败
+
+ {{ opsSummary.failed_jobs }}
+
+
+
+
资源任务
+
{{ opsSummary.asset_retry_jobs }}
+
+
+
+
+ 最近 {{ opsSummary.window_hours }} 小时有 {{ opsSummary.degraded_jobs }} 个任务以降级完成收尾。
+
+
+
+
+
+ {{ failure.story_title || `${formatOutputMode(failure.output_mode)}任务` }}
+
+
{{ formatDate(failure.updated_at) }}
+
+
+ {{ failure.failure_label }} · {{ failure.error_message || '请打开任务轨迹查看原因' }}
+
+
+
+
+ 最近 {{ opsSummary.window_hours }} 小时没有失败任务,当前链路比较稳定。
+
diff --git a/backend/.env.example b/backend/.env.example
index 6c4d0ef..20c7887 100644
--- a/backend/.env.example
+++ b/backend/.env.example
@@ -48,6 +48,8 @@ STORYBOOK_PROVIDERS=["storybook_primary"]
TEXT_MODEL=gemini-2.0-flash
IMAGE_MODEL=nano-banana
IMAGE_RESOLUTION=1K
+STORY_AUDIO_CACHE_TTL_DAYS=30
+GENERATION_JOB_STALE_MINUTES=60
# TTS_MODEL=speech-2.6-turbo (MiniMax) / zh-CN-XiaoxiaoNeural (Edge)
# [API 密钥池]
diff --git a/backend/app/api/stories.py b/backend/app/api/stories.py
index e4473db..434c866 100644
--- a/backend/app/api/stories.py
+++ b/backend/app/api/stories.py
@@ -4,7 +4,7 @@ import json
import uuid
from typing import AsyncGenerator
-from fastapi import APIRouter, Depends, Response
+from fastapi import APIRouter, Depends, Query, Response
from sqlalchemy.ext.asyncio import AsyncSession
from sse_starlette.sse import EventSourceResponse
@@ -19,6 +19,7 @@ from app.schemas.story_schemas import (
GenerateRequest,
GenerationJobDetailResponse,
GenerationJobSummaryResponse,
+ GenerationOpsSummaryResponse,
GenerationProviderAnalyticsResponse,
GenerationProviderStatsResponse,
GenerationRequest,
@@ -36,6 +37,7 @@ from app.services import story_service
from app.services.generation_jobs import (
get_generation_job_detail,
get_story_provider_stats,
+ get_user_generation_ops_summary,
get_user_provider_analytics,
list_story_generation_jobs,
)
@@ -86,16 +88,36 @@ async def get_generation_job(
return await get_generation_job_detail(db, job_id=job_id, user_id=user.id)
+@router.get(
+ "/generations/ops-summary",
+ response_model=GenerationOpsSummaryResponse,
+)
+async def get_generation_ops_summary(
+ hours: int = Query(default=24, ge=1, le=168),
+ user: User = Depends(require_user),
+ db: AsyncSession = Depends(get_db),
+):
+ """Get a compact recent operations summary for generation workflows."""
+ return await get_user_generation_ops_summary(db, user_id=user.id, hours=hours)
+
+
@router.get(
"/generations/provider-analytics",
response_model=GenerationProviderAnalyticsResponse,
)
async def get_generation_provider_analytics(
+ days: int | None = Query(default=None, ge=1, le=365),
+ capability: str | None = Query(default=None),
user: User = Depends(require_user),
db: AsyncSession = Depends(get_db),
):
"""Get provider call stats aggregated across the user's generation history."""
- return await get_user_provider_analytics(db, user_id=user.id)
+ return await get_user_provider_analytics(
+ db,
+ user_id=user.id,
+ days=days,
+ capability=capability,
+ )
@router.get(
@@ -117,11 +139,19 @@ async def list_generation_jobs(
)
async def get_generation_provider_stats(
story_id: int,
+ days: int | None = Query(default=None, ge=1, le=365),
+ capability: str | None = Query(default=None),
user: User = Depends(require_user),
db: AsyncSession = Depends(get_db),
):
"""Get provider call stats aggregated from generation job events."""
- return await get_story_provider_stats(db, story_id=story_id, user_id=user.id)
+ return await get_story_provider_stats(
+ db,
+ story_id=story_id,
+ user_id=user.id,
+ days=days,
+ capability=capability,
+ )
@router.get("/generations/{story_id}", response_model=StoryDetailResponse)
diff --git a/backend/app/core/celery_app.py b/backend/app/core/celery_app.py
index 18aafc5..fedced3 100644
--- a/backend/app/core/celery_app.py
+++ b/backend/app/core/celery_app.py
@@ -49,6 +49,14 @@ celery_app.conf.update(
"task": "app.tasks.memory.prune_memories_task",
"schedule": crontab(minute="0", hour="3"), # Daily at 03:00
},
+ "prune_story_audio_cache": {
+ "task": "app.tasks.audio_cache.prune_story_audio_cache_task",
+ "schedule": crontab(minute="30", hour="3"), # Daily at 03:30
+ },
+ "prune_stale_generation_jobs": {
+ "task": "app.tasks.generation_maintenance.prune_stale_generation_jobs_task",
+ "schedule": crontab(minute="*/30"),
+ },
},
)
diff --git a/backend/app/core/config.py b/backend/app/core/config.py
index d5e7834..f7ca3ea 100644
--- a/backend/app/core/config.py
+++ b/backend/app/core/config.py
@@ -62,12 +62,20 @@ class Settings(BaseSettings):
False,
description="Enable local deterministic demo providers for portfolio demos",
)
- story_audio_cache_dir: str = Field(
- "storage/audio",
- description="Directory for cached story audio files",
- )
-
- # Celery (Redis)
+ story_audio_cache_dir: str = Field(
+ "storage/audio",
+ description="Directory for cached story audio files",
+ )
+ story_audio_cache_ttl_days: int = Field(
+ 30,
+ description="TTL in days before cached story audio is pruned",
+ )
+ generation_job_stale_minutes: int = Field(
+ 60,
+ description="Minutes before a running generation job is considered stale",
+ )
+
+ # Celery (Redis)
celery_broker_url: str = Field("redis://localhost:6379/0")
celery_result_backend: str = Field("redis://localhost:6379/0")
diff --git a/backend/app/schemas/story_schemas.py b/backend/app/schemas/story_schemas.py
index 42c4324..381f821 100644
--- a/backend/app/schemas/story_schemas.py
+++ b/backend/app/schemas/story_schemas.py
@@ -220,21 +220,33 @@ class GenerationProviderStatResponse(BaseModel):
estimated_cost_usd: float = 0.0
+class GenerationProviderFailureReasonResponse(BaseModel):
+ """Aggregated failed provider call reason."""
+
+ reason: str
+ count: int
+
+
class GenerationProviderStatsResponse(BaseModel):
"""Provider call stats aggregated from generation job events."""
story_id: int
+ window_days: int | None = None
+ capability: str | None = None
total_calls: int
successful_calls: int
failed_calls: int
avg_latency_ms: float | None = None
estimated_cost_usd: float = 0.0
by_provider: list[GenerationProviderStatResponse] = Field(default_factory=list)
+ failure_reasons: list[GenerationProviderFailureReasonResponse] = Field(default_factory=list)
class GenerationProviderAnalyticsResponse(BaseModel):
"""Provider call stats aggregated across one user's generation history."""
+ window_days: int | None = None
+ capability: str | None = None
total_calls: int
successful_calls: int
failed_calls: int
@@ -243,6 +255,33 @@ class GenerationProviderAnalyticsResponse(BaseModel):
job_count: int
story_count: int
by_provider: list[GenerationProviderStatResponse] = Field(default_factory=list)
+ failure_reasons: list[GenerationProviderFailureReasonResponse] = Field(default_factory=list)
+
+
+class GenerationRecentFailureResponse(BaseModel):
+ """One recent failed generation task for operations summary."""
+
+ job_id: str
+ story_id: int | None = None
+ story_title: str | None = None
+ output_mode: str
+ current_step: str
+ error_message: str | None = None
+ failure_label: str
+ updated_at: datetime
+
+
+class GenerationOpsSummaryResponse(BaseModel):
+ """Recent generation health summary for one user."""
+
+ window_hours: int
+ stale_threshold_minutes: int
+ active_jobs: int
+ stale_running_jobs: int
+ failed_jobs: int
+ degraded_jobs: int
+ asset_retry_jobs: int
+ recent_failures: list[GenerationRecentFailureResponse] = Field(default_factory=list)
class AchievementItem(BaseModel):
diff --git a/backend/app/services/generation_jobs.py b/backend/app/services/generation_jobs.py
index 1980692..abb38b9 100644
--- a/backend/app/services/generation_jobs.py
+++ b/backend/app/services/generation_jobs.py
@@ -2,14 +2,19 @@
from __future__ import annotations
+from datetime import datetime, timedelta, timezone
from typing import Any
from fastapi import HTTPException
-from sqlalchemy import desc, distinct, func, select
+from sqlalchemy import desc, select
from sqlalchemy.ext.asyncio import AsyncSession
+from app.core.config import settings
+from app.core.logging import get_logger
from app.db.models import GenerationJob, GenerationJobEvent, Story
+logger = get_logger(__name__)
+
def _story_snapshot(story: Story | None) -> dict[str, Any]:
if story is None:
@@ -68,6 +73,7 @@ def _job_progress(job: GenerationJob) -> dict[str, Any]:
"asset_generation_completed": (100, "资源已完成"),
"asset_retry_completed": (100, "资源重试完成"),
"generation_completed": (100, "生成完成"),
+ "generation_stale_failed": (100, "任务超时已收敛"),
}
percent, label = progress_map.get(job.current_step, (10, "生成处理中"))
return {
@@ -77,6 +83,27 @@ def _job_progress(job: GenerationJob) -> dict[str, Any]:
}
+def _normalize_datetime(value: datetime) -> datetime:
+ if value.tzinfo is None:
+ return value.replace(tzinfo=timezone.utc)
+ return value.astimezone(timezone.utc)
+
+
+def _is_stale_job(job: GenerationJob, *, stale_after_minutes: int) -> bool:
+ cutoff = datetime.now(timezone.utc) - timedelta(minutes=stale_after_minutes)
+ return job.status == "running" and _normalize_datetime(job.updated_at) <= cutoff
+
+
+def _failure_label(job: GenerationJob) -> str:
+ if job.current_step == "generation_stale_failed":
+ return "任务超时"
+ if job.output_mode == "asset_retry":
+ return "资源重试失败"
+ if job.output_mode == "asset_generation":
+ return "资源生成失败"
+ return "生成失败"
+
+
async def create_generation_job(
db: AsyncSession,
*,
@@ -266,16 +293,64 @@ async def list_story_generation_jobs(
return [generation_job_to_summary(job) for job in jobs]
+async def get_active_story_generation_job(
+ db: AsyncSession,
+ *,
+ story_id: int,
+ user_id: str,
+) -> GenerationJob | None:
+ """Return the most recent running job for a story, if any."""
+
+ result = await db.execute(
+ select(GenerationJob)
+ .where(
+ GenerationJob.story_id == story_id,
+ GenerationJob.user_id == user_id,
+ GenerationJob.status == "running",
+ )
+ .order_by(desc(GenerationJob.updated_at), desc(GenerationJob.id))
+ .limit(1)
+ )
+ return result.scalar_one_or_none()
+
+
+async def ensure_no_active_story_generation_job(
+ db: AsyncSession,
+ *,
+ story_id: int,
+ user_id: str,
+) -> None:
+ """Prevent duplicate asset work while a story already has a running job."""
+
+ active_job = await get_active_story_generation_job(db, story_id=story_id, user_id=user_id)
+ if active_job is None:
+ return
+
+ progress = _job_progress(active_job)
+ raise HTTPException(
+ status_code=409,
+ detail=(
+ f"当前故事已有运行中的任务({progress['progress_label']}),"
+ "请等待当前任务完成后再试。"
+ ),
+ )
+
+
def _as_float(value: Any) -> float | None:
if isinstance(value, int | float):
return float(value)
return None
-def _aggregate_provider_events(events: list[GenerationJobEvent]) -> dict[str, Any]:
+def _aggregate_provider_events(
+ events: list[GenerationJobEvent],
+ *,
+ capability: str | None = None,
+) -> dict[str, Any]:
"""Aggregate provider telemetry from provider call events."""
by_key: dict[tuple[str, str], dict[str, Any]] = {}
+ failure_reasons: dict[str, int] = {}
total_latency = 0.0
latency_count = 0
total_cost = 0.0
@@ -284,13 +359,16 @@ def _aggregate_provider_events(events: list[GenerationJobEvent]) -> dict[str, An
for event in events:
metadata = event.event_metadata or {}
- capability = str(metadata.get("capability") or "unknown")
+ event_capability = str(metadata.get("capability") or "unknown")
+ if capability is not None and event_capability != capability:
+ continue
+
adapter = str(metadata.get("adapter") or "unknown")
- key = (capability, adapter)
+ key = (event_capability, adapter)
bucket = by_key.setdefault(
key,
{
- "capability": capability,
+ "capability": event_capability,
"adapter": adapter,
"call_count": 0,
"success_count": 0,
@@ -318,6 +396,8 @@ def _aggregate_provider_events(events: list[GenerationJobEvent]) -> dict[str, An
else:
bucket["failure_count"] += 1
failed_calls += 1
+ reason = str(metadata.get("error") or "unknown_error")
+ failure_reasons[reason] = failure_reasons.get(reason, 0) + 1
by_provider = []
for bucket in by_key.values():
@@ -349,67 +429,243 @@ def _aggregate_provider_events(events: list[GenerationJobEvent]) -> dict[str, An
"avg_latency_ms": round(total_latency / latency_count, 2) if latency_count else None,
"estimated_cost_usd": round(total_cost, 6),
"by_provider": by_provider,
+ "failure_reasons": [
+ {"reason": reason, "count": count}
+ for reason, count in sorted(
+ failure_reasons.items(),
+ key=lambda item: (-item[1], item[0]),
+ )
+ ],
}
+def _provider_events_query(
+ *,
+ user_id: str,
+ story_id: int | None = None,
+ days: int | None = None,
+):
+ query = (
+ select(GenerationJobEvent)
+ .join(GenerationJob, GenerationJobEvent.job_id == GenerationJob.id)
+ .where(
+ GenerationJob.user_id == user_id,
+ GenerationJobEvent.event_type.in_(
+ ["provider_call_succeeded", "provider_call_failed"]
+ ),
+ )
+ )
+
+ if story_id is not None:
+ query = query.where(GenerationJob.story_id == story_id)
+
+ if days is not None:
+ cutoff = datetime.now(timezone.utc) - timedelta(days=days)
+ query = query.where(GenerationJobEvent.created_at >= cutoff)
+
+ return query.order_by(GenerationJobEvent.id)
+
+
async def get_story_provider_stats(
db: AsyncSession,
*,
story_id: int,
user_id: str,
+ days: int | None = None,
+ capability: str | None = None,
) -> dict[str, Any]:
"""Aggregate provider call telemetry from all user-owned jobs for one story."""
events = (
await db.execute(
- select(GenerationJobEvent)
- .join(GenerationJob, GenerationJobEvent.job_id == GenerationJob.id)
- .where(
- GenerationJob.story_id == story_id,
- GenerationJob.user_id == user_id,
- GenerationJobEvent.event_type.in_(
- ["provider_call_succeeded", "provider_call_failed"]
- ),
+ _provider_events_query(
+ user_id=user_id,
+ story_id=story_id,
+ days=days,
)
- .order_by(GenerationJobEvent.id)
)
).scalars().all()
- return {"story_id": story_id, **_aggregate_provider_events(events)}
+ return {
+ "story_id": story_id,
+ "window_days": days,
+ "capability": capability,
+ **_aggregate_provider_events(events, capability=capability),
+ }
async def get_user_provider_analytics(
db: AsyncSession,
*,
user_id: str,
+ days: int | None = None,
+ capability: str | None = None,
) -> dict[str, Any]:
"""Aggregate provider telemetry across all stories owned by one user."""
events = (
await db.execute(
- select(GenerationJobEvent)
- .join(GenerationJob, GenerationJobEvent.job_id == GenerationJob.id)
+ _provider_events_query(
+ user_id=user_id,
+ days=days,
+ )
+ )
+ ).scalars().all()
+ filtered_event_job_ids = {
+ event.job_id
+ for event in events
+ if capability is None
+ or str((event.event_metadata or {}).get("capability") or "unknown") == capability
+ }
+ filtered_story_ids = {
+ event.story_id
+ for event in events
+ if event.story_id is not None
+ and (
+ capability is None
+ or str((event.event_metadata or {}).get("capability") or "unknown") == capability
+ )
+ }
+
+ return {
+ "window_days": days,
+ "capability": capability,
+ **_aggregate_provider_events(events, capability=capability),
+ "job_count": len(filtered_event_job_ids),
+ "story_count": len(filtered_story_ids),
+ }
+
+
+async def get_user_generation_ops_summary(
+ db: AsyncSession,
+ *,
+ user_id: str,
+ hours: int = 24,
+ recent_failure_limit: int = 5,
+) -> dict[str, Any]:
+ """Summarize recent generation health for one user."""
+
+ stale_after_minutes = settings.generation_job_stale_minutes
+ recent_cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
+
+ running_jobs = (
+ await db.execute(
+ select(GenerationJob)
.where(
GenerationJob.user_id == user_id,
- GenerationJobEvent.event_type.in_(
- ["provider_call_succeeded", "provider_call_failed"]
- ),
+ GenerationJob.status == "running",
)
- .order_by(GenerationJobEvent.id)
+ .order_by(desc(GenerationJob.updated_at), desc(GenerationJob.id))
)
).scalars().all()
- job_count, story_count = (
+ recent_jobs = (
await db.execute(
- select(
- func.count(GenerationJob.id),
- func.count(distinct(GenerationJob.story_id)),
- ).where(GenerationJob.user_id == user_id)
+ select(GenerationJob, Story.title)
+ .outerjoin(Story, Story.id == GenerationJob.story_id)
+ .where(
+ GenerationJob.user_id == user_id,
+ GenerationJob.updated_at >= recent_cutoff,
+ )
+ .order_by(desc(GenerationJob.updated_at), desc(GenerationJob.id))
)
- ).one()
+ ).all()
+
+ recent_failures: list[dict[str, Any]] = []
+ failed_jobs = 0
+ degraded_jobs = 0
+ asset_retry_jobs = 0
+
+ for job, story_title in recent_jobs:
+ if job.status == "failed":
+ failed_jobs += 1
+ if len(recent_failures) < recent_failure_limit:
+ recent_failures.append(
+ {
+ "job_id": job.id,
+ "story_id": job.story_id,
+ "story_title": story_title,
+ "output_mode": job.output_mode,
+ "current_step": job.current_step,
+ "error_message": job.error_message,
+ "failure_label": _failure_label(job),
+ "updated_at": job.updated_at,
+ }
+ )
+ elif job.status == "degraded_completed":
+ degraded_jobs += 1
+
+ if job.output_mode in {"asset_retry", "asset_generation"}:
+ asset_retry_jobs += 1
return {
- **_aggregate_provider_events(events),
- "job_count": job_count,
- "story_count": story_count,
+ "window_hours": hours,
+ "stale_threshold_minutes": stale_after_minutes,
+ "active_jobs": len(running_jobs),
+ "stale_running_jobs": sum(
+ 1 for job in running_jobs if _is_stale_job(job, stale_after_minutes=stale_after_minutes)
+ ),
+ "failed_jobs": failed_jobs,
+ "degraded_jobs": degraded_jobs,
+ "asset_retry_jobs": asset_retry_jobs,
+ "recent_failures": recent_failures,
+ }
+
+
+async def mark_stale_generation_jobs(
+ db: AsyncSession,
+ *,
+ stale_after_minutes: int | None = None,
+) -> dict[str, int]:
+ """Mark long-running generation jobs as failed so they no longer appear stuck forever."""
+
+ threshold = stale_after_minutes or settings.generation_job_stale_minutes
+ running_jobs = (
+ await db.execute(
+ select(GenerationJob)
+ .where(GenerationJob.status == "running")
+ .order_by(GenerationJob.updated_at, GenerationJob.id)
+ )
+ ).scalars().all()
+
+ marked_stale = 0
+
+ for job in running_jobs:
+ if not _is_stale_job(job, stale_after_minutes=threshold):
+ continue
+
+ story = None
+ if job.story_id is not None:
+ story = (
+ await db.execute(
+ select(Story).where(
+ Story.id == job.story_id,
+ Story.user_id == job.user_id,
+ )
+ )
+ ).scalar_one_or_none()
+
+ await finish_generation_job(
+ db,
+ job=job,
+ story=story,
+ status="failed",
+ current_step="generation_stale_failed",
+ error_message=f"Generation job exceeded {threshold} minutes without progress.",
+ message="Generation job was marked failed after exceeding the stale threshold.",
+ metadata={"stale_after_minutes": threshold},
+ )
+ marked_stale += 1
+ logger.warning(
+ "generation_job_marked_stale",
+ job_id=job.id,
+ story_id=job.story_id,
+ output_mode=job.output_mode,
+ stale_after_minutes=threshold,
+ )
+
+ return {
+ "running": len(running_jobs),
+ "marked_stale": marked_stale,
+ "stale_after_minutes": threshold,
}
diff --git a/backend/app/services/story_service.py b/backend/app/services/story_service.py
index 2eecfd9..7ec8714 100644
--- a/backend/app/services/story_service.py
+++ b/backend/app/services/story_service.py
@@ -2,6 +2,7 @@
import asyncio
from dataclasses import dataclass
+from datetime import datetime, timedelta, timezone
from typing import Literal
from fastapi import HTTPException
@@ -9,6 +10,7 @@ from sqlalchemy import desc, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
+from app.core.config import settings
from app.core.logging import get_logger
from app.db.models import ChildProfile, Story, StoryUniverse
from app.schemas.story_schemas import (
@@ -32,6 +34,7 @@ from app.services.audio_storage import (
)
from app.services.generation_jobs import (
create_generation_job,
+ ensure_no_active_story_generation_job,
finish_generation_job,
record_generation_event,
)
@@ -1369,6 +1372,7 @@ async def retry_story_assets(
db: AsyncSession,
) -> Story:
"""Retry selected assets through one workflow-level endpoint."""
+ await ensure_no_active_story_generation_job(db, story_id=story_id, user_id=user_id)
requested_assets = list(dict.fromkeys(assets))
job = await create_generation_job(
db,
@@ -1443,6 +1447,7 @@ async def generate_story_cover(
db: AsyncSession,
) -> str:
"""Generate cover image for an existing story."""
+ await ensure_no_active_story_generation_job(db, story_id=story_id, user_id=user_id)
job = await create_generation_job(
db,
user_id=user_id,
@@ -1495,6 +1500,7 @@ async def generate_story_audio(
db: AsyncSession,
) -> bytes:
"""Generate audio for a story."""
+ await ensure_no_active_story_generation_job(db, story_id=story_id, user_id=user_id)
job = await create_generation_job(
db,
user_id=user_id,
@@ -1597,6 +1603,50 @@ async def clear_story_audio_cache(
return await get_story_audio_status(story_id, user_id, db)
+async def prune_story_audio_cache(db: AsyncSession) -> dict[str, int]:
+ """Prune expired audio cache files and repair story metadata."""
+
+ ttl_days = max(1, settings.story_audio_cache_ttl_days)
+ cutoff = datetime.now(timezone.utc) - timedelta(days=ttl_days)
+ result = await db.execute(select(Story).where(Story.audio_path.is_not(None)))
+ stories = result.scalars().all()
+
+ scanned = 0
+ pruned = 0
+ repaired = 0
+
+ for story in stories:
+ scanned += 1
+ metadata = get_audio_cache_metadata(story.audio_path)
+
+ if not metadata.exists:
+ story.audio_path = None
+ if story.audio_status == StoryAssetStatus.READY.value:
+ sync_story_status(story, audio_status=StoryAssetStatus.NOT_REQUESTED)
+ repaired += 1
+ continue
+
+ if metadata.updated_at and metadata.updated_at < cutoff:
+ delete_audio_cache(story.audio_path)
+ story.audio_path = None
+ sync_story_status(
+ story,
+ audio_status=StoryAssetStatus.NOT_REQUESTED,
+ last_error=None,
+ )
+ pruned += 1
+
+ await db.commit()
+ logger.info(
+ "story_audio_cache_pruned",
+ scanned=scanned,
+ pruned=pruned,
+ repaired=repaired,
+ ttl_days=ttl_days,
+ )
+ return {"scanned": scanned, "pruned": pruned, "repaired": repaired}
+
+
async def get_story_achievements(
story_id: int,
user_id: str,
diff --git a/backend/app/tasks/audio_cache.py b/backend/app/tasks/audio_cache.py
new file mode 100644
index 0000000..73fb084
--- /dev/null
+++ b/backend/app/tasks/audio_cache.py
@@ -0,0 +1,29 @@
+"""Celery tasks for story audio cache maintenance."""
+
+import asyncio
+
+from app.core.celery_app import celery_app
+from app.core.logging import get_logger
+from app.db.database import _get_session_factory
+from app.services.story_service import prune_story_audio_cache
+
+logger = get_logger(__name__)
+
+
+@celery_app.task
+def prune_story_audio_cache_task():
+ """Daily task to prune expired story audio cache files."""
+ logger.info("prune_story_audio_cache_task_started")
+
+ async def _run():
+ session_factory = _get_session_factory()
+ async with session_factory() as session:
+ return await prune_story_audio_cache(session)
+
+ try:
+ result = asyncio.run(_run())
+ logger.info("prune_story_audio_cache_task_completed", **result)
+ return result
+ except Exception as exc:
+ logger.error("prune_story_audio_cache_task_failed", error=str(exc))
+ raise
diff --git a/backend/app/tasks/generation_maintenance.py b/backend/app/tasks/generation_maintenance.py
new file mode 100644
index 0000000..fa82396
--- /dev/null
+++ b/backend/app/tasks/generation_maintenance.py
@@ -0,0 +1,30 @@
+"""Generation job maintenance tasks."""
+
+import asyncio
+
+from app.core.celery_app import celery_app
+from app.core.logging import get_logger
+from app.db.database import _get_session_factory
+from app.services.generation_jobs import mark_stale_generation_jobs
+
+logger = get_logger(__name__)
+
+
+@celery_app.task
+def prune_stale_generation_jobs_task():
+ """Periodically mark stale running generation jobs as failed."""
+
+ logger.info("prune_stale_generation_jobs_task_started")
+
+ async def _run():
+ session_factory = _get_session_factory()
+ async with session_factory() as session:
+ return await mark_stale_generation_jobs(session)
+
+ try:
+ result = asyncio.run(_run())
+ logger.info("prune_stale_generation_jobs_task_completed", **result)
+ return result
+ except Exception as exc:
+ logger.error("prune_stale_generation_jobs_task_failed", error=str(exc))
+ raise
diff --git a/backend/tests/test_audio_cache.py b/backend/tests/test_audio_cache.py
new file mode 100644
index 0000000..a9c6c61
--- /dev/null
+++ b/backend/tests/test_audio_cache.py
@@ -0,0 +1,65 @@
+"""Story audio cache maintenance tests."""
+
+import os
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+
+import pytest
+from fastapi import HTTPException
+from sqlalchemy import select
+
+from app.core.config import settings
+from app.db.models import Story
+from app.services.generation_jobs import create_generation_job
+from app.services.story_service import generate_story_audio, prune_story_audio_cache
+
+pytestmark = pytest.mark.asyncio
+
+
+async def test_prune_story_audio_cache_removes_expired_audio(
+ db_session,
+ test_story,
+ mock_tts_provider,
+ monkeypatch,
+):
+ await generate_story_audio(test_story.id, test_story.user_id, db_session)
+
+ cached_audio_path = Path(settings.story_audio_cache_dir) / f"story-{test_story.id}.mp3"
+ assert cached_audio_path.is_file()
+
+ old_time = datetime.now(timezone.utc) - timedelta(days=10)
+ timestamp = old_time.timestamp()
+ os.utime(cached_audio_path, (timestamp, timestamp))
+ monkeypatch.setattr(settings, "story_audio_cache_ttl_days", 7)
+
+ result = await prune_story_audio_cache(db_session)
+
+ assert result == {"scanned": 1, "pruned": 1, "repaired": 0}
+ assert not cached_audio_path.exists()
+
+ story = (
+ await db_session.execute(select(Story).where(Story.id == test_story.id))
+ ).scalar_one()
+ assert story.audio_path is None
+ assert story.audio_status == "not_requested"
+ assert story.generation_status == "partial_ready"
+
+
+async def test_generate_story_audio_rejects_when_story_has_active_job(
+ db_session,
+ test_story,
+):
+ await create_generation_job(
+ db_session,
+ user_id=test_story.user_id,
+ output_mode="asset_retry",
+ input_type="audio",
+ request_payload={"story_id": test_story.id},
+ story_id=test_story.id,
+ )
+
+ with pytest.raises(HTTPException) as exc_info:
+ await generate_story_audio(test_story.id, test_story.user_id, db_session)
+
+ assert exc_info.value.status_code == 409
+ assert "已有运行中的任务" in str(exc_info.value.detail)
diff --git a/backend/tests/test_generation_jobs.py b/backend/tests/test_generation_jobs.py
index 367816d..89a1d16 100644
--- a/backend/tests/test_generation_jobs.py
+++ b/backend/tests/test_generation_jobs.py
@@ -1,5 +1,6 @@
"""Generation job tracking tests."""
+from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
@@ -12,7 +13,11 @@ from app.main import app
from app.services.adapters import AdapterConfig
from app.services.adapters.storybook.primary import Storybook, StorybookPage
from app.services.adapters.text.models import StoryOutput
-from app.services.generation_jobs import create_generation_job, record_generation_event
+from app.services.generation_jobs import (
+ create_generation_job,
+ mark_stale_generation_jobs,
+ record_generation_event,
+)
pytestmark = pytest.mark.asyncio
@@ -520,6 +525,7 @@ async def test_user_provider_analytics_aggregate_across_stories(
assert data["failed_calls"] == 1
assert data["avg_latency_ms"] == 60.0
assert data["estimated_cost_usd"] == 0.013
+ assert data["failure_reasons"] == [{"reason": "timeout", "count": 1}]
assert data["by_provider"] == [
{
"capability": "image",
@@ -551,3 +557,249 @@ async def test_user_provider_analytics_aggregate_across_stories(
]
finally:
app.dependency_overrides.clear()
+
+
+async def test_provider_analytics_support_days_and_capability_filters(
+ db_session,
+ auth_token,
+ degraded_story_with_text,
+ test_story,
+):
+ async def override_get_db():
+ yield db_session
+
+ app.dependency_overrides[get_db] = override_get_db
+
+ image_job = await create_generation_job(
+ db_session,
+ user_id=degraded_story_with_text.user_id,
+ output_mode="asset_retry",
+ input_type="image",
+ request_payload={"assets": ["image"]},
+ story_id=degraded_story_with_text.id,
+ )
+ old_event = await record_generation_event(
+ db_session,
+ job=image_job,
+ story_id=degraded_story_with_text.id,
+ event_type="provider_call_failed",
+ status="failed",
+ metadata={
+ "capability": "image",
+ "adapter": "cqtai",
+ "strategy": "priority",
+ "latency_ms": 120,
+ "error": "timeout",
+ },
+ )
+ old_event.created_at = datetime.now(timezone.utc) - timedelta(days=10)
+ await db_session.commit()
+
+ tts_job = await create_generation_job(
+ db_session,
+ user_id=test_story.user_id,
+ output_mode="asset_retry",
+ input_type="audio",
+ request_payload={"assets": ["audio"]},
+ story_id=test_story.id,
+ )
+ await record_generation_event(
+ db_session,
+ job=tts_job,
+ story_id=test_story.id,
+ event_type="provider_call_succeeded",
+ status="succeeded",
+ metadata={
+ "capability": "tts",
+ "adapter": "edge_tts",
+ "strategy": "priority",
+ "latency_ms": 18,
+ "estimated_cost_usd": 0.003,
+ },
+ )
+
+ transport = ASGITransport(app=app)
+ try:
+ async with AsyncClient(transport=transport, base_url="http://test") as client:
+ client.cookies.set("access_token", auth_token)
+
+ response = await client.get("/api/generations/provider-analytics?days=7")
+ assert response.status_code == 200
+ data = response.json()
+ assert data["window_days"] == 7
+ assert data["total_calls"] == 1
+ assert data["job_count"] == 1
+ assert data["story_count"] == 1
+ assert data["failure_reasons"] == []
+
+ response = await client.get(
+ "/api/generations/provider-analytics?capability=image"
+ )
+ assert response.status_code == 200
+ data = response.json()
+ assert data["capability"] == "image"
+ assert data["total_calls"] == 1
+ assert data["failed_calls"] == 1
+ assert data["job_count"] == 1
+ assert data["story_count"] == 1
+ assert data["failure_reasons"] == [{"reason": "timeout", "count": 1}]
+
+ response = await client.get(
+ f"/api/generations/{degraded_story_with_text.id}/provider-stats?capability=image"
+ )
+ assert response.status_code == 200
+ data = response.json()
+ assert data["capability"] == "image"
+ assert data["failure_reasons"] == [{"reason": "timeout", "count": 1}]
+ finally:
+ app.dependency_overrides.clear()
+
+
+async def test_generation_ops_summary_exposes_running_stale_and_recent_failures(
+ db_session,
+ auth_token,
+ degraded_story_with_text,
+ test_story,
+):
+ async def override_get_db():
+ yield db_session
+
+ app.dependency_overrides[get_db] = override_get_db
+
+ running_job = await create_generation_job(
+ db_session,
+ user_id=test_story.user_id,
+ output_mode="story",
+ input_type="keywords",
+ request_payload={"data": "星星"},
+ story_id=test_story.id,
+ )
+ stale_job = await create_generation_job(
+ db_session,
+ user_id=degraded_story_with_text.user_id,
+ output_mode="asset_generation",
+ input_type="image",
+ request_payload={"story_id": degraded_story_with_text.id},
+ story_id=degraded_story_with_text.id,
+ )
+ failed_job = await create_generation_job(
+ db_session,
+ user_id=degraded_story_with_text.user_id,
+ output_mode="asset_retry",
+ input_type="image",
+ request_payload={"assets": ["image"]},
+ story_id=degraded_story_with_text.id,
+ )
+ degraded_job = await create_generation_job(
+ db_session,
+ user_id=test_story.user_id,
+ output_mode="storybook",
+ input_type="keywords",
+ request_payload={"data": "月亮"},
+ story_id=test_story.id,
+ )
+
+ stale_job.updated_at = datetime.now(timezone.utc) - timedelta(hours=3)
+ failed_job.status = "failed"
+ failed_job.current_step = "asset_retry_failed"
+ failed_job.error_message = "image timeout"
+ failed_job.updated_at = datetime.now(timezone.utc) - timedelta(hours=1)
+ degraded_job.status = "degraded_completed"
+ degraded_job.current_step = "generation_completed"
+ degraded_job.updated_at = datetime.now(timezone.utc) - timedelta(minutes=30)
+ running_job.updated_at = datetime.now(timezone.utc) - timedelta(minutes=10)
+ await db_session.commit()
+
+ transport = ASGITransport(app=app)
+ try:
+ async with AsyncClient(transport=transport, base_url="http://test") as client:
+ client.cookies.set("access_token", auth_token)
+
+ response = await client.get("/api/generations/ops-summary?hours=48")
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["window_hours"] == 48
+ assert data["active_jobs"] == 2
+ assert data["stale_running_jobs"] == 1
+ assert data["failed_jobs"] == 1
+ assert data["degraded_jobs"] == 1
+ assert data["asset_retry_jobs"] == 2
+ assert len(data["recent_failures"]) == 1
+ assert data["recent_failures"][0]["job_id"] == failed_job.id
+ assert data["recent_failures"][0]["story_title"] == degraded_story_with_text.title
+ assert data["recent_failures"][0]["failure_label"] == "资源重试失败"
+ finally:
+ app.dependency_overrides.clear()
+
+
+async def test_mark_stale_generation_jobs_marks_old_running_jobs_failed(
+ db_session,
+ degraded_story_with_text,
+):
+ stale_job = await create_generation_job(
+ db_session,
+ user_id=degraded_story_with_text.user_id,
+ output_mode="story",
+ input_type="keywords",
+ request_payload={"data": "超时任务"},
+ story_id=degraded_story_with_text.id,
+ )
+ stale_job.updated_at = datetime.now(timezone.utc) - timedelta(hours=2)
+ await db_session.commit()
+
+ result = await mark_stale_generation_jobs(db_session, stale_after_minutes=30)
+
+ assert result == {"running": 1, "marked_stale": 1, "stale_after_minutes": 30}
+
+ refreshed_job = (
+ await db_session.execute(select(GenerationJob).where(GenerationJob.id == stale_job.id))
+ ).scalar_one()
+ assert refreshed_job.status == "failed"
+ assert refreshed_job.current_step == "generation_stale_failed"
+ assert refreshed_job.error_message == "Generation job exceeded 30 minutes without progress."
+
+ events = (
+ await db_session.execute(
+ select(GenerationJobEvent)
+ .where(GenerationJobEvent.job_id == stale_job.id)
+ .order_by(GenerationJobEvent.id)
+ )
+ ).scalars().all()
+ assert events[-1].event_type == "generation_stale_failed"
+ assert events[-1].event_metadata["stale_after_minutes"] == 30
+
+
+async def test_retry_assets_rejects_when_story_has_active_job(
+ db_session,
+ auth_token,
+ degraded_story_with_text,
+):
+ async def override_get_db():
+ yield db_session
+
+ app.dependency_overrides[get_db] = override_get_db
+
+ await create_generation_job(
+ db_session,
+ user_id=degraded_story_with_text.user_id,
+ output_mode="asset_generation",
+ input_type="image",
+ request_payload={"story_id": degraded_story_with_text.id},
+ story_id=degraded_story_with_text.id,
+ )
+
+ transport = ASGITransport(app=app)
+ try:
+ async with AsyncClient(transport=transport, base_url="http://test") as client:
+ client.cookies.set("access_token", auth_token)
+
+ response = await client.post(
+ f"/api/generations/{degraded_story_with_text.id}/retry-assets",
+ json={"assets": ["image"]},
+ )
+
+ assert response.status_code == 409
+ assert "已有运行中的任务" in response.json()["detail"]
+ finally:
+ app.dependency_overrides.clear()
diff --git a/docs/planning/demo-validation-log.md b/docs/planning/demo-validation-log.md
index ea85a31..3c95a44 100644
--- a/docs/planning/demo-validation-log.md
+++ b/docs/planning/demo-validation-log.md
@@ -16,6 +16,8 @@
- `./scripts/demo_smoke.sh` 已覆盖音频缓存状态查询。
- Week 4 Demo 包装已完成:新增架构说明、Demo 包装文档、Week 4 sprint review,用户端和管理端绘本阅读器支持阅读位置恢复。
- Week 4 最终回归通过:后端全量测试 85 passed,ruff 通过,用户端/管理端构建通过,`docker compose up -d --build` 和 `./scripts/demo_smoke.sh` 通过。
+- 继续优化后再次验证:Provider analytics 已支持时间窗口与能力筛选、失败原因摘要;音频缓存已加入 TTL 配置和后台 prune 任务。
+- 新一轮优化验证通过:新增 `GET /api/generations/ops-summary`,故事库已展示最近失败与卡住任务摘要;生成任务已支持 stale 自动收敛和重复资产任务保护。
- 后端新增 `partial_ready`、`text_status` 与迁移 `0012_story_text_status` 后,`backend/.venv/bin/python -m pytest backend/tests -q` 通过,82 个测试通过。
- `backend/.venv/bin/python -m ruff check backend/app backend/tests backend/alembic/versions/0012_add_story_text_status_and_partial_ready.py` 通过。
- 用户端与管理端 `npm run build` 均通过。
diff --git a/docs/planning/week-2-to-4-execution-backlog.md b/docs/planning/week-2-to-4-execution-backlog.md
index fed9e10..4fe50e4 100644
--- a/docs/planning/week-2-to-4-execution-backlog.md
+++ b/docs/planning/week-2-to-4-execution-backlog.md
@@ -66,6 +66,11 @@ Week 2 已完成演示闭环、统一生成工作流、generation job/event、
| W4-03 | Demo | 求职版 Demo 包装 | `docs/planning/demo-package.md` | P0 | Done |
| W4-04 | QA | 全量回归与验证记录 | pytest、ruff、前端 build、Docker smoke | P0 | Done |
| W4-05 | Product | 项目复盘与下一阶段路线 | `docs/planning/week-4-sprint-review.md` | P1 | Done |
+| W4-06 | Ops | Provider analytics 支持时间窗口与失败原因 | `days` / `capability` 筛选 + failure reason summary | P1 | Done |
+| W4-07 | Ops | 音频缓存后台清理 | TTL 配置 + Celery beat prune task | P1 | Done |
+| W4-08 | Ops | 任务运行概览与失败摘要 | `GET /api/generations/ops-summary` + 最近失败列表 | P1 | Done |
+| W4-09 | Workflow | 卡住任务自动收敛 | `GENERATION_JOB_STALE_MINUTES` + Celery beat stale job maintenance | P1 | Done |
+| W4-10 | Workflow | 防止重复资产任务 | 运行中故事拒绝重复封面/音频/资产重试请求 | P1 | Done |
---
diff --git a/docs/planning/week-4-sprint-review.md b/docs/planning/week-4-sprint-review.md
index deaa098..20fa8de 100644
--- a/docs/planning/week-4-sprint-review.md
+++ b/docs/planning/week-4-sprint-review.md
@@ -34,6 +34,7 @@ DreamWeaver 已经具备求职演示所需的完整闭环:
- generation job/event
- Provider failover 和聚合指标
- 跨故事 Provider analytics
+- 任务运行概览、最近失败摘要与卡住任务收敛
- 前端生成轨迹和自动轮询形态
---
@@ -42,7 +43,7 @@ DreamWeaver 已经具备求职演示所需的完整闭环:
最近一轮验证包括:
-- 后端全量测试:85 passed
+- 后端全量测试:91 passed
- 后端 ruff:通过
- 用户端生产构建:通过
- 管理端生产构建:通过
@@ -56,10 +57,9 @@ DreamWeaver 已经具备求职演示所需的完整闭环:
| Priority | Task | Why |
| --- | --- | --- |
| P0 | 将同步生成迁移到 Celery worker | 支持真实长任务、断点恢复和后台进度 |
-| P0 | Provider analytics 加入时间窗口和失败原因 | 让运营分析可用于成本与稳定性决策 |
-| P1 | 音频缓存过期策略与后台清理 | 控制磁盘占用和缓存生命周期 |
| P1 | 生成任务取消与重试队列 | 防止重复任务和用户误触造成浪费 |
-| P1 | 监控告警与结构化 dashboard | 上线前需要可观测性闭环 |
+| P1 | 跨用户 / 跨环境 Provider dashboard | 当前已支持单用户摘要,后续要支持运营视角 |
+| P1 | 监控告警与结构化 dashboard | 目前已有故事库级概览,后续要接入更完整观测体系 |
| P2 | 更细粒度叙事风格与音色策略 | 扩展体验,但不影响当前求职版主线 |
---
diff --git a/docs/product/unified-generation-workflow-prd.md b/docs/product/unified-generation-workflow-prd.md
index c6d5416..92b022b 100644
--- a/docs/product/unified-generation-workflow-prd.md
+++ b/docs/product/unified-generation-workflow-prd.md
@@ -64,6 +64,10 @@ DreamWeaver 当前同时支持普通故事生成、完整故事生成和绘本
- Provider 调用已可按故事聚合为成功率、平均耗时、预估成本和 adapter 明细
- generation job 响应已提供 `progress_percent`、`progress_label` 和 `is_terminal`,前端可直接用于进度条和轮询
- 已新增跨故事 Provider 运营摘要 `GET /api/generations/provider-analytics`,故事库可展示总调用、成功率、平均耗时、预估成本和任务/故事覆盖数
+- 跨故事 Provider 运营摘要已支持按时间窗口和 capability 筛选,并聚合失败原因
+- 已新增任务运行概览 `GET /api/generations/ops-summary`,故事库可展示最近失败、运行中任务和超时待收敛任务
+- 重复资产任务已加入保护:同一故事存在运行中 job 时,不再重复触发封面、音频或统一资产重试
+- Celery beat 已支持定时收敛卡住的 generation job,避免任务长期停在 running
- 用户端与管理端生成轨迹组件会在任务未终止时自动轮询,为后续后台 worker 进度流保留前端形态
- `POST /api/generations` 响应已返回 `generation_job_id`,smoke 脚本会验证 job 查询与 story job history
- 用户端与管理端的故事详情页、绘本阅读页已接入生成轨迹,展示生成/重试任务、关键事件、Provider 调用结果和聚合指标
@@ -74,7 +78,7 @@ DreamWeaver 当前同时支持普通故事生成、完整故事生成和绘本
- 普通故事、完整生成、绘本生成已有统一外部入口,内部 workflow 仍可继续减少兼容层分支
- 统一资产重试入口已覆盖普通故事封面、绘本缺失插图和故事音频,后续可继续扩展更细的资产级审计
-- 后台异步 worker 执行、断点续跑、跨时间窗口筛选和更完整的 Provider 运营分析仍属于后续生产化增强
+- 后台异步 worker 执行、断点续跑、跨用户/跨环境 Provider 分析,以及真正的取消/重试队列仍属于后续生产化增强
### What This Means
diff --git a/docs/technical/generation-job-state.md b/docs/technical/generation-job-state.md
index 62cd783..68ada97 100644
--- a/docs/technical/generation-job-state.md
+++ b/docs/technical/generation-job-state.md
@@ -19,9 +19,10 @@
- `GET /api/generations/jobs/{job_id}`:查询单次生成/补全任务及其事件流。
- `GET /api/generations/{story_id}/jobs`:查询某个故事或绘本的生成与重试历史。
- `GET /api/generations/{story_id}/provider-stats`:按故事聚合 Provider 调用成功率、平均耗时、预估成本和 adapter 明细。
-- `GET /api/generations/provider-analytics`:按当前用户聚合跨故事 Provider 调用、任务数、故事数、成功率、平均耗时和预估成本。
+- `GET /api/generations/provider-analytics`:按当前用户聚合跨故事 Provider 调用、任务数、故事数、成功率、平均耗时和预估成本,并支持 `days` / `capability` 筛选。
+- `GET /api/generations/ops-summary`:按当前用户聚合最近任务健康度,包括运行中数量、超时阈值、卡住任务数和最近失败摘要。
-job 响应会返回 `progress_percent`、`progress_label` 和 `is_terminal`,用户端与管理端已经消费这些查询入口,在故事详情页和绘本阅读页展示最近任务、任务历史、事件时间线、进度条和 Provider 聚合指标;当任务未终止时,前端会自动轮询,为后台 worker 进度流预留体验形态。
+job 响应会返回 `progress_percent`、`progress_label` 和 `is_terminal`,用户端与管理端已经消费这些查询入口,在故事详情页和绘本阅读页展示最近任务、任务历史、事件时间线、进度条和 Provider 聚合指标;当任务未终止时,前端会自动轮询,为后台 worker 进度流预留体验形态。当前 analytics 还会聚合失败原因,便于快速解释“最近为什么失败”;ops summary 会额外把“哪些任务卡住了、最近哪些任务失败了”压缩成故事库首页能直接看的摘要。
## 现有状态模型
@@ -37,6 +38,12 @@ job 响应会返回 `progress_percent`、`progress_label` 和 `is_terminal`,
这些字段足够支撑前端展示、smoke 检查、失败降级、资产重试和生成轨迹解释。
+## 当前维护策略
+
+- 音频缓存由 `STORY_AUDIO_CACHE_TTL_DAYS` 控制过期时间,Celery beat 会每日清理。
+- 生成任务由 `GENERATION_JOB_STALE_MINUTES` 控制卡住阈值,Celery beat 会每 30 分钟扫描一次,将超时运行中的任务标记为 `generation_stale_failed`。
+- 当某个故事已经有运行中的 job 时,封面补全、音频生成和统一资产重试会直接拒绝重复请求,避免用户连点造成重复成本。
+
## 什么时候需要落库 job
如果后续进入真实生产化,需要扩展当前 job/event 模型:
@@ -52,7 +59,7 @@ job 响应会返回 `progress_percent`、`progress_label` 和 `is_terminal`,
当前已有两层记录,未来可以继续扩展字段和事件颗粒度:
- 将同步生成请求迁移到真正异步 worker 后,继续复用现有 job 查询和前端轮询进度条。
-- 将当前跨故事 provider 指标扩展为跨时间窗口、跨用户和失败原因维度的运营分析。
+- 将当前跨故事 provider 指标扩展为跨用户、跨环境和更细颗粒度的失败原因维度分析。
## 面试表达
diff --git a/frontend/src/types/generation.ts b/frontend/src/types/generation.ts
index 9e6a9f3..ff6e80a 100644
--- a/frontend/src/types/generation.ts
+++ b/frontend/src/types/generation.ts
@@ -42,15 +42,23 @@ export interface GenerationProviderStat {
export interface GenerationProviderStats {
story_id: number
+ window_days: number | null
+ capability: string | null
total_calls: number
successful_calls: number
failed_calls: number
avg_latency_ms: number | null
estimated_cost_usd: number
by_provider: GenerationProviderStat[]
+ failure_reasons: Array<{
+ reason: string
+ count: number
+ }>
}
export interface GenerationProviderAnalytics {
+ window_days: number | null
+ capability: string | null
total_calls: number
successful_calls: number
failed_calls: number
@@ -59,4 +67,30 @@ export interface GenerationProviderAnalytics {
job_count: number
story_count: number
by_provider: GenerationProviderStat[]
+ failure_reasons: Array<{
+ reason: string
+ count: number
+ }>
+}
+
+export interface GenerationRecentFailure {
+ job_id: string
+ story_id: number | null
+ story_title: string | null
+ output_mode: string
+ current_step: string
+ error_message: string | null
+ failure_label: string
+ updated_at: string
+}
+
+export interface GenerationOpsSummary {
+ window_hours: number
+ stale_threshold_minutes: number
+ active_jobs: number
+ stale_running_jobs: number
+ failed_jobs: number
+ degraded_jobs: number
+ asset_retry_jobs: number
+ recent_failures: GenerationRecentFailure[]
}
diff --git a/frontend/src/views/MyStories.vue b/frontend/src/views/MyStories.vue
index 77847ba..f45e245 100644
--- a/frontend/src/views/MyStories.vue
+++ b/frontend/src/views/MyStories.vue
@@ -1,5 +1,5 @@
@@ -191,6 +234,18 @@ onMounted(() => {
最近生成和资源补全留下的供应商调用轨迹。
+
+
+
+
+
+
+
+
+
+
+
+