refactor: consolidate generation workflow helpers

This commit is contained in:
2026-04-18 13:03:23 +08:00
parent e201fa3358
commit ae7bd79267
4 changed files with 361 additions and 248 deletions

View File

@@ -19,6 +19,8 @@ from app.schemas.story_schemas import (
StorybookRequest, StorybookRequest,
StorybookResponse, StorybookResponse,
) )
from app.services.adapters.storybook.primary import Storybook
from app.services.adapters.text.models import StoryOutput
from app.services.audio_storage import ( from app.services.audio_storage import (
audio_cache_exists, audio_cache_exists,
read_audio_cache, read_audio_cache,
@@ -91,6 +93,233 @@ def _resolve_storybook_image_status(
return StoryAssetStatus.FAILED return StoryAssetStatus.FAILED
async def _prepare_generation_context(
*,
profile_id: str | None,
universe_id: str | None,
user_id: str,
db: AsyncSession,
) -> tuple[str | None, str | None, str]:
"""Validate ownership and build the shared generation context."""
resolved_profile_id, resolved_universe_id = await validate_profile_and_universe(
profile_id, universe_id, user_id, db
)
memory_context = await build_enhanced_memory_context(
resolved_profile_id,
resolved_universe_id,
db,
)
return resolved_profile_id, resolved_universe_id, memory_context
def _trigger_story_postprocessing(story: Story) -> None:
"""Trigger non-blocking post-processing for a persisted story."""
if story.universe_id:
extract_story_achievements.delay(story.id, story.universe_id)
async def _persist_text_story_result(
*,
result: StoryOutput,
user_id: str,
profile_id: str | None,
universe_id: str | None,
db: AsyncSession,
) -> Story:
"""Persist generated text content as the unified story record."""
story = Story(
user_id=user_id,
child_profile_id=profile_id,
universe_id=universe_id,
title=result.title,
story_text=result.story_text,
cover_prompt=result.cover_prompt_suggestion,
mode=result.mode,
)
sync_story_status(
story,
image_status=StoryAssetStatus.NOT_REQUESTED,
audio_status=StoryAssetStatus.NOT_REQUESTED,
last_error=None,
)
db.add(story)
await db.commit()
await db.refresh(story)
_trigger_story_postprocessing(story)
return story
def _storybook_pages_to_data(storybook: Storybook) -> list[dict]:
"""Convert generated storybook pages to the persisted JSON shape."""
return [
{
"page_number": page.page_number,
"text": page.text,
"image_prompt": page.image_prompt,
"image_url": page.image_url,
}
for page in storybook.pages
]
def _storybook_pages_to_response(pages_data: list[dict]) -> list[StorybookPageResponse]:
"""Convert persisted storybook page JSON to API response models."""
return [
StorybookPageResponse(
page_number=page["page_number"],
text=page["text"],
image_prompt=page["image_prompt"],
image_url=page.get("image_url"),
)
for page in pages_data
]
async def _generate_storybook_image_assets(
storybook: Storybook,
db: AsyncSession,
) -> tuple[str | None, bool, list[int]]:
"""Generate storybook cover and page images before persistence."""
final_cover_url = storybook.cover_url
cover_failed = False
failed_pages: list[int] = []
logger.info("storybook_parallel_generation_start", page_count=len(storybook.pages))
async def _gen_cover() -> str | None:
nonlocal cover_failed
if storybook.cover_prompt and not storybook.cover_url:
try:
return await generate_image(storybook.cover_prompt, db=db)
except Exception as exc:
cover_failed = True
logger.warning("cover_gen_failed", error=str(exc))
return storybook.cover_url
async def _gen_page(page) -> None:
if not page.image_prompt or page.image_url:
return
try:
page.image_url = await generate_image(page.image_prompt, db=db)
except Exception as exc:
failed_pages.append(page.page_number)
logger.warning("page_gen_failed", page=page.page_number, error=str(exc))
results = await asyncio.gather(
_gen_cover(),
*(_gen_page(page) for page in storybook.pages),
return_exceptions=True,
)
cover_result = results[0]
if isinstance(cover_result, str):
final_cover_url = cover_result
logger.info("storybook_parallel_generation_complete")
return final_cover_url, cover_failed, failed_pages
async def _persist_storybook_result(
*,
storybook: Storybook,
user_id: str,
profile_id: str | None,
universe_id: str | None,
final_cover_url: str | None,
generate_images: bool,
cover_failed: bool,
failed_pages: list[int],
db: AsyncSession,
) -> tuple[Story, list[dict]]:
"""Persist generated storybook content as the unified story record."""
pages_data = _storybook_pages_to_data(storybook)
story = Story(
user_id=user_id,
child_profile_id=profile_id,
universe_id=universe_id,
title=storybook.title,
mode="storybook",
pages=pages_data,
story_text=None,
cover_prompt=storybook.cover_prompt,
image_url=final_cover_url,
)
sync_story_status(
story,
image_status=_resolve_storybook_image_status(
generate_images=generate_images,
cover_prompt=storybook.cover_prompt,
cover_url=final_cover_url,
pages_data=pages_data,
),
audio_status=StoryAssetStatus.NOT_REQUESTED,
last_error=_build_storybook_error_message(
cover_failed=cover_failed,
failed_pages=failed_pages,
),
)
db.add(story)
await db.commit()
await db.refresh(story)
_trigger_story_postprocessing(story)
return story, pages_data
async def _complete_cover_image_asset(
story: Story,
db: AsyncSession,
*,
raise_on_failure: bool = False,
last_error_prefix: str | None = None,
log_event: str = "cover_asset_generation_failed",
) -> tuple[str | None, str | None]:
"""Generate or retry a text story cover through one asset workflow."""
if not story.cover_prompt:
raise HTTPException(status_code=400, detail="Story has no cover prompt")
sync_story_status(story, image_status=StoryAssetStatus.GENERATING)
await db.commit()
try:
image_url = await generate_image(story.cover_prompt, db=db)
story.image_url = image_url
sync_story_status(story, image_status=StoryAssetStatus.READY)
await db.commit()
return image_url, None
except Exception as exc:
provider_error = str(exc)
last_error = (
f"{last_error_prefix}: {provider_error}"
if last_error_prefix
else provider_error
)
sync_story_status(
story,
image_status=StoryAssetStatus.FAILED,
last_error=last_error,
)
await db.commit()
logger.warning(log_event, story_id=story.id, error=provider_error)
if raise_on_failure:
raise HTTPException(
status_code=500,
detail=f"Image generation failed: {provider_error}",
) from exc
return None, provider_error
async def validate_profile_and_universe( async def validate_profile_and_universe(
profile_id: str | None, profile_id: str | None,
universe_id: str | None, universe_id: str | None,
@@ -138,15 +367,13 @@ async def generate_and_save_story(
db: AsyncSession, db: AsyncSession,
) -> Story: ) -> Story:
"""Generate generic story content and save to DB.""" """Generate generic story content and save to DB."""
# 1. Validate profile_id, universe_id, memory_context = await _prepare_generation_context(
profile_id, universe_id = await validate_profile_and_universe( profile_id=request.child_profile_id,
request.child_profile_id, request.universe_id, user_id, db universe_id=request.universe_id,
user_id=user_id,
db=db,
) )
# 2. Build Context
memory_context = await build_enhanced_memory_context(profile_id, universe_id, db)
# 3. Generate
try: try:
result = await generate_story_content( result = await generate_story_content(
input_type=request.type, input_type=request.type,
@@ -161,31 +388,13 @@ async def generate_and_save_story(
detail="Story generation failed, please try again.", detail="Story generation failed, please try again.",
) from exc ) from exc
# 4. Save return await _persist_text_story_result(
story = Story( result=result,
user_id=user_id, user_id=user_id,
child_profile_id=profile_id, profile_id=profile_id,
universe_id=universe_id, universe_id=universe_id,
title=result.title, db=db,
story_text=result.story_text,
cover_prompt=result.cover_prompt_suggestion,
mode=result.mode,
) )
sync_story_status(
story,
image_status=StoryAssetStatus.NOT_REQUESTED,
audio_status=StoryAssetStatus.NOT_REQUESTED,
last_error=None,
)
db.add(story)
await db.commit()
await db.refresh(story)
# 5. Trigger Async Tasks
if universe_id:
extract_story_achievements.delay(story.id, universe_id)
return story
async def generate_full_story_service( async def generate_full_story_service(
@@ -194,35 +403,18 @@ async def generate_full_story_service(
db: AsyncSession, db: AsyncSession,
) -> FullStoryResponse: ) -> FullStoryResponse:
"""Generate story with parallel image generation.""" """Generate story with parallel image generation."""
# 1. Generate text part
# We can reuse logic or call generate_story_content directly if we want finer control
# reusing generate_and_save_story to ensure consistency (it handles validation + saving)
story = await generate_and_save_story(request, user_id, db) story = await generate_and_save_story(request, user_id, db)
# 2. Generate Image (Parallel/Async step in this flow)
image_url: str | None = None image_url: str | None = None
errors: dict[str, str | None] = {} errors: dict[str, str | None] = {}
if story.cover_prompt: if story.cover_prompt:
sync_story_status(story, image_status=StoryAssetStatus.GENERATING) image_url, image_error = await _complete_cover_image_asset(
await db.commit()
try:
image_url = await generate_image(story.cover_prompt, db=db)
story.image_url = image_url
sync_story_status(
story, story,
image_status=StoryAssetStatus.READY, db,
log_event="image_generation_failed",
) )
await db.commit() if image_error:
except Exception as exc: errors["image"] = image_error
errors["image"] = str(exc)
sync_story_status(
story,
image_status=StoryAssetStatus.FAILED,
last_error=str(exc),
)
await db.commit()
logger.warning("image_generation_failed", story_id=story.id, error=str(exc))
return FullStoryResponse( return FullStoryResponse(
id=story.id, id=story.id,
@@ -248,9 +440,11 @@ async def generate_storybook_service(
db: AsyncSession, db: AsyncSession,
) -> StorybookResponse: ) -> StorybookResponse:
"""Generate storybook with parallel image generation for pages.""" """Generate storybook with parallel image generation for pages."""
# 1. Validate profile_id, universe_id, memory_context = await _prepare_generation_context(
profile_id, universe_id = await validate_profile_and_universe( profile_id=request.child_profile_id,
request.child_profile_id, request.universe_id, user_id, db universe_id=request.universe_id,
user_id=user_id,
db=db,
) )
logger.info( logger.info(
@@ -262,10 +456,6 @@ async def generate_storybook_service(
universe_id=universe_id, universe_id=universe_id,
) )
# 2. Context
memory_context = await build_enhanced_memory_context(profile_id, universe_id, db)
# 3. Generate Text Structure
try: try:
storybook = await generate_storybook( storybook = await generate_storybook(
keywords=request.keywords, keywords=request.keywords,
@@ -278,101 +468,30 @@ async def generate_storybook_service(
logger.error("storybook_generation_failed", error=str(e)) logger.error("storybook_generation_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"故事书生成失败: {e}") raise HTTPException(status_code=500, detail=f"故事书生成失败: {e}")
# 4. Parallel Image Generation
final_cover_url = storybook.cover_url final_cover_url = storybook.cover_url
cover_failed = False cover_failed = False
failed_pages: list[int] = [] failed_pages: list[int] = []
if request.generate_images: if request.generate_images:
logger.info("storybook_parallel_generation_start", page_count=len(storybook.pages)) (
final_cover_url,
cover_failed,
failed_pages,
) = await _generate_storybook_image_assets(storybook, db)
tasks = [] story, pages_data = await _persist_storybook_result(
storybook=storybook,
async def _gen_cover():
nonlocal cover_failed
if storybook.cover_prompt and not storybook.cover_url:
try:
return await generate_image(storybook.cover_prompt, db=db)
except Exception as exc:
cover_failed = True
logger.warning("cover_gen_failed", error=str(exc))
return storybook.cover_url
tasks.append(_gen_cover())
async def _gen_page(page):
if page.image_prompt and not page.image_url:
try:
page.image_url = await generate_image(page.image_prompt, db=db)
except Exception as exc:
failed_pages.append(page.page_number)
logger.warning("page_gen_failed", page=page.page_number, error=str(exc))
for page in storybook.pages:
tasks.append(_gen_page(page))
results = await asyncio.gather(*tasks, return_exceptions=True)
cover_res = results[0]
if isinstance(cover_res, str):
final_cover_url = cover_res
logger.info("storybook_parallel_generation_complete")
# 5. Save to DB
pages_data = [
{
"page_number": p.page_number,
"text": p.text,
"image_prompt": p.image_prompt,
"image_url": p.image_url,
}
for p in storybook.pages
]
story = Story(
user_id=user_id, user_id=user_id,
child_profile_id=profile_id, profile_id=profile_id,
universe_id=universe_id, universe_id=universe_id,
title=storybook.title, final_cover_url=final_cover_url,
mode="storybook",
pages=pages_data,
story_text=None,
cover_prompt=storybook.cover_prompt,
image_url=final_cover_url,
)
sync_story_status(
story,
image_status=_resolve_storybook_image_status(
generate_images=request.generate_images, generate_images=request.generate_images,
cover_prompt=storybook.cover_prompt,
cover_url=final_cover_url,
pages_data=pages_data,
),
audio_status=StoryAssetStatus.NOT_REQUESTED,
last_error=_build_storybook_error_message(
cover_failed=cover_failed, cover_failed=cover_failed,
failed_pages=failed_pages, failed_pages=failed_pages,
), db=db,
) )
db.add(story)
await db.commit()
await db.refresh(story)
if universe_id: response_pages = _storybook_pages_to_response(pages_data)
extract_story_achievements.delay(story.id, universe_id)
# 6. Build Response
response_pages = [
StorybookPageResponse(
page_number=p["page_number"],
text=p["text"],
image_prompt=p["image_prompt"],
image_url=p.get("image_url"),
)
for p in pages_data
]
return StorybookResponse( return StorybookResponse(
id=story.id, id=story.id,
@@ -523,59 +642,31 @@ async def delete_story(
async def create_story_from_result( async def create_story_from_result(
result, # StoryOutput result: StoryOutput,
user_id: str, user_id: str,
profile_id: str | None, profile_id: str | None,
universe_id: str | None, universe_id: str | None,
db: AsyncSession, db: AsyncSession,
) -> Story: ) -> Story:
"""Save a generated story to DB (helper for stream endpoint).""" """Save a generated story to DB (helper for stream endpoint)."""
story = Story( return await _persist_text_story_result(
result=result,
user_id=user_id, user_id=user_id,
child_profile_id=profile_id, profile_id=profile_id,
universe_id=universe_id, universe_id=universe_id,
title=result.title, db=db,
story_text=result.story_text,
cover_prompt=result.cover_prompt_suggestion,
mode=result.mode,
) )
sync_story_status(
story,
image_status=StoryAssetStatus.NOT_REQUESTED,
audio_status=StoryAssetStatus.NOT_REQUESTED,
last_error=None,
)
db.add(story)
await db.commit()
await db.refresh(story)
if universe_id:
extract_story_achievements.delay(story.id, universe_id)
return story
async def _retry_cover_image_asset(story: Story, db: AsyncSession) -> None: async def _retry_cover_image_asset(story: Story, db: AsyncSession) -> None:
"""Retry cover generation for a text story.""" """Retry cover generation for a text story."""
if not story.cover_prompt: await _complete_cover_image_asset(
raise HTTPException(status_code=400, detail="Story has no cover prompt")
sync_story_status(story, image_status=StoryAssetStatus.GENERATING)
await db.commit()
try:
story.image_url = await generate_image(story.cover_prompt, db=db)
sync_story_status(story, image_status=StoryAssetStatus.READY)
except Exception as exc:
sync_story_status(
story, story,
image_status=StoryAssetStatus.FAILED, db,
last_error=f"封面生成失败: {exc}", last_error_prefix="封面生成失败",
log_event="cover_asset_retry_failed",
) )
logger.error("cover_asset_retry_failed", story_id=story.id, error=str(exc))
await db.commit()
async def _retry_storybook_image_assets(story: Story, db: AsyncSession) -> None: async def _retry_storybook_image_assets(story: Story, db: AsyncSession) -> None:
@@ -682,30 +773,16 @@ async def generate_story_cover(
"""Generate cover image for an existing story.""" """Generate cover image for an existing story."""
story = await get_story_detail(story_id, user_id, db) story = await get_story_detail(story_id, user_id, db)
if not story.cover_prompt: image_url, _ = await _complete_cover_image_asset(
raise HTTPException(status_code=400, detail="Story has no cover prompt")
sync_story_status(story, image_status=StoryAssetStatus.GENERATING)
await db.commit()
try:
image_url = await generate_image(story.cover_prompt, db=db)
story.image_url = image_url
sync_story_status(
story, story,
image_status=StoryAssetStatus.READY, db,
raise_on_failure=True,
log_event="cover_generation_failed",
) )
await db.commit() if image_url is not None:
return image_url return image_url
except Exception as e:
sync_story_status( raise HTTPException(status_code=500, detail="Image generation failed")
story,
image_status=StoryAssetStatus.FAILED,
last_error=str(e),
)
await db.commit()
logger.error("cover_generation_failed", story_id=story_id, error=str(e))
raise HTTPException(status_code=500, detail=f"Image generation failed: {e}")
async def generate_story_audio( async def generate_story_audio(

View File

@@ -428,6 +428,33 @@ class TestUnifiedGenerations:
assert data["generation_status"] == "narrative_ready" assert data["generation_status"] == "narrative_ready"
assert data["image_status"] == "not_requested" assert data["image_status"] == "not_requested"
def test_create_story_generation_image_failure(
self,
auth_client: TestClient,
mock_text_provider,
):
with patch("app.services.story_service.generate_image", new_callable=AsyncMock) as mock_img:
mock_img.side_effect = Exception("Image API error")
response = auth_client.post(
"/api/generations",
json={
"output_mode": "story",
"type": "keywords",
"data": "小兔子, 森林",
"generate_images": True,
},
)
assert response.status_code == 200
data = response.json()
assert data["image_url"] is None
assert data["generation_status"] == "degraded_completed"
assert data["image_status"] == "failed"
assert data["audio_status"] == "not_requested"
assert "Image API error" in data["errors"]["image"]
assert "Image API error" in data["last_error"]
def test_create_storybook_generation_success(self, auth_client: TestClient): def test_create_storybook_generation_success(self, auth_client: TestClient):
with patch( with patch(
"app.services.story_service.generate_storybook", "app.services.story_service.generate_storybook",

View File

@@ -53,7 +53,7 @@
- 已新增数据库迁移: - 已新增数据库迁移:
- `0009_add_story_generation_statuses.py` - `0009_add_story_generation_statuses.py`
- `0010_add_story_audio_cache_path.py` - `0010_add_story_audio_cache_path.py`
- 已完成一轮后端回归验证:`backend/``pytest -q` 结果为 `63 passed` - 已完成一轮后端回归验证:`backend/``pytest -q` 结果为 `64 passed`
- 已完成全量后端 lint 清理:`ruff check app tests` 可通过 - 已完成全量后端 lint 清理:`ruff check app tests` 可通过
- 已修复 admin-frontend 构建阻塞,主前端与管理端前端均可生产构建 - 已修复 admin-frontend 构建阻塞,主前端与管理端前端均可生产构建
- 已落地首版统一资产重试入口:`POST /api/stories/{story_id}/assets/retry` - 已落地首版统一资产重试入口:`POST /api/stories/{story_id}/assets/retry`
@@ -62,12 +62,17 @@
- `GET /api/generations/{story_id}` - `GET /api/generations/{story_id}`
- `POST /api/generations/{story_id}/retry-assets` - `POST /api/generations/{story_id}/retry-assets`
- 用户前端与 admin 前端创建弹窗已切换到统一生成入口 - 用户前端与 admin 前端创建弹窗已切换到统一生成入口
- service 内部已抽取统一工作流公共步骤:
- 档案/宇宙校验与 memory context 构建
- 文本故事主记录保存
- 绘本主记录保存
- 普通故事封面生成/重试
### What Is In Progress ### What Is In Progress
- 统一状态模型与统一外部 API 已落地,内部 service workflow 仍未真正收束成单一路径 - 统一状态模型与统一外部 API 已落地,内部 service workflow 已开始收束公共步骤
- 旧生成 API 仍保留为兼容层,后续需要继续降低重复实现 - 旧生成 API 仍保留为兼容层,后续需要继续降低重复实现
- 资产补全已经具备统一重试入口首版,但需要继续抽象统一补全过程和 generation job 边界 - 资产补全已经具备统一重试入口首版,但绘本插图与音频还需要继续抽象统一补全过程和 generation job 边界
### What Is Still Pending ### What Is Still Pending

View File

@@ -39,12 +39,16 @@ DreamWeaver 当前同时支持普通故事生成、完整故事生成和绘本
- `GET /api/generations/{story_id}` - `GET /api/generations/{story_id}`
- `POST /api/generations/{story_id}/retry-assets` - `POST /api/generations/{story_id}/retry-assets`
- 用户前端与 admin 前端创建弹窗已切换到 `POST /api/generations` - 用户前端与 admin 前端创建弹窗已切换到 `POST /api/generations`
- service 内部已开始收束统一工作流步骤:
- 上下文准备:档案/宇宙校验 + memory context 构建
- 主记录保存:文本故事与绘本统一持久化入口
- 资产补全:普通故事封面生成/重试统一封装
- 故事详情页封面补全已切换到统一资产重试入口 - 故事详情页封面补全已切换到统一资产重试入口
- 管理端前端构建阻塞已修复,主前端与 admin 前端均可完成生产构建 - 管理端前端构建阻塞已修复,主前端与 admin 前端均可完成生产构建
### Still Missing ### Still Missing
- 普通故事、完整生成、绘本生成已有统一外部入口,内部仍通过兼容 service 路径编排 - 普通故事、完整生成、绘本生成已有统一外部入口,内部 workflow 已开始抽取公共步骤,但旧 service 函数仍作为兼容层保留
- 统一资产重试入口仍是首版:已覆盖普通故事封面、绘本缺失插图、故事音频,但尚未抽象成完整 generation job 模型 - 统一资产重试入口仍是首版:已覆盖普通故事封面、绘本缺失插图、故事音频,但尚未抽象成完整 generation job 模型
- `partial_ready``retryable_assets` 等更细粒度状态仍停留在目标态 - `partial_ready``retryable_assets` 等更细粒度状态仍停留在目标态
@@ -60,11 +64,11 @@ DreamWeaver 当前同时支持普通故事生成、完整故事生成和绘本
DreamWeaver 当前存在以下工作流层面问题: DreamWeaver 当前存在以下工作流层面问题:
1. **生成入口正在统一中** 1. **生成入口已建立,内部路径正在收束**
当前前端已切到 `/api/generations`,旧的 `/api/stories/generate``/api/stories/generate/full``/api/storybook/generate` 仍作为兼容入口保留。下一步重点是继续收束 service 内部路径 当前前端已切到 `/api/generations`,旧的 `/api/stories/generate``/api/stories/generate/full``/api/storybook/generate` 仍作为兼容入口保留。service 内部已抽取上下文准备、主记录保存和封面资产补全 helper下一步重点是继续统一音频与绘本插图补全过程
2. **保存与资产补全过程统一** 2. **保存与资产补全过程正在统一**
有的流程先存文本再补图,有的流程只返回绘本对象并依赖前端 store有的流程不考虑音频状态 文本故事和绘本已拥有更清晰的主记录保存 helper普通故事封面生成与重试已共用资产补全 helper。剩余差异集中在绘本插图补全和音频补全过程还未统一成 generation job
3. **状态表达不统一** 3. **状态表达不统一**
系统缺少标准的“生成中、部分完成、已完成、失败、可重试”等状态定义,导致前端难以做出成熟体验。 系统缺少标准的“生成中、部分完成、已完成、失败、可重试”等状态定义,导致前端难以做出成熟体验。