"""In-memory async job store. Personal-scale: jobs are lost on restart. Generic — knows nothing about musicfetch; callers pass a no-arg `fn`.""" import time import uuid from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from typing import Any, Callable, Optional _EXECUTOR = ThreadPoolExecutor(max_workers=2) JOBS: "dict[str, Job]" = {} _MAX_JOBS = 200 # cap to bound memory @dataclass class Job: id: str status: str # queued | running | done | failed hit: Any message: str result: Optional[dict] = None error: Optional[str] = None created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time) def _touch(job: "Job", **changes): for k, v in changes.items(): setattr(job, k, v) job.updated_at = time.time() def _evict_if_needed(): # Post-condition: len(JOBS) <= _MAX_JOBS (evicts oldest overflow entries). if len(JOBS) <= _MAX_JOBS: return for jid in sorted(JOBS, key=lambda j: JOBS[j].created_at)[: len(JOBS) - _MAX_JOBS]: JOBS.pop(jid, None) def create_job(hit: Any, message: str) -> "Job": job = Job(id=uuid.uuid4().hex[:8], status="queued", hit=hit, message=message) JOBS[job.id] = job _evict_if_needed() return job def get_job(job_id: str) -> Optional["Job"]: return JOBS.get(job_id) def run_job(job_id: str, fn: Callable[[], dict], done_message: str, fail_message: str = "Something went wrong while fetching.") -> None: def _task(): job = JOBS.get(job_id) if job is None: return _touch(job, status="running") try: result = fn() _touch(job, status="done", result=result, message=done_message) except Exception as e: # noqa: BLE001 — record any failure on the job _touch(job, status="failed", error=f"{type(e).__name__}: {e}", message=fail_message) _EXECUTOR.submit(_task)