Files
musicfetch/tests/test_jobs.py
2026-06-08 19:55:57 -07:00

44 lines
1.3 KiB
Python

import time
from server import jobs
def _wait(job_id, status, timeout=2.0):
end = time.time() + timeout
while time.time() < end:
j = jobs.get_job(job_id)
if j and j.status == status:
return j
time.sleep(0.01)
raise AssertionError(f"job {job_id} never reached {status}")
def test_create_job_is_queued():
job = jobs.create_job(hit={"artist": "A"}, message="queued msg")
assert job.status == "queued"
assert job.hit == {"artist": "A"}
assert jobs.get_job(job.id) is job
def test_run_job_success_sets_done():
job = jobs.create_job(hit={}, message="m")
jobs.run_job(job.id, lambda: {"path": "/x", "lidarr_album_id": None},
done_message="done!")
j = _wait(job.id, "done")
assert j.result == {"path": "/x", "lidarr_album_id": None}
assert j.message == "done!"
assert j.error is None
def test_run_job_failure_sets_failed():
job = jobs.create_job(hit={}, message="m")
def boom():
raise RuntimeError("kaboom")
jobs.run_job(job.id, boom, done_message="done!", fail_message="it broke")
j = _wait(job.id, "failed")
assert j.error and "kaboom" in j.error
assert j.message == "it broke"
def test_get_unknown_job_returns_none():
assert jobs.get_job("nope") is None