Files
musicfetch/server/jobs.py
2026-06-08 23:54:49 -07:00

67 lines
2.1 KiB
Python

"""In-memory async job store. Personal-scale: jobs are lost on restart.
Generic — knows nothing about musicfetch; callers pass a no-arg `fn`."""
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
_EXECUTOR = ThreadPoolExecutor(max_workers=2)
JOBS: "dict[str, Job]" = {}
_MAX_JOBS = 200 # cap to bound memory
@dataclass
class Job:
id: str
status: str # queued | running | done | failed
hit: Any
message: str
result: Optional[dict] = None
error: Optional[str] = None
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
def _touch(job: "Job", **changes):
for k, v in changes.items():
setattr(job, k, v)
job.updated_at = time.time()
def _evict_if_needed():
# Post-condition: len(JOBS) <= _MAX_JOBS (evicts oldest overflow entries).
if len(JOBS) <= _MAX_JOBS:
return
for jid in sorted(JOBS, key=lambda j: JOBS[j].created_at)[: len(JOBS) - _MAX_JOBS]:
JOBS.pop(jid, None)
def create_job(hit: Any, message: str) -> "Job":
job = Job(id=uuid.uuid4().hex[:8], status="queued", hit=hit, message=message)
JOBS[job.id] = job
_evict_if_needed()
return job
def get_job(job_id: str) -> Optional["Job"]:
return JOBS.get(job_id)
def run_job(job_id: str, fn: Callable[[], dict],
done_message: "str | Callable[[dict], str]",
fail_message: str = "Something went wrong while fetching.") -> None:
def _task():
job = JOBS.get(job_id)
if job is None:
return
_touch(job, status="running")
try:
result = fn()
msg = done_message(result) if callable(done_message) else done_message
_touch(job, status="done", result=result, message=msg)
except Exception as e: # noqa: BLE001 — record any failure on the job
_touch(job, status="failed", error=f"{type(e).__name__}: {e}",
message=fail_message)
_EXECUTOR.submit(_task)