diff --git a/backend/app/services/story_service.py b/backend/app/services/story_service.py index fa4f1aa..73bdf94 100644 --- a/backend/app/services/story_service.py +++ b/backend/app/services/story_service.py @@ -1241,13 +1241,16 @@ async def execute_generation_job_service( """Execute one previously accepted generation job inside the worker.""" try: - request = GenerationRequest.model_validate(job.request_payload or {}) - response = await _generate_generation_service_with_job( - request, - job.user_id, - db, - job=job, - ) + if job.output_mode == "asset_generation": + response = await _generate_asset_generation_service_with_job(job, db) + else: + request = GenerationRequest.model_validate(job.request_payload or {}) + response = await _generate_generation_service_with_job( + request, + job.user_id, + db, + job=job, + ) except GenerationJobCanceledError: return _build_canceled_generation_response(job) except HTTPException as exc: @@ -1294,6 +1297,39 @@ def _build_canceled_generation_response(job: GenerationJob) -> GenerationRespons ) +def _build_generation_response_from_story( + story: Story, + *, + job_id: str, +) -> GenerationResponse: + """Build a unified generation response from one persisted story record.""" + + pages = None + if story.mode == "storybook": + pages = _storybook_pages_to_response(story.pages or []) + + return GenerationResponse( + id=story.id, + generation_job_id=job_id, + title=story.title, + mode=story.mode, + story_text=story.story_text, + pages=pages, + cover_prompt=story.cover_prompt, + image_url=story.image_url, + cover_url=story.image_url, + audio_ready=story.audio_status == StoryAssetStatus.READY.value, + generation_status=story.generation_status, + text_status=story.text_status, + image_status=story.image_status, + audio_status=story.audio_status, + last_error=story.last_error, + child_profile_id=story.child_profile_id, + universe_id=story.universe_id, + retryable_assets=story.retryable_assets, + ) + + async def run_generation_job_service( job_id: str, db: AsyncSession, @@ -1309,6 +1345,54 @@ async def run_generation_job_service( return job +async def _generate_asset_generation_service_with_job( + job: GenerationJob, + db: AsyncSession, +) -> GenerationResponse: + """Run queued asset generation in the background worker.""" + + payload = job.request_payload or {} + story_id = payload.get("story_id") or job.story_id + requested_assets = list(dict.fromkeys(payload.get("assets") or [])) + if story_id is None: + raise HTTPException(status_code=400, detail="资源任务缺少 story_id。") + if not requested_assets: + raise HTTPException(status_code=400, detail="资源任务缺少 assets。") + + story = await get_story_detail(int(story_id), job.user_id, db) + + if "image" in requested_assets: + if story.mode == "storybook": + await _complete_storybook_image_assets(story, db, job=job) + else: + await _complete_cover_image_asset( + story, + db, + raise_on_failure=True, + log_event="cover_generation_failed", + job=job, + ) + + if "audio" in requested_assets: + await _complete_audio_asset( + story, + db, + raise_on_failure=True, + job=job, + ) + + story = await get_story_detail(story.id, job.user_id, db) + await finish_generation_job( + db, + job=job, + story=story, + current_step="asset_generation_completed", + message="Asset generation completed in the background worker.", + metadata={"assets": requested_assets}, + ) + return _build_generation_response_from_story(story, job_id=job.id) + + async def retry_generation_job_service( job_id: str, user_id: str, @@ -1534,6 +1618,46 @@ async def create_story_from_result( ) +async def queue_story_asset_generation( + story_id: int, + user_id: str, + assets: list[str], + db: AsyncSession, +) -> dict: + """Queue one asset generation job for an already persisted story.""" + + await ensure_no_active_story_generation_job(db, story_id=story_id, user_id=user_id) + requested_assets = list(dict.fromkeys(assets)) + if not requested_assets: + raise HTTPException(status_code=400, detail="至少需要一个待生成资源") + + story = await get_story_detail(story_id, user_id, db) + if "image" in requested_assets: + has_image_prompt = bool(story.cover_prompt) + if story.mode == "storybook": + has_image_prompt = has_image_prompt or any( + isinstance(page, dict) and page.get("image_prompt") + for page in story.pages or [] + ) + if not has_image_prompt: + raise HTTPException(status_code=400, detail="当前故事没有可生成的图片提示词") + + if "audio" in requested_assets and not story.story_text: + raise HTTPException(status_code=400, detail="当前故事没有可生成音频的正文") + + job = await create_generation_job( + db, + user_id=user_id, + output_mode="asset_generation", + input_type=",".join(requested_assets), + request_payload={"story_id": story_id, "assets": requested_assets}, + story_id=story_id, + ) + await _dispatch_generation_job(db, job=job) + await db.refresh(job) + return generation_job_to_summary(job) + + async def _retry_cover_image_asset(story: Story, db: AsyncSession, *, job=None) -> None: """Retry cover generation for a text story.""" diff --git a/backend/app/services/voice_session_service.py b/backend/app/services/voice_session_service.py index a402c2c..2b879d4 100644 --- a/backend/app/services/voice_session_service.py +++ b/backend/app/services/voice_session_service.py @@ -31,7 +31,7 @@ from app.services.memory_service import build_enhanced_memory_context from app.services.provider_router import generate_story_content, text_to_speech from app.services.story_service import ( create_story_from_result, - generate_story_cover, + queue_story_asset_generation, validate_profile_and_universe, ) from app.services.voice_session_safety import ( @@ -1710,15 +1710,21 @@ async def finalize_voice_session_service( generation_job_id: str | None = None if request.generate_cover and story.cover_prompt: try: - await generate_story_cover(story.id, user_id, db) + cover_job = await queue_story_asset_generation( + story.id, + user_id, + ["image"], + db, + ) + generation_job_id = str(cover_job["id"]) await _record_session_event( db, session_id=session.id, turn_id=None, - event_type="session_cover_generation_succeeded", + event_type="session_cover_generation_queued", status="succeeded", - message="Finalized story cover was generated after session save.", - metadata={"story_id": story.id}, + message="Finalized story cover generation was queued after session save.", + metadata={"story_id": story.id, "generation_job_id": generation_job_id}, ) except HTTPException as exc: await _record_session_event( @@ -1727,11 +1733,11 @@ async def finalize_voice_session_service( turn_id=None, event_type="session_cover_generation_failed", status="failed", - message="Finalized story cover generation failed after session save.", + message="Finalized story cover generation failed before the worker could start.", metadata={"story_id": story.id, "error": str(exc.detail)}, ) logger.warning( - "voice_session_finalize_cover_failed", + "voice_session_finalize_cover_queue_failed", session_id=session.id, story_id=story.id, error=str(exc.detail), diff --git a/backend/tests/test_generation_jobs.py b/backend/tests/test_generation_jobs.py index c09a5a0..3c68e36 100644 --- a/backend/tests/test_generation_jobs.py +++ b/backend/tests/test_generation_jobs.py @@ -8,7 +8,7 @@ from httpx import ASGITransport, AsyncClient from sqlalchemy import select from app.db.database import get_db -from app.db.models import GenerationJob, GenerationJobEvent +from app.db.models import GenerationJob, GenerationJobEvent, Story from app.main import app from app.services.adapters import AdapterConfig from app.services.adapters.storybook.primary import Storybook, StorybookPage @@ -20,7 +20,7 @@ from app.services.generation_jobs import ( mark_stale_generation_jobs, record_generation_event, ) -from app.services.story_service import run_generation_job_service +from app.services.story_service import queue_story_asset_generation, run_generation_job_service pytestmark = pytest.mark.asyncio @@ -225,6 +225,86 @@ async def test_asset_retry_records_job_events_and_updates_retryable_assets( app.dependency_overrides.clear() +async def test_queue_story_asset_generation_dispatches_background_job( + db_session, + test_story, +): + task_delay_path = "app.tasks.generation_workflow.run_generation_workflow_task.delay" + + with patch(task_delay_path) as mock_delay: + summary = await queue_story_asset_generation( + test_story.id, + test_story.user_id, + ["image"], + db_session, + ) + + assert summary["output_mode"] == "asset_generation" + assert summary["input_type"] == "image" + assert summary["status"] == "running" + assert summary["current_step"] == "request_accepted" + mock_delay.assert_called_once_with(summary["id"]) + + job = ( + await db_session.execute( + select(GenerationJob).where(GenerationJob.id == summary["id"]) + ) + ).scalar_one() + assert job.story_id == test_story.id + assert job.output_mode == "asset_generation" + + +async def test_asset_generation_job_worker_completes_cover_image( + db_session, + test_story, +): + job = await create_generation_job( + db_session, + user_id=test_story.user_id, + output_mode="asset_generation", + input_type="image", + request_payload={"story_id": test_story.id, "assets": ["image"]}, + story_id=test_story.id, + ) + + with patch( + "app.services.story_service.generate_image", + new_callable=AsyncMock, + ) as mock_generate_image: + mock_generate_image.return_value = "https://example.com/async-cover.png" + + await run_generation_job_service(job.id, db_session) + + refreshed_job = ( + await db_session.execute(select(GenerationJob).where(GenerationJob.id == job.id)) + ).scalar_one() + assert refreshed_job.status == "completed" + assert refreshed_job.current_step == "asset_generation_completed" + assert refreshed_job.result_snapshot["image_status"] == "ready" + + story = ( + await db_session.execute( + select(Story).where(Story.id == test_story.id) + ) + ).scalar_one() + assert story.image_url == "https://example.com/async-cover.png" + + events = ( + await db_session.execute( + select(GenerationJobEvent) + .where(GenerationJobEvent.job_id == job.id) + .order_by(GenerationJobEvent.id) + ) + ).scalars().all() + assert [event.event_type for event in events] == [ + "request_accepted", + "worker_started", + "cover_image_started", + "cover_image_succeeded", + "asset_generation_completed", + ] + + async def test_storybook_generation_is_queued_then_worker_records_page_image_events( db_session, auth_token, diff --git a/backend/tests/test_voice_sessions.py b/backend/tests/test_voice_sessions.py index edd01ff..19f0e75 100644 --- a/backend/tests/test_voice_sessions.py +++ b/backend/tests/test_voice_sessions.py @@ -99,9 +99,9 @@ async def test_voice_session_correct_turn_and_finalize_to_story( new_callable=AsyncMock, ) as mock_tts, patch( - "app.services.voice_session_service.generate_story_cover", + "app.services.voice_session_service.queue_story_asset_generation", new_callable=AsyncMock, - ) as mock_generate_cover, + ) as mock_queue_asset_generation, ): mock_generate.side_effect = [ StoryOutput( @@ -118,7 +118,23 @@ async def test_voice_session_correct_turn_and_finalize_to_story( ), ] mock_tts.side_effect = [b"turn-1-audio", b"turn-2-audio"] - mock_generate_cover.return_value = "https://example.com/voice-cover.png" + mock_queue_asset_generation.return_value = { + "id": "cover-job-123", + "story_id": 1, + "output_mode": "asset_generation", + "input_type": "image", + "status": "running", + "current_step": "request_accepted", + "progress_percent": 5, + "progress_label": "已接收请求", + "is_terminal": False, + "can_cancel": True, + "can_retry": False, + "result_snapshot": {}, + "error_message": None, + "created_at": "2026-04-20T00:00:00Z", + "updated_at": "2026-04-20T00:00:00Z", + } transport = ASGITransport(app=app) try: @@ -156,6 +172,7 @@ async def test_voice_session_correct_turn_and_finalize_to_story( finalize_data = response.json() story_id = finalize_data["story_id"] assert finalize_data["status"] == "completed" + assert finalize_data["generation_job_id"] == "cover-job-123" response = await client.get(f"/api/stories/{story_id}") assert response.status_code == 200 @@ -172,7 +189,7 @@ async def test_voice_session_correct_turn_and_finalize_to_story( assert session_data["final_story_id"] == story_id assert session_data["can_continue"] is False assert session_data["story_state"]["final_summary"] - mock_generate_cover.assert_awaited_once() + mock_queue_asset_generation.assert_awaited_once() finally: app.dependency_overrides.clear() diff --git a/docs/technical/voice-co-creation-phase-a-migration-api-draft.md b/docs/technical/voice-co-creation-phase-a-migration-api-draft.md index b3277a9..d238413 100644 --- a/docs/technical/voice-co-creation-phase-a-migration-api-draft.md +++ b/docs/technical/voice-co-creation-phase-a-migration-api-draft.md @@ -594,6 +594,11 @@ async def resolve_voice_turn_confirmation(...) async def finalize_voice_session(...) ``` +说明: + +- 当 `generate_cover=true` 且已产出 `cover_prompt` 时,finalize 后会额外排队一个 `asset_generation` job +- 前端可通过返回的 `generation_job_id` 继续复用现有 generation trace / job detail 组件 + ### 获取语音共创 analytics ```python diff --git a/docs/technical/voice-co-creation-phase-a-tech-spec.md b/docs/technical/voice-co-creation-phase-a-tech-spec.md index 649c333..24e0834 100644 --- a/docs/technical/voice-co-creation-phase-a-tech-spec.md +++ b/docs/technical/voice-co-creation-phase-a-tech-spec.md @@ -27,8 +27,9 @@ - 前端明确展示“本轮系统理解为”与“建议家长确认后再继续”提示 - 低置信度确认链路已有后端测试覆盖,可作为下一阶段继续接 ASR 与更细确认交互的基础 - 已新增用户转写安全检查、assistant 输出柔性改写与 `safety_flags` 事件记录 -- finalize 会生成更稳定的标题/摘要,并在条件允许时自动衔接封面补全 +- finalize 会生成更稳定的标题/摘要,并在条件允许时自动排队封面补全 job - 已新增 `voice session analytics` 聚合指标,可跟踪 turn 成功率、ASR/TTS 失败、低置信度触发和 finalize 转化率 +- `voice session finalize` 现在会返回可追踪的 `generation_job_id`,让正式 Story 资产补全重新接回现有 generation trace 主干 Phase A 的核心目标不是做“完全实时的语音陪伴”,而是验证以下最小闭环: