Files
dreamweaver/backend/tests/test_harness_runtime.py

645 lines
20 KiB
Python

"""Tests for generation harness runtime support."""
from pathlib import Path
import pytest
from sqlalchemy import select
from app.db.models import GenerationJob, GenerationJobEvent
from app.services.adapters.storybook.primary import Storybook, StorybookPage
from app.services.adapters.text.models import StoryOutput
from app.services.generation_jobs import create_generation_job, record_generation_event
from app.services.harness.artifacts import AssetCompletionResult
from app.services.harness.control import ExecutionControl, GenerationJobCanceledError
from app.services.harness.evaluation_replay import (
EvaluationReplayArtifact,
EvaluationReplayCase,
ExpectedEvaluation,
replay_evaluation_golden_cases,
run_evaluation_replay_cases,
)
from app.services.harness.evaluators import evaluate_story_output, evaluate_storybook_output
from app.services.harness.executor import run_asset_plan
from app.services.harness.plans import (
WorkflowMode,
WorkflowPlan,
WorkflowTask,
build_asset_plan,
build_story_plan,
build_storybook_plan,
)
from app.services.harness.quality_gates import (
QualityGateError,
validate_story_output,
validate_storybook_output,
)
from app.services.harness.trace import TraceRecorder
from app.services.harness.types import (
ArtifactKind,
FailureCategory,
WorkflowStep,
artifact_for_event,
normalize_trace_metadata,
step_for_event,
)
from app.services.story_status import StoryAssetStatus
FIXTURES_DIR = (
Path(__file__).parents[1] / "app" / "services" / "harness" / "fixtures"
)
def test_event_type_maps_to_standard_workflow_step():
assert step_for_event("request_accepted") == WorkflowStep.REQUEST_ACCEPTANCE
assert step_for_event("context_prepared") == WorkflowStep.CONTEXT_PREPARATION
assert step_for_event("narrative_generated") == WorkflowStep.NARRATIVE_GENERATION
assert step_for_event("evaluation_completed") == WorkflowStep.EVALUATION
assert step_for_event("story_saved") == WorkflowStep.STORY_PERSISTENCE
assert step_for_event("provider_call_succeeded") == WorkflowStep.PROVIDER_INVOCATION
assert step_for_event("quality_gate_failed") == WorkflowStep.NARRATIVE_GENERATION
assert step_for_event("cover_image_failed") == WorkflowStep.IMAGE_GENERATION
assert step_for_event("audio_succeeded") == WorkflowStep.AUDIO_GENERATION
assert step_for_event("generation_canceled") == WorkflowStep.CANCELLATION
assert step_for_event("generation_stale_failed") == WorkflowStep.STALE_RECOVERY
assert step_for_event("future_event") == WorkflowStep.UNKNOWN
def test_event_type_maps_to_standard_artifact():
assert artifact_for_event("narrative_generated") == ArtifactKind.STORY_TEXT
assert artifact_for_event("quality_gate_failed") == ArtifactKind.STORY_TEXT
assert artifact_for_event("evaluation_completed") == ArtifactKind.STORY_TEXT
assert artifact_for_event("cover_image_succeeded") == ArtifactKind.COVER_IMAGE
assert artifact_for_event("storybook_page_image_failed") == ArtifactKind.PAGE_IMAGE
assert artifact_for_event("audio_cache_hit") == ArtifactKind.AUDIO
assert artifact_for_event("postprocessing_queued") == ArtifactKind.ACHIEVEMENT_MEMORY
assert artifact_for_event("request_accepted") == ArtifactKind.NONE
def test_trace_metadata_adds_standard_fields_without_dropping_legacy_values():
metadata = normalize_trace_metadata(
"provider_call_failed",
{
"capability": "text",
"adapter": "demo",
"error": "timeout",
},
failure_category=FailureCategory.TIMEOUT,
retryable=True,
)
assert metadata["capability"] == "text"
assert metadata["adapter"] == "demo"
assert metadata["error"] == "timeout"
assert metadata["step"] == "provider_invocation"
assert metadata["artifact"] == "none"
assert metadata["failure_category"] == "timeout"
assert metadata["retryable"] is True
assert metadata["blocks_main_result"] is False
def test_trace_metadata_respects_explicit_step_and_artifact():
metadata = normalize_trace_metadata(
"narrative_generated",
{"title": "小兔子的冒险"},
step=WorkflowStep.NARRATIVE_GENERATION,
artifact=ArtifactKind.STORYBOOK_PAGES,
blocks_main_result=True,
)
assert metadata["title"] == "小兔子的冒险"
assert metadata["step"] == "narrative_generation"
assert metadata["artifact"] == "storybook_pages"
assert metadata["blocks_main_result"] is True
def test_story_plan_without_assets_snapshot():
assert build_story_plan(generate_images=False).to_snapshot() == {
"mode": "story",
"tasks": [
{
"key": "prepare_context",
"step": "context_preparation",
"artifact": "none",
"required": True,
"recoverable": False,
},
{
"key": "generate_narrative",
"step": "narrative_generation",
"artifact": "story_text",
"required": True,
"recoverable": False,
},
{
"key": "evaluate_narrative",
"step": "evaluation",
"artifact": "story_text",
"required": True,
"recoverable": False,
},
{
"key": "persist_story",
"step": "story_persistence",
"artifact": "story_text",
"required": True,
"recoverable": False,
},
{
"key": "queue_postprocessing",
"step": "postprocessing",
"artifact": "achievement_memory",
"required": False,
"recoverable": True,
},
{
"key": "complete_generation",
"step": "completion",
"artifact": "none",
"required": True,
"recoverable": False,
},
],
}
def test_story_plan_with_assets_marks_cover_recoverable():
plan = build_story_plan(generate_images=True).to_snapshot()
assert plan["mode"] == "story_with_assets"
assert plan["tasks"][4] == {
"key": "generate_cover_image",
"step": "image_generation",
"artifact": "cover_image",
"required": False,
"recoverable": True,
}
def test_storybook_plan_with_images_marks_storybook_images_recoverable():
plan = build_storybook_plan(generate_images=True).to_snapshot()
assert plan["mode"] == "storybook"
assert [task["key"] for task in plan["tasks"]] == [
"prepare_context",
"generate_storybook_pages",
"evaluate_storybook_pages",
"generate_storybook_images",
"persist_storybook",
"queue_postprocessing",
"complete_generation",
]
assert plan["tasks"][3]["artifact"] == "image"
assert plan["tasks"][3]["recoverable"] is True
def test_asset_retry_plan_deduplicates_assets():
plan = build_asset_plan(output_mode="asset_retry", assets=["image", "audio", "image"])
assert plan.to_snapshot() == {
"mode": "asset_retry",
"tasks": [
{
"key": "start_asset_retry",
"step": "asset_retry",
"artifact": "none",
"required": True,
"recoverable": False,
},
{
"key": "complete_image_asset",
"step": "image_generation",
"artifact": "image",
"required": False,
"recoverable": True,
},
{
"key": "complete_audio_asset",
"step": "audio_generation",
"artifact": "audio",
"required": False,
"recoverable": True,
},
{
"key": "complete_asset_retry",
"step": "asset_retry",
"artifact": "none",
"required": True,
"recoverable": False,
},
],
}
@pytest.mark.asyncio
async def test_run_asset_plan_executes_asset_tasks_in_plan_order():
calls: list[str] = []
async def image_task() -> AssetCompletionResult:
calls.append("image")
return AssetCompletionResult(
asset="cover_image",
status=StoryAssetStatus.READY,
value="https://example.com/cover.png",
)
async def audio_task() -> AssetCompletionResult:
calls.append("audio")
return AssetCompletionResult(
asset="audio",
status=StoryAssetStatus.READY,
value=b"audio",
)
result = await run_asset_plan(
build_asset_plan(output_mode="asset_generation", assets=["audio", "image"]),
image_task=image_task,
audio_task=audio_task,
)
assert calls == ["audio", "image"]
assert result.executed_task_keys == ("complete_audio_asset", "complete_image_asset")
assert result.ignored_task_keys == (
"start_asset_generation",
"complete_asset_generation",
)
assert [item.asset for item in result.task_results] == ["audio", "cover_image"]
@pytest.mark.asyncio
async def test_run_asset_plan_ignores_unknown_non_asset_tasks():
calls: list[str] = []
plan = WorkflowPlan(
mode=WorkflowMode.ASSET_RETRY,
tasks=(
WorkflowTask(
key="start_asset_retry",
step=WorkflowStep.ASSET_RETRY,
artifact=ArtifactKind.NONE,
),
WorkflowTask(
key="complete_video_asset",
step=WorkflowStep.UNKNOWN,
artifact=ArtifactKind.UNKNOWN,
required=False,
recoverable=True,
),
WorkflowTask(
key="complete_asset_retry",
step=WorkflowStep.ASSET_RETRY,
artifact=ArtifactKind.NONE,
),
),
)
async def image_task() -> AssetCompletionResult:
calls.append("image")
return AssetCompletionResult(
asset="cover_image",
status=StoryAssetStatus.READY,
)
result = await run_asset_plan(plan, image_task=image_task)
assert calls == []
assert result.task_results == ()
assert result.executed_task_keys == ()
assert result.ignored_task_keys == (
"start_asset_retry",
"complete_video_asset",
"complete_asset_retry",
)
def test_story_quality_gate_accepts_complete_child_safe_story():
validate_story_output(
StoryOutput(
mode="generated",
title="小兔子的月光花园",
story_text="小兔子在花园里学会了和朋友轮流分享水壶。",
cover_prompt_suggestion="A gentle moonlit garden with a rabbit",
)
)
def test_story_evaluator_scores_complete_child_safe_story():
result = evaluate_story_output(
StoryOutput(
mode="generated",
title="小兔子的月光花园",
story_text="小兔子在花园里学会了和朋友轮流分享水壶,也学会了复盘今天的努力。",
cover_prompt_suggestion="A gentle moonlit garden with a rabbit",
),
education_theme="复盘",
)
assert result.passed is True
assert result.blocking is False
assert result.overall_score >= 0.9
assert result.to_metadata()["scores"][0]["dimension"] == "structure"
def test_story_evaluator_blocks_quality_gate_failure():
result = evaluate_story_output(
StoryOutput(
mode="generated",
title="空白故事",
story_text="",
cover_prompt_suggestion="A cover",
)
)
assert result.passed is False
assert result.blocking is True
assert result.overall_score == 0.0
assert result.gate_error is not None
assert result.to_metadata()["quality_gate"]["issues"][0]["code"] == "missing_story_text"
def test_storybook_evaluator_scores_complete_child_safe_storybook():
result = evaluate_storybook_output(
Storybook(
title="森林里的复盘星星",
main_character="小兔子露露",
art_style="温暖水彩",
cover_prompt="A warm watercolor forest cover",
pages=[
StorybookPage(
page_number=1,
text="露露在森林里发现一颗会提醒她复盘的小星星。",
image_prompt="Lulu finds a star",
),
StorybookPage(
page_number=2,
text="她回想今天的努力,学会下次先和朋友商量。",
image_prompt="Lulu thinking with friends",
),
],
),
education_theme="复盘",
)
assert result.passed is True
assert result.blocking is False
assert result.overall_score >= 0.9
def test_storybook_evaluator_blocks_quality_gate_failure():
result = evaluate_storybook_output(
Storybook(
title="森林绘本",
main_character="小兔子",
art_style="水彩",
cover_prompt="A forest cover",
pages=[
StorybookPage(page_number=1, text="第一页。", image_prompt="page 1"),
StorybookPage(page_number=1, text="第二页。", image_prompt="page 2"),
],
)
)
assert result.passed is False
assert result.blocking is True
assert result.gate_error is not None
assert result.to_metadata()["quality_gate"]["issues"][0]["code"] == (
"invalid_storybook_page_number"
)
def test_evaluation_golden_cases_replay_successfully():
result = replay_evaluation_golden_cases(
FIXTURES_DIR / "evaluation_golden_cases.json"
)
assert result.passed is True, result.failure_report()
assert result.failed_case_ids == ()
assert len(result.cases) == 11
assert {
case.artifact
for case in result.cases
} == {
EvaluationReplayArtifact.STORY,
EvaluationReplayArtifact.STORYBOOK,
}
def test_evaluation_golden_cases_report_internal_coverage_summary():
result = replay_evaluation_golden_cases(
FIXTURES_DIR / "evaluation_golden_cases.json"
)
summary = result.coverage_summary()
assert summary["artifact"] == {
"storybook": 5,
"story": 6,
}
assert summary["age_band"] == {
"3-4": 4,
"5-6": 4,
"unknown": 2,
"7-8": 1,
}
assert summary["risk_area"] == {
"schema_error": 4,
"happy_path": 2,
"readability_warning": 2,
"safety_error": 2,
"length_boundary": 1,
}
assert summary["outcome"] == {
"blocked": 8,
"passed": 3,
}
assert summary["tags"]["story"] == 6
assert summary["tags"]["storybook"] == 5
assert summary["tags"]["blocking"] == 6
assert summary["tags"]["threshold_block"] == 2
def test_evaluation_replay_reports_expectation_mismatch():
case = EvaluationReplayCase(
case_id="expectation-mismatch",
artifact=EvaluationReplayArtifact.STORY,
input_payload={"keywords": "小兔子"},
output_payload={
"mode": "generated",
"title": "小兔子的花园",
"story_text": "小兔子学会了和朋友分享水壶。",
"cover_prompt_suggestion": "A rabbit sharing a watering can",
},
expected=ExpectedEvaluation(
passed=True,
blocking=False,
min_overall_score=0.99,
),
)
result = run_evaluation_replay_cases([case])
assert result.passed is False
assert result.failed_case_ids == ("expectation-mismatch",)
assert "expected overall_score >=" in result.failure_report()
def test_story_quality_gate_rejects_missing_story_text():
output = StoryOutput(
mode="generated",
title="空白故事",
story_text="",
cover_prompt_suggestion="A cover",
)
try:
validate_story_output(output)
except QualityGateError as exc:
assert [issue.code.value for issue in exc.issues] == ["missing_story_text"]
assert exc.to_metadata()["issues"][0]["field"] == "story_text"
else:
raise AssertionError("Expected QualityGateError")
def test_story_quality_gate_rejects_obviously_unsafe_child_content():
output = StoryOutput(
mode="generated",
title="危险词测试",
story_text="这个故事包含血腥场景。",
cover_prompt_suggestion="A cover",
)
try:
validate_story_output(output)
except QualityGateError as exc:
assert [issue.code.value for issue in exc.issues] == ["unsafe_child_content"]
assert exc.to_metadata()["issues"][0]["failure_category"] == "safety_error"
else:
raise AssertionError("Expected QualityGateError")
def test_storybook_quality_gate_rejects_duplicate_page_number():
storybook = Storybook(
title="森林绘本",
main_character="小兔子",
art_style="水彩",
cover_prompt="A forest cover",
pages=[
StorybookPage(page_number=1, text="第一页。", image_prompt="page 1"),
StorybookPage(page_number=1, text="第二页。", image_prompt="page 2"),
],
)
try:
validate_storybook_output(storybook)
except QualityGateError as exc:
assert [issue.code.value for issue in exc.issues] == [
"invalid_storybook_page_number"
]
assert exc.to_metadata()["issues"][0]["field"] == "pages[1].page_number"
else:
raise AssertionError("Expected QualityGateError")
@pytest.mark.asyncio
async def test_trace_recorder_persists_standard_metadata(db_session, test_user):
job = await create_generation_job(
db_session,
user_id=test_user.id,
output_mode="story",
input_type="keywords",
request_payload={"data": "小兔子"},
)
event = await TraceRecorder(db_session).record_step(
job=job,
event_type="provider_call_failed",
status="failed",
metadata={
"capability": "text",
"adapter": "demo",
"error": "timeout",
},
failure_category=FailureCategory.TIMEOUT,
retryable=True,
)
assert event is not None
events = (
await db_session.execute(
select(GenerationJobEvent)
.where(GenerationJobEvent.job_id == job.id)
.order_by(GenerationJobEvent.id)
)
).scalars().all()
assert [item.event_type for item in events] == [
"request_accepted",
"provider_call_failed",
]
metadata = events[1].event_metadata
assert metadata["capability"] == "text"
assert metadata["adapter"] == "demo"
assert metadata["step"] == "provider_invocation"
assert metadata["artifact"] == "none"
assert metadata["failure_category"] == "timeout"
assert metadata["retryable"] is True
@pytest.mark.asyncio
async def test_trace_recorder_ignores_missing_job(db_session):
event = await TraceRecorder(db_session).record_step(
job=None,
event_type="context_prepared",
status="succeeded",
)
assert event is None
@pytest.mark.asyncio
async def test_execution_control_cancels_job_at_safe_checkpoint(
db_session,
test_user,
test_story,
):
job = await create_generation_job(
db_session,
user_id=test_user.id,
output_mode="story",
input_type="keywords",
request_payload={"data": "小兔子"},
story_id=test_story.id,
)
await record_generation_event(
db_session,
job=job,
story_id=test_story.id,
event_type="cancel_requested",
status="running",
message="Cancellation requested.",
)
with pytest.raises(GenerationJobCanceledError):
await ExecutionControl(db_session).stop_if_cancel_requested(
job=job,
story=test_story,
)
refreshed_job = (
await db_session.execute(select(GenerationJob).where(GenerationJob.id == job.id))
).scalar_one()
assert refreshed_job.status == "canceled"
assert refreshed_job.current_step == "generation_canceled"
assert refreshed_job.error_message == "Generation canceled by user."
events = (
await db_session.execute(
select(GenerationJobEvent)
.where(GenerationJobEvent.job_id == job.id)
.order_by(GenerationJobEvent.id)
)
).scalars().all()
assert [item.event_type for item in events] == [
"request_accepted",
"cancel_requested",
"generation_canceled",
]