From 459ca9edef7e7158526b810bd69b7316754f6be7 Mon Sep 17 00:00:00 2001 From: Yuyan Date: Sun, 21 Jun 2026 22:31:38 +0800 Subject: [PATCH] Add generation harness runtime --- backend/app/services/harness/__init__.py | 2 + backend/app/services/harness/artifacts.py | 37 ++ .../app/services/harness/asset_workflows.py | 468 ++++++++++++++++ backend/app/services/harness/control.py | 48 ++ backend/app/services/harness/plans.py | 237 +++++++++ backend/app/services/harness/quality_gates.py | 191 +++++++ backend/app/services/harness/trace.py | 64 +++ backend/app/services/harness/types.py | 169 ++++++ backend/app/services/provider_router.py | 5 +- backend/app/services/story_service.py | 498 +++--------------- backend/tests/test_generation_jobs.py | 64 +++ backend/tests/test_harness_runtime.py | 374 +++++++++++++ docs/planning/harness-stage-0-report.md | 111 ++++ docs/planning/harness-stage-1-report.md | 122 +++++ docs/planning/harness-stage-2-report.md | 121 +++++ docs/planning/harness-stage-3-report.md | 121 +++++ docs/planning/harness-stage-4-report.md | 140 +++++ .../harness-engineering-modernization.md | 493 +++++++++++++++++ 18 files changed, 2846 insertions(+), 419 deletions(-) create mode 100644 backend/app/services/harness/__init__.py create mode 100644 backend/app/services/harness/artifacts.py create mode 100644 backend/app/services/harness/asset_workflows.py create mode 100644 backend/app/services/harness/control.py create mode 100644 backend/app/services/harness/plans.py create mode 100644 backend/app/services/harness/quality_gates.py create mode 100644 backend/app/services/harness/trace.py create mode 100644 backend/app/services/harness/types.py create mode 100644 backend/tests/test_harness_runtime.py create mode 100644 docs/planning/harness-stage-0-report.md create mode 100644 docs/planning/harness-stage-1-report.md create mode 100644 docs/planning/harness-stage-2-report.md create mode 100644 docs/planning/harness-stage-3-report.md create mode 100644 docs/planning/harness-stage-4-report.md create mode 100644 docs/technical/harness-engineering-modernization.md diff --git a/backend/app/services/harness/__init__.py b/backend/app/services/harness/__init__.py new file mode 100644 index 0000000..00c33ae --- /dev/null +++ b/backend/app/services/harness/__init__.py @@ -0,0 +1,2 @@ +"""Generation harness runtime support.""" + diff --git a/backend/app/services/harness/artifacts.py b/backend/app/services/harness/artifacts.py new file mode 100644 index 0000000..2ec1be9 --- /dev/null +++ b/backend/app/services/harness/artifacts.py @@ -0,0 +1,37 @@ +"""Artifact result types for generation harness workflows.""" + +from dataclasses import dataclass +from typing import Literal + +from app.services.story_status import StoryAssetStatus + +AssetCompletionKind = Literal["cover_image", "storybook_images", "audio"] + + +@dataclass(frozen=True) +class AssetCompletionResult: + """Service-level result for a generated asset completion attempt.""" + + asset: AssetCompletionKind + status: StoryAssetStatus + value: str | bytes | None = None + error: str | None = None + blocks_main_result: bool = False + + @property + def succeeded(self) -> bool: + """Whether the asset reached a usable ready state.""" + + return self.status == StoryAssetStatus.READY and self.error is None + + +def asset_result_metadata(result: AssetCompletionResult) -> dict: + """Build JSON-safe metadata for asset workflow events.""" + + return { + "asset": result.asset, + "status": result.status.value, + "error": result.error, + "blocks_main_result": result.blocks_main_result, + } + diff --git a/backend/app/services/harness/asset_workflows.py b/backend/app/services/harness/asset_workflows.py new file mode 100644 index 0000000..b73b3a8 --- /dev/null +++ b/backend/app/services/harness/asset_workflows.py @@ -0,0 +1,468 @@ +"""Artifact completion workflows for the generation harness runtime.""" + +from collections.abc import Awaitable, Callable + +from fastapi import HTTPException +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.logging import get_logger +from app.db.models import Story +from app.services.harness.artifacts import AssetCompletionResult, asset_result_metadata +from app.services.harness.control import ExecutionControl +from app.services.harness.trace import TraceRecorder +from app.services.story_status import StoryAssetStatus, sync_story_status + +logger = get_logger(__name__) + +ImageGenerator = Callable[..., Awaitable[str]] +TTSGenerator = Callable[..., Awaitable[bytes]] +AudioCacheExists = Callable[[str], bool] +AudioCacheReader = Callable[[str], bytes] +AudioCacheWriter = Callable[[int, bytes], str] + + +async def complete_cover_image_asset( + story: Story, + db: AsyncSession, + *, + generate_image_func: ImageGenerator, + raise_on_failure: bool = False, + last_error_prefix: str | None = None, + log_event: str = "cover_asset_generation_failed", + job=None, +) -> AssetCompletionResult: + """Generate or retry a text story cover through one asset workflow.""" + + if not story.cover_prompt: + raise HTTPException(status_code=400, detail="Story has no cover prompt") + + sync_story_status(story, image_status=StoryAssetStatus.GENERATING) + await db.commit() + await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="cover_image_started", + status="running", + message="Cover image generation started.", + metadata={"asset": "image", "cover_prompt_present": True}, + ) + + try: + await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story) + image_url = await generate_image_func( + story.cover_prompt, + db=db, + user_id=story.user_id, + generation_job=job, + story_id=story.id, + ) + story.image_url = image_url + sync_story_status(story, image_status=StoryAssetStatus.READY) + await db.commit() + result = AssetCompletionResult( + asset="cover_image", + status=StoryAssetStatus.READY, + value=image_url, + blocks_main_result=raise_on_failure, + ) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="cover_image_succeeded", + status="succeeded", + message="Cover image was generated.", + metadata=asset_result_metadata(result), + ) + return result + except Exception as exc: + provider_error = str(exc) + last_error = ( + f"{last_error_prefix}: {provider_error}" + if last_error_prefix + else provider_error + ) + sync_story_status( + story, + image_status=StoryAssetStatus.FAILED, + last_error=last_error, + ) + await db.commit() + logger.warning(log_event, story_id=story.id, error=provider_error) + + result = AssetCompletionResult( + asset="cover_image", + status=StoryAssetStatus.FAILED, + error=provider_error, + blocks_main_result=raise_on_failure, + ) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="cover_image_failed", + status="failed", + message="Cover image generation failed.", + metadata=asset_result_metadata(result), + ) + if raise_on_failure: + raise HTTPException( + status_code=500, + detail=f"Image generation failed: {provider_error}", + ) from exc + + return result + + +async def read_cached_audio_asset( + story: Story, + db: AsyncSession, + *, + audio_cache_exists_func: AudioCacheExists, + read_audio_cache_func: AudioCacheReader, +) -> bytes | None: + """Read cached audio or repair stale audio cache metadata.""" + + if story.audio_path and audio_cache_exists_func(story.audio_path): + if story.audio_status != StoryAssetStatus.READY.value: + sync_story_status(story, audio_status=StoryAssetStatus.READY) + await db.commit() + return read_audio_cache_func(story.audio_path) + + if story.audio_path and not audio_cache_exists_func(story.audio_path): + logger.warning( + "story_audio_cache_missing", + story_id=story.id, + audio_path=story.audio_path, + ) + story.audio_path = None + if story.audio_status == StoryAssetStatus.READY.value: + sync_story_status(story, audio_status=StoryAssetStatus.NOT_REQUESTED) + await db.commit() + + return None + + +async def complete_audio_asset( + story: Story, + db: AsyncSession, + *, + text_to_speech_func: TTSGenerator, + audio_cache_exists_func: AudioCacheExists, + read_audio_cache_func: AudioCacheReader, + write_story_audio_cache_func: AudioCacheWriter, + raise_on_failure: bool = True, + job=None, +) -> AssetCompletionResult: + """Complete TTS audio generation through one asset workflow.""" + + if not story.story_text: + raise HTTPException(status_code=400, detail="Story has no text") + + cached_audio = await read_cached_audio_asset( + story, + db, + audio_cache_exists_func=audio_cache_exists_func, + read_audio_cache_func=read_audio_cache_func, + ) + if cached_audio is not None: + result = AssetCompletionResult( + asset="audio", + status=StoryAssetStatus.READY, + value=cached_audio, + blocks_main_result=raise_on_failure, + ) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="audio_cache_hit", + status="succeeded", + message="Cached story audio was reused.", + metadata=asset_result_metadata(result), + ) + return result + + sync_story_status(story, audio_status=StoryAssetStatus.GENERATING) + await db.commit() + await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="audio_started", + status="running", + message="Story audio generation started.", + metadata={"asset": "audio"}, + ) + + try: + await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story) + audio_data = await text_to_speech_func( + story.story_text, + db=db, + user_id=story.user_id, + generation_job=job, + story_id=story.id, + ) + story.audio_path = write_story_audio_cache_func(story.id, audio_data) + sync_story_status( + story, + audio_status=StoryAssetStatus.READY, + ) + await db.commit() + result = AssetCompletionResult( + asset="audio", + status=StoryAssetStatus.READY, + value=audio_data, + blocks_main_result=raise_on_failure, + ) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="audio_succeeded", + status="succeeded", + message="Story audio was generated and cached.", + metadata=asset_result_metadata(result), + ) + return result + except Exception as exc: + provider_error = str(exc) + story.audio_path = None + sync_story_status( + story, + audio_status=StoryAssetStatus.FAILED, + last_error=provider_error, + ) + await db.commit() + logger.error("audio_generation_failed", story_id=story.id, error=provider_error) + + result = AssetCompletionResult( + asset="audio", + status=StoryAssetStatus.FAILED, + error=provider_error, + blocks_main_result=raise_on_failure, + ) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="audio_failed", + status="failed", + message="Story audio generation failed.", + metadata=asset_result_metadata(result), + ) + if raise_on_failure: + raise HTTPException( + status_code=500, + detail=f"Audio generation failed: {provider_error}", + ) from exc + + return result + + +def get_storybook_pages_data(story: Story) -> list[dict]: + """Return mutable storybook page data from the persisted JSON field.""" + + return [dict(page) for page in story.pages or [] if isinstance(page, dict)] + + +def build_storybook_error_message( + *, + cover_failed: bool, + failed_pages: list[int], +) -> str | None: + """Summarize storybook image generation errors for the latest attempt.""" + + parts: list[str] = [] + if cover_failed: + parts.append("封面生成失败") + if failed_pages: + pages = "、".join(str(page) for page in sorted(failed_pages)) + parts.append(f"第 {pages} 页插图生成失败") + return ";".join(parts) if parts else None + + +def resolve_storybook_image_status( + *, + generate_images: bool, + cover_prompt: str | None, + cover_url: str | None, + pages_data: list[dict], +) -> StoryAssetStatus: + """Resolve the persisted image status for a storybook.""" + + if not generate_images: + return StoryAssetStatus.NOT_REQUESTED + + expected_assets = 0 + ready_assets = 0 + + if cover_prompt or cover_url: + expected_assets += 1 + if cover_url: + ready_assets += 1 + + for page in pages_data: + if not page.get("image_prompt") and not page.get("image_url"): + continue + expected_assets += 1 + if page.get("image_url"): + ready_assets += 1 + + if expected_assets == 0: + return StoryAssetStatus.NOT_REQUESTED + + if ready_assets == expected_assets: + return StoryAssetStatus.READY + + return StoryAssetStatus.FAILED + + +async def complete_storybook_image_assets( + story: Story, + db: AsyncSession, + *, + generate_image_func: ImageGenerator, + job=None, +) -> AssetCompletionResult: + """Complete missing cover/page images for a persisted storybook.""" + + pages_data = get_storybook_pages_data(story) + has_image_prompt = bool(story.cover_prompt) or any( + page.get("image_prompt") for page in pages_data + ) + if not has_image_prompt: + raise HTTPException(status_code=400, detail="Storybook has no image prompts") + + sync_story_status(story, image_status=StoryAssetStatus.GENERATING) + await db.commit() + await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="storybook_images_started", + status="running", + message="Storybook missing image completion started.", + metadata={"asset": "image"}, + ) + + cover_failed = False + failed_pages: list[int] = [] + completed_pages: list[int] = [] + + if story.cover_prompt and not story.image_url: + await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story) + try: + story.image_url = await generate_image_func( + story.cover_prompt, + db=db, + user_id=story.user_id, + generation_job=job, + story_id=story.id, + ) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="storybook_cover_image_succeeded", + status="succeeded", + message="Storybook cover image was generated.", + metadata={"asset": "image", "scope": "cover"}, + ) + except Exception as exc: + cover_failed = True + logger.warning( + "storybook_cover_asset_completion_failed", + story_id=story.id, + error=str(exc), + ) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="storybook_cover_image_failed", + status="failed", + message="Storybook cover image generation failed.", + metadata={"asset": "image", "scope": "cover", "error": str(exc)}, + ) + + for page in pages_data: + if not page.get("image_prompt") or page.get("image_url"): + continue + + await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story) + try: + page["image_url"] = await generate_image_func( + page["image_prompt"], + db=db, + user_id=story.user_id, + generation_job=job, + story_id=story.id, + ) + page_number = page.get("page_number") + if isinstance(page_number, int): + completed_pages.append(page_number) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="storybook_page_image_succeeded", + status="succeeded", + message="Storybook page image was generated.", + metadata={"asset": "image", "scope": "page", "page_number": page_number}, + ) + except Exception as exc: + page_number = page.get("page_number") + if isinstance(page_number, int): + failed_pages.append(page_number) + logger.warning( + "storybook_page_asset_completion_failed", + story_id=story.id, + page=page_number, + error=str(exc), + ) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="storybook_page_image_failed", + status="failed", + message="Storybook page image generation failed.", + metadata={ + "asset": "image", + "scope": "page", + "page_number": page_number, + "error": str(exc), + }, + ) + + story.pages = pages_data + error_message = build_storybook_error_message( + cover_failed=cover_failed, + failed_pages=failed_pages, + ) + image_status = resolve_storybook_image_status( + generate_images=True, + cover_prompt=story.cover_prompt, + cover_url=story.image_url, + pages_data=pages_data, + ) + sync_story_status( + story, + image_status=image_status, + last_error=error_message, + ) + await db.commit() + + result = AssetCompletionResult( + asset="storybook_images", + status=image_status, + value=story.image_url, + error=error_message, + ) + await TraceRecorder(db).record_step( + job=job, + story_id=story.id, + event_type="storybook_images_completed", + status="failed" if error_message else "succeeded", + message="Storybook image completion finished.", + metadata={ + **asset_result_metadata(result), + "completed_pages": sorted(completed_pages), + "failed_pages": sorted(failed_pages), + }, + ) + return result diff --git a/backend/app/services/harness/control.py b/backend/app/services/harness/control.py new file mode 100644 index 0000000..6789b5e --- /dev/null +++ b/backend/app/services/harness/control.py @@ -0,0 +1,48 @@ +"""Execution control helpers for generation harness workflows.""" + +from typing import TYPE_CHECKING + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.services.generation_jobs import finish_generation_job + +if TYPE_CHECKING: + from app.db.models import GenerationJob, Story + + +class GenerationJobCanceledError(Exception): + """Raised when a running worker job has been canceled by the user.""" + + +class ExecutionControl: + """Runtime control surface for cancelable generation workflows.""" + + def __init__(self, db: AsyncSession): + self.db = db + + async def stop_if_cancel_requested( + self, + *, + job: "GenerationJob | None", + story: "Story | None" = None, + ) -> None: + """Stop a worker-owned job at the next safe checkpoint after cancellation.""" + + if job is None: + return + + await self.db.refresh(job) + if job.current_step != "cancel_requested": + return + + await finish_generation_job( + self.db, + job=job, + story=story, + status="canceled", + current_step="generation_canceled", + error_message="Generation canceled by user.", + message="Generation job was canceled after a user request.", + ) + raise GenerationJobCanceledError() + diff --git a/backend/app/services/harness/plans.py b/backend/app/services/harness/plans.py new file mode 100644 index 0000000..9163ca7 --- /dev/null +++ b/backend/app/services/harness/plans.py @@ -0,0 +1,237 @@ +"""Workflow plan builders for generation harness workflows.""" + +from dataclasses import dataclass +from enum import StrEnum +from typing import Any + +from app.services.harness.types import ArtifactKind, WorkflowStep + + +class WorkflowMode(StrEnum): + """Supported executable workflow modes.""" + + STORY = "story" + STORY_WITH_ASSETS = "story_with_assets" + STORYBOOK = "storybook" + ASSET_GENERATION = "asset_generation" + ASSET_RETRY = "asset_retry" + + +@dataclass(frozen=True) +class WorkflowTask: + """One planned step in a generation workflow.""" + + key: str + step: WorkflowStep + artifact: ArtifactKind + required: bool = True + recoverable: bool = False + + def to_snapshot(self) -> dict[str, Any]: + """Return a JSON-safe snapshot for tests and trace metadata.""" + + return { + "key": self.key, + "step": self.step.value, + "artifact": self.artifact.value, + "required": self.required, + "recoverable": self.recoverable, + } + + +@dataclass(frozen=True) +class WorkflowPlan: + """Declarative shape of a generation workflow before execution.""" + + mode: WorkflowMode + tasks: tuple[WorkflowTask, ...] + + def to_snapshot(self) -> dict[str, Any]: + """Return a JSON-safe snapshot for tests and trace metadata.""" + + return { + "mode": self.mode.value, + "tasks": [task.to_snapshot() for task in self.tasks], + } + + +def build_story_plan(*, generate_images: bool) -> WorkflowPlan: + """Build a plan for a text story generation request.""" + + tasks = [ + WorkflowTask( + key="prepare_context", + step=WorkflowStep.CONTEXT_PREPARATION, + artifact=ArtifactKind.NONE, + ), + WorkflowTask( + key="generate_narrative", + step=WorkflowStep.NARRATIVE_GENERATION, + artifact=ArtifactKind.STORY_TEXT, + ), + WorkflowTask( + key="persist_story", + step=WorkflowStep.STORY_PERSISTENCE, + artifact=ArtifactKind.STORY_TEXT, + ), + ] + + if generate_images: + tasks.append( + WorkflowTask( + key="generate_cover_image", + step=WorkflowStep.IMAGE_GENERATION, + artifact=ArtifactKind.COVER_IMAGE, + required=False, + recoverable=True, + ) + ) + + tasks.extend( + [ + WorkflowTask( + key="queue_postprocessing", + step=WorkflowStep.POSTPROCESSING, + artifact=ArtifactKind.ACHIEVEMENT_MEMORY, + required=False, + recoverable=True, + ), + WorkflowTask( + key="complete_generation", + step=WorkflowStep.COMPLETION, + artifact=ArtifactKind.NONE, + ), + ] + ) + + return WorkflowPlan( + mode=WorkflowMode.STORY_WITH_ASSETS if generate_images else WorkflowMode.STORY, + tasks=tuple(tasks), + ) + + +def build_storybook_plan(*, generate_images: bool) -> WorkflowPlan: + """Build a plan for a storybook generation request.""" + + tasks = [ + WorkflowTask( + key="prepare_context", + step=WorkflowStep.CONTEXT_PREPARATION, + artifact=ArtifactKind.NONE, + ), + WorkflowTask( + key="generate_storybook_pages", + step=WorkflowStep.NARRATIVE_GENERATION, + artifact=ArtifactKind.STORYBOOK_PAGES, + ), + ] + + if generate_images: + tasks.append( + WorkflowTask( + key="generate_storybook_images", + step=WorkflowStep.IMAGE_GENERATION, + artifact=ArtifactKind.IMAGE, + required=False, + recoverable=True, + ) + ) + + tasks.extend( + [ + WorkflowTask( + key="persist_storybook", + step=WorkflowStep.STORY_PERSISTENCE, + artifact=ArtifactKind.STORYBOOK_PAGES, + ), + WorkflowTask( + key="queue_postprocessing", + step=WorkflowStep.POSTPROCESSING, + artifact=ArtifactKind.ACHIEVEMENT_MEMORY, + required=False, + recoverable=True, + ), + WorkflowTask( + key="complete_generation", + step=WorkflowStep.COMPLETION, + artifact=ArtifactKind.NONE, + ), + ] + ) + + return WorkflowPlan(mode=WorkflowMode.STORYBOOK, tasks=tuple(tasks)) + + +def build_asset_plan(*, output_mode: str, assets: list[str]) -> WorkflowPlan: + """Build a plan for asset generation or retry jobs.""" + + mode = ( + WorkflowMode.ASSET_RETRY + if output_mode == WorkflowMode.ASSET_RETRY.value + else WorkflowMode.ASSET_GENERATION + ) + initial_step = ( + WorkflowStep.ASSET_RETRY + if mode == WorkflowMode.ASSET_RETRY + else WorkflowStep.ASSET_GENERATION + ) + initial_key = ( + "start_asset_retry" + if mode == WorkflowMode.ASSET_RETRY + else "start_asset_generation" + ) + completion_key = ( + "complete_asset_retry" + if mode == WorkflowMode.ASSET_RETRY + else "complete_asset_generation" + ) + + tasks = [ + WorkflowTask( + key=initial_key, + step=initial_step, + artifact=ArtifactKind.NONE, + ) + ] + + for asset in dict.fromkeys(assets): + if asset == "image": + tasks.append( + WorkflowTask( + key="complete_image_asset", + step=WorkflowStep.IMAGE_GENERATION, + artifact=ArtifactKind.IMAGE, + required=False, + recoverable=True, + ) + ) + elif asset == "audio": + tasks.append( + WorkflowTask( + key="complete_audio_asset", + step=WorkflowStep.AUDIO_GENERATION, + artifact=ArtifactKind.AUDIO, + required=False, + recoverable=True, + ) + ) + else: + tasks.append( + WorkflowTask( + key=f"complete_{asset}_asset", + step=WorkflowStep.UNKNOWN, + artifact=ArtifactKind.UNKNOWN, + required=False, + recoverable=True, + ) + ) + + tasks.append( + WorkflowTask( + key=completion_key, + step=initial_step, + artifact=ArtifactKind.NONE, + ) + ) + + return WorkflowPlan(mode=mode, tasks=tuple(tasks)) diff --git a/backend/app/services/harness/quality_gates.py b/backend/app/services/harness/quality_gates.py new file mode 100644 index 0000000..2a423e0 --- /dev/null +++ b/backend/app/services/harness/quality_gates.py @@ -0,0 +1,191 @@ +"""Deterministic quality gates for generated child-facing content.""" + +from dataclasses import dataclass +from enum import StrEnum + +from app.services.adapters.storybook.primary import Storybook +from app.services.adapters.text.models import StoryOutput +from app.services.harness.types import FailureCategory + + +class QualityGateCode(StrEnum): + """Stable issue codes emitted by deterministic quality gates.""" + + MISSING_TITLE = "missing_title" + MISSING_STORY_TEXT = "missing_story_text" + MISSING_COVER_PROMPT = "missing_cover_prompt" + MISSING_STORYBOOK_PAGE = "missing_storybook_page" + INVALID_STORYBOOK_PAGE_NUMBER = "invalid_storybook_page_number" + MISSING_STORYBOOK_PAGE_TEXT = "missing_storybook_page_text" + UNSAFE_CHILD_CONTENT = "unsafe_child_content" + + +@dataclass(frozen=True) +class QualityGateIssue: + """One deterministic quality gate issue.""" + + code: QualityGateCode + message: str + failure_category: FailureCategory = FailureCategory.SCHEMA_ERROR + field: str | None = None + + def to_metadata(self) -> dict: + """Return a JSON-safe metadata payload.""" + + return { + "code": self.code.value, + "message": self.message, + "failure_category": self.failure_category.value, + "field": self.field, + } + + +class QualityGateError(ValueError): + """Raised when generated content fails deterministic quality gates.""" + + def __init__(self, issues: list[QualityGateIssue]): + self.issues = issues + message = ";".join(issue.message for issue in issues) + super().__init__(message) + + def to_metadata(self) -> dict: + """Return a JSON-safe metadata payload.""" + + return {"issues": [issue.to_metadata() for issue in self.issues]} + + +UNSAFE_CHILD_TERMS = ( + "自杀", + "自残", + "血腥", + "虐待", + "毒品", + "色情", +) + + +def _is_blank(value: str | None) -> bool: + return not value or not value.strip() + + +def _unsafe_issue_if_present(text: str, *, field: str) -> QualityGateIssue | None: + for term in UNSAFE_CHILD_TERMS: + if term in text: + return QualityGateIssue( + code=QualityGateCode.UNSAFE_CHILD_CONTENT, + message="生成内容包含不适合 3-8 岁儿童的明显风险词。", + failure_category=FailureCategory.SAFETY_ERROR, + field=field, + ) + return None + + +def validate_story_output(output: StoryOutput) -> None: + """Validate generated text story output before persistence.""" + + issues: list[QualityGateIssue] = [] + + if _is_blank(output.title): + issues.append( + QualityGateIssue( + code=QualityGateCode.MISSING_TITLE, + message="故事标题为空。", + field="title", + ) + ) + + if _is_blank(output.story_text): + issues.append( + QualityGateIssue( + code=QualityGateCode.MISSING_STORY_TEXT, + message="故事正文为空。", + field="story_text", + ) + ) + + if _is_blank(output.cover_prompt_suggestion): + issues.append( + QualityGateIssue( + code=QualityGateCode.MISSING_COVER_PROMPT, + message="封面提示词为空。", + field="cover_prompt_suggestion", + ) + ) + + unsafe_issue = _unsafe_issue_if_present( + " ".join([output.title or "", output.story_text or ""]), + field="story_text", + ) + if unsafe_issue is not None: + issues.append(unsafe_issue) + + if issues: + raise QualityGateError(issues) + + +def validate_storybook_output(output: Storybook) -> None: + """Validate generated storybook output before persistence.""" + + issues: list[QualityGateIssue] = [] + + if _is_blank(output.title): + issues.append( + QualityGateIssue( + code=QualityGateCode.MISSING_TITLE, + message="绘本标题为空。", + field="title", + ) + ) + + if not output.pages: + issues.append( + QualityGateIssue( + code=QualityGateCode.MISSING_STORYBOOK_PAGE, + message="绘本至少需要一页内容。", + field="pages", + ) + ) + + seen_page_numbers: set[int] = set() + page_texts: list[str] = [] + for index, page in enumerate(output.pages, start=1): + if not isinstance(page.page_number, int) or page.page_number <= 0: + issues.append( + QualityGateIssue( + code=QualityGateCode.INVALID_STORYBOOK_PAGE_NUMBER, + message=f"绘本第 {index} 个页面页码无效。", + field=f"pages[{index - 1}].page_number", + ) + ) + elif page.page_number in seen_page_numbers: + issues.append( + QualityGateIssue( + code=QualityGateCode.INVALID_STORYBOOK_PAGE_NUMBER, + message=f"绘本页码 {page.page_number} 重复。", + field=f"pages[{index - 1}].page_number", + ) + ) + else: + seen_page_numbers.add(page.page_number) + + if _is_blank(page.text): + issues.append( + QualityGateIssue( + code=QualityGateCode.MISSING_STORYBOOK_PAGE_TEXT, + message=f"绘本第 {index} 页正文为空。", + field=f"pages[{index - 1}].text", + ) + ) + else: + page_texts.append(page.text) + + unsafe_issue = _unsafe_issue_if_present( + " ".join([output.title or "", *page_texts]), + field="pages", + ) + if unsafe_issue is not None: + issues.append(unsafe_issue) + + if issues: + raise QualityGateError(issues) + diff --git a/backend/app/services/harness/trace.py b/backend/app/services/harness/trace.py new file mode 100644 index 0000000..4206bfe --- /dev/null +++ b/backend/app/services/harness/trace.py @@ -0,0 +1,64 @@ +"""Trace recording helpers for generation harness workflows.""" + +from typing import TYPE_CHECKING, Any + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.services.generation_jobs import record_generation_event +from app.services.harness.types import ( + ArtifactKind, + FailureCategory, + WorkflowStep, + normalize_trace_metadata, +) + +if TYPE_CHECKING: + from app.db.models import GenerationJob + + +class TraceRecorder: + """Append workflow events with standard harness trace metadata.""" + + def __init__(self, db: AsyncSession): + self.db = db + + async def record_step( + self, + *, + job: "GenerationJob | None", + event_type: str, + status: str, + story_id: int | None = None, + message: str | None = None, + metadata: dict[str, Any] | None = None, + step: WorkflowStep | str | None = None, + artifact: ArtifactKind | str | None = None, + failure_category: FailureCategory | str | None = None, + retryable: bool | None = None, + blocks_main_result: bool | None = None, + commit: bool = True, + ): + """Append a workflow event when the caller is running under a tracked job.""" + + if job is None: + return None + + return await record_generation_event( + self.db, + job=job, + story_id=story_id, + event_type=event_type, + status=status, + message=message, + metadata=normalize_trace_metadata( + event_type, + metadata, + step=step, + artifact=artifact, + failure_category=failure_category, + retryable=retryable, + blocks_main_result=blocks_main_result, + ), + commit=commit, + ) + diff --git a/backend/app/services/harness/types.py b/backend/app/services/harness/types.py new file mode 100644 index 0000000..174a44d --- /dev/null +++ b/backend/app/services/harness/types.py @@ -0,0 +1,169 @@ +"""Shared types for the generation harness runtime.""" + +from enum import StrEnum +from typing import Any + + +class WorkflowStep(StrEnum): + """Standard product-level steps for generation workflows.""" + + REQUEST_ACCEPTANCE = "request_acceptance" + WORKER_START = "worker_start" + CONTEXT_PREPARATION = "context_preparation" + NARRATIVE_GENERATION = "narrative_generation" + STORY_PERSISTENCE = "story_persistence" + PROVIDER_INVOCATION = "provider_invocation" + IMAGE_GENERATION = "image_generation" + AUDIO_GENERATION = "audio_generation" + ASSET_RETRY = "asset_retry" + ASSET_GENERATION = "asset_generation" + POSTPROCESSING = "postprocessing" + COMPLETION = "completion" + CANCELLATION = "cancellation" + STALE_RECOVERY = "stale_recovery" + UNKNOWN = "unknown" + + +class ArtifactKind(StrEnum): + """Artifacts produced or completed by generation workflows.""" + + STORY_TEXT = "story_text" + STORYBOOK_PAGES = "storybook_pages" + COVER_IMAGE = "cover_image" + PAGE_IMAGE = "page_image" + IMAGE = "image" + AUDIO = "audio" + ACHIEVEMENT_MEMORY = "achievement_memory" + NONE = "none" + UNKNOWN = "unknown" + + +class FailureCategory(StrEnum): + """Coarse failure categories for trace and analytics metadata.""" + + PROVIDER_ERROR = "provider_error" + SCHEMA_ERROR = "schema_error" + SAFETY_ERROR = "safety_error" + TIMEOUT = "timeout" + CANCELED = "canceled" + STALE_JOB = "stale_job" + STORAGE_ERROR = "storage_error" + VALIDATION_ERROR = "validation_error" + UNKNOWN_ERROR = "unknown_error" + + +class StepStatus(StrEnum): + """Standard status values for a workflow step.""" + + QUEUED = "queued" + RUNNING = "running" + SUCCEEDED = "succeeded" + FAILED = "failed" + CANCELED = "canceled" + + +EVENT_STEP_MAP: dict[str, WorkflowStep] = { + "request_accepted": WorkflowStep.REQUEST_ACCEPTANCE, + "retry_queued": WorkflowStep.REQUEST_ACCEPTANCE, + "worker_started": WorkflowStep.WORKER_START, + "context_prepared": WorkflowStep.CONTEXT_PREPARATION, + "narrative_generated": WorkflowStep.NARRATIVE_GENERATION, + "story_saved": WorkflowStep.STORY_PERSISTENCE, + "provider_call_started": WorkflowStep.PROVIDER_INVOCATION, + "provider_call_succeeded": WorkflowStep.PROVIDER_INVOCATION, + "provider_call_failed": WorkflowStep.PROVIDER_INVOCATION, + "quality_gate_failed": WorkflowStep.NARRATIVE_GENERATION, + "cover_image_started": WorkflowStep.IMAGE_GENERATION, + "cover_image_succeeded": WorkflowStep.IMAGE_GENERATION, + "cover_image_failed": WorkflowStep.IMAGE_GENERATION, + "storybook_images_started": WorkflowStep.IMAGE_GENERATION, + "storybook_cover_image_succeeded": WorkflowStep.IMAGE_GENERATION, + "storybook_cover_image_failed": WorkflowStep.IMAGE_GENERATION, + "storybook_page_image_succeeded": WorkflowStep.IMAGE_GENERATION, + "storybook_page_image_failed": WorkflowStep.IMAGE_GENERATION, + "storybook_images_completed": WorkflowStep.IMAGE_GENERATION, + "audio_started": WorkflowStep.AUDIO_GENERATION, + "audio_cache_hit": WorkflowStep.AUDIO_GENERATION, + "audio_succeeded": WorkflowStep.AUDIO_GENERATION, + "audio_failed": WorkflowStep.AUDIO_GENERATION, + "asset_retry_started": WorkflowStep.ASSET_RETRY, + "asset_retry_completed": WorkflowStep.ASSET_RETRY, + "asset_generation_completed": WorkflowStep.ASSET_GENERATION, + "postprocessing_queued": WorkflowStep.POSTPROCESSING, + "generation_completed": WorkflowStep.COMPLETION, + "generation_failed": WorkflowStep.COMPLETION, + "generation_canceled": WorkflowStep.CANCELLATION, + "cancel_requested": WorkflowStep.CANCELLATION, + "generation_stale_failed": WorkflowStep.STALE_RECOVERY, +} + +EVENT_ARTIFACT_MAP: dict[str, ArtifactKind] = { + "narrative_generated": ArtifactKind.STORY_TEXT, + "quality_gate_failed": ArtifactKind.STORY_TEXT, + "cover_image_started": ArtifactKind.COVER_IMAGE, + "cover_image_succeeded": ArtifactKind.COVER_IMAGE, + "cover_image_failed": ArtifactKind.COVER_IMAGE, + "storybook_images_started": ArtifactKind.IMAGE, + "storybook_cover_image_succeeded": ArtifactKind.COVER_IMAGE, + "storybook_cover_image_failed": ArtifactKind.COVER_IMAGE, + "storybook_page_image_succeeded": ArtifactKind.PAGE_IMAGE, + "storybook_page_image_failed": ArtifactKind.PAGE_IMAGE, + "storybook_images_completed": ArtifactKind.IMAGE, + "audio_started": ArtifactKind.AUDIO, + "audio_cache_hit": ArtifactKind.AUDIO, + "audio_succeeded": ArtifactKind.AUDIO, + "audio_failed": ArtifactKind.AUDIO, + "postprocessing_queued": ArtifactKind.ACHIEVEMENT_MEMORY, +} + + +def step_for_event(event_type: str) -> WorkflowStep: + """Return the standard workflow step for a persisted event type.""" + + return EVENT_STEP_MAP.get(event_type, WorkflowStep.UNKNOWN) + + +def artifact_for_event(event_type: str) -> ArtifactKind: + """Return the standard artifact for a persisted event type.""" + + return EVENT_ARTIFACT_MAP.get(event_type, ArtifactKind.NONE) + + +def normalize_trace_metadata( + event_type: str, + metadata: dict[str, Any] | None = None, + *, + step: WorkflowStep | str | None = None, + artifact: ArtifactKind | str | None = None, + failure_category: FailureCategory | str | None = None, + retryable: bool | None = None, + blocks_main_result: bool | None = None, +) -> dict[str, Any]: + """Merge legacy metadata with standard harness trace fields.""" + + normalized: dict[str, Any] = dict(metadata or {}) + + resolved_step = str(step or normalized.get("step") or step_for_event(event_type)) + resolved_artifact = str( + artifact or normalized.get("artifact") or artifact_for_event(event_type) + ) + + normalized["step"] = resolved_step + normalized["artifact"] = resolved_artifact + + if failure_category is not None: + normalized["failure_category"] = str(failure_category) + elif "failure_category" not in normalized: + normalized["failure_category"] = None + + if retryable is not None: + normalized["retryable"] = retryable + elif "retryable" not in normalized: + normalized["retryable"] = False + + if blocks_main_result is not None: + normalized["blocks_main_result"] = blocks_main_result + elif "blocks_main_result" not in normalized: + normalized["blocks_main_result"] = False + + return normalized diff --git a/backend/app/services/provider_router.py b/backend/app/services/provider_router.py index 6a529aa..a548fae 100644 --- a/backend/app/services/provider_router.py +++ b/backend/app/services/provider_router.py @@ -10,7 +10,7 @@ from app.core.logging import get_logger from app.services.adapters import AdapterConfig, AdapterRegistry from app.services.adapters.text.models import StoryOutput from app.services.cost_tracker import cost_tracker -from app.services.generation_jobs import record_generation_event +from app.services.harness.trace import TraceRecorder from app.services.provider_cache import get_providers from app.services.provider_metrics import health_checker, metrics_collector from app.services.provider_policy import ( @@ -67,8 +67,7 @@ async def _record_provider_event_if_present( if db is None or job is None: return - await record_generation_event( - db, + await TraceRecorder(db).record_step( job=job, story_id=story_id, event_type=event_type, diff --git a/backend/app/services/story_service.py b/backend/app/services/story_service.py index 73bdf94..6869b91 100644 --- a/backend/app/services/story_service.py +++ b/backend/app/services/story_service.py @@ -1,9 +1,7 @@ """Story business logic service.""" import asyncio -from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from typing import Literal from fastapi import HTTPException from sqlalchemy import desc, select @@ -42,6 +40,29 @@ from app.services.generation_jobs import ( get_generation_job_for_user, record_generation_event, ) +from app.services.harness.artifacts import ( + AssetCompletionResult, + asset_result_metadata, +) +from app.services.harness.asset_workflows import ( + build_storybook_error_message, + complete_audio_asset, + complete_cover_image_asset, + complete_storybook_image_assets, + get_storybook_pages_data, + read_cached_audio_asset, + resolve_storybook_image_status, +) +from app.services.harness.control import ( + ExecutionControl, + GenerationJobCanceledError, +) +from app.services.harness.quality_gates import ( + QualityGateError, + validate_story_output, + validate_storybook_output, +) +from app.services.harness.trace import TraceRecorder from app.services.memory_service import build_enhanced_memory_context from app.services.provider_router import ( generate_image, @@ -56,29 +77,6 @@ from app.tasks.achievements import extract_story_achievements logger = get_logger(__name__) -AssetCompletionKind = Literal["cover_image", "storybook_images", "audio"] - - -@dataclass(frozen=True) -class AssetCompletionResult: - """Service-level result for a generated asset completion attempt.""" - - asset: AssetCompletionKind - status: StoryAssetStatus - value: str | bytes | None = None - error: str | None = None - blocks_main_result: bool = False - - @property - def succeeded(self) -> bool: - """Whether the asset reached a usable ready state.""" - - return self.status == StoryAssetStatus.READY and self.error is None - - -class GenerationJobCanceledError(Exception): - """Raised when a running worker job has been canceled by the user.""" - async def _record_job_event_if_present( db: AsyncSession, @@ -92,11 +90,7 @@ async def _record_job_event_if_present( ) -> None: """Append a workflow event when the caller is running under a tracked job.""" - if job is None: - return - - await record_generation_event( - db, + await TraceRecorder(db).record_step( job=job, story_id=story_id, event_type=event_type, @@ -114,34 +108,31 @@ async def _stop_if_job_cancel_requested( ) -> None: """Stop a worker-owned job at the next safe checkpoint after cancellation.""" - if job is None: - return + await ExecutionControl(db).stop_if_cancel_requested(job=job, story=story) - await db.refresh(job) - if job.current_step != "cancel_requested": - return - await finish_generation_job( +async def _record_quality_gate_failure_if_present( + db: AsyncSession, + *, + job, + error: QualityGateError, +) -> None: + """Append a quality gate failure event for tracked worker jobs.""" + + await _record_job_event_if_present( db, job=job, - story=story, - status="canceled", - current_step="generation_canceled", - error_message="Generation canceled by user.", - message="Generation job was canceled after a user request.", + event_type="quality_gate_failed", + status="failed", + message="Generated content failed deterministic quality gates.", + metadata=error.to_metadata(), ) - raise GenerationJobCanceledError() def _asset_result_metadata(result: AssetCompletionResult) -> dict: """Build JSON-safe metadata for asset workflow events.""" - return { - "asset": result.asset, - "status": result.status.value, - "error": result.error, - "blocks_main_result": result.blocks_main_result, - } + return asset_result_metadata(result) def _build_storybook_error_message( @@ -151,13 +142,10 @@ def _build_storybook_error_message( ) -> str | None: """Summarize storybook image generation errors for the latest attempt.""" - parts: list[str] = [] - if cover_failed: - parts.append("封面生成失败") - if failed_pages: - pages = "、".join(str(page) for page in sorted(failed_pages)) - parts.append(f"第 {pages} 页插图生成失败") - return ";".join(parts) if parts else None + return build_storybook_error_message( + cover_failed=cover_failed, + failed_pages=failed_pages, + ) def _resolve_storybook_image_status( @@ -169,31 +157,12 @@ def _resolve_storybook_image_status( ) -> StoryAssetStatus: """Resolve the persisted image status for a storybook.""" - if not generate_images: - return StoryAssetStatus.NOT_REQUESTED - - expected_assets = 0 - ready_assets = 0 - - if cover_prompt or cover_url: - expected_assets += 1 - if cover_url: - ready_assets += 1 - - for page in pages_data: - if not page.get("image_prompt") and not page.get("image_url"): - continue - expected_assets += 1 - if page.get("image_url"): - ready_assets += 1 - - if expected_assets == 0: - return StoryAssetStatus.NOT_REQUESTED - - if ready_assets == expected_assets: - return StoryAssetStatus.READY - - return StoryAssetStatus.FAILED + return resolve_storybook_image_status( + generate_images=generate_images, + cover_prompt=cover_prompt, + cover_url=cover_url, + pages_data=pages_data, + ) async def _prepare_generation_context( @@ -539,93 +508,21 @@ async def _complete_cover_image_asset( ) -> AssetCompletionResult: """Generate or retry a text story cover through one asset workflow.""" - if not story.cover_prompt: - raise HTTPException(status_code=400, detail="Story has no cover prompt") - - sync_story_status(story, image_status=StoryAssetStatus.GENERATING) - await db.commit() - await _stop_if_job_cancel_requested(db, job=job, story=story) - await _record_job_event_if_present( + return await complete_cover_image_asset( + story, db, + generate_image_func=generate_image, + raise_on_failure=raise_on_failure, + last_error_prefix=last_error_prefix, + log_event=log_event, job=job, - story_id=story.id, - event_type="cover_image_started", - status="running", - message="Cover image generation started.", - metadata={"asset": "image", "cover_prompt_present": True}, ) - try: - await _stop_if_job_cancel_requested(db, job=job, story=story) - image_url = await generate_image( - story.cover_prompt, - db=db, - user_id=story.user_id, - generation_job=job, - story_id=story.id, - ) - story.image_url = image_url - sync_story_status(story, image_status=StoryAssetStatus.READY) - await db.commit() - result = AssetCompletionResult( - asset="cover_image", - status=StoryAssetStatus.READY, - value=image_url, - blocks_main_result=raise_on_failure, - ) - await _record_job_event_if_present( - db, - job=job, - story_id=story.id, - event_type="cover_image_succeeded", - status="succeeded", - message="Cover image was generated.", - metadata=_asset_result_metadata(result), - ) - return result - except Exception as exc: - provider_error = str(exc) - last_error = ( - f"{last_error_prefix}: {provider_error}" - if last_error_prefix - else provider_error - ) - sync_story_status( - story, - image_status=StoryAssetStatus.FAILED, - last_error=last_error, - ) - await db.commit() - logger.warning(log_event, story_id=story.id, error=provider_error) - - result = AssetCompletionResult( - asset="cover_image", - status=StoryAssetStatus.FAILED, - error=provider_error, - blocks_main_result=raise_on_failure, - ) - await _record_job_event_if_present( - db, - job=job, - story_id=story.id, - event_type="cover_image_failed", - status="failed", - message="Cover image generation failed.", - metadata=_asset_result_metadata(result), - ) - if raise_on_failure: - raise HTTPException( - status_code=500, - detail=f"Image generation failed: {provider_error}", - ) from exc - - return result - def _get_storybook_pages_data(story: Story) -> list[dict]: """Return mutable storybook page data from the persisted JSON field.""" - return [dict(page) for page in story.pages or [] if isinstance(page, dict)] + return get_storybook_pages_data(story) async def _complete_storybook_image_assets( @@ -636,176 +533,23 @@ async def _complete_storybook_image_assets( ) -> AssetCompletionResult: """Complete missing cover/page images for a persisted storybook.""" - pages_data = _get_storybook_pages_data(story) - has_image_prompt = bool(story.cover_prompt) or any( - page.get("image_prompt") for page in pages_data - ) - if not has_image_prompt: - raise HTTPException(status_code=400, detail="Storybook has no image prompts") - - sync_story_status(story, image_status=StoryAssetStatus.GENERATING) - await db.commit() - await _stop_if_job_cancel_requested(db, job=job, story=story) - await _record_job_event_if_present( - db, - job=job, - story_id=story.id, - event_type="storybook_images_started", - status="running", - message="Storybook missing image completion started.", - metadata={"asset": "image"}, - ) - - cover_failed = False - failed_pages: list[int] = [] - completed_pages: list[int] = [] - - if story.cover_prompt and not story.image_url: - await _stop_if_job_cancel_requested(db, job=job, story=story) - try: - story.image_url = await generate_image( - story.cover_prompt, - db=db, - user_id=story.user_id, - generation_job=job, - story_id=story.id, - ) - await _record_job_event_if_present( - db, - job=job, - story_id=story.id, - event_type="storybook_cover_image_succeeded", - status="succeeded", - message="Storybook cover image was generated.", - metadata={"asset": "image", "scope": "cover"}, - ) - except Exception as exc: - cover_failed = True - logger.warning( - "storybook_cover_asset_completion_failed", - story_id=story.id, - error=str(exc), - ) - await _record_job_event_if_present( - db, - job=job, - story_id=story.id, - event_type="storybook_cover_image_failed", - status="failed", - message="Storybook cover image generation failed.", - metadata={"asset": "image", "scope": "cover", "error": str(exc)}, - ) - - for page in pages_data: - if not page.get("image_prompt") or page.get("image_url"): - continue - - await _stop_if_job_cancel_requested(db, job=job, story=story) - try: - page["image_url"] = await generate_image( - page["image_prompt"], - db=db, - user_id=story.user_id, - generation_job=job, - story_id=story.id, - ) - page_number = page.get("page_number") - if isinstance(page_number, int): - completed_pages.append(page_number) - await _record_job_event_if_present( - db, - job=job, - story_id=story.id, - event_type="storybook_page_image_succeeded", - status="succeeded", - message="Storybook page image was generated.", - metadata={"asset": "image", "scope": "page", "page_number": page_number}, - ) - except Exception as exc: - page_number = page.get("page_number") - if isinstance(page_number, int): - failed_pages.append(page_number) - logger.warning( - "storybook_page_asset_completion_failed", - story_id=story.id, - page=page_number, - error=str(exc), - ) - await _record_job_event_if_present( - db, - job=job, - story_id=story.id, - event_type="storybook_page_image_failed", - status="failed", - message="Storybook page image generation failed.", - metadata={ - "asset": "image", - "scope": "page", - "page_number": page_number, - "error": str(exc), - }, - ) - - story.pages = pages_data - error_message = _build_storybook_error_message( - cover_failed=cover_failed, - failed_pages=failed_pages, - ) - image_status = _resolve_storybook_image_status( - generate_images=True, - cover_prompt=story.cover_prompt, - cover_url=story.image_url, - pages_data=pages_data, - ) - sync_story_status( + return await complete_storybook_image_assets( story, - image_status=image_status, - last_error=error_message, - ) - await db.commit() - result = AssetCompletionResult( - asset="storybook_images", - status=image_status, - value=story.image_url, - error=error_message, - ) - await _record_job_event_if_present( db, + generate_image_func=generate_image, job=job, - story_id=story.id, - event_type="storybook_images_completed", - status="failed" if error_message else "succeeded", - message="Storybook image completion finished.", - metadata={ - **_asset_result_metadata(result), - "completed_pages": sorted(completed_pages), - "failed_pages": sorted(failed_pages), - }, ) - return result async def _read_cached_audio_asset(story: Story, db: AsyncSession) -> bytes | None: """Read cached audio or repair stale audio cache metadata.""" - if story.audio_path and audio_cache_exists(story.audio_path): - if story.audio_status != StoryAssetStatus.READY.value: - sync_story_status(story, audio_status=StoryAssetStatus.READY) - await db.commit() - return read_audio_cache(story.audio_path) - - if story.audio_path and not audio_cache_exists(story.audio_path): - logger.warning( - "story_audio_cache_missing", - story_id=story.id, - audio_path=story.audio_path, - ) - story.audio_path = None - if story.audio_status == StoryAssetStatus.READY.value: - sync_story_status(story, audio_status=StoryAssetStatus.NOT_REQUESTED) - await db.commit() - - return None + return await read_cached_audio_asset( + story, + db, + audio_cache_exists_func=audio_cache_exists, + read_audio_cache_func=read_audio_cache, + ) async def _complete_audio_asset( @@ -817,107 +561,18 @@ async def _complete_audio_asset( ) -> AssetCompletionResult: """Complete TTS audio generation through one asset workflow.""" - if not story.story_text: - raise HTTPException(status_code=400, detail="Story has no text") - - cached_audio = await _read_cached_audio_asset(story, db) - if cached_audio is not None: - result = AssetCompletionResult( - asset="audio", - status=StoryAssetStatus.READY, - value=cached_audio, - blocks_main_result=raise_on_failure, - ) - await _record_job_event_if_present( - db, - job=job, - story_id=story.id, - event_type="audio_cache_hit", - status="succeeded", - message="Cached story audio was reused.", - metadata=_asset_result_metadata(result), - ) - return result - from app.services.provider_router import text_to_speech - sync_story_status(story, audio_status=StoryAssetStatus.GENERATING) - await db.commit() - await _stop_if_job_cancel_requested(db, job=job, story=story) - await _record_job_event_if_present( + return await complete_audio_asset( + story, db, + text_to_speech_func=text_to_speech, + audio_cache_exists_func=audio_cache_exists, + read_audio_cache_func=read_audio_cache, + write_story_audio_cache_func=write_story_audio_cache, + raise_on_failure=raise_on_failure, job=job, - story_id=story.id, - event_type="audio_started", - status="running", - message="Story audio generation started.", - metadata={"asset": "audio"}, ) - - try: - await _stop_if_job_cancel_requested(db, job=job, story=story) - audio_data = await text_to_speech( - story.story_text, - db=db, - user_id=story.user_id, - generation_job=job, - story_id=story.id, - ) - story.audio_path = write_story_audio_cache(story.id, audio_data) - sync_story_status( - story, - audio_status=StoryAssetStatus.READY, - ) - await db.commit() - result = AssetCompletionResult( - asset="audio", - status=StoryAssetStatus.READY, - value=audio_data, - blocks_main_result=raise_on_failure, - ) - await _record_job_event_if_present( - db, - job=job, - story_id=story.id, - event_type="audio_succeeded", - status="succeeded", - message="Story audio was generated and cached.", - metadata=_asset_result_metadata(result), - ) - return result - except Exception as exc: - provider_error = str(exc) - story.audio_path = None - sync_story_status( - story, - audio_status=StoryAssetStatus.FAILED, - last_error=provider_error, - ) - await db.commit() - logger.error("audio_generation_failed", story_id=story.id, error=provider_error) - - result = AssetCompletionResult( - asset="audio", - status=StoryAssetStatus.FAILED, - error=provider_error, - blocks_main_result=raise_on_failure, - ) - await _record_job_event_if_present( - db, - job=job, - story_id=story.id, - event_type="audio_failed", - status="failed", - message="Story audio generation failed.", - metadata=_asset_result_metadata(result), - ) - if raise_on_failure: - raise HTTPException( - status_code=500, - detail=f"Audio generation failed: {provider_error}", - ) from exc - - return result async def validate_profile_and_universe( @@ -988,6 +643,13 @@ async def generate_and_save_story( user_id=user_id, generation_job=job, ) + validate_story_output(result) + except QualityGateError as exc: + await _record_quality_gate_failure_if_present(db, job=job, error=exc) + raise HTTPException( + status_code=502, + detail="Story generation failed quality checks, please try again.", + ) from exc except Exception as exc: raise HTTPException( status_code=502, @@ -1096,6 +758,10 @@ async def generate_storybook_service( user_id=user_id, generation_job=job, ) + validate_storybook_output(storybook) + except QualityGateError as exc: + await _record_quality_gate_failure_if_present(db, job=job, error=exc) + raise HTTPException(status_code=500, detail=f"故事书质量检查失败: {exc}") from exc except Exception as e: logger.error("storybook_generation_failed", error=str(e)) raise HTTPException(status_code=500, detail=f"故事书生成失败: {e}") diff --git a/backend/tests/test_generation_jobs.py b/backend/tests/test_generation_jobs.py index 8d2631a..8b4c311 100644 --- a/backend/tests/test_generation_jobs.py +++ b/backend/tests/test_generation_jobs.py @@ -165,6 +165,70 @@ async def test_unified_generation_is_queued_then_worker_persists_story_and_event app.dependency_overrides.clear() +async def test_generation_worker_records_quality_gate_failure_without_persisting_story( + db_session, + test_user, +): + invalid_output = StoryOutput( + mode="generated", + title="空白故事", + story_text="", + cover_prompt_suggestion="A blank cover", + ) + job = await create_generation_job( + db_session, + user_id=test_user.id, + output_mode="story", + input_type="keywords", + request_payload={ + "output_mode": "story", + "type": "keywords", + "data": "小兔子", + "generate_images": False, + }, + ) + + with patch( + "app.services.story_service.generate_story_content", + new_callable=AsyncMock, + ) as mock_generate_story_content: + mock_generate_story_content.return_value = invalid_output + + with pytest.raises(Exception): + await run_generation_job_service(job.id, db_session) + + refreshed_job = ( + await db_session.execute(select(GenerationJob).where(GenerationJob.id == job.id)) + ).scalar_one() + assert refreshed_job.status == "failed" + assert refreshed_job.story_id is None + assert refreshed_job.current_step == "generation_failed" + assert "quality checks" in refreshed_job.error_message + + stories = ( + await db_session.execute(select(Story).where(Story.user_id == test_user.id)) + ).scalars().all() + assert stories == [] + + events = ( + await db_session.execute( + select(GenerationJobEvent) + .where(GenerationJobEvent.job_id == job.id) + .order_by(GenerationJobEvent.id) + ) + ).scalars().all() + assert [event.event_type for event in events] == [ + "request_accepted", + "worker_started", + "context_prepared", + "quality_gate_failed", + "generation_failed", + ] + quality_event = events[3] + assert quality_event.event_metadata["step"] == "narrative_generation" + assert quality_event.event_metadata["issues"][0]["code"] == "missing_story_text" + + async def test_asset_retry_records_job_events_and_updates_retryable_assets( db_session, test_user, diff --git a/backend/tests/test_harness_runtime.py b/backend/tests/test_harness_runtime.py new file mode 100644 index 0000000..a58ddaa --- /dev/null +++ b/backend/tests/test_harness_runtime.py @@ -0,0 +1,374 @@ +"""Tests for generation harness runtime support.""" + +import pytest +from sqlalchemy import select + +from app.db.models import GenerationJob, GenerationJobEvent +from app.services.adapters.storybook.primary import Storybook, StorybookPage +from app.services.adapters.text.models import StoryOutput +from app.services.generation_jobs import create_generation_job, record_generation_event +from app.services.harness.control import ExecutionControl, GenerationJobCanceledError +from app.services.harness.plans import ( + build_asset_plan, + build_story_plan, + build_storybook_plan, +) +from app.services.harness.quality_gates import ( + QualityGateError, + validate_story_output, + validate_storybook_output, +) +from app.services.harness.trace import TraceRecorder +from app.services.harness.types import ( + ArtifactKind, + FailureCategory, + WorkflowStep, + artifact_for_event, + normalize_trace_metadata, + step_for_event, +) + + +def test_event_type_maps_to_standard_workflow_step(): + assert step_for_event("request_accepted") == WorkflowStep.REQUEST_ACCEPTANCE + assert step_for_event("context_prepared") == WorkflowStep.CONTEXT_PREPARATION + assert step_for_event("narrative_generated") == WorkflowStep.NARRATIVE_GENERATION + assert step_for_event("story_saved") == WorkflowStep.STORY_PERSISTENCE + assert step_for_event("provider_call_succeeded") == WorkflowStep.PROVIDER_INVOCATION + assert step_for_event("quality_gate_failed") == WorkflowStep.NARRATIVE_GENERATION + assert step_for_event("cover_image_failed") == WorkflowStep.IMAGE_GENERATION + assert step_for_event("audio_succeeded") == WorkflowStep.AUDIO_GENERATION + assert step_for_event("generation_canceled") == WorkflowStep.CANCELLATION + assert step_for_event("generation_stale_failed") == WorkflowStep.STALE_RECOVERY + assert step_for_event("future_event") == WorkflowStep.UNKNOWN + + +def test_event_type_maps_to_standard_artifact(): + assert artifact_for_event("narrative_generated") == ArtifactKind.STORY_TEXT + assert artifact_for_event("quality_gate_failed") == ArtifactKind.STORY_TEXT + assert artifact_for_event("cover_image_succeeded") == ArtifactKind.COVER_IMAGE + assert artifact_for_event("storybook_page_image_failed") == ArtifactKind.PAGE_IMAGE + assert artifact_for_event("audio_cache_hit") == ArtifactKind.AUDIO + assert artifact_for_event("postprocessing_queued") == ArtifactKind.ACHIEVEMENT_MEMORY + assert artifact_for_event("request_accepted") == ArtifactKind.NONE + + +def test_trace_metadata_adds_standard_fields_without_dropping_legacy_values(): + metadata = normalize_trace_metadata( + "provider_call_failed", + { + "capability": "text", + "adapter": "demo", + "error": "timeout", + }, + failure_category=FailureCategory.TIMEOUT, + retryable=True, + ) + + assert metadata["capability"] == "text" + assert metadata["adapter"] == "demo" + assert metadata["error"] == "timeout" + assert metadata["step"] == "provider_invocation" + assert metadata["artifact"] == "none" + assert metadata["failure_category"] == "timeout" + assert metadata["retryable"] is True + assert metadata["blocks_main_result"] is False + + +def test_trace_metadata_respects_explicit_step_and_artifact(): + metadata = normalize_trace_metadata( + "narrative_generated", + {"title": "小兔子的冒险"}, + step=WorkflowStep.NARRATIVE_GENERATION, + artifact=ArtifactKind.STORYBOOK_PAGES, + blocks_main_result=True, + ) + + assert metadata["title"] == "小兔子的冒险" + assert metadata["step"] == "narrative_generation" + assert metadata["artifact"] == "storybook_pages" + assert metadata["blocks_main_result"] is True + + +def test_story_plan_without_assets_snapshot(): + assert build_story_plan(generate_images=False).to_snapshot() == { + "mode": "story", + "tasks": [ + { + "key": "prepare_context", + "step": "context_preparation", + "artifact": "none", + "required": True, + "recoverable": False, + }, + { + "key": "generate_narrative", + "step": "narrative_generation", + "artifact": "story_text", + "required": True, + "recoverable": False, + }, + { + "key": "persist_story", + "step": "story_persistence", + "artifact": "story_text", + "required": True, + "recoverable": False, + }, + { + "key": "queue_postprocessing", + "step": "postprocessing", + "artifact": "achievement_memory", + "required": False, + "recoverable": True, + }, + { + "key": "complete_generation", + "step": "completion", + "artifact": "none", + "required": True, + "recoverable": False, + }, + ], + } + + +def test_story_plan_with_assets_marks_cover_recoverable(): + plan = build_story_plan(generate_images=True).to_snapshot() + + assert plan["mode"] == "story_with_assets" + assert plan["tasks"][3] == { + "key": "generate_cover_image", + "step": "image_generation", + "artifact": "cover_image", + "required": False, + "recoverable": True, + } + + +def test_storybook_plan_with_images_marks_storybook_images_recoverable(): + plan = build_storybook_plan(generate_images=True).to_snapshot() + + assert plan["mode"] == "storybook" + assert [task["key"] for task in plan["tasks"]] == [ + "prepare_context", + "generate_storybook_pages", + "generate_storybook_images", + "persist_storybook", + "queue_postprocessing", + "complete_generation", + ] + assert plan["tasks"][2]["artifact"] == "image" + assert plan["tasks"][2]["recoverable"] is True + + +def test_asset_retry_plan_deduplicates_assets(): + plan = build_asset_plan(output_mode="asset_retry", assets=["image", "audio", "image"]) + + assert plan.to_snapshot() == { + "mode": "asset_retry", + "tasks": [ + { + "key": "start_asset_retry", + "step": "asset_retry", + "artifact": "none", + "required": True, + "recoverable": False, + }, + { + "key": "complete_image_asset", + "step": "image_generation", + "artifact": "image", + "required": False, + "recoverable": True, + }, + { + "key": "complete_audio_asset", + "step": "audio_generation", + "artifact": "audio", + "required": False, + "recoverable": True, + }, + { + "key": "complete_asset_retry", + "step": "asset_retry", + "artifact": "none", + "required": True, + "recoverable": False, + }, + ], + } + + +def test_story_quality_gate_accepts_complete_child_safe_story(): + validate_story_output( + StoryOutput( + mode="generated", + title="小兔子的月光花园", + story_text="小兔子在花园里学会了和朋友轮流分享水壶。", + cover_prompt_suggestion="A gentle moonlit garden with a rabbit", + ) + ) + + +def test_story_quality_gate_rejects_missing_story_text(): + output = StoryOutput( + mode="generated", + title="空白故事", + story_text="", + cover_prompt_suggestion="A cover", + ) + + try: + validate_story_output(output) + except QualityGateError as exc: + assert [issue.code.value for issue in exc.issues] == ["missing_story_text"] + assert exc.to_metadata()["issues"][0]["field"] == "story_text" + else: + raise AssertionError("Expected QualityGateError") + + +def test_story_quality_gate_rejects_obviously_unsafe_child_content(): + output = StoryOutput( + mode="generated", + title="危险词测试", + story_text="这个故事包含血腥场景。", + cover_prompt_suggestion="A cover", + ) + + try: + validate_story_output(output) + except QualityGateError as exc: + assert [issue.code.value for issue in exc.issues] == ["unsafe_child_content"] + assert exc.to_metadata()["issues"][0]["failure_category"] == "safety_error" + else: + raise AssertionError("Expected QualityGateError") + + +def test_storybook_quality_gate_rejects_duplicate_page_number(): + storybook = Storybook( + title="森林绘本", + main_character="小兔子", + art_style="水彩", + cover_prompt="A forest cover", + pages=[ + StorybookPage(page_number=1, text="第一页。", image_prompt="page 1"), + StorybookPage(page_number=1, text="第二页。", image_prompt="page 2"), + ], + ) + + try: + validate_storybook_output(storybook) + except QualityGateError as exc: + assert [issue.code.value for issue in exc.issues] == [ + "invalid_storybook_page_number" + ] + assert exc.to_metadata()["issues"][0]["field"] == "pages[1].page_number" + else: + raise AssertionError("Expected QualityGateError") + + +@pytest.mark.asyncio +async def test_trace_recorder_persists_standard_metadata(db_session, test_user): + job = await create_generation_job( + db_session, + user_id=test_user.id, + output_mode="story", + input_type="keywords", + request_payload={"data": "小兔子"}, + ) + + event = await TraceRecorder(db_session).record_step( + job=job, + event_type="provider_call_failed", + status="failed", + metadata={ + "capability": "text", + "adapter": "demo", + "error": "timeout", + }, + failure_category=FailureCategory.TIMEOUT, + retryable=True, + ) + + assert event is not None + events = ( + await db_session.execute( + select(GenerationJobEvent) + .where(GenerationJobEvent.job_id == job.id) + .order_by(GenerationJobEvent.id) + ) + ).scalars().all() + + assert [item.event_type for item in events] == [ + "request_accepted", + "provider_call_failed", + ] + metadata = events[1].event_metadata + assert metadata["capability"] == "text" + assert metadata["adapter"] == "demo" + assert metadata["step"] == "provider_invocation" + assert metadata["artifact"] == "none" + assert metadata["failure_category"] == "timeout" + assert metadata["retryable"] is True + + +@pytest.mark.asyncio +async def test_trace_recorder_ignores_missing_job(db_session): + event = await TraceRecorder(db_session).record_step( + job=None, + event_type="context_prepared", + status="succeeded", + ) + + assert event is None + + +@pytest.mark.asyncio +async def test_execution_control_cancels_job_at_safe_checkpoint( + db_session, + test_user, + test_story, +): + job = await create_generation_job( + db_session, + user_id=test_user.id, + output_mode="story", + input_type="keywords", + request_payload={"data": "小兔子"}, + story_id=test_story.id, + ) + await record_generation_event( + db_session, + job=job, + story_id=test_story.id, + event_type="cancel_requested", + status="running", + message="Cancellation requested.", + ) + + with pytest.raises(GenerationJobCanceledError): + await ExecutionControl(db_session).stop_if_cancel_requested( + job=job, + story=test_story, + ) + + refreshed_job = ( + await db_session.execute(select(GenerationJob).where(GenerationJob.id == job.id)) + ).scalar_one() + assert refreshed_job.status == "canceled" + assert refreshed_job.current_step == "generation_canceled" + assert refreshed_job.error_message == "Generation canceled by user." + + events = ( + await db_session.execute( + select(GenerationJobEvent) + .where(GenerationJobEvent.job_id == job.id) + .order_by(GenerationJobEvent.id) + ) + ).scalars().all() + assert [item.event_type for item in events] == [ + "request_accepted", + "cancel_requested", + "generation_canceled", + ] diff --git a/docs/planning/harness-stage-0-report.md b/docs/planning/harness-stage-0-report.md new file mode 100644 index 0000000..c4801cb --- /dev/null +++ b/docs/planning/harness-stage-0-report.md @@ -0,0 +1,111 @@ +# Harness Engineering 改造阶段 0 报告 + +**阶段**: 0 - 设计与基线 +**日期**: 2026-06-21 +**状态**: 已完成 +**范围**: 技术设计、阶段拆解、最小任务、验收标准 + +--- + +## 1. 本阶段目标 + +本阶段不修改业务代码,目标是先建立 harness engineering 改造的执行基线: + +- 明确这次改造不是引入外部工作流引擎,也不是重写项目。 +- 确认现有统一生成工作流的能力边界。 +- 设计 Generation Harness Runtime 的目标架构。 +- 把后续工作拆成可执行、可验证、可报告的阶段。 + +## 2. 已完成工作 + +- 阅读并对齐现有统一生成 PRD:`docs/product/unified-generation-workflow-prd.md` +- 阅读现有架构说明:`docs/technical/architecture.md` +- 阅读现有 job/event 说明:`docs/technical/generation-job-state.md` +- 阅读 Provider 路由说明:`docs/technical/provider-routing.md` +- 检查当前生成链路实现: + - `backend/app/services/story_service.py` + - `backend/app/services/generation_jobs.py` + - `backend/app/services/provider_router.py` + - `backend/app/tasks/generation_workflow.py` +- 检查当前关键测试: + - `backend/tests/test_generation_jobs.py` + - `backend/tests/test_stories.py` +- 新增技术设计文档: + - `docs/technical/harness-engineering-modernization.md` + +## 3. 核心结论 + +DreamWeaver 已经具备 harness engineering 的雏形,不需要从零开始。 + +当前最有价值的改造路径是: + +1. 先抽出 harness 基础类型、trace recorder 和 execution control。 +2. 再拆资产工作流。 +3. 然后引入显式 workflow plan。 +4. 最后补 quality gates、trace analytics 和前端增量展示。 + +第一阶段应避免大改数据库、API 和前端,先保证内部边界更清楚,并让现有测试继续通过。 + +## 4. 发现的现状问题 + +| 问题 | 影响 | 后续阶段 | +| --- | --- | --- | +| `story_service` 同时负责业务流程、事件记录、取消检查、资产补全和响应构造 | 文件职责偏重,后续扩展容易继续堆叠 | 阶段 1、2、3 | +| event type 已经丰富,但缺少标准 step/artifact/failure category | 可观测性有数据,但分析语义还不稳定 | 阶段 1、5 | +| Provider trace 已落库,但还没有纳入统一 runtime 语义 | 调用层和产品步骤之间缺少统一映射 | 阶段 1、5 | +| 输出质量主要依赖 adapter 和 schema | 儿童内容质量、结构完整性和安全门还不够显式 | 阶段 4 | +| 资产工作流 helper 已抽出一部分,但仍在 `story_service` 内 | 重试、后台补全、同步兼容路径仍有重复风险 | 阶段 2 | + +## 5. 阶段 1 入口标准 + +可以进入阶段 1,入口条件已满足: + +- 技术设计已存在。 +- 最小任务已经拆解。 +- 阶段 1 不需要产品澄清。 +- 阶段 1 不需要数据库迁移。 +- 阶段 1 有明确验证命令。 + +阶段 1 第一批任务: + +| ID | 任务 | +| --- | --- | +| H1-1 | 新增 `app/services/harness/__init__.py` | +| H1-2 | 新增 `types.py` 枚举和 event type 映射 | +| H1-3 | 新增 `trace.py` 封装 job event 写入 | +| H1-4 | 新增 `control.py` 封装取消检查 | +| H1-5 | 替换 `story_service` 内部 helper 实现 | +| H1-6 | 补 `tests/test_harness_runtime.py` | + +## 6. 验证 + +本阶段为文档阶段,验证方式是文档审查和路径确认。 + +已确认: + +- 设计文档放在 `docs/technical/` +- 阶段报告放在 `docs/planning/` +- 后续阶段有明确测试命令 +- 改造策略与现有统一生成 PRD 不冲突 + +## 7. 风险 + +| 风险 | 等级 | 处理 | +| --- | --- | --- | +| 过早拆 workflow 导致行为回归 | 高 | 阶段 1 不拆主流程,只抽基础支撑件 | +| metadata 标准化影响前端 | 中 | 只新增字段,不删除旧字段 | +| 文档太大但实现不跟进 | 中 | 每个阶段都产出报告并更新状态 | + +## 8. 下一步 + +进入阶段 1:Harness 基础类型与事件封装。 + +优先顺序: + +1. 新增 harness 包和纯类型定义。 +2. 增加单测锁定 event type 到 step 的映射。 +3. 新增 trace recorder,保持旧事件行为。 +4. 新增 execution control,保持取消行为。 +5. 替换 `story_service` 内部 helper 为代理调用。 +6. 运行阶段 1 验证命令并产出阶段 1 报告。 + diff --git a/docs/planning/harness-stage-1-report.md b/docs/planning/harness-stage-1-report.md new file mode 100644 index 0000000..86bbcab --- /dev/null +++ b/docs/planning/harness-stage-1-report.md @@ -0,0 +1,122 @@ +# Harness Engineering 改造阶段 1 报告 + +**阶段**: 1 - Harness 基础类型与事件封装 +**日期**: 2026-06-21 +**状态**: 已完成 +**范围**: 后端 harness 包、标准类型、trace recorder、execution control、定向测试 + +--- + +## 1. 本阶段目标 + +本阶段目标是先建立 Generation Harness Runtime 的最低可用支撑件,不重排主生成流程,不修改数据库结构,不破坏现有 API。 + +目标包括: + +- 新增标准 workflow step、artifact、failure category 类型。 +- 将现有 event type 映射到标准 step/artifact。 +- 封装 job event 写入,统一补齐标准 trace metadata。 +- 封装取消检查,保留当前安全检查点语义。 +- 增加单元测试,确保新支撑件可独立验证。 + +## 2. 已完成工作 + +### 新增文件 + +- `backend/app/services/harness/__init__.py` +- `backend/app/services/harness/types.py` +- `backend/app/services/harness/trace.py` +- `backend/app/services/harness/control.py` +- `backend/tests/test_harness_runtime.py` + +### 修改文件 + +- `backend/app/services/story_service.py` + - 保留 `_record_job_event_if_present` 和 `_stop_if_job_cancel_requested` 原函数名。 + - 内部改为代理到 `TraceRecorder` 和 `ExecutionControl`。 + - 将 `GenerationJobCanceledError` 移入 harness control 模块。 + +- `backend/app/services/provider_router.py` + - Provider 调用事件改为通过 `TraceRecorder` 写入。 + - 保留原有 metadata 字段,例如 capability、adapter、strategy、latency、estimated cost、error。 + +- `docs/technical/harness-engineering-modernization.md` + - 更新阶段 1 状态。 + +## 3. 行为兼容性 + +本阶段采用“只新增标准字段,不删除旧字段”的策略。 + +新增写入到 `generation_job_events.event_metadata` 的标准字段包括: + +- `step` +- `artifact` +- `failure_category` +- `retryable` +- `blocks_main_result` + +现有事件顺序、event type、status、message 和既有 metadata 字段保持兼容。 + +## 4. 验证结果 + +已执行: + +```bash +cd backend +.venv/bin/python -m pytest tests/test_harness_runtime.py tests/test_generation_jobs.py +.venv/bin/python -m ruff check app tests +``` + +结果: + +- `24 passed` +- `ruff`: `All checks passed!` + +覆盖到的关键行为: + +- event type 到标准 workflow step 的映射。 +- event type 到 artifact 的映射。 +- trace metadata 不丢失旧字段。 +- TraceRecorder 能写入标准 metadata。 +- job 为 `None` 时 TraceRecorder 安全跳过。 +- ExecutionControl 能在 `cancel_requested` checkpoint 将 job 收敛为 `canceled`。 +- 现有 generation job、取消、重试、Provider 统计测试继续通过。 + +## 5. 自审结论 + +本阶段改动符合阶段 1 设计: + +- 没有引入外部框架。 +- 没有修改数据库迁移。 +- 没有修改 API schema。 +- 没有重排现有生成 workflow。 +- 没有删除旧 metadata 字段。 +- `story_service` 仍保留旧 helper 入口,降低后续阶段风险。 + +## 6. 已知限制 + +- 当前只有通过 `TraceRecorder` 写入的事件会自动带标准 metadata。直接调用 `record_generation_event` 的旧代码路径暂未全量迁移。 +- `failure_category` 目前只在显式传入时有具体值,Provider 错误自动分类留到后续阶段。 +- `AssetCompletionResult` 仍在 `story_service.py`,资产工作流拆分留到阶段 2。 +- `WorkflowPlan` 和执行器尚未实现,阶段 1 只完成运行时支撑件。 + +## 7. 风险与处理 + +| 风险 | 等级 | 当前处理 | +| --- | --- | --- | +| 新 metadata 影响前端 | 低 | 只新增字段,不删除字段 | +| 取消语义回归 | 低 | `tests/test_generation_jobs.py` 已通过 | +| Provider 聚合误算 | 低 | Provider 统计测试已通过 | +| 直接调用 `record_generation_event` 的路径未标准化 | 中 | 后续阶段逐步迁移或在底层统一补齐 | + +## 8. 下一阶段建议 + +进入阶段 2:资产工作流边界抽取。 + +建议先做最小切片: + +1. 将 `AssetCompletionResult` 移到 harness 或 artifact workflow 模块,并保留兼容 import。 +2. 抽普通故事封面补全工作流,保持测试不变。 +3. 抽音频补全工作流,先覆盖缓存命中、生成成功、生成失败。 +4. 最后抽绘本图片工作流,因为它涉及并发生成和逐页事件顺序,风险略高。 + diff --git a/docs/planning/harness-stage-2-report.md b/docs/planning/harness-stage-2-report.md new file mode 100644 index 0000000..f5b94b9 --- /dev/null +++ b/docs/planning/harness-stage-2-report.md @@ -0,0 +1,121 @@ +# Harness Engineering 改造阶段 2 报告 + +**阶段**: 2 - 资产工作流边界抽取 +**日期**: 2026-06-21 +**状态**: 已完成主要目标 +**范围**: 封面、音频、持久化绘本缺失图片补全工作流抽取 + +--- + +## 1. 本阶段目标 + +本阶段目标是将资产补全职责从 `story_service.py` 中抽出,迁入 harness runtime 的 artifact workflow 层,同时保留原有函数签名和外部行为。 + +阶段 2 不修改数据库结构,不修改 API schema,不改变前端行为。 + +## 2. 已完成工作 + +### 新增和扩展文件 + +- `backend/app/services/harness/artifacts.py` + - 新增 `AssetCompletionResult` + - 新增 `asset_result_metadata` + +- `backend/app/services/harness/asset_workflows.py` + - 新增 `complete_cover_image_asset` + - 新增 `read_cached_audio_asset` + - 新增 `complete_audio_asset` + - 新增 `get_storybook_pages_data` + - 新增 `build_storybook_error_message` + - 新增 `resolve_storybook_image_status` + - 新增 `complete_storybook_image_assets` + +### 修改文件 + +- `backend/app/services/story_service.py` + - 移除本地 `AssetCompletionResult` 定义,改为从 harness artifacts 引入。 + - `_complete_cover_image_asset` 改为代理到 harness asset workflow。 + - `_read_cached_audio_asset` 改为代理到 harness asset workflow。 + - `_complete_audio_asset` 改为代理到 harness asset workflow。 + - `_complete_storybook_image_assets` 改为代理到 harness asset workflow。 + - 绘本错误信息和图片状态推导 helper 改为代理到 harness asset workflow。 + +## 3. 行为兼容性 + +本阶段保留了 `story_service.py` 内原有私有 helper 名称,因此调用方不需要调整。 + +保持兼容的行为包括: + +- 普通故事封面生成成功和失败语义。 +- 封面失败时主内容仍可读,并进入可重试状态。 +- 音频缓存命中、缓存缺失修复、TTS 成功和 TTS 失败语义。 +- 音频失败时可选择阻塞或非阻塞,取决于 `raise_on_failure`。 +- 持久化绘本缺失封面/分页插图补全语义。 +- 绘本逐页图片事件和完成事件 metadata。 +- `retryable_assets` 行为。 + +## 4. 验证结果 + +已执行: + +```bash +cd backend +.venv/bin/python -m pytest tests/test_harness_runtime.py tests/test_generation_jobs.py tests/test_stories.py tests/test_audio_cache.py +.venv/bin/python -m ruff check app tests +``` + +结果: + +- `72 passed` +- `ruff`: `All checks passed!` + +覆盖到的关键行为: + +- 统一生成 job 队列和 worker 持久化。 +- 资产重试 job 事件。 +- 普通故事封面生成与重试。 +- 绘本分页图片重试。 +- 音频缓存、生成、失败和清理。 +- Provider 调用事件和聚合。 +- job 取消、重试和卡住任务收敛。 + +## 5. 自审结论 + +本阶段符合设计目标: + +- 资产补全职责已从 `story_service` 主体中显著抽离。 +- 外部 API 和数据库模型未变。 +- 当前主要测试通过。 +- harness 层开始承载 artifact workflow,但仍通过依赖注入函数调用 Provider 和文件缓存,便于测试与后续替换。 + +## 6. 保留到后续阶段的内容 + +首次绘本生成前的并发图片生成函数 `_generate_storybook_image_assets` 仍保留在 `story_service.py`。 + +保留原因: + +- 它发生在绘本主记录持久化之前。 +- 它与“生成绘本结构 -> 可选并发生成图片 -> 持久化故事”的执行计划强相关。 +- 更适合在阶段 3 引入 `WorkflowPlan` 时一起整理,而不是在阶段 2 单独迁移。 + +## 7. 风险与处理 + +| 风险 | 等级 | 当前处理 | +| --- | --- | --- | +| 资产工作流迁移改变事件顺序 | 低 | generation job 和 story 测试已通过 | +| 音频缓存修复逻辑回归 | 低 | `test_audio_cache.py` 已通过 | +| 绘本图片补全状态误判 | 低 | 绘本重试测试已通过 | +| 首次绘本并发图片仍在 service 内 | 中 | 阶段 3 处理 | + +## 8. 下一阶段建议 + +进入阶段 3:Workflow Plan 与执行器。 + +建议切片: + +1. 定义 `WorkflowPlan`、`WorkflowTask` 和模式枚举,不接入主流程。 +2. 为普通故事、完整故事、绘本、资产任务生成 plan 快照测试。 +3. 将 `_generate_generation_service_with_job` 的分支逐步迁移到 plan 构建。 +4. 处理首次绘本并发图片生成,把它纳入 storybook plan 的 asset step。 +5. 保持 `/api/generations` 和现有 job event 顺序兼容。 + diff --git a/docs/planning/harness-stage-3-report.md b/docs/planning/harness-stage-3-report.md new file mode 100644 index 0000000..8048d6a --- /dev/null +++ b/docs/planning/harness-stage-3-report.md @@ -0,0 +1,121 @@ +# Harness Engineering 改造阶段 3 报告 + +**阶段**: 3 - Workflow Plan 与执行器 +**日期**: 2026-06-21 +**状态**: 已完成计划建模基线,执行器接管未启用 +**范围**: 纯 WorkflowPlan 建模、计划快照测试 + +--- + +## 1. 本阶段目标 + +阶段 3 的完整目标是引入显式 `WorkflowPlan`,逐步减少 `_generate_generation_service_with_job` 中的分支逻辑。 + +本次完成了最小安全切片: + +- 定义 plan 类型和 task 类型。 +- 为普通故事、带封面故事、绘本、资产任务生成计划。 +- 用快照测试锁定计划形状。 +- 暂不改变实际执行流,避免事件顺序和前端时间线发生非必要变化。 + +## 2. 已完成工作 + +### 新增文件 + +- `backend/app/services/harness/plans.py` + +### 新增能力 + +- `WorkflowMode` + - `story` + - `story_with_assets` + - `storybook` + - `asset_generation` + - `asset_retry` + +- `WorkflowTask` + - `key` + - `step` + - `artifact` + - `required` + - `recoverable` + +- `WorkflowPlan` + - `mode` + - `tasks` + - `to_snapshot()` + +- plan builder + - `build_story_plan(generate_images=...)` + - `build_storybook_plan(generate_images=...)` + - `build_asset_plan(output_mode=..., assets=...)` + +### 修改文件 + +- `backend/tests/test_harness_runtime.py` + - 增加普通故事计划快照。 + - 增加带封面故事计划快照。 + - 增加绘本带图片计划快照。 + - 增加资产重试计划去重测试。 + +## 3. 为什么没有接入执行器 + +本阶段有意没有新增运行时事件,例如 `workflow_planned`,也没有让 plan 接管 `_generate_generation_service_with_job`。 + +原因: + +- 新 event type 会改变前端生成轨迹时间线,需要同步前端 label 和 progress 映射。 +- 当前生成 job 测试已经严格断言事件顺序。 +- 直接接管执行器会同时触碰 story、storybook、asset_generation、asset_retry 四条路径,风险偏高。 +- 先稳定 plan snapshot,可以让后续迁移按任务级别逐步推进。 + +## 4. 验证结果 + +已执行: + +```bash +cd backend +.venv/bin/python -m pytest tests/test_harness_runtime.py tests/test_generation_jobs.py +.venv/bin/python -m ruff check app tests +``` + +结果: + +- `28 passed` +- `ruff`: `All checks passed!` + +阶段 3 的计划建模未改变业务执行流,因此完整后端行为仍由阶段 2 的完整测试结果兜底。 + +## 5. 自审结论 + +本阶段符合“小步可验证”原则: + +- 新增模块不依赖数据库、FastAPI 或 Provider。 +- plan 只描述 workflow 形状,不执行副作用。 +- 所有任务均可 JSON snapshot,后续可写入 trace metadata 或用于执行器。 +- 没有影响现有 API、job event 顺序或前端。 + +## 6. 保留到后续的内容 + +| 内容 | 建议处理 | +| --- | --- | +| 执行器接管 `_generate_generation_service_with_job` | 分 story、storybook、asset 三次迁移 | +| 首次绘本生成前并发图片生成 | 跟 storybook plan 的 image task 一起迁移 | +| `workflow_planned` 事件 | 等前端 label 和 progress 映射准备好后再加入 | +| plan 与 trace metadata 关联 | 先在 execution context 内部使用,再决定是否落库 | + +## 7. 下一阶段建议 + +下一步有两条可选路线: + +1. **继续阶段 3B:执行器小步接管** + - 先让普通故事不带图片路径使用 plan。 + - 再迁移普通故事带图片路径。 + - 最后迁移绘本和资产任务。 + +2. **进入阶段 4:Quality Gates** + - 在不改变执行器的前提下,为 Provider 输出增加确定性校验。 + - 这条路线风险更低,对儿童内容质量收益更直接。 + +建议优先做阶段 4 的低风险质量门,然后再回来做阶段 3B 的执行器迁移。 + diff --git a/docs/planning/harness-stage-4-report.md b/docs/planning/harness-stage-4-report.md new file mode 100644 index 0000000..128b01f --- /dev/null +++ b/docs/planning/harness-stage-4-report.md @@ -0,0 +1,140 @@ +# Harness Engineering 改造阶段 4 报告 + +**阶段**: 4 - Quality Gates 与输出验证 +**日期**: 2026-06-21 +**状态**: 已完成确定性质量门 +**范围**: 文本故事和绘本结构输出校验、质量门失败事件、测试验证 + +--- + +## 1. 本阶段目标 + +阶段 4 的目标是在 Provider 输出进入持久化之前增加低成本、确定性的质量门。 + +本阶段不调用额外 AI 模型,不增加外部服务成本,只做结构完整性和明显儿童安全风险检查。 + +## 2. 已完成工作 + +### 新增文件 + +- `backend/app/services/harness/quality_gates.py` + +### 新增能力 + +- `QualityGateCode` + - `missing_title` + - `missing_story_text` + - `missing_cover_prompt` + - `missing_storybook_page` + - `invalid_storybook_page_number` + - `missing_storybook_page_text` + - `unsafe_child_content` + +- `QualityGateIssue` + - 稳定 code + - 中文 message + - `failure_category` + - field + +- `QualityGateError` + - 聚合多个 issue + - 可输出 JSON-safe metadata + +- `validate_story_output` + - 检查标题 + - 检查正文 + - 检查封面 prompt + - 检查明显不适合 3-8 岁儿童的风险词 + +- `validate_storybook_output` + - 检查标题 + - 检查至少一页 + - 检查页码有效且不重复 + - 检查每页正文 + - 检查明显不适合 3-8 岁儿童的风险词 + +### 修改文件 + +- `backend/app/services/story_service.py` + - 文本故事 Provider 输出后、持久化前执行 `validate_story_output`。 + - 绘本 Provider 输出后、图片生成和持久化前执行 `validate_storybook_output`。 + - 质量门失败会写入 `quality_gate_failed` job event。 + - 质量门失败不会落库故事主记录。 + +- `backend/app/services/harness/types.py` + - `quality_gate_failed` 映射到 `narrative_generation` step。 + - `quality_gate_failed` 映射到 `story_text` artifact。 + +- `backend/tests/test_harness_runtime.py` + - 增加质量门纯函数测试。 + +- `backend/tests/test_generation_jobs.py` + - 增加 worker 质量门失败测试,确认 story 不落库、job failed、事件可解释。 + +## 3. 行为语义 + +质量门失败属于生成失败,而不是降级完成。 + +原因: + +- 文本故事正文或绘本页结构是 blocking artifact。 +- 如果主内容本身不合格,系统不能保存为可读故事。 +- 图片、音频等 recoverable artifact 失败仍按原有 `degraded_completed` 或可重试逻辑处理。 + +## 4. 验证结果 + +已执行: + +```bash +cd backend +.venv/bin/python -m pytest tests/test_harness_runtime.py tests/test_generation_jobs.py +.venv/bin/python -m ruff check app tests +.venv/bin/python -m pytest +``` + +结果: + +- 定向测试:`33 passed` +- 完整后端测试:`138 passed` +- `ruff`: `All checks passed!` + +覆盖到的关键行为: + +- 质量门接受完整、安全的儿童故事。 +- 质量门拒绝空正文。 +- 质量门拒绝明显不适合儿童的风险词。 +- 质量门拒绝绘本重复页码。 +- worker 中质量门失败会写入 `quality_gate_failed`。 +- 质量门失败不会持久化 story。 +- 现有所有后端测试继续通过。 + +## 5. 自审结论 + +本阶段符合设计目标: + +- 没有引入额外 AI 调用。 +- 没有引入新依赖。 +- 没有改变 API schema。 +- 没有改变图片、音频资产失败降级语义。 +- 对儿童内容质量和结构完整性有了第一层确定性保护。 + +## 6. 已知限制 + +| 限制 | 后续建议 | +| --- | --- | +| 儿童安全词表很保守,只覆盖明显风险词 | 后续可接入可配置词表或轻量安全审核 Provider | +| 当前 `quality_gate_failed` artifact 固定映射到 `story_text` | 后续可根据 story/storybook mode 写入更精确 artifact | +| 质量门失败文案目前偏技术 | 后续可为前端增加更友好的用户提示 | +| 未做模型评审式质量评分 | 先保留,避免增加成本和不稳定性 | + +## 7. 下一阶段建议 + +进入阶段 5:Trace Analytics 与前端增量展示。 + +建议切片: + +1. 后端 Provider/Job 聚合支持 `failure_category` 统计。 +2. 前端生成轨迹显示 `step` 和 `artifact` 的中文标签。 +3. 管理端 Provider dashboard 展示 failure category 聚合。 +4. 更新 smoke 脚本检查标准 metadata。 + diff --git a/docs/technical/harness-engineering-modernization.md b/docs/technical/harness-engineering-modernization.md new file mode 100644 index 0000000..f0e170c --- /dev/null +++ b/docs/technical/harness-engineering-modernization.md @@ -0,0 +1,493 @@ +# Harness Engineering 架构改造技术设计 + +**项目**: DreamWeaver 梦语织机 +**版本**: 0.1 +**日期**: 2026-06-21 +**状态**: 阶段 0 已建立设计基线 +**作者**: Codex + +--- + +## 1. 背景 + +DreamWeaver 当前已经完成统一生成工作流的第一轮落地:`POST /api/generations` 会创建 `generation_jobs`,后台 worker 执行故事或绘本生成,`generation_job_events` 记录关键过程,前端通过 job 查询和故事状态展示进度、失败与重试入口。 + +现有架构已经具备 harness engineering 的雏形: + +- `story_service` 负责上下文准备、主内容生成、故事落库、资产补全、状态同步和任务收敛。 +- `generation_jobs` 负责 job/event、进度摘要、取消、重试和 Provider 聚合。 +- `provider_router` 负责 Provider 选择、failover、熔断、成本和调用事件。 +- `story_status` 负责把文本、图片、音频状态推导为统一故事状态。 + +当前主要问题不是缺少能力,而是生成运行时控制逻辑分散在多个 service 内。取消检查、事件记录、资产恢复、Provider 轨迹、失败分类和质量校验还没有形成显式的运行时边界。 + +本设计的目标,是把现有“隐性 harness”升级为可观测、可恢复、可验证、可演进的 Generation Harness Runtime,同时保留当前 API、数据库和前端主行为。 + +## 2. 目标 + +### 2.1 产品目标 + +- 家长提交故事或绘本生成后,能获得稳定、可解释、可恢复的结果。 +- 主内容生成成功后,即使图片或音频失败,也不影响阅读。 +- 用户能清楚看到任务步骤、失败原因和可重试资产。 +- 后续新增语音共创、更多内容模式或新资产类型时,复用同一套运行时模型。 + +### 2.2 工程目标 + +- 把 `story_service` 中的运行时控制职责抽到 harness 层。 +- 让 workflow step、artifact、trace、failure category 成为一等概念。 +- 保持 `/api/generations`、旧兼容接口、现有状态字段和主要测试行为不破坏。 +- 优先做渐进式重构,不引入复杂工作流引擎,不进行大爆炸重写。 +- 每个大阶段都产出阶段报告,包含实现、审查、验证和风险。 + +### 2.3 非目标 + +- 本轮不引入 Temporal、Dagster、LangGraph 等外部工作流引擎。 +- 本轮不彻底重做数据库结构。 +- 本轮不废弃旧生成接口,只继续保持兼容层指向统一入口。 +- 本轮不把 DreamWeaver 抽象成通用 agent 平台,所有抽象必须服务儿童故事生成业务。 +- 本轮不要求一次性完成所有 eval、质量门和 replay 能力。 + +## 3. 架构原则 + +1. **主内容优先可读** + 文本故事或绘本结构是 blocking artifact;封面、分页插图、音频是 recoverable artifact。 + +2. **API 稳定优先** + 先重构内部边界,再考虑扩展响应字段。现有前端、smoke、测试不应被第一阶段打断。 + +3. **事件结构稳定** + 继续复用 `generation_job_events`,但逐步标准化 metadata,避免每个调用点随手定义不同结构。 + +4. **Provider 不等于产品能力** + Provider 只是 tool invocation 的实现。产品能力应由 capability、workflow step、artifact 和 recovery policy 共同定义。 + +5. **小步可验证** + 每个最小任务都必须能通过单测、局部测试或文档审查验证。 + +## 4. 目标架构 + +```mermaid +flowchart TB + API["FastAPI API 层"] --> COMMAND["Generation Command"] + COMMAND --> HARNESS["Generation Harness Runtime"] + + HARNESS --> CTX["Context Builder
档案 / 宇宙 / 记忆 / 输入规范化"] + HARNESS --> PLAN["Workflow Plan
story / storybook / asset_generation / asset_retry"] + HARNESS --> CONTROL["Execution Control
取消 / 超时收敛 / 安全检查点"] + HARNESS --> TRACE["Trace Recorder
job events / step metadata / provider trace"] + HARNESS --> ARTIFACT["Artifact Workflows
story_text / storybook_pages / image / audio"] + HARNESS --> GUARD["Quality Gates
schema / 儿童安全 / 内容完整性"] + + ARTIFACT --> ROUTER["Provider Router
策略 / failover / 熔断 / 成本"] + ROUTER --> ADAPTERS["Provider Adapters"] + HARNESS --> STATE["State Store
stories / generation_jobs / generation_job_events"] + STATE --> FE["Vue 前端进度 / 详情 / 重试"] +``` + +## 5. 运行时核心概念 + +### 5.1 Generation Command + +表示一次用户意图,来源包括: + +- `POST /api/generations` +- `POST /api/generations/{story_id}/retry-assets` +- `POST /api/generations/jobs/{job_id}/retry` +- 旧兼容接口产生的同步生成调用 + +标准字段: + +| 字段 | 说明 | +| --- | --- | +| `output_mode` | `story`、`storybook`、`asset_generation`、`asset_retry` | +| `input_type` | `keywords`、`full_story`、`image`、`audio` 或资产组合 | +| `user_id` | 当前用户 | +| `story_id` | 已有故事 ID,可为空 | +| `request_payload` | 原始请求的 JSON 安全快照 | + +### 5.2 Workflow Step + +第一阶段先将现有 event type 映射为标准 step,不立即改库: + +| Step | 当前事件 | 是否阻塞主内容 | +| --- | --- | --- | +| `request_acceptance` | `request_accepted`、`retry_queued` | 是 | +| `worker_start` | `worker_started` | 是 | +| `context_preparation` | `context_prepared` | 是 | +| `narrative_generation` | `narrative_generated` | 是 | +| `story_persistence` | `story_saved` | 是 | +| `image_generation` | `cover_image_*`、`storybook_*image*` | 否 | +| `audio_generation` | `audio_*` | 否 | +| `postprocessing` | `postprocessing_queued` | 否 | +| `completion` | `generation_completed`、`asset_*_completed` | 是 | +| `cancellation` | `cancel_requested`、`generation_canceled` | 是 | +| `stale_recovery` | `generation_stale_failed` | 是 | + +### 5.3 Artifact + +| Artifact | 来源 | 状态字段 | 恢复策略 | +| --- | --- | --- | --- | +| `story_text` | text provider | `text_status` | 失败则 job failed | +| `storybook_pages` | storybook provider | `text_status` | 失败则 job failed | +| `cover_image` | image provider | `image_status` | 失败后 `degraded_completed`,可重试 | +| `page_image` | image provider | `image_status` | 部分页失败后 `degraded_completed`,可重试 | +| `audio` | tts provider + file cache | `audio_status` | 失败后 `degraded_completed`,可重试 | +| `achievement_memory` | Celery 后处理 | 事件记录 | 失败不影响主内容 | + +### 5.4 Failure Category + +第一阶段先在代码中定义枚举和 metadata 规范;后续逐步写入事件: + +| Category | 说明 | +| --- | --- | +| `provider_error` | 外部 Provider 返回失败或不可用 | +| `schema_error` | Provider 输出无法解析或字段不完整 | +| `safety_error` | 儿童安全或内容安全校验失败 | +| `timeout` | 调用超时或 job 超时 | +| `canceled` | 用户取消 | +| `stale_job` | 后台任务卡住后被系统收敛 | +| `storage_error` | 数据库、文件缓存或持久化失败 | +| `validation_error` | 用户输入、档案或宇宙校验失败 | +| `unknown_error` | 未归类错误 | + +### 5.5 Trace Metadata + +标准 event metadata 应逐步包含: + +```json +{ + "step": "narrative_generation", + "artifact": "story_text", + "capability": "text", + "adapter": "demo", + "provider_id": null, + "strategy": "priority", + "latency_ms": 42, + "estimated_cost_usd": 0.01, + "failure_category": null, + "error": null, + "retryable": false, + "blocks_main_result": true +} +``` + +短期兼容要求: + +- 不删除现有 metadata 字段。 +- 新增字段必须向后兼容。 +- 前端仍可使用当前 `event_type`、`status`、`message`、`event_metadata`。 + +## 6. 模块设计 + +### 6.1 `app/services/harness/types.py` + +新增纯类型模块,不依赖数据库。 + +职责: + +- 定义 `WorkflowStep` +- 定义 `ArtifactKind` +- 定义 `FailureCategory` +- 定义 `StepStatus` +- 定义 `TraceMetadata` +- 定义从旧 event type 到标准 step 的映射函数 + +验收: + +- 单测覆盖 event type 到 step 的映射。 +- 未知 event type 映射到 `unknown` 或保守默认,不抛出非预期异常。 + +### 6.2 `app/services/harness/trace.py` + +封装 job event 写入。 + +职责: + +- 提供 `TraceRecorder.record_step(...)` +- 提供 `TraceRecorder.record_provider_call(...)` +- 统一补齐 `step`、`artifact`、`failure_category`、`retryable`、`blocks_main_result` +- 内部继续调用现有 `record_generation_event` + +验收: + +- 现有 `generation_job_events` 行为保持。 +- 新事件 metadata 含标准字段。 +- job 为空时安全跳过,兼容旧同步接口。 + +### 6.3 `app/services/harness/control.py` + +封装执行控制。 + +职责: + +- 提供 `ExecutionControl.stop_if_cancel_requested(...)` +- 保留现有取消语义和 `GenerationJobCanceledError` +- 后续可扩展 step timeout、checkpoint、stale recovery + +验收: + +- 取消中的 job 仍被标记为 `canceled`。 +- 现有取消测试继续通过。 + +### 6.4 `app/services/harness/artifacts.py` + +第二阶段再拆。第一阶段先保留 `story_service` 中的资产函数,只把共享结果类型移动或桥接。 + +职责: + +- 表达 `AssetCompletionResult` +- 后续承载封面、绘本插图、音频工作流 + +验收: + +- 资产重试行为不变。 +- `retryable_assets` 行为不变。 + +### 6.5 `story_service` 改造方式 + +第一阶段仅做低风险替换: + +- `_record_job_event_if_present` 代理到 `TraceRecorder` +- `_stop_if_job_cancel_requested` 代理到 `ExecutionControl` +- `AssetCompletionResult` 可先留在原文件,第二阶段再移动 +- 不重写 `generate_generation_service`、`run_generation_job_service` 主流程 + +第二阶段再拆分: + +- `story_generation_workflow.py` +- `storybook_generation_workflow.py` +- `asset_generation_workflow.py` +- `asset_retry_workflow.py` + +## 7. 阶段计划 + +### 阶段 0: 设计与基线 + +目标: + +- 产出本技术设计。 +- 产出阶段 0 报告。 +- 明确阶段拆解、验收标准和验证方式。 + +最小任务: + +| ID | 任务 | 验收 | +| --- | --- | --- | +| H0-1 | 梳理现有统一生成 PRD、架构文档和测试 | 文档引用现有边界,不重复设计入口 | +| H0-2 | 定义目标 runtime 架构 | 技术设计包含模块、数据、阶段计划 | +| H0-3 | 拆分最小可执行任务 | 每个阶段有任务 ID 和验收标准 | +| H0-4 | 建立阶段报告机制 | `docs/planning/` 下有阶段报告 | + +### 阶段 1: Harness 基础类型与事件封装 + +目标: + +- 新增 harness 包。 +- 标准化 workflow step、artifact、failure category。 +- 用 `TraceRecorder` 封装事件写入。 +- 用 `ExecutionControl` 封装取消检查。 + +最小任务: + +| ID | 任务 | 验收 | +| --- | --- | --- | +| H1-1 | 新增 `app/services/harness/__init__.py` | 后端 import 正常 | +| H1-2 | 新增 `types.py` 枚举和映射 | 单测覆盖核心 event type | +| H1-3 | 新增 `trace.py` | record 后 metadata 含 `step` | +| H1-4 | 新增 `control.py` | 取消行为与旧逻辑一致 | +| H1-5 | 替换 `story_service` 中事件和取消 helper 的内部实现 | 现有 API 行为不变 | +| H1-6 | 补 `tests/test_harness_runtime.py` | 后端测试通过 | + +验证命令: + +```bash +cd backend +pytest tests/test_harness_runtime.py tests/test_generation_jobs.py +ruff check app tests +``` + +阶段报告: + +- `docs/planning/harness-stage-1-report.md` + +### 阶段 2: 资产工作流边界抽取 + +目标: + +- 将封面、绘本插图、音频补全从 `story_service` 中拆成 artifact workflow。 +- 保持当前 `StoryAssetStatus` 和 `sync_story_status` 语义。 + +最小任务: + +| ID | 任务 | 验收 | +| --- | --- | --- | +| H2-1 | 移动或桥接 `AssetCompletionResult` | import 稳定 | +| H2-2 | 抽普通故事封面工作流 | 封面生成、失败、重试测试通过 | +| H2-3 | 抽绘本图片工作流 | 绘本逐页事件顺序不破坏 | +| H2-4 | 抽音频工作流 | 音频缓存和失败状态测试通过 | +| H2-5 | 补 artifact workflow 单测 | 资产相关测试通过 | + +验证命令: + +```bash +cd backend +pytest tests/test_stories.py tests/test_generation_jobs.py tests/test_audio_cache.py +ruff check app tests +``` + +阶段报告: + +- `docs/planning/harness-stage-2-report.md` + +### 阶段 3: Workflow Plan 与执行器 + +目标: + +- 用显式 `WorkflowPlan` 表达 story、storybook、asset_generation、asset_retry。 +- 将 `_generate_generation_service_with_job` 的分支压缩到计划构建和执行器。 + +最小任务: + +| ID | 任务 | 验收 | +| --- | --- | --- | +| H3-1 | 定义 `WorkflowPlan` 和 `WorkflowTask` | 不依赖 FastAPI schema | +| H3-2 | 为普通故事构建 plan | 不生成图片时步骤正确 | +| H3-3 | 为完整故事构建 plan | 图片为 recoverable step | +| H3-4 | 为绘本构建 plan | 绘本结构和图片步骤可区分 | +| H3-5 | 为资产任务构建 plan | 重试和后台生成共用计划 | +| H3-6 | 增加 plan 单测 | 核心模式计划快照稳定 | + +验证命令: + +```bash +cd backend +pytest tests/test_harness_runtime.py tests/test_generation_jobs.py tests/test_stories.py +ruff check app tests +``` + +阶段报告: + +- `docs/planning/harness-stage-3-report.md` + +### 阶段 4: Quality Gates 与输出验证 + +目标: + +- 在 Provider 输出进入持久化之前,加入低成本、确定性的质量门。 +- 后续可扩展模型评审,但第一轮避免额外 AI 成本。 + +最小任务: + +| ID | 任务 | 验收 | +| --- | --- | --- | +| H4-1 | 定义 quality gate 接口 | 不绑定具体 Provider | +| H4-2 | 文本故事完整性检查 | 标题、正文、封面 prompt 缺失可识别 | +| H4-3 | 绘本结构检查 | 页数、页码、正文、图片 prompt 可识别 | +| H4-4 | 儿童内容基础安全检查 | 明显不适龄词或空内容阻断 | +| H4-5 | gate 失败写入 `failure_category` | job event 可解释 | + +验证命令: + +```bash +cd backend +pytest tests/test_harness_runtime.py tests/test_generation_jobs.py tests/test_stories.py +ruff check app tests +``` + +阶段报告: + +- `docs/planning/harness-stage-4-report.md` + +### 阶段 5: Trace Analytics 与前端增量展示 + +目标: + +- 前端继续消费现有 job/event,但可展示标准 step、artifact、failure category。 +- 管理端可按 failure category 聚合失败原因。 + +最小任务: + +| ID | 任务 | 验收 | +| --- | --- | --- | +| H5-1 | 后端聚合支持标准 step/failure category | 旧字段兼容 | +| H5-2 | 用户端生成轨迹展示 step 名称 | 移动端不溢出 | +| H5-3 | 管理端 Provider dashboard 增加 failure category | 构建通过 | +| H5-4 | 更新 smoke 脚本校验标准 metadata | demo smoke 通过 | + +验证命令: + +```bash +cd backend +pytest +ruff check app tests +cd ../frontend +npm run build +cd ../admin-frontend +npm run build +``` + +阶段报告: + +- `docs/planning/harness-stage-5-report.md` + +## 8. 需求与验收 + +### 功能需求 + +| ID | 优先级 | 需求 | 验收标准 | +| --- | --- | --- | --- | +| FR-001 | MUST | 系统必须保留统一生成 API 行为 | `/api/generations` 测试继续通过 | +| FR-002 | MUST | 系统必须有标准 workflow step 类型 | 核心 event type 可映射到 step | +| FR-003 | MUST | 系统必须有标准 artifact 类型 | story、storybook、image、audio 可区分 | +| FR-004 | MUST | 系统必须标准化 job event metadata | 新事件含 `step`,旧字段不删除 | +| FR-005 | MUST | 用户取消语义必须保持 | 取消测试继续通过 | +| FR-006 | SHOULD | Provider 调用应记录标准 trace 字段 | provider event 含 capability、adapter、latency、cost、step | +| FR-007 | SHOULD | 资产工作流应从主 service 拆出 | `story_service` 行数和职责减少 | +| FR-008 | SHOULD | 输出验证应在持久化前执行 | schema 缺失可被测试捕获 | +| FR-009 | COULD | 前端展示标准 step/failure category | 构建通过且无布局溢出 | + +### 非功能需求 + +| ID | 优先级 | 需求 | 验收标准 | +| --- | --- | --- | --- | +| NFR-001 | MUST | 向后兼容 | 现有测试和 smoke 主链路不破坏 | +| NFR-002 | MUST | 可测试 | 每个新增 harness 模块有单测 | +| NFR-003 | MUST | 可观测 | job/event 可以解释 step、artifact、失败原因 | +| NFR-004 | SHOULD | 低耦合 | harness 类型模块不依赖 FastAPI 和 SQLAlchemy | +| NFR-005 | SHOULD | 性能稳定 | 不新增阻塞式外部调用 | +| NFR-006 | SHOULD | 中文一致性 | 文档、用户可见文案和新增注释使用简体中文 | + +## 9. 风险与缓解 + +| 风险 | 影响 | 缓解 | +| --- | --- | --- | +| 过度抽象导致改造变慢 | 中 | 第一阶段只抽类型、trace、control,不拆主流程 | +| 事件 metadata 变化影响前端 | 中 | 只新增字段,不删除旧字段 | +| 取消/重试语义被破坏 | 高 | 优先跑 `tests/test_generation_jobs.py` | +| Provider trace 与 job event 重复 | 低 | 保持 Provider 事件专注调用层,workflow 事件专注产品步骤 | +| 文档与实现偏离 | 中 | 每个阶段报告必须记录实现偏差 | +| 质量门误伤内容 | 中 | 第四阶段先做确定性低风险检查,模型评审延后 | + +## 10. 审查清单 + +每个阶段完成前必须检查: + +- [ ] 是否保持 `/api/generations` 行为兼容 +- [ ] 是否有对应测试或验证命令 +- [ ] 是否没有引入不必要的外部框架 +- [ ] 是否没有重写无关功能 +- [ ] 是否保留用户已有工作区改动 +- [ ] 是否更新阶段报告 +- [ ] 是否更新本设计中的阶段状态或偏差记录 + +## 11. 当前状态 + +| 阶段 | 状态 | 备注 | +| --- | --- | --- | +| 阶段 0 | 已完成设计基线 | 已建立本设计与阶段 0 报告 | +| 阶段 1 | 已完成基础实现 | 已新增 harness 类型、trace recorder、execution control,并通过定向测试 | +| 阶段 2 | 已完成主要资产补全抽取 | 封面、音频、持久化绘本缺失图片补全已迁入 harness asset workflows | +| 阶段 3 | 已完成计划建模基线 | 已定义 WorkflowPlan/WorkflowTask 和核心模式计划快照;执行器接管留待后续 | +| 阶段 4 | 已完成确定性质量门 | 已接入文本故事和绘本结构完整性/儿童安全基础检查 | +| 阶段 5 | 待执行 | Trace Analytics 与前端展示 |