Files
dreamweaver/backend/app/tasks/utils.py

18 lines
398 B
Python

"""Shared helpers for Celery tasks."""
from collections.abc import Awaitable
from typing import TypeVar
from app.db.database import dispose_engine
T = TypeVar("T")
async def run_with_disposed_engine(awaitable: Awaitable[T]) -> T:
"""Run async task work and drop DB pools before the event loop closes."""
try:
return await awaitable
finally:
await dispose_engine()