"""Workflow plan builders for generation harness workflows.""" from dataclasses import dataclass from enum import StrEnum from typing import Any from app.services.harness.types import ArtifactKind, WorkflowStep class WorkflowMode(StrEnum): """Supported executable workflow modes.""" STORY = "story" STORY_WITH_ASSETS = "story_with_assets" STORYBOOK = "storybook" ASSET_GENERATION = "asset_generation" ASSET_RETRY = "asset_retry" @dataclass(frozen=True) class WorkflowTask: """One planned step in a generation workflow.""" key: str step: WorkflowStep artifact: ArtifactKind required: bool = True recoverable: bool = False def to_snapshot(self) -> dict[str, Any]: """Return a JSON-safe snapshot for tests and trace metadata.""" return { "key": self.key, "step": self.step.value, "artifact": self.artifact.value, "required": self.required, "recoverable": self.recoverable, } @dataclass(frozen=True) class WorkflowPlan: """Declarative shape of a generation workflow before execution.""" mode: WorkflowMode tasks: tuple[WorkflowTask, ...] def to_snapshot(self) -> dict[str, Any]: """Return a JSON-safe snapshot for tests and trace metadata.""" return { "mode": self.mode.value, "tasks": [task.to_snapshot() for task in self.tasks], } def build_story_plan(*, generate_images: bool) -> WorkflowPlan: """Build a plan for a text story generation request.""" tasks = [ WorkflowTask( key="prepare_context", step=WorkflowStep.CONTEXT_PREPARATION, artifact=ArtifactKind.NONE, ), WorkflowTask( key="generate_narrative", step=WorkflowStep.NARRATIVE_GENERATION, artifact=ArtifactKind.STORY_TEXT, ), WorkflowTask( key="evaluate_narrative", step=WorkflowStep.EVALUATION, artifact=ArtifactKind.STORY_TEXT, ), WorkflowTask( key="persist_story", step=WorkflowStep.STORY_PERSISTENCE, artifact=ArtifactKind.STORY_TEXT, ), ] if generate_images: tasks.append( WorkflowTask( key="generate_cover_image", step=WorkflowStep.IMAGE_GENERATION, artifact=ArtifactKind.COVER_IMAGE, required=False, recoverable=True, ) ) tasks.extend( [ WorkflowTask( key="queue_postprocessing", step=WorkflowStep.POSTPROCESSING, artifact=ArtifactKind.ACHIEVEMENT_MEMORY, required=False, recoverable=True, ), WorkflowTask( key="complete_generation", step=WorkflowStep.COMPLETION, artifact=ArtifactKind.NONE, ), ] ) return WorkflowPlan( mode=WorkflowMode.STORY_WITH_ASSETS if generate_images else WorkflowMode.STORY, tasks=tuple(tasks), ) def build_storybook_plan(*, generate_images: bool) -> WorkflowPlan: """Build a plan for a storybook generation request.""" tasks = [ WorkflowTask( key="prepare_context", step=WorkflowStep.CONTEXT_PREPARATION, artifact=ArtifactKind.NONE, ), WorkflowTask( key="generate_storybook_pages", step=WorkflowStep.NARRATIVE_GENERATION, artifact=ArtifactKind.STORYBOOK_PAGES, ), WorkflowTask( key="evaluate_storybook_pages", step=WorkflowStep.EVALUATION, artifact=ArtifactKind.STORYBOOK_PAGES, ), ] if generate_images: tasks.append( WorkflowTask( key="generate_storybook_images", step=WorkflowStep.IMAGE_GENERATION, artifact=ArtifactKind.IMAGE, required=False, recoverable=True, ) ) tasks.extend( [ WorkflowTask( key="persist_storybook", step=WorkflowStep.STORY_PERSISTENCE, artifact=ArtifactKind.STORYBOOK_PAGES, ), WorkflowTask( key="queue_postprocessing", step=WorkflowStep.POSTPROCESSING, artifact=ArtifactKind.ACHIEVEMENT_MEMORY, required=False, recoverable=True, ), WorkflowTask( key="complete_generation", step=WorkflowStep.COMPLETION, artifact=ArtifactKind.NONE, ), ] ) return WorkflowPlan(mode=WorkflowMode.STORYBOOK, tasks=tuple(tasks)) def build_asset_plan(*, output_mode: str, assets: list[str]) -> WorkflowPlan: """Build a plan for asset generation or retry jobs.""" mode = ( WorkflowMode.ASSET_RETRY if output_mode == WorkflowMode.ASSET_RETRY.value else WorkflowMode.ASSET_GENERATION ) initial_step = ( WorkflowStep.ASSET_RETRY if mode == WorkflowMode.ASSET_RETRY else WorkflowStep.ASSET_GENERATION ) initial_key = ( "start_asset_retry" if mode == WorkflowMode.ASSET_RETRY else "start_asset_generation" ) completion_key = ( "complete_asset_retry" if mode == WorkflowMode.ASSET_RETRY else "complete_asset_generation" ) tasks = [ WorkflowTask( key=initial_key, step=initial_step, artifact=ArtifactKind.NONE, ) ] for asset in dict.fromkeys(assets): if asset == "image": tasks.append( WorkflowTask( key="complete_image_asset", step=WorkflowStep.IMAGE_GENERATION, artifact=ArtifactKind.IMAGE, required=False, recoverable=True, ) ) elif asset == "audio": tasks.append( WorkflowTask( key="complete_audio_asset", step=WorkflowStep.AUDIO_GENERATION, artifact=ArtifactKind.AUDIO, required=False, recoverable=True, ) ) else: tasks.append( WorkflowTask( key=f"complete_{asset}_asset", step=WorkflowStep.UNKNOWN, artifact=ArtifactKind.UNKNOWN, required=False, recoverable=True, ) ) tasks.append( WorkflowTask( key=completion_key, step=initial_step, artifact=ArtifactKind.NONE, ) ) return WorkflowPlan(mode=mode, tasks=tuple(tasks))