diff --git a/backend/app/services/harness/__init__.py b/backend/app/services/harness/__init__.py
new file mode 100644
index 0000000..00c33ae
--- /dev/null
+++ b/backend/app/services/harness/__init__.py
@@ -0,0 +1,2 @@
+"""Generation harness runtime support."""
+
diff --git a/backend/app/services/harness/artifacts.py b/backend/app/services/harness/artifacts.py
new file mode 100644
index 0000000..2ec1be9
--- /dev/null
+++ b/backend/app/services/harness/artifacts.py
@@ -0,0 +1,37 @@
+"""Artifact result types for generation harness workflows."""
+
+from dataclasses import dataclass
+from typing import Literal
+
+from app.services.story_status import StoryAssetStatus
+
+AssetCompletionKind = Literal["cover_image", "storybook_images", "audio"]
+
+
+@dataclass(frozen=True)
+class AssetCompletionResult:
+ """Service-level result for a generated asset completion attempt."""
+
+ asset: AssetCompletionKind
+ status: StoryAssetStatus
+ value: str | bytes | None = None
+ error: str | None = None
+ blocks_main_result: bool = False
+
+ @property
+ def succeeded(self) -> bool:
+ """Whether the asset reached a usable ready state."""
+
+ return self.status == StoryAssetStatus.READY and self.error is None
+
+
+def asset_result_metadata(result: AssetCompletionResult) -> dict:
+ """Build JSON-safe metadata for asset workflow events."""
+
+ return {
+ "asset": result.asset,
+ "status": result.status.value,
+ "error": result.error,
+ "blocks_main_result": result.blocks_main_result,
+ }
+
diff --git a/backend/app/services/harness/asset_workflows.py b/backend/app/services/harness/asset_workflows.py
new file mode 100644
index 0000000..b73b3a8
--- /dev/null
+++ b/backend/app/services/harness/asset_workflows.py
@@ -0,0 +1,468 @@
+"""Artifact completion workflows for the generation harness runtime."""
+
+from collections.abc import Awaitable, Callable
+
+from fastapi import HTTPException
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.core.logging import get_logger
+from app.db.models import Story
+from app.services.harness.artifacts import AssetCompletionResult, asset_result_metadata
+from app.services.harness.control import ExecutionControl
+from app.services.harness.trace import TraceRecorder
+from app.services.story_status import StoryAssetStatus, sync_story_status
+
+logger = get_logger(__name__)
+
+ImageGenerator = Callable[..., Awaitable[str]]
+TTSGenerator = Callable[..., Awaitable[bytes]]
+AudioCacheExists = Callable[[str], bool]
+AudioCacheReader = Callable[[str], bytes]
+AudioCacheWriter = Callable[[int, bytes], str]
+
+
+async def complete_cover_image_asset(
+ story: Story,
+ db: AsyncSession,
+ *,
+ generate_image_func: ImageGenerator,
+ raise_on_failure: bool = False,
+ last_error_prefix: str | None = None,
+ log_event: str = "cover_asset_generation_failed",
+ job=None,
+) -> AssetCompletionResult:
+ """Generate or retry a text story cover through one asset workflow."""
+
+ if not story.cover_prompt:
+ raise HTTPException(status_code=400, detail="Story has no cover prompt")
+
+ sync_story_status(story, image_status=StoryAssetStatus.GENERATING)
+ await db.commit()
+ await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story)
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="cover_image_started",
+ status="running",
+ message="Cover image generation started.",
+ metadata={"asset": "image", "cover_prompt_present": True},
+ )
+
+ try:
+ await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story)
+ image_url = await generate_image_func(
+ story.cover_prompt,
+ db=db,
+ user_id=story.user_id,
+ generation_job=job,
+ story_id=story.id,
+ )
+ story.image_url = image_url
+ sync_story_status(story, image_status=StoryAssetStatus.READY)
+ await db.commit()
+ result = AssetCompletionResult(
+ asset="cover_image",
+ status=StoryAssetStatus.READY,
+ value=image_url,
+ blocks_main_result=raise_on_failure,
+ )
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="cover_image_succeeded",
+ status="succeeded",
+ message="Cover image was generated.",
+ metadata=asset_result_metadata(result),
+ )
+ return result
+ except Exception as exc:
+ provider_error = str(exc)
+ last_error = (
+ f"{last_error_prefix}: {provider_error}"
+ if last_error_prefix
+ else provider_error
+ )
+ sync_story_status(
+ story,
+ image_status=StoryAssetStatus.FAILED,
+ last_error=last_error,
+ )
+ await db.commit()
+ logger.warning(log_event, story_id=story.id, error=provider_error)
+
+ result = AssetCompletionResult(
+ asset="cover_image",
+ status=StoryAssetStatus.FAILED,
+ error=provider_error,
+ blocks_main_result=raise_on_failure,
+ )
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="cover_image_failed",
+ status="failed",
+ message="Cover image generation failed.",
+ metadata=asset_result_metadata(result),
+ )
+ if raise_on_failure:
+ raise HTTPException(
+ status_code=500,
+ detail=f"Image generation failed: {provider_error}",
+ ) from exc
+
+ return result
+
+
+async def read_cached_audio_asset(
+ story: Story,
+ db: AsyncSession,
+ *,
+ audio_cache_exists_func: AudioCacheExists,
+ read_audio_cache_func: AudioCacheReader,
+) -> bytes | None:
+ """Read cached audio or repair stale audio cache metadata."""
+
+ if story.audio_path and audio_cache_exists_func(story.audio_path):
+ if story.audio_status != StoryAssetStatus.READY.value:
+ sync_story_status(story, audio_status=StoryAssetStatus.READY)
+ await db.commit()
+ return read_audio_cache_func(story.audio_path)
+
+ if story.audio_path and not audio_cache_exists_func(story.audio_path):
+ logger.warning(
+ "story_audio_cache_missing",
+ story_id=story.id,
+ audio_path=story.audio_path,
+ )
+ story.audio_path = None
+ if story.audio_status == StoryAssetStatus.READY.value:
+ sync_story_status(story, audio_status=StoryAssetStatus.NOT_REQUESTED)
+ await db.commit()
+
+ return None
+
+
+async def complete_audio_asset(
+ story: Story,
+ db: AsyncSession,
+ *,
+ text_to_speech_func: TTSGenerator,
+ audio_cache_exists_func: AudioCacheExists,
+ read_audio_cache_func: AudioCacheReader,
+ write_story_audio_cache_func: AudioCacheWriter,
+ raise_on_failure: bool = True,
+ job=None,
+) -> AssetCompletionResult:
+ """Complete TTS audio generation through one asset workflow."""
+
+ if not story.story_text:
+ raise HTTPException(status_code=400, detail="Story has no text")
+
+ cached_audio = await read_cached_audio_asset(
+ story,
+ db,
+ audio_cache_exists_func=audio_cache_exists_func,
+ read_audio_cache_func=read_audio_cache_func,
+ )
+ if cached_audio is not None:
+ result = AssetCompletionResult(
+ asset="audio",
+ status=StoryAssetStatus.READY,
+ value=cached_audio,
+ blocks_main_result=raise_on_failure,
+ )
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="audio_cache_hit",
+ status="succeeded",
+ message="Cached story audio was reused.",
+ metadata=asset_result_metadata(result),
+ )
+ return result
+
+ sync_story_status(story, audio_status=StoryAssetStatus.GENERATING)
+ await db.commit()
+ await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story)
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="audio_started",
+ status="running",
+ message="Story audio generation started.",
+ metadata={"asset": "audio"},
+ )
+
+ try:
+ await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story)
+ audio_data = await text_to_speech_func(
+ story.story_text,
+ db=db,
+ user_id=story.user_id,
+ generation_job=job,
+ story_id=story.id,
+ )
+ story.audio_path = write_story_audio_cache_func(story.id, audio_data)
+ sync_story_status(
+ story,
+ audio_status=StoryAssetStatus.READY,
+ )
+ await db.commit()
+ result = AssetCompletionResult(
+ asset="audio",
+ status=StoryAssetStatus.READY,
+ value=audio_data,
+ blocks_main_result=raise_on_failure,
+ )
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="audio_succeeded",
+ status="succeeded",
+ message="Story audio was generated and cached.",
+ metadata=asset_result_metadata(result),
+ )
+ return result
+ except Exception as exc:
+ provider_error = str(exc)
+ story.audio_path = None
+ sync_story_status(
+ story,
+ audio_status=StoryAssetStatus.FAILED,
+ last_error=provider_error,
+ )
+ await db.commit()
+ logger.error("audio_generation_failed", story_id=story.id, error=provider_error)
+
+ result = AssetCompletionResult(
+ asset="audio",
+ status=StoryAssetStatus.FAILED,
+ error=provider_error,
+ blocks_main_result=raise_on_failure,
+ )
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="audio_failed",
+ status="failed",
+ message="Story audio generation failed.",
+ metadata=asset_result_metadata(result),
+ )
+ if raise_on_failure:
+ raise HTTPException(
+ status_code=500,
+ detail=f"Audio generation failed: {provider_error}",
+ ) from exc
+
+ return result
+
+
+def get_storybook_pages_data(story: Story) -> list[dict]:
+ """Return mutable storybook page data from the persisted JSON field."""
+
+ return [dict(page) for page in story.pages or [] if isinstance(page, dict)]
+
+
+def build_storybook_error_message(
+ *,
+ cover_failed: bool,
+ failed_pages: list[int],
+) -> str | None:
+ """Summarize storybook image generation errors for the latest attempt."""
+
+ parts: list[str] = []
+ if cover_failed:
+ parts.append("封面生成失败")
+ if failed_pages:
+ pages = "、".join(str(page) for page in sorted(failed_pages))
+ parts.append(f"第 {pages} 页插图生成失败")
+ return ";".join(parts) if parts else None
+
+
+def resolve_storybook_image_status(
+ *,
+ generate_images: bool,
+ cover_prompt: str | None,
+ cover_url: str | None,
+ pages_data: list[dict],
+) -> StoryAssetStatus:
+ """Resolve the persisted image status for a storybook."""
+
+ if not generate_images:
+ return StoryAssetStatus.NOT_REQUESTED
+
+ expected_assets = 0
+ ready_assets = 0
+
+ if cover_prompt or cover_url:
+ expected_assets += 1
+ if cover_url:
+ ready_assets += 1
+
+ for page in pages_data:
+ if not page.get("image_prompt") and not page.get("image_url"):
+ continue
+ expected_assets += 1
+ if page.get("image_url"):
+ ready_assets += 1
+
+ if expected_assets == 0:
+ return StoryAssetStatus.NOT_REQUESTED
+
+ if ready_assets == expected_assets:
+ return StoryAssetStatus.READY
+
+ return StoryAssetStatus.FAILED
+
+
+async def complete_storybook_image_assets(
+ story: Story,
+ db: AsyncSession,
+ *,
+ generate_image_func: ImageGenerator,
+ job=None,
+) -> AssetCompletionResult:
+ """Complete missing cover/page images for a persisted storybook."""
+
+ pages_data = get_storybook_pages_data(story)
+ has_image_prompt = bool(story.cover_prompt) or any(
+ page.get("image_prompt") for page in pages_data
+ )
+ if not has_image_prompt:
+ raise HTTPException(status_code=400, detail="Storybook has no image prompts")
+
+ sync_story_status(story, image_status=StoryAssetStatus.GENERATING)
+ await db.commit()
+ await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story)
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="storybook_images_started",
+ status="running",
+ message="Storybook missing image completion started.",
+ metadata={"asset": "image"},
+ )
+
+ cover_failed = False
+ failed_pages: list[int] = []
+ completed_pages: list[int] = []
+
+ if story.cover_prompt and not story.image_url:
+ await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story)
+ try:
+ story.image_url = await generate_image_func(
+ story.cover_prompt,
+ db=db,
+ user_id=story.user_id,
+ generation_job=job,
+ story_id=story.id,
+ )
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="storybook_cover_image_succeeded",
+ status="succeeded",
+ message="Storybook cover image was generated.",
+ metadata={"asset": "image", "scope": "cover"},
+ )
+ except Exception as exc:
+ cover_failed = True
+ logger.warning(
+ "storybook_cover_asset_completion_failed",
+ story_id=story.id,
+ error=str(exc),
+ )
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="storybook_cover_image_failed",
+ status="failed",
+ message="Storybook cover image generation failed.",
+ metadata={"asset": "image", "scope": "cover", "error": str(exc)},
+ )
+
+ for page in pages_data:
+ if not page.get("image_prompt") or page.get("image_url"):
+ continue
+
+ await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story)
+ try:
+ page["image_url"] = await generate_image_func(
+ page["image_prompt"],
+ db=db,
+ user_id=story.user_id,
+ generation_job=job,
+ story_id=story.id,
+ )
+ page_number = page.get("page_number")
+ if isinstance(page_number, int):
+ completed_pages.append(page_number)
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="storybook_page_image_succeeded",
+ status="succeeded",
+ message="Storybook page image was generated.",
+ metadata={"asset": "image", "scope": "page", "page_number": page_number},
+ )
+ except Exception as exc:
+ page_number = page.get("page_number")
+ if isinstance(page_number, int):
+ failed_pages.append(page_number)
+ logger.warning(
+ "storybook_page_asset_completion_failed",
+ story_id=story.id,
+ page=page_number,
+ error=str(exc),
+ )
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="storybook_page_image_failed",
+ status="failed",
+ message="Storybook page image generation failed.",
+ metadata={
+ "asset": "image",
+ "scope": "page",
+ "page_number": page_number,
+ "error": str(exc),
+ },
+ )
+
+ story.pages = pages_data
+ error_message = build_storybook_error_message(
+ cover_failed=cover_failed,
+ failed_pages=failed_pages,
+ )
+ image_status = resolve_storybook_image_status(
+ generate_images=True,
+ cover_prompt=story.cover_prompt,
+ cover_url=story.image_url,
+ pages_data=pages_data,
+ )
+ sync_story_status(
+ story,
+ image_status=image_status,
+ last_error=error_message,
+ )
+ await db.commit()
+
+ result = AssetCompletionResult(
+ asset="storybook_images",
+ status=image_status,
+ value=story.image_url,
+ error=error_message,
+ )
+ await TraceRecorder(db).record_step(
+ job=job,
+ story_id=story.id,
+ event_type="storybook_images_completed",
+ status="failed" if error_message else "succeeded",
+ message="Storybook image completion finished.",
+ metadata={
+ **asset_result_metadata(result),
+ "completed_pages": sorted(completed_pages),
+ "failed_pages": sorted(failed_pages),
+ },
+ )
+ return result
diff --git a/backend/app/services/harness/control.py b/backend/app/services/harness/control.py
new file mode 100644
index 0000000..6789b5e
--- /dev/null
+++ b/backend/app/services/harness/control.py
@@ -0,0 +1,48 @@
+"""Execution control helpers for generation harness workflows."""
+
+from typing import TYPE_CHECKING
+
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.services.generation_jobs import finish_generation_job
+
+if TYPE_CHECKING:
+ from app.db.models import GenerationJob, Story
+
+
+class GenerationJobCanceledError(Exception):
+ """Raised when a running worker job has been canceled by the user."""
+
+
+class ExecutionControl:
+ """Runtime control surface for cancelable generation workflows."""
+
+ def __init__(self, db: AsyncSession):
+ self.db = db
+
+ async def stop_if_cancel_requested(
+ self,
+ *,
+ job: "GenerationJob | None",
+ story: "Story | None" = None,
+ ) -> None:
+ """Stop a worker-owned job at the next safe checkpoint after cancellation."""
+
+ if job is None:
+ return
+
+ await self.db.refresh(job)
+ if job.current_step != "cancel_requested":
+ return
+
+ await finish_generation_job(
+ self.db,
+ job=job,
+ story=story,
+ status="canceled",
+ current_step="generation_canceled",
+ error_message="Generation canceled by user.",
+ message="Generation job was canceled after a user request.",
+ )
+ raise GenerationJobCanceledError()
+
diff --git a/backend/app/services/harness/plans.py b/backend/app/services/harness/plans.py
new file mode 100644
index 0000000..9163ca7
--- /dev/null
+++ b/backend/app/services/harness/plans.py
@@ -0,0 +1,237 @@
+"""Workflow plan builders for generation harness workflows."""
+
+from dataclasses import dataclass
+from enum import StrEnum
+from typing import Any
+
+from app.services.harness.types import ArtifactKind, WorkflowStep
+
+
+class WorkflowMode(StrEnum):
+ """Supported executable workflow modes."""
+
+ STORY = "story"
+ STORY_WITH_ASSETS = "story_with_assets"
+ STORYBOOK = "storybook"
+ ASSET_GENERATION = "asset_generation"
+ ASSET_RETRY = "asset_retry"
+
+
+@dataclass(frozen=True)
+class WorkflowTask:
+ """One planned step in a generation workflow."""
+
+ key: str
+ step: WorkflowStep
+ artifact: ArtifactKind
+ required: bool = True
+ recoverable: bool = False
+
+ def to_snapshot(self) -> dict[str, Any]:
+ """Return a JSON-safe snapshot for tests and trace metadata."""
+
+ return {
+ "key": self.key,
+ "step": self.step.value,
+ "artifact": self.artifact.value,
+ "required": self.required,
+ "recoverable": self.recoverable,
+ }
+
+
+@dataclass(frozen=True)
+class WorkflowPlan:
+ """Declarative shape of a generation workflow before execution."""
+
+ mode: WorkflowMode
+ tasks: tuple[WorkflowTask, ...]
+
+ def to_snapshot(self) -> dict[str, Any]:
+ """Return a JSON-safe snapshot for tests and trace metadata."""
+
+ return {
+ "mode": self.mode.value,
+ "tasks": [task.to_snapshot() for task in self.tasks],
+ }
+
+
+def build_story_plan(*, generate_images: bool) -> WorkflowPlan:
+ """Build a plan for a text story generation request."""
+
+ tasks = [
+ WorkflowTask(
+ key="prepare_context",
+ step=WorkflowStep.CONTEXT_PREPARATION,
+ artifact=ArtifactKind.NONE,
+ ),
+ WorkflowTask(
+ key="generate_narrative",
+ step=WorkflowStep.NARRATIVE_GENERATION,
+ artifact=ArtifactKind.STORY_TEXT,
+ ),
+ WorkflowTask(
+ key="persist_story",
+ step=WorkflowStep.STORY_PERSISTENCE,
+ artifact=ArtifactKind.STORY_TEXT,
+ ),
+ ]
+
+ if generate_images:
+ tasks.append(
+ WorkflowTask(
+ key="generate_cover_image",
+ step=WorkflowStep.IMAGE_GENERATION,
+ artifact=ArtifactKind.COVER_IMAGE,
+ required=False,
+ recoverable=True,
+ )
+ )
+
+ tasks.extend(
+ [
+ WorkflowTask(
+ key="queue_postprocessing",
+ step=WorkflowStep.POSTPROCESSING,
+ artifact=ArtifactKind.ACHIEVEMENT_MEMORY,
+ required=False,
+ recoverable=True,
+ ),
+ WorkflowTask(
+ key="complete_generation",
+ step=WorkflowStep.COMPLETION,
+ artifact=ArtifactKind.NONE,
+ ),
+ ]
+ )
+
+ return WorkflowPlan(
+ mode=WorkflowMode.STORY_WITH_ASSETS if generate_images else WorkflowMode.STORY,
+ tasks=tuple(tasks),
+ )
+
+
+def build_storybook_plan(*, generate_images: bool) -> WorkflowPlan:
+ """Build a plan for a storybook generation request."""
+
+ tasks = [
+ WorkflowTask(
+ key="prepare_context",
+ step=WorkflowStep.CONTEXT_PREPARATION,
+ artifact=ArtifactKind.NONE,
+ ),
+ WorkflowTask(
+ key="generate_storybook_pages",
+ step=WorkflowStep.NARRATIVE_GENERATION,
+ artifact=ArtifactKind.STORYBOOK_PAGES,
+ ),
+ ]
+
+ if generate_images:
+ tasks.append(
+ WorkflowTask(
+ key="generate_storybook_images",
+ step=WorkflowStep.IMAGE_GENERATION,
+ artifact=ArtifactKind.IMAGE,
+ required=False,
+ recoverable=True,
+ )
+ )
+
+ tasks.extend(
+ [
+ WorkflowTask(
+ key="persist_storybook",
+ step=WorkflowStep.STORY_PERSISTENCE,
+ artifact=ArtifactKind.STORYBOOK_PAGES,
+ ),
+ WorkflowTask(
+ key="queue_postprocessing",
+ step=WorkflowStep.POSTPROCESSING,
+ artifact=ArtifactKind.ACHIEVEMENT_MEMORY,
+ required=False,
+ recoverable=True,
+ ),
+ WorkflowTask(
+ key="complete_generation",
+ step=WorkflowStep.COMPLETION,
+ artifact=ArtifactKind.NONE,
+ ),
+ ]
+ )
+
+ return WorkflowPlan(mode=WorkflowMode.STORYBOOK, tasks=tuple(tasks))
+
+
+def build_asset_plan(*, output_mode: str, assets: list[str]) -> WorkflowPlan:
+ """Build a plan for asset generation or retry jobs."""
+
+ mode = (
+ WorkflowMode.ASSET_RETRY
+ if output_mode == WorkflowMode.ASSET_RETRY.value
+ else WorkflowMode.ASSET_GENERATION
+ )
+ initial_step = (
+ WorkflowStep.ASSET_RETRY
+ if mode == WorkflowMode.ASSET_RETRY
+ else WorkflowStep.ASSET_GENERATION
+ )
+ initial_key = (
+ "start_asset_retry"
+ if mode == WorkflowMode.ASSET_RETRY
+ else "start_asset_generation"
+ )
+ completion_key = (
+ "complete_asset_retry"
+ if mode == WorkflowMode.ASSET_RETRY
+ else "complete_asset_generation"
+ )
+
+ tasks = [
+ WorkflowTask(
+ key=initial_key,
+ step=initial_step,
+ artifact=ArtifactKind.NONE,
+ )
+ ]
+
+ for asset in dict.fromkeys(assets):
+ if asset == "image":
+ tasks.append(
+ WorkflowTask(
+ key="complete_image_asset",
+ step=WorkflowStep.IMAGE_GENERATION,
+ artifact=ArtifactKind.IMAGE,
+ required=False,
+ recoverable=True,
+ )
+ )
+ elif asset == "audio":
+ tasks.append(
+ WorkflowTask(
+ key="complete_audio_asset",
+ step=WorkflowStep.AUDIO_GENERATION,
+ artifact=ArtifactKind.AUDIO,
+ required=False,
+ recoverable=True,
+ )
+ )
+ else:
+ tasks.append(
+ WorkflowTask(
+ key=f"complete_{asset}_asset",
+ step=WorkflowStep.UNKNOWN,
+ artifact=ArtifactKind.UNKNOWN,
+ required=False,
+ recoverable=True,
+ )
+ )
+
+ tasks.append(
+ WorkflowTask(
+ key=completion_key,
+ step=initial_step,
+ artifact=ArtifactKind.NONE,
+ )
+ )
+
+ return WorkflowPlan(mode=mode, tasks=tuple(tasks))
diff --git a/backend/app/services/harness/quality_gates.py b/backend/app/services/harness/quality_gates.py
new file mode 100644
index 0000000..2a423e0
--- /dev/null
+++ b/backend/app/services/harness/quality_gates.py
@@ -0,0 +1,191 @@
+"""Deterministic quality gates for generated child-facing content."""
+
+from dataclasses import dataclass
+from enum import StrEnum
+
+from app.services.adapters.storybook.primary import Storybook
+from app.services.adapters.text.models import StoryOutput
+from app.services.harness.types import FailureCategory
+
+
+class QualityGateCode(StrEnum):
+ """Stable issue codes emitted by deterministic quality gates."""
+
+ MISSING_TITLE = "missing_title"
+ MISSING_STORY_TEXT = "missing_story_text"
+ MISSING_COVER_PROMPT = "missing_cover_prompt"
+ MISSING_STORYBOOK_PAGE = "missing_storybook_page"
+ INVALID_STORYBOOK_PAGE_NUMBER = "invalid_storybook_page_number"
+ MISSING_STORYBOOK_PAGE_TEXT = "missing_storybook_page_text"
+ UNSAFE_CHILD_CONTENT = "unsafe_child_content"
+
+
+@dataclass(frozen=True)
+class QualityGateIssue:
+ """One deterministic quality gate issue."""
+
+ code: QualityGateCode
+ message: str
+ failure_category: FailureCategory = FailureCategory.SCHEMA_ERROR
+ field: str | None = None
+
+ def to_metadata(self) -> dict:
+ """Return a JSON-safe metadata payload."""
+
+ return {
+ "code": self.code.value,
+ "message": self.message,
+ "failure_category": self.failure_category.value,
+ "field": self.field,
+ }
+
+
+class QualityGateError(ValueError):
+ """Raised when generated content fails deterministic quality gates."""
+
+ def __init__(self, issues: list[QualityGateIssue]):
+ self.issues = issues
+ message = ";".join(issue.message for issue in issues)
+ super().__init__(message)
+
+ def to_metadata(self) -> dict:
+ """Return a JSON-safe metadata payload."""
+
+ return {"issues": [issue.to_metadata() for issue in self.issues]}
+
+
+UNSAFE_CHILD_TERMS = (
+ "自杀",
+ "自残",
+ "血腥",
+ "虐待",
+ "毒品",
+ "色情",
+)
+
+
+def _is_blank(value: str | None) -> bool:
+ return not value or not value.strip()
+
+
+def _unsafe_issue_if_present(text: str, *, field: str) -> QualityGateIssue | None:
+ for term in UNSAFE_CHILD_TERMS:
+ if term in text:
+ return QualityGateIssue(
+ code=QualityGateCode.UNSAFE_CHILD_CONTENT,
+ message="生成内容包含不适合 3-8 岁儿童的明显风险词。",
+ failure_category=FailureCategory.SAFETY_ERROR,
+ field=field,
+ )
+ return None
+
+
+def validate_story_output(output: StoryOutput) -> None:
+ """Validate generated text story output before persistence."""
+
+ issues: list[QualityGateIssue] = []
+
+ if _is_blank(output.title):
+ issues.append(
+ QualityGateIssue(
+ code=QualityGateCode.MISSING_TITLE,
+ message="故事标题为空。",
+ field="title",
+ )
+ )
+
+ if _is_blank(output.story_text):
+ issues.append(
+ QualityGateIssue(
+ code=QualityGateCode.MISSING_STORY_TEXT,
+ message="故事正文为空。",
+ field="story_text",
+ )
+ )
+
+ if _is_blank(output.cover_prompt_suggestion):
+ issues.append(
+ QualityGateIssue(
+ code=QualityGateCode.MISSING_COVER_PROMPT,
+ message="封面提示词为空。",
+ field="cover_prompt_suggestion",
+ )
+ )
+
+ unsafe_issue = _unsafe_issue_if_present(
+ " ".join([output.title or "", output.story_text or ""]),
+ field="story_text",
+ )
+ if unsafe_issue is not None:
+ issues.append(unsafe_issue)
+
+ if issues:
+ raise QualityGateError(issues)
+
+
+def validate_storybook_output(output: Storybook) -> None:
+ """Validate generated storybook output before persistence."""
+
+ issues: list[QualityGateIssue] = []
+
+ if _is_blank(output.title):
+ issues.append(
+ QualityGateIssue(
+ code=QualityGateCode.MISSING_TITLE,
+ message="绘本标题为空。",
+ field="title",
+ )
+ )
+
+ if not output.pages:
+ issues.append(
+ QualityGateIssue(
+ code=QualityGateCode.MISSING_STORYBOOK_PAGE,
+ message="绘本至少需要一页内容。",
+ field="pages",
+ )
+ )
+
+ seen_page_numbers: set[int] = set()
+ page_texts: list[str] = []
+ for index, page in enumerate(output.pages, start=1):
+ if not isinstance(page.page_number, int) or page.page_number <= 0:
+ issues.append(
+ QualityGateIssue(
+ code=QualityGateCode.INVALID_STORYBOOK_PAGE_NUMBER,
+ message=f"绘本第 {index} 个页面页码无效。",
+ field=f"pages[{index - 1}].page_number",
+ )
+ )
+ elif page.page_number in seen_page_numbers:
+ issues.append(
+ QualityGateIssue(
+ code=QualityGateCode.INVALID_STORYBOOK_PAGE_NUMBER,
+ message=f"绘本页码 {page.page_number} 重复。",
+ field=f"pages[{index - 1}].page_number",
+ )
+ )
+ else:
+ seen_page_numbers.add(page.page_number)
+
+ if _is_blank(page.text):
+ issues.append(
+ QualityGateIssue(
+ code=QualityGateCode.MISSING_STORYBOOK_PAGE_TEXT,
+ message=f"绘本第 {index} 页正文为空。",
+ field=f"pages[{index - 1}].text",
+ )
+ )
+ else:
+ page_texts.append(page.text)
+
+ unsafe_issue = _unsafe_issue_if_present(
+ " ".join([output.title or "", *page_texts]),
+ field="pages",
+ )
+ if unsafe_issue is not None:
+ issues.append(unsafe_issue)
+
+ if issues:
+ raise QualityGateError(issues)
+
diff --git a/backend/app/services/harness/trace.py b/backend/app/services/harness/trace.py
new file mode 100644
index 0000000..4206bfe
--- /dev/null
+++ b/backend/app/services/harness/trace.py
@@ -0,0 +1,64 @@
+"""Trace recording helpers for generation harness workflows."""
+
+from typing import TYPE_CHECKING, Any
+
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.services.generation_jobs import record_generation_event
+from app.services.harness.types import (
+ ArtifactKind,
+ FailureCategory,
+ WorkflowStep,
+ normalize_trace_metadata,
+)
+
+if TYPE_CHECKING:
+ from app.db.models import GenerationJob
+
+
+class TraceRecorder:
+ """Append workflow events with standard harness trace metadata."""
+
+ def __init__(self, db: AsyncSession):
+ self.db = db
+
+ async def record_step(
+ self,
+ *,
+ job: "GenerationJob | None",
+ event_type: str,
+ status: str,
+ story_id: int | None = None,
+ message: str | None = None,
+ metadata: dict[str, Any] | None = None,
+ step: WorkflowStep | str | None = None,
+ artifact: ArtifactKind | str | None = None,
+ failure_category: FailureCategory | str | None = None,
+ retryable: bool | None = None,
+ blocks_main_result: bool | None = None,
+ commit: bool = True,
+ ):
+ """Append a workflow event when the caller is running under a tracked job."""
+
+ if job is None:
+ return None
+
+ return await record_generation_event(
+ self.db,
+ job=job,
+ story_id=story_id,
+ event_type=event_type,
+ status=status,
+ message=message,
+ metadata=normalize_trace_metadata(
+ event_type,
+ metadata,
+ step=step,
+ artifact=artifact,
+ failure_category=failure_category,
+ retryable=retryable,
+ blocks_main_result=blocks_main_result,
+ ),
+ commit=commit,
+ )
+
diff --git a/backend/app/services/harness/types.py b/backend/app/services/harness/types.py
new file mode 100644
index 0000000..174a44d
--- /dev/null
+++ b/backend/app/services/harness/types.py
@@ -0,0 +1,169 @@
+"""Shared types for the generation harness runtime."""
+
+from enum import StrEnum
+from typing import Any
+
+
+class WorkflowStep(StrEnum):
+ """Standard product-level steps for generation workflows."""
+
+ REQUEST_ACCEPTANCE = "request_acceptance"
+ WORKER_START = "worker_start"
+ CONTEXT_PREPARATION = "context_preparation"
+ NARRATIVE_GENERATION = "narrative_generation"
+ STORY_PERSISTENCE = "story_persistence"
+ PROVIDER_INVOCATION = "provider_invocation"
+ IMAGE_GENERATION = "image_generation"
+ AUDIO_GENERATION = "audio_generation"
+ ASSET_RETRY = "asset_retry"
+ ASSET_GENERATION = "asset_generation"
+ POSTPROCESSING = "postprocessing"
+ COMPLETION = "completion"
+ CANCELLATION = "cancellation"
+ STALE_RECOVERY = "stale_recovery"
+ UNKNOWN = "unknown"
+
+
+class ArtifactKind(StrEnum):
+ """Artifacts produced or completed by generation workflows."""
+
+ STORY_TEXT = "story_text"
+ STORYBOOK_PAGES = "storybook_pages"
+ COVER_IMAGE = "cover_image"
+ PAGE_IMAGE = "page_image"
+ IMAGE = "image"
+ AUDIO = "audio"
+ ACHIEVEMENT_MEMORY = "achievement_memory"
+ NONE = "none"
+ UNKNOWN = "unknown"
+
+
+class FailureCategory(StrEnum):
+ """Coarse failure categories for trace and analytics metadata."""
+
+ PROVIDER_ERROR = "provider_error"
+ SCHEMA_ERROR = "schema_error"
+ SAFETY_ERROR = "safety_error"
+ TIMEOUT = "timeout"
+ CANCELED = "canceled"
+ STALE_JOB = "stale_job"
+ STORAGE_ERROR = "storage_error"
+ VALIDATION_ERROR = "validation_error"
+ UNKNOWN_ERROR = "unknown_error"
+
+
+class StepStatus(StrEnum):
+ """Standard status values for a workflow step."""
+
+ QUEUED = "queued"
+ RUNNING = "running"
+ SUCCEEDED = "succeeded"
+ FAILED = "failed"
+ CANCELED = "canceled"
+
+
+EVENT_STEP_MAP: dict[str, WorkflowStep] = {
+ "request_accepted": WorkflowStep.REQUEST_ACCEPTANCE,
+ "retry_queued": WorkflowStep.REQUEST_ACCEPTANCE,
+ "worker_started": WorkflowStep.WORKER_START,
+ "context_prepared": WorkflowStep.CONTEXT_PREPARATION,
+ "narrative_generated": WorkflowStep.NARRATIVE_GENERATION,
+ "story_saved": WorkflowStep.STORY_PERSISTENCE,
+ "provider_call_started": WorkflowStep.PROVIDER_INVOCATION,
+ "provider_call_succeeded": WorkflowStep.PROVIDER_INVOCATION,
+ "provider_call_failed": WorkflowStep.PROVIDER_INVOCATION,
+ "quality_gate_failed": WorkflowStep.NARRATIVE_GENERATION,
+ "cover_image_started": WorkflowStep.IMAGE_GENERATION,
+ "cover_image_succeeded": WorkflowStep.IMAGE_GENERATION,
+ "cover_image_failed": WorkflowStep.IMAGE_GENERATION,
+ "storybook_images_started": WorkflowStep.IMAGE_GENERATION,
+ "storybook_cover_image_succeeded": WorkflowStep.IMAGE_GENERATION,
+ "storybook_cover_image_failed": WorkflowStep.IMAGE_GENERATION,
+ "storybook_page_image_succeeded": WorkflowStep.IMAGE_GENERATION,
+ "storybook_page_image_failed": WorkflowStep.IMAGE_GENERATION,
+ "storybook_images_completed": WorkflowStep.IMAGE_GENERATION,
+ "audio_started": WorkflowStep.AUDIO_GENERATION,
+ "audio_cache_hit": WorkflowStep.AUDIO_GENERATION,
+ "audio_succeeded": WorkflowStep.AUDIO_GENERATION,
+ "audio_failed": WorkflowStep.AUDIO_GENERATION,
+ "asset_retry_started": WorkflowStep.ASSET_RETRY,
+ "asset_retry_completed": WorkflowStep.ASSET_RETRY,
+ "asset_generation_completed": WorkflowStep.ASSET_GENERATION,
+ "postprocessing_queued": WorkflowStep.POSTPROCESSING,
+ "generation_completed": WorkflowStep.COMPLETION,
+ "generation_failed": WorkflowStep.COMPLETION,
+ "generation_canceled": WorkflowStep.CANCELLATION,
+ "cancel_requested": WorkflowStep.CANCELLATION,
+ "generation_stale_failed": WorkflowStep.STALE_RECOVERY,
+}
+
+EVENT_ARTIFACT_MAP: dict[str, ArtifactKind] = {
+ "narrative_generated": ArtifactKind.STORY_TEXT,
+ "quality_gate_failed": ArtifactKind.STORY_TEXT,
+ "cover_image_started": ArtifactKind.COVER_IMAGE,
+ "cover_image_succeeded": ArtifactKind.COVER_IMAGE,
+ "cover_image_failed": ArtifactKind.COVER_IMAGE,
+ "storybook_images_started": ArtifactKind.IMAGE,
+ "storybook_cover_image_succeeded": ArtifactKind.COVER_IMAGE,
+ "storybook_cover_image_failed": ArtifactKind.COVER_IMAGE,
+ "storybook_page_image_succeeded": ArtifactKind.PAGE_IMAGE,
+ "storybook_page_image_failed": ArtifactKind.PAGE_IMAGE,
+ "storybook_images_completed": ArtifactKind.IMAGE,
+ "audio_started": ArtifactKind.AUDIO,
+ "audio_cache_hit": ArtifactKind.AUDIO,
+ "audio_succeeded": ArtifactKind.AUDIO,
+ "audio_failed": ArtifactKind.AUDIO,
+ "postprocessing_queued": ArtifactKind.ACHIEVEMENT_MEMORY,
+}
+
+
+def step_for_event(event_type: str) -> WorkflowStep:
+ """Return the standard workflow step for a persisted event type."""
+
+ return EVENT_STEP_MAP.get(event_type, WorkflowStep.UNKNOWN)
+
+
+def artifact_for_event(event_type: str) -> ArtifactKind:
+ """Return the standard artifact for a persisted event type."""
+
+ return EVENT_ARTIFACT_MAP.get(event_type, ArtifactKind.NONE)
+
+
+def normalize_trace_metadata(
+ event_type: str,
+ metadata: dict[str, Any] | None = None,
+ *,
+ step: WorkflowStep | str | None = None,
+ artifact: ArtifactKind | str | None = None,
+ failure_category: FailureCategory | str | None = None,
+ retryable: bool | None = None,
+ blocks_main_result: bool | None = None,
+) -> dict[str, Any]:
+ """Merge legacy metadata with standard harness trace fields."""
+
+ normalized: dict[str, Any] = dict(metadata or {})
+
+ resolved_step = str(step or normalized.get("step") or step_for_event(event_type))
+ resolved_artifact = str(
+ artifact or normalized.get("artifact") or artifact_for_event(event_type)
+ )
+
+ normalized["step"] = resolved_step
+ normalized["artifact"] = resolved_artifact
+
+ if failure_category is not None:
+ normalized["failure_category"] = str(failure_category)
+ elif "failure_category" not in normalized:
+ normalized["failure_category"] = None
+
+ if retryable is not None:
+ normalized["retryable"] = retryable
+ elif "retryable" not in normalized:
+ normalized["retryable"] = False
+
+ if blocks_main_result is not None:
+ normalized["blocks_main_result"] = blocks_main_result
+ elif "blocks_main_result" not in normalized:
+ normalized["blocks_main_result"] = False
+
+ return normalized
diff --git a/backend/app/services/provider_router.py b/backend/app/services/provider_router.py
index 6a529aa..a548fae 100644
--- a/backend/app/services/provider_router.py
+++ b/backend/app/services/provider_router.py
@@ -10,7 +10,7 @@ from app.core.logging import get_logger
from app.services.adapters import AdapterConfig, AdapterRegistry
from app.services.adapters.text.models import StoryOutput
from app.services.cost_tracker import cost_tracker
-from app.services.generation_jobs import record_generation_event
+from app.services.harness.trace import TraceRecorder
from app.services.provider_cache import get_providers
from app.services.provider_metrics import health_checker, metrics_collector
from app.services.provider_policy import (
@@ -67,8 +67,7 @@ async def _record_provider_event_if_present(
if db is None or job is None:
return
- await record_generation_event(
- db,
+ await TraceRecorder(db).record_step(
job=job,
story_id=story_id,
event_type=event_type,
diff --git a/backend/app/services/story_service.py b/backend/app/services/story_service.py
index 73bdf94..6869b91 100644
--- a/backend/app/services/story_service.py
+++ b/backend/app/services/story_service.py
@@ -1,9 +1,7 @@
"""Story business logic service."""
import asyncio
-from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
-from typing import Literal
from fastapi import HTTPException
from sqlalchemy import desc, select
@@ -42,6 +40,29 @@ from app.services.generation_jobs import (
get_generation_job_for_user,
record_generation_event,
)
+from app.services.harness.artifacts import (
+ AssetCompletionResult,
+ asset_result_metadata,
+)
+from app.services.harness.asset_workflows import (
+ build_storybook_error_message,
+ complete_audio_asset,
+ complete_cover_image_asset,
+ complete_storybook_image_assets,
+ get_storybook_pages_data,
+ read_cached_audio_asset,
+ resolve_storybook_image_status,
+)
+from app.services.harness.control import (
+ ExecutionControl,
+ GenerationJobCanceledError,
+)
+from app.services.harness.quality_gates import (
+ QualityGateError,
+ validate_story_output,
+ validate_storybook_output,
+)
+from app.services.harness.trace import TraceRecorder
from app.services.memory_service import build_enhanced_memory_context
from app.services.provider_router import (
generate_image,
@@ -56,29 +77,6 @@ from app.tasks.achievements import extract_story_achievements
logger = get_logger(__name__)
-AssetCompletionKind = Literal["cover_image", "storybook_images", "audio"]
-
-
-@dataclass(frozen=True)
-class AssetCompletionResult:
- """Service-level result for a generated asset completion attempt."""
-
- asset: AssetCompletionKind
- status: StoryAssetStatus
- value: str | bytes | None = None
- error: str | None = None
- blocks_main_result: bool = False
-
- @property
- def succeeded(self) -> bool:
- """Whether the asset reached a usable ready state."""
-
- return self.status == StoryAssetStatus.READY and self.error is None
-
-
-class GenerationJobCanceledError(Exception):
- """Raised when a running worker job has been canceled by the user."""
-
async def _record_job_event_if_present(
db: AsyncSession,
@@ -92,11 +90,7 @@ async def _record_job_event_if_present(
) -> None:
"""Append a workflow event when the caller is running under a tracked job."""
- if job is None:
- return
-
- await record_generation_event(
- db,
+ await TraceRecorder(db).record_step(
job=job,
story_id=story_id,
event_type=event_type,
@@ -114,34 +108,31 @@ async def _stop_if_job_cancel_requested(
) -> None:
"""Stop a worker-owned job at the next safe checkpoint after cancellation."""
- if job is None:
- return
+ await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story)
- await db.refresh(job)
- if job.current_step != "cancel_requested":
- return
- await finish_generation_job(
+async def _record_quality_gate_failure_if_present(
+ db: AsyncSession,
+ *,
+ job,
+ error: QualityGateError,
+) -> None:
+ """Append a quality gate failure event for tracked worker jobs."""
+
+ await _record_job_event_if_present(
db,
job=job,
- story=story,
- status="canceled",
- current_step="generation_canceled",
- error_message="Generation canceled by user.",
- message="Generation job was canceled after a user request.",
+ event_type="quality_gate_failed",
+ status="failed",
+ message="Generated content failed deterministic quality gates.",
+ metadata=error.to_metadata(),
)
- raise GenerationJobCanceledError()
def _asset_result_metadata(result: AssetCompletionResult) -> dict:
"""Build JSON-safe metadata for asset workflow events."""
- return {
- "asset": result.asset,
- "status": result.status.value,
- "error": result.error,
- "blocks_main_result": result.blocks_main_result,
- }
+ return asset_result_metadata(result)
def _build_storybook_error_message(
@@ -151,13 +142,10 @@ def _build_storybook_error_message(
) -> str | None:
"""Summarize storybook image generation errors for the latest attempt."""
- parts: list[str] = []
- if cover_failed:
- parts.append("封面生成失败")
- if failed_pages:
- pages = "、".join(str(page) for page in sorted(failed_pages))
- parts.append(f"第 {pages} 页插图生成失败")
- return ";".join(parts) if parts else None
+ return build_storybook_error_message(
+ cover_failed=cover_failed,
+ failed_pages=failed_pages,
+ )
def _resolve_storybook_image_status(
@@ -169,31 +157,12 @@ def _resolve_storybook_image_status(
) -> StoryAssetStatus:
"""Resolve the persisted image status for a storybook."""
- if not generate_images:
- return StoryAssetStatus.NOT_REQUESTED
-
- expected_assets = 0
- ready_assets = 0
-
- if cover_prompt or cover_url:
- expected_assets += 1
- if cover_url:
- ready_assets += 1
-
- for page in pages_data:
- if not page.get("image_prompt") and not page.get("image_url"):
- continue
- expected_assets += 1
- if page.get("image_url"):
- ready_assets += 1
-
- if expected_assets == 0:
- return StoryAssetStatus.NOT_REQUESTED
-
- if ready_assets == expected_assets:
- return StoryAssetStatus.READY
-
- return StoryAssetStatus.FAILED
+ return resolve_storybook_image_status(
+ generate_images=generate_images,
+ cover_prompt=cover_prompt,
+ cover_url=cover_url,
+ pages_data=pages_data,
+ )
async def _prepare_generation_context(
@@ -539,93 +508,21 @@ async def _complete_cover_image_asset(
) -> AssetCompletionResult:
"""Generate or retry a text story cover through one asset workflow."""
- if not story.cover_prompt:
- raise HTTPException(status_code=400, detail="Story has no cover prompt")
-
- sync_story_status(story, image_status=StoryAssetStatus.GENERATING)
- await db.commit()
- await _stop_if_job_cancel_requested(db, job=job, story=story)
- await _record_job_event_if_present(
+ return await complete_cover_image_asset(
+ story,
db,
+ generate_image_func=generate_image,
+ raise_on_failure=raise_on_failure,
+ last_error_prefix=last_error_prefix,
+ log_event=log_event,
job=job,
- story_id=story.id,
- event_type="cover_image_started",
- status="running",
- message="Cover image generation started.",
- metadata={"asset": "image", "cover_prompt_present": True},
)
- try:
- await _stop_if_job_cancel_requested(db, job=job, story=story)
- image_url = await generate_image(
- story.cover_prompt,
- db=db,
- user_id=story.user_id,
- generation_job=job,
- story_id=story.id,
- )
- story.image_url = image_url
- sync_story_status(story, image_status=StoryAssetStatus.READY)
- await db.commit()
- result = AssetCompletionResult(
- asset="cover_image",
- status=StoryAssetStatus.READY,
- value=image_url,
- blocks_main_result=raise_on_failure,
- )
- await _record_job_event_if_present(
- db,
- job=job,
- story_id=story.id,
- event_type="cover_image_succeeded",
- status="succeeded",
- message="Cover image was generated.",
- metadata=_asset_result_metadata(result),
- )
- return result
- except Exception as exc:
- provider_error = str(exc)
- last_error = (
- f"{last_error_prefix}: {provider_error}"
- if last_error_prefix
- else provider_error
- )
- sync_story_status(
- story,
- image_status=StoryAssetStatus.FAILED,
- last_error=last_error,
- )
- await db.commit()
- logger.warning(log_event, story_id=story.id, error=provider_error)
-
- result = AssetCompletionResult(
- asset="cover_image",
- status=StoryAssetStatus.FAILED,
- error=provider_error,
- blocks_main_result=raise_on_failure,
- )
- await _record_job_event_if_present(
- db,
- job=job,
- story_id=story.id,
- event_type="cover_image_failed",
- status="failed",
- message="Cover image generation failed.",
- metadata=_asset_result_metadata(result),
- )
- if raise_on_failure:
- raise HTTPException(
- status_code=500,
- detail=f"Image generation failed: {provider_error}",
- ) from exc
-
- return result
-
def _get_storybook_pages_data(story: Story) -> list[dict]:
"""Return mutable storybook page data from the persisted JSON field."""
- return [dict(page) for page in story.pages or [] if isinstance(page, dict)]
+ return get_storybook_pages_data(story)
async def _complete_storybook_image_assets(
@@ -636,176 +533,23 @@ async def _complete_storybook_image_assets(
) -> AssetCompletionResult:
"""Complete missing cover/page images for a persisted storybook."""
- pages_data = _get_storybook_pages_data(story)
- has_image_prompt = bool(story.cover_prompt) or any(
- page.get("image_prompt") for page in pages_data
- )
- if not has_image_prompt:
- raise HTTPException(status_code=400, detail="Storybook has no image prompts")
-
- sync_story_status(story, image_status=StoryAssetStatus.GENERATING)
- await db.commit()
- await _stop_if_job_cancel_requested(db, job=job, story=story)
- await _record_job_event_if_present(
- db,
- job=job,
- story_id=story.id,
- event_type="storybook_images_started",
- status="running",
- message="Storybook missing image completion started.",
- metadata={"asset": "image"},
- )
-
- cover_failed = False
- failed_pages: list[int] = []
- completed_pages: list[int] = []
-
- if story.cover_prompt and not story.image_url:
- await _stop_if_job_cancel_requested(db, job=job, story=story)
- try:
- story.image_url = await generate_image(
- story.cover_prompt,
- db=db,
- user_id=story.user_id,
- generation_job=job,
- story_id=story.id,
- )
- await _record_job_event_if_present(
- db,
- job=job,
- story_id=story.id,
- event_type="storybook_cover_image_succeeded",
- status="succeeded",
- message="Storybook cover image was generated.",
- metadata={"asset": "image", "scope": "cover"},
- )
- except Exception as exc:
- cover_failed = True
- logger.warning(
- "storybook_cover_asset_completion_failed",
- story_id=story.id,
- error=str(exc),
- )
- await _record_job_event_if_present(
- db,
- job=job,
- story_id=story.id,
- event_type="storybook_cover_image_failed",
- status="failed",
- message="Storybook cover image generation failed.",
- metadata={"asset": "image", "scope": "cover", "error": str(exc)},
- )
-
- for page in pages_data:
- if not page.get("image_prompt") or page.get("image_url"):
- continue
-
- await _stop_if_job_cancel_requested(db, job=job, story=story)
- try:
- page["image_url"] = await generate_image(
- page["image_prompt"],
- db=db,
- user_id=story.user_id,
- generation_job=job,
- story_id=story.id,
- )
- page_number = page.get("page_number")
- if isinstance(page_number, int):
- completed_pages.append(page_number)
- await _record_job_event_if_present(
- db,
- job=job,
- story_id=story.id,
- event_type="storybook_page_image_succeeded",
- status="succeeded",
- message="Storybook page image was generated.",
- metadata={"asset": "image", "scope": "page", "page_number": page_number},
- )
- except Exception as exc:
- page_number = page.get("page_number")
- if isinstance(page_number, int):
- failed_pages.append(page_number)
- logger.warning(
- "storybook_page_asset_completion_failed",
- story_id=story.id,
- page=page_number,
- error=str(exc),
- )
- await _record_job_event_if_present(
- db,
- job=job,
- story_id=story.id,
- event_type="storybook_page_image_failed",
- status="failed",
- message="Storybook page image generation failed.",
- metadata={
- "asset": "image",
- "scope": "page",
- "page_number": page_number,
- "error": str(exc),
- },
- )
-
- story.pages = pages_data
- error_message = _build_storybook_error_message(
- cover_failed=cover_failed,
- failed_pages=failed_pages,
- )
- image_status = _resolve_storybook_image_status(
- generate_images=True,
- cover_prompt=story.cover_prompt,
- cover_url=story.image_url,
- pages_data=pages_data,
- )
- sync_story_status(
+ return await complete_storybook_image_assets(
story,
- image_status=image_status,
- last_error=error_message,
- )
- await db.commit()
- result = AssetCompletionResult(
- asset="storybook_images",
- status=image_status,
- value=story.image_url,
- error=error_message,
- )
- await _record_job_event_if_present(
db,
+ generate_image_func=generate_image,
job=job,
- story_id=story.id,
- event_type="storybook_images_completed",
- status="failed" if error_message else "succeeded",
- message="Storybook image completion finished.",
- metadata={
- **_asset_result_metadata(result),
- "completed_pages": sorted(completed_pages),
- "failed_pages": sorted(failed_pages),
- },
)
- return result
async def _read_cached_audio_asset(story: Story, db: AsyncSession) -> bytes | None:
"""Read cached audio or repair stale audio cache metadata."""
- if story.audio_path and audio_cache_exists(story.audio_path):
- if story.audio_status != StoryAssetStatus.READY.value:
- sync_story_status(story, audio_status=StoryAssetStatus.READY)
- await db.commit()
- return read_audio_cache(story.audio_path)
-
- if story.audio_path and not audio_cache_exists(story.audio_path):
- logger.warning(
- "story_audio_cache_missing",
- story_id=story.id,
- audio_path=story.audio_path,
- )
- story.audio_path = None
- if story.audio_status == StoryAssetStatus.READY.value:
- sync_story_status(story, audio_status=StoryAssetStatus.NOT_REQUESTED)
- await db.commit()
-
- return None
+ return await read_cached_audio_asset(
+ story,
+ db,
+ audio_cache_exists_func=audio_cache_exists,
+ read_audio_cache_func=read_audio_cache,
+ )
async def _complete_audio_asset(
@@ -817,107 +561,18 @@ async def _complete_audio_asset(
) -> AssetCompletionResult:
"""Complete TTS audio generation through one asset workflow."""
- if not story.story_text:
- raise HTTPException(status_code=400, detail="Story has no text")
-
- cached_audio = await _read_cached_audio_asset(story, db)
- if cached_audio is not None:
- result = AssetCompletionResult(
- asset="audio",
- status=StoryAssetStatus.READY,
- value=cached_audio,
- blocks_main_result=raise_on_failure,
- )
- await _record_job_event_if_present(
- db,
- job=job,
- story_id=story.id,
- event_type="audio_cache_hit",
- status="succeeded",
- message="Cached story audio was reused.",
- metadata=_asset_result_metadata(result),
- )
- return result
-
from app.services.provider_router import text_to_speech
- sync_story_status(story, audio_status=StoryAssetStatus.GENERATING)
- await db.commit()
- await _stop_if_job_cancel_requested(db, job=job, story=story)
- await _record_job_event_if_present(
+ return await complete_audio_asset(
+ story,
db,
+ text_to_speech_func=text_to_speech,
+ audio_cache_exists_func=audio_cache_exists,
+ read_audio_cache_func=read_audio_cache,
+ write_story_audio_cache_func=write_story_audio_cache,
+ raise_on_failure=raise_on_failure,
job=job,
- story_id=story.id,
- event_type="audio_started",
- status="running",
- message="Story audio generation started.",
- metadata={"asset": "audio"},
)
-
- try:
- await _stop_if_job_cancel_requested(db, job=job, story=story)
- audio_data = await text_to_speech(
- story.story_text,
- db=db,
- user_id=story.user_id,
- generation_job=job,
- story_id=story.id,
- )
- story.audio_path = write_story_audio_cache(story.id, audio_data)
- sync_story_status(
- story,
- audio_status=StoryAssetStatus.READY,
- )
- await db.commit()
- result = AssetCompletionResult(
- asset="audio",
- status=StoryAssetStatus.READY,
- value=audio_data,
- blocks_main_result=raise_on_failure,
- )
- await _record_job_event_if_present(
- db,
- job=job,
- story_id=story.id,
- event_type="audio_succeeded",
- status="succeeded",
- message="Story audio was generated and cached.",
- metadata=_asset_result_metadata(result),
- )
- return result
- except Exception as exc:
- provider_error = str(exc)
- story.audio_path = None
- sync_story_status(
- story,
- audio_status=StoryAssetStatus.FAILED,
- last_error=provider_error,
- )
- await db.commit()
- logger.error("audio_generation_failed", story_id=story.id, error=provider_error)
-
- result = AssetCompletionResult(
- asset="audio",
- status=StoryAssetStatus.FAILED,
- error=provider_error,
- blocks_main_result=raise_on_failure,
- )
- await _record_job_event_if_present(
- db,
- job=job,
- story_id=story.id,
- event_type="audio_failed",
- status="failed",
- message="Story audio generation failed.",
- metadata=_asset_result_metadata(result),
- )
- if raise_on_failure:
- raise HTTPException(
- status_code=500,
- detail=f"Audio generation failed: {provider_error}",
- ) from exc
-
- return result
async def validate_profile_and_universe(
@@ -988,6 +643,13 @@ async def generate_and_save_story(
user_id=user_id,
generation_job=job,
)
+ validate_story_output(result)
+ except QualityGateError as exc:
+ await _record_quality_gate_failure_if_present(db, job=job, error=exc)
+ raise HTTPException(
+ status_code=502,
+ detail="Story generation failed quality checks, please try again.",
+ ) from exc
except Exception as exc:
raise HTTPException(
status_code=502,
@@ -1096,6 +758,10 @@ async def generate_storybook_service(
user_id=user_id,
generation_job=job,
)
+ validate_storybook_output(storybook)
+ except QualityGateError as exc:
+ await _record_quality_gate_failure_if_present(db, job=job, error=exc)
+ raise HTTPException(status_code=500, detail=f"故事书质量检查失败: {exc}") from exc
except Exception as e:
logger.error("storybook_generation_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"故事书生成失败: {e}")
diff --git a/backend/tests/test_generation_jobs.py b/backend/tests/test_generation_jobs.py
index 8d2631a..8b4c311 100644
--- a/backend/tests/test_generation_jobs.py
+++ b/backend/tests/test_generation_jobs.py
@@ -165,6 +165,70 @@ async def test_unified_generation_is_queued_then_worker_persists_story_and_event
app.dependency_overrides.clear()
+async def test_generation_worker_records_quality_gate_failure_without_persisting_story(
+ db_session,
+ test_user,
+):
+ invalid_output = StoryOutput(
+ mode="generated",
+ title="空白故事",
+ story_text="",
+ cover_prompt_suggestion="A blank cover",
+ )
+ job = await create_generation_job(
+ db_session,
+ user_id=test_user.id,
+ output_mode="story",
+ input_type="keywords",
+ request_payload={
+ "output_mode": "story",
+ "type": "keywords",
+ "data": "小兔子",
+ "generate_images": False,
+ },
+ )
+
+ with patch(
+ "app.services.story_service.generate_story_content",
+ new_callable=AsyncMock,
+ ) as mock_generate_story_content:
+ mock_generate_story_content.return_value = invalid_output
+
+ with pytest.raises(Exception):
+ await run_generation_job_service(job.id, db_session)
+
+ refreshed_job = (
+ await db_session.execute(select(GenerationJob).where(GenerationJob.id == job.id))
+ ).scalar_one()
+ assert refreshed_job.status == "failed"
+ assert refreshed_job.story_id is None
+ assert refreshed_job.current_step == "generation_failed"
+ assert "quality checks" in refreshed_job.error_message
+
+ stories = (
+ await db_session.execute(select(Story).where(Story.user_id == test_user.id))
+ ).scalars().all()
+ assert stories == []
+
+ events = (
+ await db_session.execute(
+ select(GenerationJobEvent)
+ .where(GenerationJobEvent.job_id == job.id)
+ .order_by(GenerationJobEvent.id)
+ )
+ ).scalars().all()
+ assert [event.event_type for event in events] == [
+ "request_accepted",
+ "worker_started",
+ "context_prepared",
+ "quality_gate_failed",
+ "generation_failed",
+ ]
+ quality_event = events[3]
+ assert quality_event.event_metadata["step"] == "narrative_generation"
+ assert quality_event.event_metadata["issues"][0]["code"] == "missing_story_text"
+
+
async def test_asset_retry_records_job_events_and_updates_retryable_assets(
db_session,
test_user,
diff --git a/backend/tests/test_harness_runtime.py b/backend/tests/test_harness_runtime.py
new file mode 100644
index 0000000..a58ddaa
--- /dev/null
+++ b/backend/tests/test_harness_runtime.py
@@ -0,0 +1,374 @@
+"""Tests for generation harness runtime support."""
+
+import pytest
+from sqlalchemy import select
+
+from app.db.models import GenerationJob, GenerationJobEvent
+from app.services.adapters.storybook.primary import Storybook, StorybookPage
+from app.services.adapters.text.models import StoryOutput
+from app.services.generation_jobs import create_generation_job, record_generation_event
+from app.services.harness.control import ExecutionControl, GenerationJobCanceledError
+from app.services.harness.plans import (
+ build_asset_plan,
+ build_story_plan,
+ build_storybook_plan,
+)
+from app.services.harness.quality_gates import (
+ QualityGateError,
+ validate_story_output,
+ validate_storybook_output,
+)
+from app.services.harness.trace import TraceRecorder
+from app.services.harness.types import (
+ ArtifactKind,
+ FailureCategory,
+ WorkflowStep,
+ artifact_for_event,
+ normalize_trace_metadata,
+ step_for_event,
+)
+
+
+def test_event_type_maps_to_standard_workflow_step():
+ assert step_for_event("request_accepted") == WorkflowStep.REQUEST_ACCEPTANCE
+ assert step_for_event("context_prepared") == WorkflowStep.CONTEXT_PREPARATION
+ assert step_for_event("narrative_generated") == WorkflowStep.NARRATIVE_GENERATION
+ assert step_for_event("story_saved") == WorkflowStep.STORY_PERSISTENCE
+ assert step_for_event("provider_call_succeeded") == WorkflowStep.PROVIDER_INVOCATION
+ assert step_for_event("quality_gate_failed") == WorkflowStep.NARRATIVE_GENERATION
+ assert step_for_event("cover_image_failed") == WorkflowStep.IMAGE_GENERATION
+ assert step_for_event("audio_succeeded") == WorkflowStep.AUDIO_GENERATION
+ assert step_for_event("generation_canceled") == WorkflowStep.CANCELLATION
+ assert step_for_event("generation_stale_failed") == WorkflowStep.STALE_RECOVERY
+ assert step_for_event("future_event") == WorkflowStep.UNKNOWN
+
+
+def test_event_type_maps_to_standard_artifact():
+ assert artifact_for_event("narrative_generated") == ArtifactKind.STORY_TEXT
+ assert artifact_for_event("quality_gate_failed") == ArtifactKind.STORY_TEXT
+ assert artifact_for_event("cover_image_succeeded") == ArtifactKind.COVER_IMAGE
+ assert artifact_for_event("storybook_page_image_failed") == ArtifactKind.PAGE_IMAGE
+ assert artifact_for_event("audio_cache_hit") == ArtifactKind.AUDIO
+ assert artifact_for_event("postprocessing_queued") == ArtifactKind.ACHIEVEMENT_MEMORY
+ assert artifact_for_event("request_accepted") == ArtifactKind.NONE
+
+
+def test_trace_metadata_adds_standard_fields_without_dropping_legacy_values():
+ metadata = normalize_trace_metadata(
+ "provider_call_failed",
+ {
+ "capability": "text",
+ "adapter": "demo",
+ "error": "timeout",
+ },
+ failure_category=FailureCategory.TIMEOUT,
+ retryable=True,
+ )
+
+ assert metadata["capability"] == "text"
+ assert metadata["adapter"] == "demo"
+ assert metadata["error"] == "timeout"
+ assert metadata["step"] == "provider_invocation"
+ assert metadata["artifact"] == "none"
+ assert metadata["failure_category"] == "timeout"
+ assert metadata["retryable"] is True
+ assert metadata["blocks_main_result"] is False
+
+
+def test_trace_metadata_respects_explicit_step_and_artifact():
+ metadata = normalize_trace_metadata(
+ "narrative_generated",
+ {"title": "小兔子的冒险"},
+ step=WorkflowStep.NARRATIVE_GENERATION,
+ artifact=ArtifactKind.STORYBOOK_PAGES,
+ blocks_main_result=True,
+ )
+
+ assert metadata["title"] == "小兔子的冒险"
+ assert metadata["step"] == "narrative_generation"
+ assert metadata["artifact"] == "storybook_pages"
+ assert metadata["blocks_main_result"] is True
+
+
+def test_story_plan_without_assets_snapshot():
+ assert build_story_plan(generate_images=False).to_snapshot() == {
+ "mode": "story",
+ "tasks": [
+ {
+ "key": "prepare_context",
+ "step": "context_preparation",
+ "artifact": "none",
+ "required": True,
+ "recoverable": False,
+ },
+ {
+ "key": "generate_narrative",
+ "step": "narrative_generation",
+ "artifact": "story_text",
+ "required": True,
+ "recoverable": False,
+ },
+ {
+ "key": "persist_story",
+ "step": "story_persistence",
+ "artifact": "story_text",
+ "required": True,
+ "recoverable": False,
+ },
+ {
+ "key": "queue_postprocessing",
+ "step": "postprocessing",
+ "artifact": "achievement_memory",
+ "required": False,
+ "recoverable": True,
+ },
+ {
+ "key": "complete_generation",
+ "step": "completion",
+ "artifact": "none",
+ "required": True,
+ "recoverable": False,
+ },
+ ],
+ }
+
+
+def test_story_plan_with_assets_marks_cover_recoverable():
+ plan = build_story_plan(generate_images=True).to_snapshot()
+
+ assert plan["mode"] == "story_with_assets"
+ assert plan["tasks"][3] == {
+ "key": "generate_cover_image",
+ "step": "image_generation",
+ "artifact": "cover_image",
+ "required": False,
+ "recoverable": True,
+ }
+
+
+def test_storybook_plan_with_images_marks_storybook_images_recoverable():
+ plan = build_storybook_plan(generate_images=True).to_snapshot()
+
+ assert plan["mode"] == "storybook"
+ assert [task["key"] for task in plan["tasks"]] == [
+ "prepare_context",
+ "generate_storybook_pages",
+ "generate_storybook_images",
+ "persist_storybook",
+ "queue_postprocessing",
+ "complete_generation",
+ ]
+ assert plan["tasks"][2]["artifact"] == "image"
+ assert plan["tasks"][2]["recoverable"] is True
+
+
+def test_asset_retry_plan_deduplicates_assets():
+ plan = build_asset_plan(output_mode="asset_retry", assets=["image", "audio", "image"])
+
+ assert plan.to_snapshot() == {
+ "mode": "asset_retry",
+ "tasks": [
+ {
+ "key": "start_asset_retry",
+ "step": "asset_retry",
+ "artifact": "none",
+ "required": True,
+ "recoverable": False,
+ },
+ {
+ "key": "complete_image_asset",
+ "step": "image_generation",
+ "artifact": "image",
+ "required": False,
+ "recoverable": True,
+ },
+ {
+ "key": "complete_audio_asset",
+ "step": "audio_generation",
+ "artifact": "audio",
+ "required": False,
+ "recoverable": True,
+ },
+ {
+ "key": "complete_asset_retry",
+ "step": "asset_retry",
+ "artifact": "none",
+ "required": True,
+ "recoverable": False,
+ },
+ ],
+ }
+
+
+def test_story_quality_gate_accepts_complete_child_safe_story():
+ validate_story_output(
+ StoryOutput(
+ mode="generated",
+ title="小兔子的月光花园",
+ story_text="小兔子在花园里学会了和朋友轮流分享水壶。",
+ cover_prompt_suggestion="A gentle moonlit garden with a rabbit",
+ )
+ )
+
+
+def test_story_quality_gate_rejects_missing_story_text():
+ output = StoryOutput(
+ mode="generated",
+ title="空白故事",
+ story_text="",
+ cover_prompt_suggestion="A cover",
+ )
+
+ try:
+ validate_story_output(output)
+ except QualityGateError as exc:
+ assert [issue.code.value for issue in exc.issues] == ["missing_story_text"]
+ assert exc.to_metadata()["issues"][0]["field"] == "story_text"
+ else:
+ raise AssertionError("Expected QualityGateError")
+
+
+def test_story_quality_gate_rejects_obviously_unsafe_child_content():
+ output = StoryOutput(
+ mode="generated",
+ title="危险词测试",
+ story_text="这个故事包含血腥场景。",
+ cover_prompt_suggestion="A cover",
+ )
+
+ try:
+ validate_story_output(output)
+ except QualityGateError as exc:
+ assert [issue.code.value for issue in exc.issues] == ["unsafe_child_content"]
+ assert exc.to_metadata()["issues"][0]["failure_category"] == "safety_error"
+ else:
+ raise AssertionError("Expected QualityGateError")
+
+
+def test_storybook_quality_gate_rejects_duplicate_page_number():
+ storybook = Storybook(
+ title="森林绘本",
+ main_character="小兔子",
+ art_style="水彩",
+ cover_prompt="A forest cover",
+ pages=[
+ StorybookPage(page_number=1, text="第一页。", image_prompt="page 1"),
+ StorybookPage(page_number=1, text="第二页。", image_prompt="page 2"),
+ ],
+ )
+
+ try:
+ validate_storybook_output(storybook)
+ except QualityGateError as exc:
+ assert [issue.code.value for issue in exc.issues] == [
+ "invalid_storybook_page_number"
+ ]
+ assert exc.to_metadata()["issues"][0]["field"] == "pages[1].page_number"
+ else:
+ raise AssertionError("Expected QualityGateError")
+
+
+@pytest.mark.asyncio
+async def test_trace_recorder_persists_standard_metadata(db_session, test_user):
+ job = await create_generation_job(
+ db_session,
+ user_id=test_user.id,
+ output_mode="story",
+ input_type="keywords",
+ request_payload={"data": "小兔子"},
+ )
+
+ event = await TraceRecorder(db_session).record_step(
+ job=job,
+ event_type="provider_call_failed",
+ status="failed",
+ metadata={
+ "capability": "text",
+ "adapter": "demo",
+ "error": "timeout",
+ },
+ failure_category=FailureCategory.TIMEOUT,
+ retryable=True,
+ )
+
+ assert event is not None
+ events = (
+ await db_session.execute(
+ select(GenerationJobEvent)
+ .where(GenerationJobEvent.job_id == job.id)
+ .order_by(GenerationJobEvent.id)
+ )
+ ).scalars().all()
+
+ assert [item.event_type for item in events] == [
+ "request_accepted",
+ "provider_call_failed",
+ ]
+ metadata = events[1].event_metadata
+ assert metadata["capability"] == "text"
+ assert metadata["adapter"] == "demo"
+ assert metadata["step"] == "provider_invocation"
+ assert metadata["artifact"] == "none"
+ assert metadata["failure_category"] == "timeout"
+ assert metadata["retryable"] is True
+
+
+@pytest.mark.asyncio
+async def test_trace_recorder_ignores_missing_job(db_session):
+ event = await TraceRecorder(db_session).record_step(
+ job=None,
+ event_type="context_prepared",
+ status="succeeded",
+ )
+
+ assert event is None
+
+
+@pytest.mark.asyncio
+async def test_execution_control_cancels_job_at_safe_checkpoint(
+ db_session,
+ test_user,
+ test_story,
+):
+ job = await create_generation_job(
+ db_session,
+ user_id=test_user.id,
+ output_mode="story",
+ input_type="keywords",
+ request_payload={"data": "小兔子"},
+ story_id=test_story.id,
+ )
+ await record_generation_event(
+ db_session,
+ job=job,
+ story_id=test_story.id,
+ event_type="cancel_requested",
+ status="running",
+ message="Cancellation requested.",
+ )
+
+ with pytest.raises(GenerationJobCanceledError):
+ await ExecutionControl(db_session).stop_if_cancel_requested(
+ job=job,
+ story=test_story,
+ )
+
+ refreshed_job = (
+ await db_session.execute(select(GenerationJob).where(GenerationJob.id == job.id))
+ ).scalar_one()
+ assert refreshed_job.status == "canceled"
+ assert refreshed_job.current_step == "generation_canceled"
+ assert refreshed_job.error_message == "Generation canceled by user."
+
+ events = (
+ await db_session.execute(
+ select(GenerationJobEvent)
+ .where(GenerationJobEvent.job_id == job.id)
+ .order_by(GenerationJobEvent.id)
+ )
+ ).scalars().all()
+ assert [item.event_type for item in events] == [
+ "request_accepted",
+ "cancel_requested",
+ "generation_canceled",
+ ]
diff --git a/docs/planning/harness-stage-0-report.md b/docs/planning/harness-stage-0-report.md
new file mode 100644
index 0000000..c4801cb
--- /dev/null
+++ b/docs/planning/harness-stage-0-report.md
@@ -0,0 +1,111 @@
+# Harness Engineering 改造阶段 0 报告
+
+**阶段**: 0 - 设计与基线
+**日期**: 2026-06-21
+**状态**: 已完成
+**范围**: 技术设计、阶段拆解、最小任务、验收标准
+
+---
+
+## 1. 本阶段目标
+
+本阶段不修改业务代码,目标是先建立 harness engineering 改造的执行基线:
+
+- 明确这次改造不是引入外部工作流引擎,也不是重写项目。
+- 确认现有统一生成工作流的能力边界。
+- 设计 Generation Harness Runtime 的目标架构。
+- 把后续工作拆成可执行、可验证、可报告的阶段。
+
+## 2. 已完成工作
+
+- 阅读并对齐现有统一生成 PRD:`docs/product/unified-generation-workflow-prd.md`
+- 阅读现有架构说明:`docs/technical/architecture.md`
+- 阅读现有 job/event 说明:`docs/technical/generation-job-state.md`
+- 阅读 Provider 路由说明:`docs/technical/provider-routing.md`
+- 检查当前生成链路实现:
+ - `backend/app/services/story_service.py`
+ - `backend/app/services/generation_jobs.py`
+ - `backend/app/services/provider_router.py`
+ - `backend/app/tasks/generation_workflow.py`
+- 检查当前关键测试:
+ - `backend/tests/test_generation_jobs.py`
+ - `backend/tests/test_stories.py`
+- 新增技术设计文档:
+ - `docs/technical/harness-engineering-modernization.md`
+
+## 3. 核心结论
+
+DreamWeaver 已经具备 harness engineering 的雏形,不需要从零开始。
+
+当前最有价值的改造路径是:
+
+1. 先抽出 harness 基础类型、trace recorder 和 execution control。
+2. 再拆资产工作流。
+3. 然后引入显式 workflow plan。
+4. 最后补 quality gates、trace analytics 和前端增量展示。
+
+第一阶段应避免大改数据库、API 和前端,先保证内部边界更清楚,并让现有测试继续通过。
+
+## 4. 发现的现状问题
+
+| 问题 | 影响 | 后续阶段 |
+| --- | --- | --- |
+| `story_service` 同时负责业务流程、事件记录、取消检查、资产补全和响应构造 | 文件职责偏重,后续扩展容易继续堆叠 | 阶段 1、2、3 |
+| event type 已经丰富,但缺少标准 step/artifact/failure category | 可观测性有数据,但分析语义还不稳定 | 阶段 1、5 |
+| Provider trace 已落库,但还没有纳入统一 runtime 语义 | 调用层和产品步骤之间缺少统一映射 | 阶段 1、5 |
+| 输出质量主要依赖 adapter 和 schema | 儿童内容质量、结构完整性和安全门还不够显式 | 阶段 4 |
+| 资产工作流 helper 已抽出一部分,但仍在 `story_service` 内 | 重试、后台补全、同步兼容路径仍有重复风险 | 阶段 2 |
+
+## 5. 阶段 1 入口标准
+
+可以进入阶段 1,入口条件已满足:
+
+- 技术设计已存在。
+- 最小任务已经拆解。
+- 阶段 1 不需要产品澄清。
+- 阶段 1 不需要数据库迁移。
+- 阶段 1 有明确验证命令。
+
+阶段 1 第一批任务:
+
+| ID | 任务 |
+| --- | --- |
+| H1-1 | 新增 `app/services/harness/__init__.py` |
+| H1-2 | 新增 `types.py` 枚举和 event type 映射 |
+| H1-3 | 新增 `trace.py` 封装 job event 写入 |
+| H1-4 | 新增 `control.py` 封装取消检查 |
+| H1-5 | 替换 `story_service` 内部 helper 实现 |
+| H1-6 | 补 `tests/test_harness_runtime.py` |
+
+## 6. 验证
+
+本阶段为文档阶段,验证方式是文档审查和路径确认。
+
+已确认:
+
+- 设计文档放在 `docs/technical/`
+- 阶段报告放在 `docs/planning/`
+- 后续阶段有明确测试命令
+- 改造策略与现有统一生成 PRD 不冲突
+
+## 7. 风险
+
+| 风险 | 等级 | 处理 |
+| --- | --- | --- |
+| 过早拆 workflow 导致行为回归 | 高 | 阶段 1 不拆主流程,只抽基础支撑件 |
+| metadata 标准化影响前端 | 中 | 只新增字段,不删除旧字段 |
+| 文档太大但实现不跟进 | 中 | 每个阶段都产出报告并更新状态 |
+
+## 8. 下一步
+
+进入阶段 1:Harness 基础类型与事件封装。
+
+优先顺序:
+
+1. 新增 harness 包和纯类型定义。
+2. 增加单测锁定 event type 到 step 的映射。
+3. 新增 trace recorder,保持旧事件行为。
+4. 新增 execution control,保持取消行为。
+5. 替换 `story_service` 内部 helper 为代理调用。
+6. 运行阶段 1 验证命令并产出阶段 1 报告。
+
diff --git a/docs/planning/harness-stage-1-report.md b/docs/planning/harness-stage-1-report.md
new file mode 100644
index 0000000..86bbcab
--- /dev/null
+++ b/docs/planning/harness-stage-1-report.md
@@ -0,0 +1,122 @@
+# Harness Engineering 改造阶段 1 报告
+
+**阶段**: 1 - Harness 基础类型与事件封装
+**日期**: 2026-06-21
+**状态**: 已完成
+**范围**: 后端 harness 包、标准类型、trace recorder、execution control、定向测试
+
+---
+
+## 1. 本阶段目标
+
+本阶段目标是先建立 Generation Harness Runtime 的最低可用支撑件,不重排主生成流程,不修改数据库结构,不破坏现有 API。
+
+目标包括:
+
+- 新增标准 workflow step、artifact、failure category 类型。
+- 将现有 event type 映射到标准 step/artifact。
+- 封装 job event 写入,统一补齐标准 trace metadata。
+- 封装取消检查,保留当前安全检查点语义。
+- 增加单元测试,确保新支撑件可独立验证。
+
+## 2. 已完成工作
+
+### 新增文件
+
+- `backend/app/services/harness/__init__.py`
+- `backend/app/services/harness/types.py`
+- `backend/app/services/harness/trace.py`
+- `backend/app/services/harness/control.py`
+- `backend/tests/test_harness_runtime.py`
+
+### 修改文件
+
+- `backend/app/services/story_service.py`
+ - 保留 `_record_job_event_if_present` 和 `_stop_if_job_cancel_requested` 原函数名。
+ - 内部改为代理到 `TraceRecorder` 和 `ExecutionControl`。
+ - 将 `GenerationJobCanceledError` 移入 harness control 模块。
+
+- `backend/app/services/provider_router.py`
+ - Provider 调用事件改为通过 `TraceRecorder` 写入。
+ - 保留原有 metadata 字段,例如 capability、adapter、strategy、latency、estimated cost、error。
+
+- `docs/technical/harness-engineering-modernization.md`
+ - 更新阶段 1 状态。
+
+## 3. 行为兼容性
+
+本阶段采用“只新增标准字段,不删除旧字段”的策略。
+
+新增写入到 `generation_job_events.event_metadata` 的标准字段包括:
+
+- `step`
+- `artifact`
+- `failure_category`
+- `retryable`
+- `blocks_main_result`
+
+现有事件顺序、event type、status、message 和既有 metadata 字段保持兼容。
+
+## 4. 验证结果
+
+已执行:
+
+```bash
+cd backend
+.venv/bin/python -m pytest tests/test_harness_runtime.py tests/test_generation_jobs.py
+.venv/bin/python -m ruff check app tests
+```
+
+结果:
+
+- `24 passed`
+- `ruff`: `All checks passed!`
+
+覆盖到的关键行为:
+
+- event type 到标准 workflow step 的映射。
+- event type 到 artifact 的映射。
+- trace metadata 不丢失旧字段。
+- TraceRecorder 能写入标准 metadata。
+- job 为 `None` 时 TraceRecorder 安全跳过。
+- ExecutionControl 能在 `cancel_requested` checkpoint 将 job 收敛为 `canceled`。
+- 现有 generation job、取消、重试、Provider 统计测试继续通过。
+
+## 5. 自审结论
+
+本阶段改动符合阶段 1 设计:
+
+- 没有引入外部框架。
+- 没有修改数据库迁移。
+- 没有修改 API schema。
+- 没有重排现有生成 workflow。
+- 没有删除旧 metadata 字段。
+- `story_service` 仍保留旧 helper 入口,降低后续阶段风险。
+
+## 6. 已知限制
+
+- 当前只有通过 `TraceRecorder` 写入的事件会自动带标准 metadata。直接调用 `record_generation_event` 的旧代码路径暂未全量迁移。
+- `failure_category` 目前只在显式传入时有具体值,Provider 错误自动分类留到后续阶段。
+- `AssetCompletionResult` 仍在 `story_service.py`,资产工作流拆分留到阶段 2。
+- `WorkflowPlan` 和执行器尚未实现,阶段 1 只完成运行时支撑件。
+
+## 7. 风险与处理
+
+| 风险 | 等级 | 当前处理 |
+| --- | --- | --- |
+| 新 metadata 影响前端 | 低 | 只新增字段,不删除字段 |
+| 取消语义回归 | 低 | `tests/test_generation_jobs.py` 已通过 |
+| Provider 聚合误算 | 低 | Provider 统计测试已通过 |
+| 直接调用 `record_generation_event` 的路径未标准化 | 中 | 后续阶段逐步迁移或在底层统一补齐 |
+
+## 8. 下一阶段建议
+
+进入阶段 2:资产工作流边界抽取。
+
+建议先做最小切片:
+
+1. 将 `AssetCompletionResult` 移到 harness 或 artifact workflow 模块,并保留兼容 import。
+2. 抽普通故事封面补全工作流,保持测试不变。
+3. 抽音频补全工作流,先覆盖缓存命中、生成成功、生成失败。
+4. 最后抽绘本图片工作流,因为它涉及并发生成和逐页事件顺序,风险略高。
+
diff --git a/docs/planning/harness-stage-2-report.md b/docs/planning/harness-stage-2-report.md
new file mode 100644
index 0000000..f5b94b9
--- /dev/null
+++ b/docs/planning/harness-stage-2-report.md
@@ -0,0 +1,121 @@
+# Harness Engineering 改造阶段 2 报告
+
+**阶段**: 2 - 资产工作流边界抽取
+**日期**: 2026-06-21
+**状态**: 已完成主要目标
+**范围**: 封面、音频、持久化绘本缺失图片补全工作流抽取
+
+---
+
+## 1. 本阶段目标
+
+本阶段目标是将资产补全职责从 `story_service.py` 中抽出,迁入 harness runtime 的 artifact workflow 层,同时保留原有函数签名和外部行为。
+
+阶段 2 不修改数据库结构,不修改 API schema,不改变前端行为。
+
+## 2. 已完成工作
+
+### 新增和扩展文件
+
+- `backend/app/services/harness/artifacts.py`
+ - 新增 `AssetCompletionResult`
+ - 新增 `asset_result_metadata`
+
+- `backend/app/services/harness/asset_workflows.py`
+ - 新增 `complete_cover_image_asset`
+ - 新增 `read_cached_audio_asset`
+ - 新增 `complete_audio_asset`
+ - 新增 `get_storybook_pages_data`
+ - 新增 `build_storybook_error_message`
+ - 新增 `resolve_storybook_image_status`
+ - 新增 `complete_storybook_image_assets`
+
+### 修改文件
+
+- `backend/app/services/story_service.py`
+ - 移除本地 `AssetCompletionResult` 定义,改为从 harness artifacts 引入。
+ - `_complete_cover_image_asset` 改为代理到 harness asset workflow。
+ - `_read_cached_audio_asset` 改为代理到 harness asset workflow。
+ - `_complete_audio_asset` 改为代理到 harness asset workflow。
+ - `_complete_storybook_image_assets` 改为代理到 harness asset workflow。
+ - 绘本错误信息和图片状态推导 helper 改为代理到 harness asset workflow。
+
+## 3. 行为兼容性
+
+本阶段保留了 `story_service.py` 内原有私有 helper 名称,因此调用方不需要调整。
+
+保持兼容的行为包括:
+
+- 普通故事封面生成成功和失败语义。
+- 封面失败时主内容仍可读,并进入可重试状态。
+- 音频缓存命中、缓存缺失修复、TTS 成功和 TTS 失败语义。
+- 音频失败时可选择阻塞或非阻塞,取决于 `raise_on_failure`。
+- 持久化绘本缺失封面/分页插图补全语义。
+- 绘本逐页图片事件和完成事件 metadata。
+- `retryable_assets` 行为。
+
+## 4. 验证结果
+
+已执行:
+
+```bash
+cd backend
+.venv/bin/python -m pytest tests/test_harness_runtime.py tests/test_generation_jobs.py tests/test_stories.py tests/test_audio_cache.py
+.venv/bin/python -m ruff check app tests
+```
+
+结果:
+
+- `72 passed`
+- `ruff`: `All checks passed!`
+
+覆盖到的关键行为:
+
+- 统一生成 job 队列和 worker 持久化。
+- 资产重试 job 事件。
+- 普通故事封面生成与重试。
+- 绘本分页图片重试。
+- 音频缓存、生成、失败和清理。
+- Provider 调用事件和聚合。
+- job 取消、重试和卡住任务收敛。
+
+## 5. 自审结论
+
+本阶段符合设计目标:
+
+- 资产补全职责已从 `story_service` 主体中显著抽离。
+- 外部 API 和数据库模型未变。
+- 当前主要测试通过。
+- harness 层开始承载 artifact workflow,但仍通过依赖注入函数调用 Provider 和文件缓存,便于测试与后续替换。
+
+## 6. 保留到后续阶段的内容
+
+首次绘本生成前的并发图片生成函数 `_generate_storybook_image_assets` 仍保留在 `story_service.py`。
+
+保留原因:
+
+- 它发生在绘本主记录持久化之前。
+- 它与“生成绘本结构 -> 可选并发生成图片 -> 持久化故事”的执行计划强相关。
+- 更适合在阶段 3 引入 `WorkflowPlan` 时一起整理,而不是在阶段 2 单独迁移。
+
+## 7. 风险与处理
+
+| 风险 | 等级 | 当前处理 |
+| --- | --- | --- |
+| 资产工作流迁移改变事件顺序 | 低 | generation job 和 story 测试已通过 |
+| 音频缓存修复逻辑回归 | 低 | `test_audio_cache.py` 已通过 |
+| 绘本图片补全状态误判 | 低 | 绘本重试测试已通过 |
+| 首次绘本并发图片仍在 service 内 | 中 | 阶段 3 处理 |
+
+## 8. 下一阶段建议
+
+进入阶段 3:Workflow Plan 与执行器。
+
+建议切片:
+
+1. 定义 `WorkflowPlan`、`WorkflowTask` 和模式枚举,不接入主流程。
+2. 为普通故事、完整故事、绘本、资产任务生成 plan 快照测试。
+3. 将 `_generate_generation_service_with_job` 的分支逐步迁移到 plan 构建。
+4. 处理首次绘本并发图片生成,把它纳入 storybook plan 的 asset step。
+5. 保持 `/api/generations` 和现有 job event 顺序兼容。
+
diff --git a/docs/planning/harness-stage-3-report.md b/docs/planning/harness-stage-3-report.md
new file mode 100644
index 0000000..8048d6a
--- /dev/null
+++ b/docs/planning/harness-stage-3-report.md
@@ -0,0 +1,121 @@
+# Harness Engineering 改造阶段 3 报告
+
+**阶段**: 3 - Workflow Plan 与执行器
+**日期**: 2026-06-21
+**状态**: 已完成计划建模基线,执行器接管未启用
+**范围**: 纯 WorkflowPlan 建模、计划快照测试
+
+---
+
+## 1. 本阶段目标
+
+阶段 3 的完整目标是引入显式 `WorkflowPlan`,逐步减少 `_generate_generation_service_with_job` 中的分支逻辑。
+
+本次完成了最小安全切片:
+
+- 定义 plan 类型和 task 类型。
+- 为普通故事、带封面故事、绘本、资产任务生成计划。
+- 用快照测试锁定计划形状。
+- 暂不改变实际执行流,避免事件顺序和前端时间线发生非必要变化。
+
+## 2. 已完成工作
+
+### 新增文件
+
+- `backend/app/services/harness/plans.py`
+
+### 新增能力
+
+- `WorkflowMode`
+ - `story`
+ - `story_with_assets`
+ - `storybook`
+ - `asset_generation`
+ - `asset_retry`
+
+- `WorkflowTask`
+ - `key`
+ - `step`
+ - `artifact`
+ - `required`
+ - `recoverable`
+
+- `WorkflowPlan`
+ - `mode`
+ - `tasks`
+ - `to_snapshot()`
+
+- plan builder
+ - `build_story_plan(generate_images=...)`
+ - `build_storybook_plan(generate_images=...)`
+ - `build_asset_plan(output_mode=..., assets=...)`
+
+### 修改文件
+
+- `backend/tests/test_harness_runtime.py`
+ - 增加普通故事计划快照。
+ - 增加带封面故事计划快照。
+ - 增加绘本带图片计划快照。
+ - 增加资产重试计划去重测试。
+
+## 3. 为什么没有接入执行器
+
+本阶段有意没有新增运行时事件,例如 `workflow_planned`,也没有让 plan 接管 `_generate_generation_service_with_job`。
+
+原因:
+
+- 新 event type 会改变前端生成轨迹时间线,需要同步前端 label 和 progress 映射。
+- 当前生成 job 测试已经严格断言事件顺序。
+- 直接接管执行器会同时触碰 story、storybook、asset_generation、asset_retry 四条路径,风险偏高。
+- 先稳定 plan snapshot,可以让后续迁移按任务级别逐步推进。
+
+## 4. 验证结果
+
+已执行:
+
+```bash
+cd backend
+.venv/bin/python -m pytest tests/test_harness_runtime.py tests/test_generation_jobs.py
+.venv/bin/python -m ruff check app tests
+```
+
+结果:
+
+- `28 passed`
+- `ruff`: `All checks passed!`
+
+阶段 3 的计划建模未改变业务执行流,因此完整后端行为仍由阶段 2 的完整测试结果兜底。
+
+## 5. 自审结论
+
+本阶段符合“小步可验证”原则:
+
+- 新增模块不依赖数据库、FastAPI 或 Provider。
+- plan 只描述 workflow 形状,不执行副作用。
+- 所有任务均可 JSON snapshot,后续可写入 trace metadata 或用于执行器。
+- 没有影响现有 API、job event 顺序或前端。
+
+## 6. 保留到后续的内容
+
+| 内容 | 建议处理 |
+| --- | --- |
+| 执行器接管 `_generate_generation_service_with_job` | 分 story、storybook、asset 三次迁移 |
+| 首次绘本生成前并发图片生成 | 跟 storybook plan 的 image task 一起迁移 |
+| `workflow_planned` 事件 | 等前端 label 和 progress 映射准备好后再加入 |
+| plan 与 trace metadata 关联 | 先在 execution context 内部使用,再决定是否落库 |
+
+## 7. 下一阶段建议
+
+下一步有两条可选路线:
+
+1. **继续阶段 3B:执行器小步接管**
+ - 先让普通故事不带图片路径使用 plan。
+ - 再迁移普通故事带图片路径。
+ - 最后迁移绘本和资产任务。
+
+2. **进入阶段 4:Quality Gates**
+ - 在不改变执行器的前提下,为 Provider 输出增加确定性校验。
+ - 这条路线风险更低,对儿童内容质量收益更直接。
+
+建议优先做阶段 4 的低风险质量门,然后再回来做阶段 3B 的执行器迁移。
+
diff --git a/docs/planning/harness-stage-4-report.md b/docs/planning/harness-stage-4-report.md
new file mode 100644
index 0000000..128b01f
--- /dev/null
+++ b/docs/planning/harness-stage-4-report.md
@@ -0,0 +1,140 @@
+# Harness Engineering 改造阶段 4 报告
+
+**阶段**: 4 - Quality Gates 与输出验证
+**日期**: 2026-06-21
+**状态**: 已完成确定性质量门
+**范围**: 文本故事和绘本结构输出校验、质量门失败事件、测试验证
+
+---
+
+## 1. 本阶段目标
+
+阶段 4 的目标是在 Provider 输出进入持久化之前增加低成本、确定性的质量门。
+
+本阶段不调用额外 AI 模型,不增加外部服务成本,只做结构完整性和明显儿童安全风险检查。
+
+## 2. 已完成工作
+
+### 新增文件
+
+- `backend/app/services/harness/quality_gates.py`
+
+### 新增能力
+
+- `QualityGateCode`
+ - `missing_title`
+ - `missing_story_text`
+ - `missing_cover_prompt`
+ - `missing_storybook_page`
+ - `invalid_storybook_page_number`
+ - `missing_storybook_page_text`
+ - `unsafe_child_content`
+
+- `QualityGateIssue`
+ - 稳定 code
+ - 中文 message
+ - `failure_category`
+ - field
+
+- `QualityGateError`
+ - 聚合多个 issue
+ - 可输出 JSON-safe metadata
+
+- `validate_story_output`
+ - 检查标题
+ - 检查正文
+ - 检查封面 prompt
+ - 检查明显不适合 3-8 岁儿童的风险词
+
+- `validate_storybook_output`
+ - 检查标题
+ - 检查至少一页
+ - 检查页码有效且不重复
+ - 检查每页正文
+ - 检查明显不适合 3-8 岁儿童的风险词
+
+### 修改文件
+
+- `backend/app/services/story_service.py`
+ - 文本故事 Provider 输出后、持久化前执行 `validate_story_output`。
+ - 绘本 Provider 输出后、图片生成和持久化前执行 `validate_storybook_output`。
+ - 质量门失败会写入 `quality_gate_failed` job event。
+ - 质量门失败不会落库故事主记录。
+
+- `backend/app/services/harness/types.py`
+ - `quality_gate_failed` 映射到 `narrative_generation` step。
+ - `quality_gate_failed` 映射到 `story_text` artifact。
+
+- `backend/tests/test_harness_runtime.py`
+ - 增加质量门纯函数测试。
+
+- `backend/tests/test_generation_jobs.py`
+ - 增加 worker 质量门失败测试,确认 story 不落库、job failed、事件可解释。
+
+## 3. 行为语义
+
+质量门失败属于生成失败,而不是降级完成。
+
+原因:
+
+- 文本故事正文或绘本页结构是 blocking artifact。
+- 如果主内容本身不合格,系统不能保存为可读故事。
+- 图片、音频等 recoverable artifact 失败仍按原有 `degraded_completed` 或可重试逻辑处理。
+
+## 4. 验证结果
+
+已执行:
+
+```bash
+cd backend
+.venv/bin/python -m pytest tests/test_harness_runtime.py tests/test_generation_jobs.py
+.venv/bin/python -m ruff check app tests
+.venv/bin/python -m pytest
+```
+
+结果:
+
+- 定向测试:`33 passed`
+- 完整后端测试:`138 passed`
+- `ruff`: `All checks passed!`
+
+覆盖到的关键行为:
+
+- 质量门接受完整、安全的儿童故事。
+- 质量门拒绝空正文。
+- 质量门拒绝明显不适合儿童的风险词。
+- 质量门拒绝绘本重复页码。
+- worker 中质量门失败会写入 `quality_gate_failed`。
+- 质量门失败不会持久化 story。
+- 现有所有后端测试继续通过。
+
+## 5. 自审结论
+
+本阶段符合设计目标:
+
+- 没有引入额外 AI 调用。
+- 没有引入新依赖。
+- 没有改变 API schema。
+- 没有改变图片、音频资产失败降级语义。
+- 对儿童内容质量和结构完整性有了第一层确定性保护。
+
+## 6. 已知限制
+
+| 限制 | 后续建议 |
+| --- | --- |
+| 儿童安全词表很保守,只覆盖明显风险词 | 后续可接入可配置词表或轻量安全审核 Provider |
+| 当前 `quality_gate_failed` artifact 固定映射到 `story_text` | 后续可根据 story/storybook mode 写入更精确 artifact |
+| 质量门失败文案目前偏技术 | 后续可为前端增加更友好的用户提示 |
+| 未做模型评审式质量评分 | 先保留,避免增加成本和不稳定性 |
+
+## 7. 下一阶段建议
+
+进入阶段 5:Trace Analytics 与前端增量展示。
+
+建议切片:
+
+1. 后端 Provider/Job 聚合支持 `failure_category` 统计。
+2. 前端生成轨迹显示 `step` 和 `artifact` 的中文标签。
+3. 管理端 Provider dashboard 展示 failure category 聚合。
+4. 更新 smoke 脚本检查标准 metadata。
+
diff --git a/docs/technical/harness-engineering-modernization.md b/docs/technical/harness-engineering-modernization.md
new file mode 100644
index 0000000..f0e170c
--- /dev/null
+++ b/docs/technical/harness-engineering-modernization.md
@@ -0,0 +1,493 @@
+# Harness Engineering 架构改造技术设计
+
+**项目**: DreamWeaver 梦语织机
+**版本**: 0.1
+**日期**: 2026-06-21
+**状态**: 阶段 0 已建立设计基线
+**作者**: Codex
+
+---
+
+## 1. 背景
+
+DreamWeaver 当前已经完成统一生成工作流的第一轮落地:`POST /api/generations` 会创建 `generation_jobs`,后台 worker 执行故事或绘本生成,`generation_job_events` 记录关键过程,前端通过 job 查询和故事状态展示进度、失败与重试入口。
+
+现有架构已经具备 harness engineering 的雏形:
+
+- `story_service` 负责上下文准备、主内容生成、故事落库、资产补全、状态同步和任务收敛。
+- `generation_jobs` 负责 job/event、进度摘要、取消、重试和 Provider 聚合。
+- `provider_router` 负责 Provider 选择、failover、熔断、成本和调用事件。
+- `story_status` 负责把文本、图片、音频状态推导为统一故事状态。
+
+当前主要问题不是缺少能力,而是生成运行时控制逻辑分散在多个 service 内。取消检查、事件记录、资产恢复、Provider 轨迹、失败分类和质量校验还没有形成显式的运行时边界。
+
+本设计的目标,是把现有“隐性 harness”升级为可观测、可恢复、可验证、可演进的 Generation Harness Runtime,同时保留当前 API、数据库和前端主行为。
+
+## 2. 目标
+
+### 2.1 产品目标
+
+- 家长提交故事或绘本生成后,能获得稳定、可解释、可恢复的结果。
+- 主内容生成成功后,即使图片或音频失败,也不影响阅读。
+- 用户能清楚看到任务步骤、失败原因和可重试资产。
+- 后续新增语音共创、更多内容模式或新资产类型时,复用同一套运行时模型。
+
+### 2.2 工程目标
+
+- 把 `story_service` 中的运行时控制职责抽到 harness 层。
+- 让 workflow step、artifact、trace、failure category 成为一等概念。
+- 保持 `/api/generations`、旧兼容接口、现有状态字段和主要测试行为不破坏。
+- 优先做渐进式重构,不引入复杂工作流引擎,不进行大爆炸重写。
+- 每个大阶段都产出阶段报告,包含实现、审查、验证和风险。
+
+### 2.3 非目标
+
+- 本轮不引入 Temporal、Dagster、LangGraph 等外部工作流引擎。
+- 本轮不彻底重做数据库结构。
+- 本轮不废弃旧生成接口,只继续保持兼容层指向统一入口。
+- 本轮不把 DreamWeaver 抽象成通用 agent 平台,所有抽象必须服务儿童故事生成业务。
+- 本轮不要求一次性完成所有 eval、质量门和 replay 能力。
+
+## 3. 架构原则
+
+1. **主内容优先可读**
+ 文本故事或绘本结构是 blocking artifact;封面、分页插图、音频是 recoverable artifact。
+
+2. **API 稳定优先**
+ 先重构内部边界,再考虑扩展响应字段。现有前端、smoke、测试不应被第一阶段打断。
+
+3. **事件结构稳定**
+ 继续复用 `generation_job_events`,但逐步标准化 metadata,避免每个调用点随手定义不同结构。
+
+4. **Provider 不等于产品能力**
+ Provider 只是 tool invocation 的实现。产品能力应由 capability、workflow step、artifact 和 recovery policy 共同定义。
+
+5. **小步可验证**
+ 每个最小任务都必须能通过单测、局部测试或文档审查验证。
+
+## 4. 目标架构
+
+```mermaid
+flowchart TB
+ API["FastAPI API 层"] --> COMMAND["Generation Command"]
+ COMMAND --> HARNESS["Generation Harness Runtime"]
+
+ HARNESS --> CTX["Context Builder
档案 / 宇宙 / 记忆 / 输入规范化"]
+ HARNESS --> PLAN["Workflow Plan
story / storybook / asset_generation / asset_retry"]
+ HARNESS --> CONTROL["Execution Control
取消 / 超时收敛 / 安全检查点"]
+ HARNESS --> TRACE["Trace Recorder
job events / step metadata / provider trace"]
+ HARNESS --> ARTIFACT["Artifact Workflows
story_text / storybook_pages / image / audio"]
+ HARNESS --> GUARD["Quality Gates
schema / 儿童安全 / 内容完整性"]
+
+ ARTIFACT --> ROUTER["Provider Router
策略 / failover / 熔断 / 成本"]
+ ROUTER --> ADAPTERS["Provider Adapters"]
+ HARNESS --> STATE["State Store
stories / generation_jobs / generation_job_events"]
+ STATE --> FE["Vue 前端进度 / 详情 / 重试"]
+```
+
+## 5. 运行时核心概念
+
+### 5.1 Generation Command
+
+表示一次用户意图,来源包括:
+
+- `POST /api/generations`
+- `POST /api/generations/{story_id}/retry-assets`
+- `POST /api/generations/jobs/{job_id}/retry`
+- 旧兼容接口产生的同步生成调用
+
+标准字段:
+
+| 字段 | 说明 |
+| --- | --- |
+| `output_mode` | `story`、`storybook`、`asset_generation`、`asset_retry` |
+| `input_type` | `keywords`、`full_story`、`image`、`audio` 或资产组合 |
+| `user_id` | 当前用户 |
+| `story_id` | 已有故事 ID,可为空 |
+| `request_payload` | 原始请求的 JSON 安全快照 |
+
+### 5.2 Workflow Step
+
+第一阶段先将现有 event type 映射为标准 step,不立即改库:
+
+| Step | 当前事件 | 是否阻塞主内容 |
+| --- | --- | --- |
+| `request_acceptance` | `request_accepted`、`retry_queued` | 是 |
+| `worker_start` | `worker_started` | 是 |
+| `context_preparation` | `context_prepared` | 是 |
+| `narrative_generation` | `narrative_generated` | 是 |
+| `story_persistence` | `story_saved` | 是 |
+| `image_generation` | `cover_image_*`、`storybook_*image*` | 否 |
+| `audio_generation` | `audio_*` | 否 |
+| `postprocessing` | `postprocessing_queued` | 否 |
+| `completion` | `generation_completed`、`asset_*_completed` | 是 |
+| `cancellation` | `cancel_requested`、`generation_canceled` | 是 |
+| `stale_recovery` | `generation_stale_failed` | 是 |
+
+### 5.3 Artifact
+
+| Artifact | 来源 | 状态字段 | 恢复策略 |
+| --- | --- | --- | --- |
+| `story_text` | text provider | `text_status` | 失败则 job failed |
+| `storybook_pages` | storybook provider | `text_status` | 失败则 job failed |
+| `cover_image` | image provider | `image_status` | 失败后 `degraded_completed`,可重试 |
+| `page_image` | image provider | `image_status` | 部分页失败后 `degraded_completed`,可重试 |
+| `audio` | tts provider + file cache | `audio_status` | 失败后 `degraded_completed`,可重试 |
+| `achievement_memory` | Celery 后处理 | 事件记录 | 失败不影响主内容 |
+
+### 5.4 Failure Category
+
+第一阶段先在代码中定义枚举和 metadata 规范;后续逐步写入事件:
+
+| Category | 说明 |
+| --- | --- |
+| `provider_error` | 外部 Provider 返回失败或不可用 |
+| `schema_error` | Provider 输出无法解析或字段不完整 |
+| `safety_error` | 儿童安全或内容安全校验失败 |
+| `timeout` | 调用超时或 job 超时 |
+| `canceled` | 用户取消 |
+| `stale_job` | 后台任务卡住后被系统收敛 |
+| `storage_error` | 数据库、文件缓存或持久化失败 |
+| `validation_error` | 用户输入、档案或宇宙校验失败 |
+| `unknown_error` | 未归类错误 |
+
+### 5.5 Trace Metadata
+
+标准 event metadata 应逐步包含:
+
+```json
+{
+ "step": "narrative_generation",
+ "artifact": "story_text",
+ "capability": "text",
+ "adapter": "demo",
+ "provider_id": null,
+ "strategy": "priority",
+ "latency_ms": 42,
+ "estimated_cost_usd": 0.01,
+ "failure_category": null,
+ "error": null,
+ "retryable": false,
+ "blocks_main_result": true
+}
+```
+
+短期兼容要求:
+
+- 不删除现有 metadata 字段。
+- 新增字段必须向后兼容。
+- 前端仍可使用当前 `event_type`、`status`、`message`、`event_metadata`。
+
+## 6. 模块设计
+
+### 6.1 `app/services/harness/types.py`
+
+新增纯类型模块,不依赖数据库。
+
+职责:
+
+- 定义 `WorkflowStep`
+- 定义 `ArtifactKind`
+- 定义 `FailureCategory`
+- 定义 `StepStatus`
+- 定义 `TraceMetadata`
+- 定义从旧 event type 到标准 step 的映射函数
+
+验收:
+
+- 单测覆盖 event type 到 step 的映射。
+- 未知 event type 映射到 `unknown` 或保守默认,不抛出非预期异常。
+
+### 6.2 `app/services/harness/trace.py`
+
+封装 job event 写入。
+
+职责:
+
+- 提供 `TraceRecorder.record_step(...)`
+- 提供 `TraceRecorder.record_provider_call(...)`
+- 统一补齐 `step`、`artifact`、`failure_category`、`retryable`、`blocks_main_result`
+- 内部继续调用现有 `record_generation_event`
+
+验收:
+
+- 现有 `generation_job_events` 行为保持。
+- 新事件 metadata 含标准字段。
+- job 为空时安全跳过,兼容旧同步接口。
+
+### 6.3 `app/services/harness/control.py`
+
+封装执行控制。
+
+职责:
+
+- 提供 `ExecutionControl.stop_if_cancel_requested(...)`
+- 保留现有取消语义和 `GenerationJobCanceledError`
+- 后续可扩展 step timeout、checkpoint、stale recovery
+
+验收:
+
+- 取消中的 job 仍被标记为 `canceled`。
+- 现有取消测试继续通过。
+
+### 6.4 `app/services/harness/artifacts.py`
+
+第二阶段再拆。第一阶段先保留 `story_service` 中的资产函数,只把共享结果类型移动或桥接。
+
+职责:
+
+- 表达 `AssetCompletionResult`
+- 后续承载封面、绘本插图、音频工作流
+
+验收:
+
+- 资产重试行为不变。
+- `retryable_assets` 行为不变。
+
+### 6.5 `story_service` 改造方式
+
+第一阶段仅做低风险替换:
+
+- `_record_job_event_if_present` 代理到 `TraceRecorder`
+- `_stop_if_job_cancel_requested` 代理到 `ExecutionControl`
+- `AssetCompletionResult` 可先留在原文件,第二阶段再移动
+- 不重写 `generate_generation_service`、`run_generation_job_service` 主流程
+
+第二阶段再拆分:
+
+- `story_generation_workflow.py`
+- `storybook_generation_workflow.py`
+- `asset_generation_workflow.py`
+- `asset_retry_workflow.py`
+
+## 7. 阶段计划
+
+### 阶段 0: 设计与基线
+
+目标:
+
+- 产出本技术设计。
+- 产出阶段 0 报告。
+- 明确阶段拆解、验收标准和验证方式。
+
+最小任务:
+
+| ID | 任务 | 验收 |
+| --- | --- | --- |
+| H0-1 | 梳理现有统一生成 PRD、架构文档和测试 | 文档引用现有边界,不重复设计入口 |
+| H0-2 | 定义目标 runtime 架构 | 技术设计包含模块、数据、阶段计划 |
+| H0-3 | 拆分最小可执行任务 | 每个阶段有任务 ID 和验收标准 |
+| H0-4 | 建立阶段报告机制 | `docs/planning/` 下有阶段报告 |
+
+### 阶段 1: Harness 基础类型与事件封装
+
+目标:
+
+- 新增 harness 包。
+- 标准化 workflow step、artifact、failure category。
+- 用 `TraceRecorder` 封装事件写入。
+- 用 `ExecutionControl` 封装取消检查。
+
+最小任务:
+
+| ID | 任务 | 验收 |
+| --- | --- | --- |
+| H1-1 | 新增 `app/services/harness/__init__.py` | 后端 import 正常 |
+| H1-2 | 新增 `types.py` 枚举和映射 | 单测覆盖核心 event type |
+| H1-3 | 新增 `trace.py` | record 后 metadata 含 `step` |
+| H1-4 | 新增 `control.py` | 取消行为与旧逻辑一致 |
+| H1-5 | 替换 `story_service` 中事件和取消 helper 的内部实现 | 现有 API 行为不变 |
+| H1-6 | 补 `tests/test_harness_runtime.py` | 后端测试通过 |
+
+验证命令:
+
+```bash
+cd backend
+pytest tests/test_harness_runtime.py tests/test_generation_jobs.py
+ruff check app tests
+```
+
+阶段报告:
+
+- `docs/planning/harness-stage-1-report.md`
+
+### 阶段 2: 资产工作流边界抽取
+
+目标:
+
+- 将封面、绘本插图、音频补全从 `story_service` 中拆成 artifact workflow。
+- 保持当前 `StoryAssetStatus` 和 `sync_story_status` 语义。
+
+最小任务:
+
+| ID | 任务 | 验收 |
+| --- | --- | --- |
+| H2-1 | 移动或桥接 `AssetCompletionResult` | import 稳定 |
+| H2-2 | 抽普通故事封面工作流 | 封面生成、失败、重试测试通过 |
+| H2-3 | 抽绘本图片工作流 | 绘本逐页事件顺序不破坏 |
+| H2-4 | 抽音频工作流 | 音频缓存和失败状态测试通过 |
+| H2-5 | 补 artifact workflow 单测 | 资产相关测试通过 |
+
+验证命令:
+
+```bash
+cd backend
+pytest tests/test_stories.py tests/test_generation_jobs.py tests/test_audio_cache.py
+ruff check app tests
+```
+
+阶段报告:
+
+- `docs/planning/harness-stage-2-report.md`
+
+### 阶段 3: Workflow Plan 与执行器
+
+目标:
+
+- 用显式 `WorkflowPlan` 表达 story、storybook、asset_generation、asset_retry。
+- 将 `_generate_generation_service_with_job` 的分支压缩到计划构建和执行器。
+
+最小任务:
+
+| ID | 任务 | 验收 |
+| --- | --- | --- |
+| H3-1 | 定义 `WorkflowPlan` 和 `WorkflowTask` | 不依赖 FastAPI schema |
+| H3-2 | 为普通故事构建 plan | 不生成图片时步骤正确 |
+| H3-3 | 为完整故事构建 plan | 图片为 recoverable step |
+| H3-4 | 为绘本构建 plan | 绘本结构和图片步骤可区分 |
+| H3-5 | 为资产任务构建 plan | 重试和后台生成共用计划 |
+| H3-6 | 增加 plan 单测 | 核心模式计划快照稳定 |
+
+验证命令:
+
+```bash
+cd backend
+pytest tests/test_harness_runtime.py tests/test_generation_jobs.py tests/test_stories.py
+ruff check app tests
+```
+
+阶段报告:
+
+- `docs/planning/harness-stage-3-report.md`
+
+### 阶段 4: Quality Gates 与输出验证
+
+目标:
+
+- 在 Provider 输出进入持久化之前,加入低成本、确定性的质量门。
+- 后续可扩展模型评审,但第一轮避免额外 AI 成本。
+
+最小任务:
+
+| ID | 任务 | 验收 |
+| --- | --- | --- |
+| H4-1 | 定义 quality gate 接口 | 不绑定具体 Provider |
+| H4-2 | 文本故事完整性检查 | 标题、正文、封面 prompt 缺失可识别 |
+| H4-3 | 绘本结构检查 | 页数、页码、正文、图片 prompt 可识别 |
+| H4-4 | 儿童内容基础安全检查 | 明显不适龄词或空内容阻断 |
+| H4-5 | gate 失败写入 `failure_category` | job event 可解释 |
+
+验证命令:
+
+```bash
+cd backend
+pytest tests/test_harness_runtime.py tests/test_generation_jobs.py tests/test_stories.py
+ruff check app tests
+```
+
+阶段报告:
+
+- `docs/planning/harness-stage-4-report.md`
+
+### 阶段 5: Trace Analytics 与前端增量展示
+
+目标:
+
+- 前端继续消费现有 job/event,但可展示标准 step、artifact、failure category。
+- 管理端可按 failure category 聚合失败原因。
+
+最小任务:
+
+| ID | 任务 | 验收 |
+| --- | --- | --- |
+| H5-1 | 后端聚合支持标准 step/failure category | 旧字段兼容 |
+| H5-2 | 用户端生成轨迹展示 step 名称 | 移动端不溢出 |
+| H5-3 | 管理端 Provider dashboard 增加 failure category | 构建通过 |
+| H5-4 | 更新 smoke 脚本校验标准 metadata | demo smoke 通过 |
+
+验证命令:
+
+```bash
+cd backend
+pytest
+ruff check app tests
+cd ../frontend
+npm run build
+cd ../admin-frontend
+npm run build
+```
+
+阶段报告:
+
+- `docs/planning/harness-stage-5-report.md`
+
+## 8. 需求与验收
+
+### 功能需求
+
+| ID | 优先级 | 需求 | 验收标准 |
+| --- | --- | --- | --- |
+| FR-001 | MUST | 系统必须保留统一生成 API 行为 | `/api/generations` 测试继续通过 |
+| FR-002 | MUST | 系统必须有标准 workflow step 类型 | 核心 event type 可映射到 step |
+| FR-003 | MUST | 系统必须有标准 artifact 类型 | story、storybook、image、audio 可区分 |
+| FR-004 | MUST | 系统必须标准化 job event metadata | 新事件含 `step`,旧字段不删除 |
+| FR-005 | MUST | 用户取消语义必须保持 | 取消测试继续通过 |
+| FR-006 | SHOULD | Provider 调用应记录标准 trace 字段 | provider event 含 capability、adapter、latency、cost、step |
+| FR-007 | SHOULD | 资产工作流应从主 service 拆出 | `story_service` 行数和职责减少 |
+| FR-008 | SHOULD | 输出验证应在持久化前执行 | schema 缺失可被测试捕获 |
+| FR-009 | COULD | 前端展示标准 step/failure category | 构建通过且无布局溢出 |
+
+### 非功能需求
+
+| ID | 优先级 | 需求 | 验收标准 |
+| --- | --- | --- | --- |
+| NFR-001 | MUST | 向后兼容 | 现有测试和 smoke 主链路不破坏 |
+| NFR-002 | MUST | 可测试 | 每个新增 harness 模块有单测 |
+| NFR-003 | MUST | 可观测 | job/event 可以解释 step、artifact、失败原因 |
+| NFR-004 | SHOULD | 低耦合 | harness 类型模块不依赖 FastAPI 和 SQLAlchemy |
+| NFR-005 | SHOULD | 性能稳定 | 不新增阻塞式外部调用 |
+| NFR-006 | SHOULD | 中文一致性 | 文档、用户可见文案和新增注释使用简体中文 |
+
+## 9. 风险与缓解
+
+| 风险 | 影响 | 缓解 |
+| --- | --- | --- |
+| 过度抽象导致改造变慢 | 中 | 第一阶段只抽类型、trace、control,不拆主流程 |
+| 事件 metadata 变化影响前端 | 中 | 只新增字段,不删除旧字段 |
+| 取消/重试语义被破坏 | 高 | 优先跑 `tests/test_generation_jobs.py` |
+| Provider trace 与 job event 重复 | 低 | 保持 Provider 事件专注调用层,workflow 事件专注产品步骤 |
+| 文档与实现偏离 | 中 | 每个阶段报告必须记录实现偏差 |
+| 质量门误伤内容 | 中 | 第四阶段先做确定性低风险检查,模型评审延后 |
+
+## 10. 审查清单
+
+每个阶段完成前必须检查:
+
+- [ ] 是否保持 `/api/generations` 行为兼容
+- [ ] 是否有对应测试或验证命令
+- [ ] 是否没有引入不必要的外部框架
+- [ ] 是否没有重写无关功能
+- [ ] 是否保留用户已有工作区改动
+- [ ] 是否更新阶段报告
+- [ ] 是否更新本设计中的阶段状态或偏差记录
+
+## 11. 当前状态
+
+| 阶段 | 状态 | 备注 |
+| --- | --- | --- |
+| 阶段 0 | 已完成设计基线 | 已建立本设计与阶段 0 报告 |
+| 阶段 1 | 已完成基础实现 | 已新增 harness 类型、trace recorder、execution control,并通过定向测试 |
+| 阶段 2 | 已完成主要资产补全抽取 | 封面、音频、持久化绘本缺失图片补全已迁入 harness asset workflows |
+| 阶段 3 | 已完成计划建模基线 | 已定义 WorkflowPlan/WorkflowTask 和核心模式计划快照;执行器接管留待后续 |
+| 阶段 4 | 已完成确定性质量门 | 已接入文本故事和绘本结构完整性/儿童安全基础检查 |
+| 阶段 5 | 待执行 | Trace Analytics 与前端展示 |