feat: track generation jobs
This commit is contained in:
133
backend/app/services/generation_jobs.py
Normal file
133
backend/app/services/generation_jobs.py
Normal file
@@ -0,0 +1,133 @@
|
||||
"""Lightweight generation job/event tracking."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.db.models import GenerationJob, GenerationJobEvent, Story
|
||||
|
||||
|
||||
def _story_snapshot(story: Story | None) -> dict[str, Any]:
|
||||
if story is None:
|
||||
return {}
|
||||
|
||||
return {
|
||||
"story_id": story.id,
|
||||
"mode": story.mode,
|
||||
"generation_status": story.generation_status,
|
||||
"image_status": story.image_status,
|
||||
"audio_status": story.audio_status,
|
||||
"retryable_assets": story.retryable_assets,
|
||||
"last_error": story.last_error,
|
||||
}
|
||||
|
||||
|
||||
def _job_status_from_story(story: Story) -> str:
|
||||
if story.generation_status == "failed":
|
||||
return "failed"
|
||||
if story.generation_status == "degraded_completed":
|
||||
return "degraded_completed"
|
||||
return "completed"
|
||||
|
||||
|
||||
async def create_generation_job(
|
||||
db: AsyncSession,
|
||||
*,
|
||||
user_id: str,
|
||||
output_mode: str,
|
||||
input_type: str,
|
||||
request_payload: dict[str, Any],
|
||||
story_id: int | None = None,
|
||||
) -> GenerationJob:
|
||||
"""Create a generation job and record its first event."""
|
||||
|
||||
job = GenerationJob(
|
||||
user_id=user_id,
|
||||
story_id=story_id,
|
||||
output_mode=output_mode,
|
||||
input_type=input_type,
|
||||
status="running",
|
||||
current_step="request_accepted",
|
||||
request_payload=request_payload,
|
||||
result_snapshot={},
|
||||
)
|
||||
db.add(job)
|
||||
await db.flush()
|
||||
await record_generation_event(
|
||||
db,
|
||||
job=job,
|
||||
story_id=story_id,
|
||||
event_type="request_accepted",
|
||||
status="succeeded",
|
||||
message="Generation request accepted.",
|
||||
metadata={"output_mode": output_mode, "input_type": input_type},
|
||||
commit=False,
|
||||
)
|
||||
await db.commit()
|
||||
await db.refresh(job)
|
||||
return job
|
||||
|
||||
|
||||
async def record_generation_event(
|
||||
db: AsyncSession,
|
||||
*,
|
||||
job: GenerationJob,
|
||||
event_type: str,
|
||||
status: str,
|
||||
story_id: int | None = None,
|
||||
message: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
commit: bool = True,
|
||||
) -> GenerationJobEvent:
|
||||
"""Append one event to an existing generation job."""
|
||||
|
||||
event = GenerationJobEvent(
|
||||
job_id=job.id,
|
||||
story_id=story_id if story_id is not None else job.story_id,
|
||||
event_type=event_type,
|
||||
status=status,
|
||||
message=message,
|
||||
event_metadata=metadata or {},
|
||||
)
|
||||
db.add(event)
|
||||
if commit:
|
||||
await db.commit()
|
||||
return event
|
||||
|
||||
|
||||
async def finish_generation_job(
|
||||
db: AsyncSession,
|
||||
*,
|
||||
job: GenerationJob,
|
||||
story: Story | None,
|
||||
status: str | None = None,
|
||||
current_step: str,
|
||||
error_message: str | None = None,
|
||||
message: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
) -> GenerationJob:
|
||||
"""Mark a generation job as completed/degraded/failed and append a final event."""
|
||||
|
||||
job.story_id = story.id if story is not None else job.story_id
|
||||
job.status = status or (_job_status_from_story(story) if story is not None else "failed")
|
||||
job.current_step = current_step
|
||||
job.error_message = error_message
|
||||
job.result_snapshot = _story_snapshot(story)
|
||||
await record_generation_event(
|
||||
db,
|
||||
job=job,
|
||||
story_id=job.story_id,
|
||||
event_type=current_step,
|
||||
status=job.status,
|
||||
message=message,
|
||||
metadata={
|
||||
**(metadata or {}),
|
||||
"result_snapshot": job.result_snapshot,
|
||||
},
|
||||
commit=False,
|
||||
)
|
||||
await db.commit()
|
||||
await db.refresh(job)
|
||||
return job
|
||||
@@ -28,6 +28,11 @@ from app.services.audio_storage import (
|
||||
read_audio_cache,
|
||||
write_story_audio_cache,
|
||||
)
|
||||
from app.services.generation_jobs import (
|
||||
create_generation_job,
|
||||
finish_generation_job,
|
||||
record_generation_event,
|
||||
)
|
||||
from app.services.memory_service import build_enhanced_memory_context
|
||||
from app.services.provider_router import (
|
||||
generate_image,
|
||||
@@ -141,6 +146,26 @@ def _trigger_story_postprocessing(story: Story) -> None:
|
||||
extract_story_achievements.delay(story.id, story.universe_id)
|
||||
|
||||
|
||||
async def _record_postprocessing_event_if_needed(
|
||||
db: AsyncSession,
|
||||
*,
|
||||
job,
|
||||
story: Story,
|
||||
) -> None:
|
||||
if not story.universe_id:
|
||||
return
|
||||
|
||||
await record_generation_event(
|
||||
db,
|
||||
job=job,
|
||||
story_id=story.id,
|
||||
event_type="postprocessing_queued",
|
||||
status="queued",
|
||||
message="Achievement extraction queued after the main story record was saved.",
|
||||
metadata={"universe_id": story.universe_id},
|
||||
)
|
||||
|
||||
|
||||
async def _persist_text_story_result(
|
||||
*,
|
||||
result: StoryOutput,
|
||||
@@ -629,6 +654,7 @@ async def generate_full_story_service(
|
||||
image_status=story.image_status,
|
||||
audio_status=story.audio_status,
|
||||
last_error=story.last_error,
|
||||
retryable_assets=story.retryable_assets,
|
||||
)
|
||||
|
||||
|
||||
@@ -703,6 +729,7 @@ async def generate_storybook_service(
|
||||
image_status=story.image_status,
|
||||
audio_status=story.audio_status,
|
||||
last_error=story.last_error,
|
||||
retryable_assets=story.retryable_assets,
|
||||
)
|
||||
|
||||
|
||||
@@ -713,6 +740,51 @@ async def generate_generation_service(
|
||||
) -> GenerationResponse:
|
||||
"""Unified generation workflow entry point for stories and storybooks."""
|
||||
|
||||
job = await create_generation_job(
|
||||
db,
|
||||
user_id=user_id,
|
||||
output_mode=request.output_mode,
|
||||
input_type=request.type,
|
||||
request_payload=request.model_dump(mode="json"),
|
||||
)
|
||||
|
||||
try:
|
||||
response = await _generate_generation_service_with_job(request, user_id, db, job=job)
|
||||
except HTTPException as exc:
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=None,
|
||||
status="failed",
|
||||
current_step="generation_failed",
|
||||
error_message=str(exc.detail),
|
||||
message="Generation failed before a durable story result was available.",
|
||||
)
|
||||
raise
|
||||
except Exception as exc:
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=None,
|
||||
status="failed",
|
||||
current_step="generation_failed",
|
||||
error_message=str(exc),
|
||||
message="Generation failed before a durable story result was available.",
|
||||
)
|
||||
raise
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def _generate_generation_service_with_job(
|
||||
request: GenerationRequest,
|
||||
user_id: str,
|
||||
db: AsyncSession,
|
||||
*,
|
||||
job,
|
||||
) -> GenerationResponse:
|
||||
"""Run the unified generation workflow after the tracking job has been created."""
|
||||
|
||||
if request.output_mode == "storybook":
|
||||
storybook = await generate_storybook_service(
|
||||
StorybookRequest(
|
||||
@@ -730,6 +802,14 @@ async def generate_generation_service(
|
||||
raise HTTPException(status_code=500, detail="Storybook generation did not persist.")
|
||||
|
||||
saved_story = await get_story_detail(storybook.id, user_id, db)
|
||||
await _record_postprocessing_event_if_needed(db, job=job, story=saved_story)
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=saved_story,
|
||||
current_step="generation_completed",
|
||||
message="Storybook generation completed with persisted text and current asset states.",
|
||||
)
|
||||
return GenerationResponse(
|
||||
id=storybook.id,
|
||||
title=storybook.title,
|
||||
@@ -746,6 +826,7 @@ async def generate_generation_service(
|
||||
last_error=storybook.last_error,
|
||||
child_profile_id=saved_story.child_profile_id,
|
||||
universe_id=saved_story.universe_id,
|
||||
retryable_assets=saved_story.retryable_assets,
|
||||
)
|
||||
|
||||
generate_request = GenerateRequest(
|
||||
@@ -758,6 +839,15 @@ async def generate_generation_service(
|
||||
|
||||
if request.generate_images:
|
||||
story = await generate_full_story_service(generate_request, user_id, db)
|
||||
saved_story = await get_story_detail(story.id, user_id, db)
|
||||
await _record_postprocessing_event_if_needed(db, job=job, story=saved_story)
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=saved_story,
|
||||
current_step="generation_completed",
|
||||
message="Story generation completed with persisted text and current asset states.",
|
||||
)
|
||||
return GenerationResponse(
|
||||
id=story.id,
|
||||
title=story.title,
|
||||
@@ -774,9 +864,18 @@ async def generate_generation_service(
|
||||
last_error=story.last_error,
|
||||
child_profile_id=story.child_profile_id,
|
||||
universe_id=story.universe_id,
|
||||
retryable_assets=saved_story.retryable_assets,
|
||||
)
|
||||
|
||||
story = await generate_and_save_story(generate_request, user_id, db)
|
||||
await _record_postprocessing_event_if_needed(db, job=job, story=story)
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=story,
|
||||
current_step="generation_completed",
|
||||
message="Story generation completed with a persisted readable narrative.",
|
||||
)
|
||||
return GenerationResponse(
|
||||
id=story.id,
|
||||
title=story.title,
|
||||
@@ -791,6 +890,7 @@ async def generate_generation_service(
|
||||
last_error=story.last_error,
|
||||
child_profile_id=story.child_profile_id,
|
||||
universe_id=story.universe_id,
|
||||
retryable_assets=story.retryable_assets,
|
||||
)
|
||||
|
||||
|
||||
@@ -884,20 +984,72 @@ async def retry_story_assets(
|
||||
db: AsyncSession,
|
||||
) -> Story:
|
||||
"""Retry selected assets through one workflow-level endpoint."""
|
||||
|
||||
story = await get_story_detail(story_id, user_id, db)
|
||||
requested_assets = list(dict.fromkeys(assets))
|
||||
job = await create_generation_job(
|
||||
db,
|
||||
user_id=user_id,
|
||||
output_mode="asset_retry",
|
||||
input_type=",".join(requested_assets),
|
||||
request_payload={"story_id": story_id, "assets": requested_assets},
|
||||
story_id=story_id,
|
||||
)
|
||||
story: Story | None = None
|
||||
|
||||
if "image" in requested_assets:
|
||||
if story.mode == "storybook":
|
||||
await _retry_storybook_image_assets(story, db)
|
||||
else:
|
||||
await _retry_cover_image_asset(story, db)
|
||||
try:
|
||||
story = await get_story_detail(story_id, user_id, db)
|
||||
await record_generation_event(
|
||||
db,
|
||||
job=job,
|
||||
story_id=story.id,
|
||||
event_type="asset_retry_started",
|
||||
status="running",
|
||||
message="Asset retry started.",
|
||||
metadata={"assets": requested_assets},
|
||||
)
|
||||
|
||||
if "audio" in requested_assets:
|
||||
await _retry_audio_asset(story, db)
|
||||
if "image" in requested_assets:
|
||||
if story.mode == "storybook":
|
||||
await _retry_storybook_image_assets(story, db)
|
||||
else:
|
||||
await _retry_cover_image_asset(story, db)
|
||||
|
||||
return await get_story_detail(story_id, user_id, db)
|
||||
if "audio" in requested_assets:
|
||||
await _retry_audio_asset(story, db)
|
||||
|
||||
story = await get_story_detail(story_id, user_id, db)
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=story,
|
||||
current_step="asset_retry_completed",
|
||||
message="Asset retry completed with persisted status updates.",
|
||||
metadata={"assets": requested_assets},
|
||||
)
|
||||
return story
|
||||
except HTTPException as exc:
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=story,
|
||||
status="failed",
|
||||
current_step="asset_retry_failed",
|
||||
error_message=str(exc.detail),
|
||||
message="Asset retry failed.",
|
||||
metadata={"assets": requested_assets},
|
||||
)
|
||||
raise
|
||||
except Exception as exc:
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=story,
|
||||
status="failed",
|
||||
current_step="asset_retry_failed",
|
||||
error_message=str(exc),
|
||||
message="Asset retry failed.",
|
||||
metadata={"assets": requested_assets},
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
async def generate_story_cover(
|
||||
@@ -906,16 +1058,47 @@ async def generate_story_cover(
|
||||
db: AsyncSession,
|
||||
) -> str:
|
||||
"""Generate cover image for an existing story."""
|
||||
story = await get_story_detail(story_id, user_id, db)
|
||||
|
||||
image_result = await _complete_cover_image_asset(
|
||||
story,
|
||||
job = await create_generation_job(
|
||||
db,
|
||||
raise_on_failure=True,
|
||||
log_event="cover_generation_failed",
|
||||
user_id=user_id,
|
||||
output_mode="asset_generation",
|
||||
input_type="image",
|
||||
request_payload={"story_id": story_id, "assets": ["image"]},
|
||||
story_id=story_id,
|
||||
)
|
||||
if image_result.succeeded and isinstance(image_result.value, str):
|
||||
return image_result.value
|
||||
story: Story | None = None
|
||||
|
||||
try:
|
||||
story = await get_story_detail(story_id, user_id, db)
|
||||
image_result = await _complete_cover_image_asset(
|
||||
story,
|
||||
db,
|
||||
raise_on_failure=True,
|
||||
log_event="cover_generation_failed",
|
||||
)
|
||||
story = await get_story_detail(story_id, user_id, db)
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=story,
|
||||
current_step="asset_generation_completed",
|
||||
message="Cover image generation completed.",
|
||||
metadata={"assets": ["image"]},
|
||||
)
|
||||
if image_result.succeeded and isinstance(image_result.value, str):
|
||||
return image_result.value
|
||||
except HTTPException as exc:
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=story,
|
||||
status="failed",
|
||||
current_step="asset_generation_failed",
|
||||
error_message=str(exc.detail),
|
||||
message="Cover image generation failed.",
|
||||
metadata={"assets": ["image"]},
|
||||
)
|
||||
raise
|
||||
|
||||
raise HTTPException(status_code=500, detail="Image generation failed")
|
||||
|
||||
@@ -926,11 +1109,42 @@ async def generate_story_audio(
|
||||
db: AsyncSession,
|
||||
) -> bytes:
|
||||
"""Generate audio for a story."""
|
||||
story = await get_story_detail(story_id, user_id, db)
|
||||
job = await create_generation_job(
|
||||
db,
|
||||
user_id=user_id,
|
||||
output_mode="asset_generation",
|
||||
input_type="audio",
|
||||
request_payload={"story_id": story_id, "assets": ["audio"]},
|
||||
story_id=story_id,
|
||||
)
|
||||
story: Story | None = None
|
||||
|
||||
audio_result = await _complete_audio_asset(story, db, raise_on_failure=True)
|
||||
if audio_result.succeeded and isinstance(audio_result.value, bytes):
|
||||
return audio_result.value
|
||||
try:
|
||||
story = await get_story_detail(story_id, user_id, db)
|
||||
audio_result = await _complete_audio_asset(story, db, raise_on_failure=True)
|
||||
story = await get_story_detail(story_id, user_id, db)
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=story,
|
||||
current_step="asset_generation_completed",
|
||||
message="Story audio generation completed.",
|
||||
metadata={"assets": ["audio"]},
|
||||
)
|
||||
if audio_result.succeeded and isinstance(audio_result.value, bytes):
|
||||
return audio_result.value
|
||||
except HTTPException as exc:
|
||||
await finish_generation_job(
|
||||
db,
|
||||
job=job,
|
||||
story=story,
|
||||
status="failed",
|
||||
current_step="asset_generation_failed",
|
||||
error_message=str(exc.detail),
|
||||
message="Story audio generation failed.",
|
||||
metadata={"assets": ["audio"]},
|
||||
)
|
||||
raise
|
||||
|
||||
raise HTTPException(status_code=500, detail="Audio generation failed")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user