"""Celery tasks for push notifications.""" import asyncio from datetime import datetime, time from zoneinfo import ZoneInfo from sqlalchemy import select from app.core.celery_app import celery_app from app.core.logging import get_logger from app.db.database import _get_session_factory from app.db.models import PushConfig, PushEvent logger = get_logger(__name__) LOCAL_TZ = ZoneInfo("Asia/Shanghai") QUIET_HOURS_START = time(21, 0) QUIET_HOURS_END = time(9, 0) TRIGGER_WINDOW_MINUTES = 30 @celery_app.task def check_push_notifications() -> None: """Check push configs and create push events.""" asyncio.run(_check_push_notifications()) def _is_quiet_hours(current: time) -> bool: if QUIET_HOURS_START < QUIET_HOURS_END: return QUIET_HOURS_START <= current < QUIET_HOURS_END return current >= QUIET_HOURS_START or current < QUIET_HOURS_END def _within_window(current: time, target: time) -> bool: current_minutes = current.hour * 60 + current.minute target_minutes = target.hour * 60 + target.minute return 0 <= current_minutes - target_minutes < TRIGGER_WINDOW_MINUTES async def _already_sent_today( session, child_profile_id: str, now: datetime, ) -> bool: start = now.replace(hour=0, minute=0, second=0, microsecond=0) end = now.replace(hour=23, minute=59, second=59, microsecond=999999) result = await session.execute( select(PushEvent.id).where( PushEvent.child_profile_id == child_profile_id, PushEvent.status == "sent", PushEvent.sent_at >= start, PushEvent.sent_at <= end, ) ) return result.scalar_one_or_none() is not None async def _check_push_notifications() -> None: session_factory = _get_session_factory() now = datetime.now(LOCAL_TZ) current_day = now.weekday() current_time = now.time() async with session_factory() as session: result = await session.execute( select(PushConfig).where(PushConfig.enabled.is_(True)) ) configs = result.scalars().all() for config in configs: if not config.push_time: continue if config.push_days and current_day not in config.push_days: continue if not _within_window(current_time, config.push_time): continue if _is_quiet_hours(current_time): session.add( PushEvent( user_id=config.user_id, child_profile_id=config.child_profile_id, trigger_type="time", status="suppressed", reason="quiet_hours", sent_at=now, ) ) continue if await _already_sent_today(session, config.child_profile_id, now): continue session.add( PushEvent( user_id=config.user_id, child_profile_id=config.child_profile_id, trigger_type="time", status="sent", reason=None, sent_at=now, ) ) logger.info( "push_event_sent", child_profile_id=config.child_profile_id, user_id=config.user_id, ) await session.commit()