56 lines
1.5 KiB
Python
56 lines
1.5 KiB
Python
import time
|
|
from server import jobs
|
|
|
|
|
|
def _wait(job_id, status, timeout=2.0):
|
|
end = time.time() + timeout
|
|
while time.time() < end:
|
|
j = jobs.get_job(job_id)
|
|
if j and j.status == status:
|
|
return j
|
|
time.sleep(0.01)
|
|
raise AssertionError(f"job {job_id} never reached {status}")
|
|
|
|
|
|
def test_create_job_is_queued():
|
|
job = jobs.create_job(hit={"artist": "A"}, message="queued msg")
|
|
assert job.status == "queued"
|
|
assert job.hit == {"artist": "A"}
|
|
assert jobs.get_job(job.id) is job
|
|
|
|
|
|
def test_run_job_success_sets_done():
|
|
job = jobs.create_job(hit={}, message="m")
|
|
jobs.run_job(job.id, lambda: {"path": "/x", "lidarr_album_id": None},
|
|
done_message="done!")
|
|
j = _wait(job.id, "done")
|
|
assert j.result == {"path": "/x", "lidarr_album_id": None}
|
|
assert j.message == "done!"
|
|
assert j.error is None
|
|
|
|
|
|
def test_run_job_failure_sets_failed():
|
|
job = jobs.create_job(hit={}, message="m")
|
|
def boom():
|
|
raise RuntimeError("kaboom")
|
|
jobs.run_job(job.id, boom, done_message="done!", fail_message="it broke")
|
|
j = _wait(job.id, "failed")
|
|
assert j.error and "kaboom" in j.error
|
|
assert j.message == "it broke"
|
|
|
|
|
|
def test_get_unknown_job_returns_none():
|
|
assert jobs.get_job("nope") is None
|
|
|
|
|
|
def test_eviction_keeps_within_cap():
|
|
jobs.JOBS.clear()
|
|
for i in range(jobs._MAX_JOBS + 25):
|
|
jobs.create_job(hit={"i": i}, message="m")
|
|
assert len(jobs.JOBS) <= jobs._MAX_JOBS
|
|
jobs.JOBS.clear()
|
|
|
|
|
|
def teardown_module():
|
|
jobs.JOBS.clear()
|