feat: add generation trace and partial-ready workflow status

This commit is contained in:
2026-04-18 21:53:55 +08:00
parent 96dfc677e2
commit e99a7fbe14
36 changed files with 2597 additions and 144 deletions

View File

@@ -67,6 +67,43 @@ class AssetCompletionResult:
return self.status == StoryAssetStatus.READY and self.error is None
async def _record_job_event_if_present(
db: AsyncSession,
*,
job,
event_type: str,
status: str,
story_id: int | None = None,
message: str | None = None,
metadata: dict | None = None,
) -> None:
"""Append a workflow event when the caller is running under a tracked job."""
if job is None:
return
await record_generation_event(
db,
job=job,
story_id=story_id,
event_type=event_type,
status=status,
message=message,
metadata=metadata,
)
def _asset_result_metadata(result: AssetCompletionResult) -> dict:
"""Build JSON-safe metadata for asset workflow events."""
return {
"asset": result.asset,
"status": result.status.value,
"error": result.error,
"blocks_main_result": result.blocks_main_result,
}
def _build_storybook_error_message(
*,
cover_failed: bool,
@@ -125,6 +162,7 @@ async def _prepare_generation_context(
universe_id: str | None,
user_id: str,
db: AsyncSession,
job=None,
) -> tuple[str | None, str | None, str]:
"""Validate ownership and build the shared generation context."""
@@ -136,6 +174,18 @@ async def _prepare_generation_context(
resolved_universe_id,
db,
)
await _record_job_event_if_present(
db,
job=job,
event_type="context_prepared",
status="succeeded",
message="Profile, universe, and memory context were prepared.",
metadata={
"profile_id": resolved_profile_id,
"universe_id": resolved_universe_id,
"has_memory_context": bool(memory_context),
},
)
return resolved_profile_id, resolved_universe_id, memory_context
@@ -173,6 +223,7 @@ async def _persist_text_story_result(
profile_id: str | None,
universe_id: str | None,
db: AsyncSession,
job=None,
) -> Story:
"""Persist generated text content as the unified story record."""
@@ -195,6 +246,20 @@ async def _persist_text_story_result(
await db.commit()
await db.refresh(story)
_trigger_story_postprocessing(story)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="story_saved",
status="succeeded",
message="Readable story record was saved.",
metadata={
"mode": story.mode,
"generation_status": story.generation_status,
"image_status": story.image_status,
"audio_status": story.audio_status,
},
)
return story
@@ -229,21 +294,47 @@ def _storybook_pages_to_response(pages_data: list[dict]) -> list[StorybookPageRe
async def _generate_storybook_image_assets(
storybook: Storybook,
db: AsyncSession,
*,
user_id: str,
job=None,
) -> tuple[str | None, bool, list[int]]:
"""Generate storybook cover and page images before persistence."""
final_cover_url = storybook.cover_url
cover_failed = False
failed_pages: list[int] = []
completed_pages: list[int] = []
attempted_cover = bool(storybook.cover_prompt and not storybook.cover_url)
attempted_pages = [
page.page_number
for page in storybook.pages
if page.image_prompt and not page.image_url
]
logger.info("storybook_parallel_generation_start", page_count=len(storybook.pages))
await _record_job_event_if_present(
db,
job=job,
event_type="storybook_images_started",
status="running",
message="Storybook cover and page image generation started.",
metadata={
"attempted_cover": attempted_cover,
"attempted_pages": attempted_pages,
},
)
async def _gen_cover() -> str | None:
nonlocal cover_failed
if storybook.cover_prompt and not storybook.cover_url:
try:
return await generate_image(storybook.cover_prompt, db=db)
return await generate_image(
storybook.cover_prompt,
db=db,
user_id=user_id,
generation_job=job,
)
except Exception as exc:
cover_failed = True
logger.warning("cover_gen_failed", error=str(exc))
@@ -254,7 +345,13 @@ async def _generate_storybook_image_assets(
return
try:
page.image_url = await generate_image(page.image_prompt, db=db)
page.image_url = await generate_image(
page.image_prompt,
db=db,
user_id=user_id,
generation_job=job,
)
completed_pages.append(page.page_number)
except Exception as exc:
failed_pages.append(page.page_number)
logger.warning("page_gen_failed", page=page.page_number, error=str(exc))
@@ -270,6 +367,57 @@ async def _generate_storybook_image_assets(
final_cover_url = cover_result
logger.info("storybook_parallel_generation_complete")
if attempted_cover:
await _record_job_event_if_present(
db,
job=job,
event_type=(
"storybook_cover_image_failed"
if cover_failed
else "storybook_cover_image_succeeded"
),
status="failed" if cover_failed else "succeeded",
message=(
"Storybook cover image generation failed."
if cover_failed
else "Storybook cover image was generated."
),
metadata={"asset": "image", "scope": "cover"},
)
for page_number in sorted(completed_pages):
await _record_job_event_if_present(
db,
job=job,
event_type="storybook_page_image_succeeded",
status="succeeded",
message="Storybook page image was generated.",
metadata={"asset": "image", "scope": "page", "page_number": page_number},
)
for page_number in sorted(failed_pages):
await _record_job_event_if_present(
db,
job=job,
event_type="storybook_page_image_failed",
status="failed",
message="Storybook page image generation failed.",
metadata={"asset": "image", "scope": "page", "page_number": page_number},
)
await _record_job_event_if_present(
db,
job=job,
event_type="storybook_images_completed",
status="failed" if cover_failed or failed_pages else "succeeded",
message="Storybook image generation finished.",
metadata={
"asset": "image",
"attempted_cover": attempted_cover,
"completed_pages": sorted(completed_pages),
"failed_pages": sorted(failed_pages),
},
)
return final_cover_url, cover_failed, failed_pages
@@ -284,6 +432,7 @@ async def _persist_storybook_result(
cover_failed: bool,
failed_pages: list[int],
db: AsyncSession,
job=None,
) -> tuple[Story, list[dict]]:
"""Persist generated storybook content as the unified story record."""
@@ -317,6 +466,21 @@ async def _persist_storybook_result(
await db.commit()
await db.refresh(story)
_trigger_story_postprocessing(story)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="story_saved",
status="succeeded",
message="Storybook record was saved.",
metadata={
"mode": story.mode,
"page_count": len(pages_data),
"generation_status": story.generation_status,
"image_status": story.image_status,
"audio_status": story.audio_status,
},
)
return story, pages_data
@@ -327,6 +491,7 @@ async def _complete_cover_image_asset(
raise_on_failure: bool = False,
last_error_prefix: str | None = None,
log_event: str = "cover_asset_generation_failed",
job=None,
) -> AssetCompletionResult:
"""Generate or retry a text story cover through one asset workflow."""
@@ -335,18 +500,43 @@ async def _complete_cover_image_asset(
sync_story_status(story, image_status=StoryAssetStatus.GENERATING)
await db.commit()
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="cover_image_started",
status="running",
message="Cover image generation started.",
metadata={"asset": "image", "cover_prompt_present": True},
)
try:
image_url = await generate_image(story.cover_prompt, db=db)
image_url = await generate_image(
story.cover_prompt,
db=db,
user_id=story.user_id,
generation_job=job,
story_id=story.id,
)
story.image_url = image_url
sync_story_status(story, image_status=StoryAssetStatus.READY)
await db.commit()
return AssetCompletionResult(
result = AssetCompletionResult(
asset="cover_image",
status=StoryAssetStatus.READY,
value=image_url,
blocks_main_result=raise_on_failure,
)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="cover_image_succeeded",
status="succeeded",
message="Cover image was generated.",
metadata=_asset_result_metadata(result),
)
return result
except Exception as exc:
provider_error = str(exc)
last_error = (
@@ -362,18 +552,28 @@ async def _complete_cover_image_asset(
await db.commit()
logger.warning(log_event, story_id=story.id, error=provider_error)
result = AssetCompletionResult(
asset="cover_image",
status=StoryAssetStatus.FAILED,
error=provider_error,
blocks_main_result=raise_on_failure,
)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="cover_image_failed",
status="failed",
message="Cover image generation failed.",
metadata=_asset_result_metadata(result),
)
if raise_on_failure:
raise HTTPException(
status_code=500,
detail=f"Image generation failed: {provider_error}",
) from exc
return AssetCompletionResult(
asset="cover_image",
status=StoryAssetStatus.FAILED,
error=provider_error,
blocks_main_result=raise_on_failure,
)
return result
def _get_storybook_pages_data(story: Story) -> list[dict]:
@@ -385,6 +585,8 @@ def _get_storybook_pages_data(story: Story) -> list[dict]:
async def _complete_storybook_image_assets(
story: Story,
db: AsyncSession,
*,
job=None,
) -> AssetCompletionResult:
"""Complete missing cover/page images for a persisted storybook."""
@@ -397,13 +599,38 @@ async def _complete_storybook_image_assets(
sync_story_status(story, image_status=StoryAssetStatus.GENERATING)
await db.commit()
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="storybook_images_started",
status="running",
message="Storybook missing image completion started.",
metadata={"asset": "image"},
)
cover_failed = False
failed_pages: list[int] = []
completed_pages: list[int] = []
if story.cover_prompt and not story.image_url:
try:
story.image_url = await generate_image(story.cover_prompt, db=db)
story.image_url = await generate_image(
story.cover_prompt,
db=db,
user_id=story.user_id,
generation_job=job,
story_id=story.id,
)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="storybook_cover_image_succeeded",
status="succeeded",
message="Storybook cover image was generated.",
metadata={"asset": "image", "scope": "cover"},
)
except Exception as exc:
cover_failed = True
logger.warning(
@@ -411,13 +638,40 @@ async def _complete_storybook_image_assets(
story_id=story.id,
error=str(exc),
)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="storybook_cover_image_failed",
status="failed",
message="Storybook cover image generation failed.",
metadata={"asset": "image", "scope": "cover", "error": str(exc)},
)
for page in pages_data:
if not page.get("image_prompt") or page.get("image_url"):
continue
try:
page["image_url"] = await generate_image(page["image_prompt"], db=db)
page["image_url"] = await generate_image(
page["image_prompt"],
db=db,
user_id=story.user_id,
generation_job=job,
story_id=story.id,
)
page_number = page.get("page_number")
if isinstance(page_number, int):
completed_pages.append(page_number)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="storybook_page_image_succeeded",
status="succeeded",
message="Storybook page image was generated.",
metadata={"asset": "image", "scope": "page", "page_number": page_number},
)
except Exception as exc:
page_number = page.get("page_number")
if isinstance(page_number, int):
@@ -428,6 +682,20 @@ async def _complete_storybook_image_assets(
page=page_number,
error=str(exc),
)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="storybook_page_image_failed",
status="failed",
message="Storybook page image generation failed.",
metadata={
"asset": "image",
"scope": "page",
"page_number": page_number,
"error": str(exc),
},
)
story.pages = pages_data
error_message = _build_storybook_error_message(
@@ -446,12 +714,26 @@ async def _complete_storybook_image_assets(
last_error=error_message,
)
await db.commit()
return AssetCompletionResult(
result = AssetCompletionResult(
asset="storybook_images",
status=image_status,
value=story.image_url,
error=error_message,
)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="storybook_images_completed",
status="failed" if error_message else "succeeded",
message="Storybook image completion finished.",
metadata={
**_asset_result_metadata(result),
"completed_pages": sorted(completed_pages),
"failed_pages": sorted(failed_pages),
},
)
return result
async def _read_cached_audio_asset(story: Story, db: AsyncSession) -> bytes | None:
@@ -482,6 +764,7 @@ async def _complete_audio_asset(
db: AsyncSession,
*,
raise_on_failure: bool = True,
job=None,
) -> AssetCompletionResult:
"""Complete TTS audio generation through one asset workflow."""
@@ -490,32 +773,67 @@ async def _complete_audio_asset(
cached_audio = await _read_cached_audio_asset(story, db)
if cached_audio is not None:
return AssetCompletionResult(
result = AssetCompletionResult(
asset="audio",
status=StoryAssetStatus.READY,
value=cached_audio,
blocks_main_result=raise_on_failure,
)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="audio_cache_hit",
status="succeeded",
message="Cached story audio was reused.",
metadata=_asset_result_metadata(result),
)
return result
from app.services.provider_router import text_to_speech
sync_story_status(story, audio_status=StoryAssetStatus.GENERATING)
await db.commit()
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="audio_started",
status="running",
message="Story audio generation started.",
metadata={"asset": "audio"},
)
try:
audio_data = await text_to_speech(story.story_text, db=db)
audio_data = await text_to_speech(
story.story_text,
db=db,
user_id=story.user_id,
generation_job=job,
story_id=story.id,
)
story.audio_path = write_story_audio_cache(story.id, audio_data)
sync_story_status(
story,
audio_status=StoryAssetStatus.READY,
)
await db.commit()
return AssetCompletionResult(
result = AssetCompletionResult(
asset="audio",
status=StoryAssetStatus.READY,
value=audio_data,
blocks_main_result=raise_on_failure,
)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="audio_succeeded",
status="succeeded",
message="Story audio was generated and cached.",
metadata=_asset_result_metadata(result),
)
return result
except Exception as exc:
provider_error = str(exc)
story.audio_path = None
@@ -527,18 +845,28 @@ async def _complete_audio_asset(
await db.commit()
logger.error("audio_generation_failed", story_id=story.id, error=provider_error)
result = AssetCompletionResult(
asset="audio",
status=StoryAssetStatus.FAILED,
error=provider_error,
blocks_main_result=raise_on_failure,
)
await _record_job_event_if_present(
db,
job=job,
story_id=story.id,
event_type="audio_failed",
status="failed",
message="Story audio generation failed.",
metadata=_asset_result_metadata(result),
)
if raise_on_failure:
raise HTTPException(
status_code=500,
detail=f"Audio generation failed: {provider_error}",
) from exc
return AssetCompletionResult(
asset="audio",
status=StoryAssetStatus.FAILED,
error=provider_error,
blocks_main_result=raise_on_failure,
)
return result
async def validate_profile_and_universe(
@@ -586,6 +914,8 @@ async def generate_and_save_story(
request: GenerateRequest,
user_id: str,
db: AsyncSession,
*,
job=None,
) -> Story:
"""Generate generic story content and save to DB."""
profile_id, universe_id, memory_context = await _prepare_generation_context(
@@ -593,21 +923,32 @@ async def generate_and_save_story(
universe_id=request.universe_id,
user_id=user_id,
db=db,
job=job,
)
try:
result = await generate_story_content(
input_type=request.type,
data=request.data,
education_theme=request.education_theme,
memory_context=memory_context,
db=db,
)
education_theme=request.education_theme,
memory_context=memory_context,
db=db,
user_id=user_id,
generation_job=job,
)
except Exception as exc:
raise HTTPException(
status_code=502,
detail="Story generation failed, please try again.",
) from exc
await _record_job_event_if_present(
db,
job=job,
event_type="narrative_generated",
status="succeeded",
message="Story narrative was generated.",
metadata={"mode": result.mode, "title": result.title},
)
return await _persist_text_story_result(
result=result,
@@ -615,6 +956,7 @@ async def generate_and_save_story(
profile_id=profile_id,
universe_id=universe_id,
db=db,
job=job,
)
@@ -622,9 +964,11 @@ async def generate_full_story_service(
request: GenerateRequest,
user_id: str,
db: AsyncSession,
*,
job=None,
) -> FullStoryResponse:
"""Generate story with parallel image generation."""
story = await generate_and_save_story(request, user_id, db)
story = await generate_and_save_story(request, user_id, db, job=job)
image_url: str | None = None
errors: dict[str, str | None] = {}
@@ -633,6 +977,7 @@ async def generate_full_story_service(
story,
db,
log_event="image_generation_failed",
job=job,
)
if image_result.succeeded and isinstance(image_result.value, str):
image_url = image_result.value
@@ -651,6 +996,7 @@ async def generate_full_story_service(
child_profile_id=story.child_profile_id,
universe_id=story.universe_id,
generation_status=story.generation_status,
text_status=story.text_status,
image_status=story.image_status,
audio_status=story.audio_status,
last_error=story.last_error,
@@ -662,6 +1008,8 @@ async def generate_storybook_service(
request: StorybookRequest,
user_id: str,
db: AsyncSession,
*,
job=None,
) -> StorybookResponse:
"""Generate storybook with parallel image generation for pages."""
profile_id, universe_id, memory_context = await _prepare_generation_context(
@@ -669,6 +1017,7 @@ async def generate_storybook_service(
universe_id=request.universe_id,
user_id=user_id,
db=db,
job=job,
)
logger.info(
@@ -684,13 +1033,27 @@ async def generate_storybook_service(
storybook = await generate_storybook(
keywords=request.keywords,
page_count=request.page_count,
education_theme=request.education_theme,
memory_context=memory_context,
db=db,
)
except Exception as e:
education_theme=request.education_theme,
memory_context=memory_context,
db=db,
user_id=user_id,
generation_job=job,
)
except Exception as e:
logger.error("storybook_generation_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"故事书生成失败: {e}")
await _record_job_event_if_present(
db,
job=job,
event_type="narrative_generated",
status="succeeded",
message="Storybook narrative and page plan were generated.",
metadata={
"mode": "storybook",
"title": storybook.title,
"page_count": len(storybook.pages),
},
)
final_cover_url = storybook.cover_url
cover_failed = False
@@ -701,7 +1064,12 @@ async def generate_storybook_service(
final_cover_url,
cover_failed,
failed_pages,
) = await _generate_storybook_image_assets(storybook, db)
) = await _generate_storybook_image_assets(
storybook,
db,
user_id=user_id,
job=job,
)
story, pages_data = await _persist_storybook_result(
storybook=storybook,
@@ -713,6 +1081,7 @@ async def generate_storybook_service(
cover_failed=cover_failed,
failed_pages=failed_pages,
db=db,
job=job,
)
response_pages = _storybook_pages_to_response(pages_data)
@@ -726,6 +1095,7 @@ async def generate_storybook_service(
cover_prompt=storybook.cover_prompt,
cover_url=final_cover_url,
generation_status=story.generation_status,
text_status=story.text_status,
image_status=story.image_status,
audio_status=story.audio_status,
last_error=story.last_error,
@@ -797,6 +1167,7 @@ async def _generate_generation_service_with_job(
),
user_id,
db,
job=job,
)
if storybook.id is None:
raise HTTPException(status_code=500, detail="Storybook generation did not persist.")
@@ -812,6 +1183,7 @@ async def _generate_generation_service_with_job(
)
return GenerationResponse(
id=storybook.id,
generation_job_id=job.id,
title=storybook.title,
mode="storybook",
pages=storybook.pages,
@@ -821,6 +1193,7 @@ async def _generate_generation_service_with_job(
main_character=storybook.main_character,
art_style=storybook.art_style,
generation_status=storybook.generation_status,
text_status=saved_story.text_status,
image_status=storybook.image_status,
audio_status=storybook.audio_status,
last_error=storybook.last_error,
@@ -838,7 +1211,7 @@ async def _generate_generation_service_with_job(
)
if request.generate_images:
story = await generate_full_story_service(generate_request, user_id, db)
story = await generate_full_story_service(generate_request, user_id, db, job=job)
saved_story = await get_story_detail(story.id, user_id, db)
await _record_postprocessing_event_if_needed(db, job=job, story=saved_story)
await finish_generation_job(
@@ -850,6 +1223,7 @@ async def _generate_generation_service_with_job(
)
return GenerationResponse(
id=story.id,
generation_job_id=job.id,
title=story.title,
mode=story.mode,
story_text=story.story_text,
@@ -859,6 +1233,7 @@ async def _generate_generation_service_with_job(
audio_ready=story.audio_ready,
errors=story.errors,
generation_status=story.generation_status,
text_status=saved_story.text_status,
image_status=story.image_status,
audio_status=story.audio_status,
last_error=story.last_error,
@@ -867,7 +1242,7 @@ async def _generate_generation_service_with_job(
retryable_assets=saved_story.retryable_assets,
)
story = await generate_and_save_story(generate_request, user_id, db)
story = await generate_and_save_story(generate_request, user_id, db, job=job)
await _record_postprocessing_event_if_needed(db, job=job, story=story)
await finish_generation_job(
db,
@@ -878,6 +1253,7 @@ async def _generate_generation_service_with_job(
)
return GenerationResponse(
id=story.id,
generation_job_id=job.id,
title=story.title,
mode=story.mode,
story_text=story.story_text,
@@ -885,6 +1261,7 @@ async def _generate_generation_service_with_job(
image_url=story.image_url,
cover_url=story.image_url,
generation_status=story.generation_status,
text_status=story.text_status,
image_status=story.image_status,
audio_status=story.audio_status,
last_error=story.last_error,
@@ -954,7 +1331,7 @@ async def create_story_from_result(
)
async def _retry_cover_image_asset(story: Story, db: AsyncSession) -> None:
async def _retry_cover_image_asset(story: Story, db: AsyncSession, *, job=None) -> None:
"""Retry cover generation for a text story."""
await _complete_cover_image_asset(
@@ -962,19 +1339,25 @@ async def _retry_cover_image_asset(story: Story, db: AsyncSession) -> None:
db,
last_error_prefix="封面生成失败",
log_event="cover_asset_retry_failed",
job=job,
)
async def _retry_storybook_image_assets(story: Story, db: AsyncSession) -> None:
async def _retry_storybook_image_assets(
story: Story,
db: AsyncSession,
*,
job=None,
) -> None:
"""Retry missing storybook cover/page images."""
await _complete_storybook_image_assets(story, db)
await _complete_storybook_image_assets(story, db, job=job)
async def _retry_audio_asset(story: Story, db: AsyncSession) -> None:
async def _retry_audio_asset(story: Story, db: AsyncSession, *, job=None) -> None:
"""Retry audio generation while preserving persisted status on provider failure."""
await _complete_audio_asset(story, db, raise_on_failure=False)
await _complete_audio_asset(story, db, raise_on_failure=False, job=job)
async def retry_story_assets(
@@ -1009,12 +1392,12 @@ async def retry_story_assets(
if "image" in requested_assets:
if story.mode == "storybook":
await _retry_storybook_image_assets(story, db)
await _retry_storybook_image_assets(story, db, job=job)
else:
await _retry_cover_image_asset(story, db)
await _retry_cover_image_asset(story, db, job=job)
if "audio" in requested_assets:
await _retry_audio_asset(story, db)
await _retry_audio_asset(story, db, job=job)
story = await get_story_detail(story_id, user_id, db)
await finish_generation_job(
@@ -1075,6 +1458,7 @@ async def generate_story_cover(
db,
raise_on_failure=True,
log_event="cover_generation_failed",
job=job,
)
story = await get_story_detail(story_id, user_id, db)
await finish_generation_job(
@@ -1121,7 +1505,12 @@ async def generate_story_audio(
try:
story = await get_story_detail(story_id, user_id, db)
audio_result = await _complete_audio_asset(story, db, raise_on_failure=True)
audio_result = await _complete_audio_asset(
story,
db,
raise_on_failure=True,
job=job,
)
story = await get_story_detail(story_id, user_id, db)
await finish_generation_job(
db,