From 35df01f08ed4facb16efc07e115916b124d250b8 Mon Sep 17 00:00:00 2001 From: zebra Date: Mon, 8 Jun 2026 19:55:57 -0700 Subject: [PATCH] feat(server): in-memory async job store with thread-pool worker Co-Authored-By: Claude Sonnet 4.6 --- server/jobs.py | 61 ++++++++++++++++++++++++++++++++++++++++++++++ tests/test_jobs.py | 43 ++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 server/jobs.py create mode 100644 tests/test_jobs.py diff --git a/server/jobs.py b/server/jobs.py new file mode 100644 index 0000000..9122960 --- /dev/null +++ b/server/jobs.py @@ -0,0 +1,61 @@ +"""In-memory async job store. Personal-scale: jobs are lost on restart. +Generic — knows nothing about musicfetch; callers pass a no-arg `fn`.""" +import time +import uuid +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass, field +from typing import Any, Callable, Optional + +_EXECUTOR = ThreadPoolExecutor(max_workers=2) +JOBS: "dict[str, Job]" = {} +_MAX_JOBS = 200 # cap to bound memory + + +@dataclass +class Job: + id: str + status: str # queued | running | done | failed + hit: Any + message: str + result: Optional[dict] = None + error: Optional[str] = None + created_at: float = field(default_factory=time.time) + updated_at: float = field(default_factory=time.time) + + +def _touch(job: "Job", **changes): + for k, v in changes.items(): + setattr(job, k, v) + job.updated_at = time.time() + + +def _evict_if_needed(): + if len(JOBS) <= _MAX_JOBS: + return + for jid in sorted(JOBS, key=lambda j: JOBS[j].created_at)[: len(JOBS) - _MAX_JOBS]: + JOBS.pop(jid, None) + + +def create_job(hit: Any, message: str) -> "Job": + job = Job(id=uuid.uuid4().hex[:8], status="queued", hit=hit, message=message) + JOBS[job.id] = job + _evict_if_needed() + return job + + +def get_job(job_id: str) -> Optional["Job"]: + return JOBS.get(job_id) + + +def run_job(job_id: str, fn: Callable[[], dict], done_message: str, + fail_message: str = "Something went wrong while fetching.") -> None: + def _task(): + job = JOBS[job_id] + _touch(job, status="running") + try: + result = fn() + _touch(job, status="done", result=result, message=done_message) + except Exception as e: # noqa: BLE001 — record any failure on the job + _touch(job, status="failed", error=f"{type(e).__name__}: {e}", + message=fail_message) + _EXECUTOR.submit(_task) diff --git a/tests/test_jobs.py b/tests/test_jobs.py new file mode 100644 index 0000000..e44d900 --- /dev/null +++ b/tests/test_jobs.py @@ -0,0 +1,43 @@ +import time +from server import jobs + + +def _wait(job_id, status, timeout=2.0): + end = time.time() + timeout + while time.time() < end: + j = jobs.get_job(job_id) + if j and j.status == status: + return j + time.sleep(0.01) + raise AssertionError(f"job {job_id} never reached {status}") + + +def test_create_job_is_queued(): + job = jobs.create_job(hit={"artist": "A"}, message="queued msg") + assert job.status == "queued" + assert job.hit == {"artist": "A"} + assert jobs.get_job(job.id) is job + + +def test_run_job_success_sets_done(): + job = jobs.create_job(hit={}, message="m") + jobs.run_job(job.id, lambda: {"path": "/x", "lidarr_album_id": None}, + done_message="done!") + j = _wait(job.id, "done") + assert j.result == {"path": "/x", "lidarr_album_id": None} + assert j.message == "done!" + assert j.error is None + + +def test_run_job_failure_sets_failed(): + job = jobs.create_job(hit={}, message="m") + def boom(): + raise RuntimeError("kaboom") + jobs.run_job(job.id, boom, done_message="done!", fail_message="it broke") + j = _wait(job.id, "failed") + assert j.error and "kaboom" in j.error + assert j.message == "it broke" + + +def test_get_unknown_job_returns_none(): + assert jobs.get_job("nope") is None