Files
dreamweaver/backend/app/services/harness/plans.py
2026-06-21 22:31:38 +08:00

238 lines
6.5 KiB
Python

"""Workflow plan builders for generation harness workflows."""
from dataclasses import dataclass
from enum import StrEnum
from typing import Any
from app.services.harness.types import ArtifactKind, WorkflowStep
class WorkflowMode(StrEnum):
"""Supported executable workflow modes."""
STORY = "story"
STORY_WITH_ASSETS = "story_with_assets"
STORYBOOK = "storybook"
ASSET_GENERATION = "asset_generation"
ASSET_RETRY = "asset_retry"
@dataclass(frozen=True)
class WorkflowTask:
"""One planned step in a generation workflow."""
key: str
step: WorkflowStep
artifact: ArtifactKind
required: bool = True
recoverable: bool = False
def to_snapshot(self) -> dict[str, Any]:
"""Return a JSON-safe snapshot for tests and trace metadata."""
return {
"key": self.key,
"step": self.step.value,
"artifact": self.artifact.value,
"required": self.required,
"recoverable": self.recoverable,
}
@dataclass(frozen=True)
class WorkflowPlan:
"""Declarative shape of a generation workflow before execution."""
mode: WorkflowMode
tasks: tuple[WorkflowTask, ...]
def to_snapshot(self) -> dict[str, Any]:
"""Return a JSON-safe snapshot for tests and trace metadata."""
return {
"mode": self.mode.value,
"tasks": [task.to_snapshot() for task in self.tasks],
}
def build_story_plan(*, generate_images: bool) -> WorkflowPlan:
"""Build a plan for a text story generation request."""
tasks = [
WorkflowTask(
key="prepare_context",
step=WorkflowStep.CONTEXT_PREPARATION,
artifact=ArtifactKind.NONE,
),
WorkflowTask(
key="generate_narrative",
step=WorkflowStep.NARRATIVE_GENERATION,
artifact=ArtifactKind.STORY_TEXT,
),
WorkflowTask(
key="persist_story",
step=WorkflowStep.STORY_PERSISTENCE,
artifact=ArtifactKind.STORY_TEXT,
),
]
if generate_images:
tasks.append(
WorkflowTask(
key="generate_cover_image",
step=WorkflowStep.IMAGE_GENERATION,
artifact=ArtifactKind.COVER_IMAGE,
required=False,
recoverable=True,
)
)
tasks.extend(
[
WorkflowTask(
key="queue_postprocessing",
step=WorkflowStep.POSTPROCESSING,
artifact=ArtifactKind.ACHIEVEMENT_MEMORY,
required=False,
recoverable=True,
),
WorkflowTask(
key="complete_generation",
step=WorkflowStep.COMPLETION,
artifact=ArtifactKind.NONE,
),
]
)
return WorkflowPlan(
mode=WorkflowMode.STORY_WITH_ASSETS if generate_images else WorkflowMode.STORY,
tasks=tuple(tasks),
)
def build_storybook_plan(*, generate_images: bool) -> WorkflowPlan:
"""Build a plan for a storybook generation request."""
tasks = [
WorkflowTask(
key="prepare_context",
step=WorkflowStep.CONTEXT_PREPARATION,
artifact=ArtifactKind.NONE,
),
WorkflowTask(
key="generate_storybook_pages",
step=WorkflowStep.NARRATIVE_GENERATION,
artifact=ArtifactKind.STORYBOOK_PAGES,
),
]
if generate_images:
tasks.append(
WorkflowTask(
key="generate_storybook_images",
step=WorkflowStep.IMAGE_GENERATION,
artifact=ArtifactKind.IMAGE,
required=False,
recoverable=True,
)
)
tasks.extend(
[
WorkflowTask(
key="persist_storybook",
step=WorkflowStep.STORY_PERSISTENCE,
artifact=ArtifactKind.STORYBOOK_PAGES,
),
WorkflowTask(
key="queue_postprocessing",
step=WorkflowStep.POSTPROCESSING,
artifact=ArtifactKind.ACHIEVEMENT_MEMORY,
required=False,
recoverable=True,
),
WorkflowTask(
key="complete_generation",
step=WorkflowStep.COMPLETION,
artifact=ArtifactKind.NONE,
),
]
)
return WorkflowPlan(mode=WorkflowMode.STORYBOOK, tasks=tuple(tasks))
def build_asset_plan(*, output_mode: str, assets: list[str]) -> WorkflowPlan:
"""Build a plan for asset generation or retry jobs."""
mode = (
WorkflowMode.ASSET_RETRY
if output_mode == WorkflowMode.ASSET_RETRY.value
else WorkflowMode.ASSET_GENERATION
)
initial_step = (
WorkflowStep.ASSET_RETRY
if mode == WorkflowMode.ASSET_RETRY
else WorkflowStep.ASSET_GENERATION
)
initial_key = (
"start_asset_retry"
if mode == WorkflowMode.ASSET_RETRY
else "start_asset_generation"
)
completion_key = (
"complete_asset_retry"
if mode == WorkflowMode.ASSET_RETRY
else "complete_asset_generation"
)
tasks = [
WorkflowTask(
key=initial_key,
step=initial_step,
artifact=ArtifactKind.NONE,
)
]
for asset in dict.fromkeys(assets):
if asset == "image":
tasks.append(
WorkflowTask(
key="complete_image_asset",
step=WorkflowStep.IMAGE_GENERATION,
artifact=ArtifactKind.IMAGE,
required=False,
recoverable=True,
)
)
elif asset == "audio":
tasks.append(
WorkflowTask(
key="complete_audio_asset",
step=WorkflowStep.AUDIO_GENERATION,
artifact=ArtifactKind.AUDIO,
required=False,
recoverable=True,
)
)
else:
tasks.append(
WorkflowTask(
key=f"complete_{asset}_asset",
step=WorkflowStep.UNKNOWN,
artifact=ArtifactKind.UNKNOWN,
required=False,
recoverable=True,
)
)
tasks.append(
WorkflowTask(
key=completion_key,
step=initial_step,
artifact=ArtifactKind.NONE,
)
)
return WorkflowPlan(mode=mode, tasks=tuple(tasks))