Compare commits
32 Commits
feat/rest-
...
fix/yt-tag
| Author | SHA1 | Date | |
|---|---|---|---|
| 0347a638cf | |||
| 33ca743a34 | |||
| 7951c436dd | |||
| 8b881c14bf | |||
| 530b5b0406 | |||
| 0a4e6d474a | |||
| c0503187c5 | |||
| a6aa469084 | |||
| f071158c10 | |||
| c6bde6958a | |||
| 7ea3ad2538 | |||
| 9af7f91a25 | |||
| 567d7578ad | |||
| c6e28a4f75 | |||
| 1a81f64cc3 | |||
| fdc3cc84a5 | |||
| 74eb63b243 | |||
| 6730f1f141 | |||
| f103b6c253 | |||
| 7309ad3a29 | |||
| 90b9a01872 | |||
| 0f7ddd7697 | |||
| ca36d2bb27 | |||
| aa9d177ed1 | |||
| 3ee49b17bd | |||
| 6e6bec7a0d | |||
| a24c894c61 | |||
| a424fbfd2f | |||
| b99e5eb9cb | |||
| 1661cb1742 | |||
| 18f72a5626 | |||
| babbd84fda |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,2 +1,4 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
|
||||
server/log.txt
|
||||
|
||||
51
README.md
51
README.md
@@ -94,6 +94,9 @@ export LIDARR_API_KEY="your-lidarr-api-key"
|
||||
| `--yt-only` | Skip Lidarr. |
|
||||
| `-o`, `--root PATH` | Output root folder (default `/media/music`). |
|
||||
| `--search-all` | Search all albums when adding an artist to Lidarr. |
|
||||
| `--repair` | Re-tag existing downloads under `--root` from source metadata (see below). |
|
||||
| `--retag-from-path` | Offline: re-tag artist/title from folder + filename (see below). |
|
||||
| `-x`, `--exclude NAME` | Folder under `--root` to skip during `--repair`/`--retag-from-path` (repeatable). |
|
||||
| `--debug` | Verbose output. |
|
||||
|
||||
### Examples
|
||||
@@ -115,8 +118,48 @@ export LIDARR_API_KEY="your-lidarr-api-key"
|
||||
# YouTube only, lossless preferred
|
||||
./musicfetch --yt-only -q flac "Bonobo - Kerala"
|
||||
|
||||
# Download by URL (YouTube Music URL preferred for correct art)
|
||||
# Download by URL (single track or playlist/set/album, any yt-dlp site)
|
||||
./musicfetch "https://music.youtube.com/watch?v=xxxxxxxxxxx"
|
||||
./musicfetch "https://soundcloud.com/artist/sets/my-mix"
|
||||
```
|
||||
|
||||
### 🔧 Repair existing tags
|
||||
|
||||
`--repair` walks `<root>/<artist>/<source>/` (the `youtube`/`soundcloud`/… download
|
||||
folders — Lidarr album folders are skipped), re-fetches authoritative metadata for each
|
||||
file using the `[id]` in its filename, and fixes tags. Useful when downloads landed with
|
||||
missing album or wrong year.
|
||||
|
||||
It is deliberately **conservative**: it overwrites **album** and **year** (the usual
|
||||
breakage), and fills in **artist**/**title** when they are missing *or* a known-bogus
|
||||
placeholder (`NA`, `Unknown Album`, `Unknown Artist` — left behind by older buggy tagging) —
|
||||
but it never overwrites a genuine existing artist/title with a channel name or decorated video
|
||||
title. A bogus `NA [<id>].<ext>` filename is renamed to the recovered title, and a literal
|
||||
`NA` album with no source album is normalised to `Unknown Album`.
|
||||
|
||||
It re-queries the source over the network, so run it occasionally, not constantly. Requires
|
||||
`mutagen` (a yt-dlp dependency, usually already present). CLI-only — not exposed via the REST API.
|
||||
|
||||
```bash
|
||||
# Preview what would change (writes nothing)
|
||||
./musicfetch --repair -d
|
||||
|
||||
# Apply fixes under a specific root
|
||||
./musicfetch --repair -o /media/music
|
||||
```
|
||||
|
||||
**`--retag-from-path`** is an offline companion: it derives **artist** and **title** purely
|
||||
from the folder name + filename (stripping `(Official Video)` / `(Lyrics)`-style decorations,
|
||||
and treating an `Artist - Title` filename correctly), with no network. Use it to undo bad
|
||||
tags — e.g. titles/artists clobbered by an earlier `--repair` on music videos. It overwrites
|
||||
artist/title and leaves album/year alone.
|
||||
|
||||
```bash
|
||||
./musicfetch --retag-from-path -d # preview
|
||||
./musicfetch --retag-from-path -o /media/music
|
||||
|
||||
# Skip folders (e.g. hand-curated playlists you don't want re-tagged)
|
||||
./musicfetch --repair -x Unsorted -x playlists
|
||||
```
|
||||
|
||||
### 📁 Output Structure
|
||||
@@ -124,8 +167,10 @@ export LIDARR_API_KEY="your-lidarr-api-key"
|
||||
```text
|
||||
<root>/
|
||||
├── Artist Name/
|
||||
│ ├── Album Name/ (managed by Lidarr)
|
||||
│ └── youtube/ (yt-dlp downloads / fallbacks)
|
||||
│ ├── Album Name/ (managed by Lidarr)
|
||||
│ ├── youtube/ (YouTube / YouTube Music downloads)
|
||||
│ ├── soundcloud/ (SoundCloud downloads)
|
||||
│ └── <source>/ (one folder per yt-dlp source)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
@@ -0,0 +1,645 @@
|
||||
# Playlists + Profile Hardening Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** (1) Harden Lidarr profile selection (pick metadata + quality profile by name with env overrides, not by array position). (2) Add YouTube playlist support that downloads each track to its own per-artist folder, via both the CLI and the REST API (one playlist = one job, `done` if ≥1 track succeeds).
|
||||
|
||||
**Architecture:** New helpers in the single-file `musicfetch` binary (profile lookup by name; `is_playlist_url`/`expand_playlist`/`download_playlist`/`download_single`; `yt_download`/`act_youtube` return success bools). `server/mf.py` re-exports the new URL helpers; `server/jobs.py` gains callable `done_message` (so a batch can report `N/M`); `server/actions.py` + `server/app.py` route URL/playlist `q` to a download job. Tests import the binary via the existing `server.mf` loader (`musicfetch_core`).
|
||||
|
||||
**Tech Stack:** Python 3.10+, stdlib `urllib.parse`, `requests`/`ytmusicapi`/`yt-dlp` (already deps), FastAPI, pytest+monkeypatch. No new deps.
|
||||
|
||||
---
|
||||
|
||||
## Context for the implementer
|
||||
|
||||
Work from `/home/zhering/Documents/musicfetch` on branch `feat/playlists-profiles` (already checked out). The `musicfetch` binary (no `.py` ext) already has, verified at these locations:
|
||||
- `get_default_metadata_profile_id()` (line ~447): returns `profiles[0]["id"]` — to be replaced.
|
||||
- `add_artist()` (line ~457): payload hardcodes `"qualityProfileId": 1` (line ~466) and calls `get_default_metadata_profile_id()` (line ~467).
|
||||
- `yt_download(url_or_query, target_folder, quality, dry_run, hit=None)` (line ~579): builds the yt-dlp cmd, `subprocess.run(cmd)` at the end, returns None. `--no-playlist` is in the cmd.
|
||||
- `act_youtube(hit, root, quality, dry_run)` (line ~611): builds `music.youtube` URL + per-first-artist folder, calls `yt_download`, returns None.
|
||||
- `run_yt_dlp_get_metadata(url)` (line ~623), `get_artist_from_metadata(meta)` (line ~635), `handle_url(url, root, quality, dry_run)` (line ~644).
|
||||
- `is_url(s)` (early), `Hit` dataclass, `_ytm_artists(item)` (in YouTube-search section), module-level `YTMusic` (None if not installed), `subprocess`, `json`, `os`, `requests`, `RequestException`, `dbg`, `err`, `lidarr_get`, `lidarr_post`.
|
||||
|
||||
`server/mf.py` re-exports a fixed symbol list + `__all__`; `server/jobs.py` has `run_job(job_id, fn, done_message, fail_message=...)` where `done_message` is currently a str; `server/app.py` `fetch()` treats `q` only as a search term; `server/actions.py` has `perform_fetch`, `started_message`, `done_message`, `failed_message`.
|
||||
|
||||
Tests: `import server.mf # noqa: F401` then `import musicfetch_core as mf`; monkeypatch `mf.lidarr_get`, `mf.act_youtube`, `mf.subprocess`, `mf.YTMusic`, and `monkeypatch.setenv`.
|
||||
|
||||
Add to the top imports block of `musicfetch` (Task 2): `from urllib.parse import urlparse, parse_qs`.
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Lidarr profile hardening
|
||||
|
||||
**Files:**
|
||||
- Modify: `musicfetch` (replace `get_default_metadata_profile_id`; add `_profile_id_by_name` and `get_quality_profile_id`; change `add_artist` payload)
|
||||
- Test: `tests/test_profiles.py`
|
||||
|
||||
- [ ] **Step 1: Write the failing test**
|
||||
|
||||
Create `tests/test_profiles.py`:
|
||||
```python
|
||||
import server.mf # noqa: F401
|
||||
import musicfetch_core as mf
|
||||
|
||||
META = [{"id": 1, "name": "Standard"}, {"id": 2, "name": "None"}, {"id": 3, "name": "OST"}]
|
||||
QUAL = [{"id": 1, "name": "Any"}, {"id": 2, "name": "Lossless"}]
|
||||
|
||||
|
||||
def test_metadata_profile_default_standard_by_name(monkeypatch):
|
||||
monkeypatch.delenv("LIDARR_METADATA_PROFILE", raising=False)
|
||||
monkeypatch.setattr(mf, "lidarr_get", lambda path, timeout=10: META)
|
||||
assert mf.get_default_metadata_profile_id() == 1 # "Standard", not position-luck
|
||||
|
||||
|
||||
def test_metadata_profile_env_override(monkeypatch):
|
||||
monkeypatch.setenv("LIDARR_METADATA_PROFILE", "OST")
|
||||
monkeypatch.setattr(mf, "lidarr_get", lambda path, timeout=10: META)
|
||||
assert mf.get_default_metadata_profile_id() == 3
|
||||
|
||||
|
||||
def test_metadata_profile_unknown_name_falls_back_to_first(monkeypatch):
|
||||
monkeypatch.setenv("LIDARR_METADATA_PROFILE", "Nonexistent")
|
||||
monkeypatch.setattr(mf, "lidarr_get", lambda path, timeout=10: META)
|
||||
assert mf.get_default_metadata_profile_id() == 1
|
||||
|
||||
|
||||
def test_quality_profile_default_any_by_name(monkeypatch):
|
||||
monkeypatch.delenv("LIDARR_QUALITY_PROFILE", raising=False)
|
||||
monkeypatch.setattr(mf, "lidarr_get", lambda path, timeout=10: QUAL)
|
||||
assert mf.get_quality_profile_id() == 1
|
||||
|
||||
|
||||
def test_quality_profile_env_override(monkeypatch):
|
||||
monkeypatch.setenv("LIDARR_QUALITY_PROFILE", "Lossless")
|
||||
monkeypatch.setattr(mf, "lidarr_get", lambda path, timeout=10: QUAL)
|
||||
assert mf.get_quality_profile_id() == 2
|
||||
|
||||
|
||||
def test_profile_fetch_error_returns_one(monkeypatch):
|
||||
def boom(path, timeout=10):
|
||||
raise mf.RequestException("down")
|
||||
monkeypatch.setattr(mf, "lidarr_get", boom)
|
||||
assert mf.get_default_metadata_profile_id() == 1
|
||||
assert mf.get_quality_profile_id() == 1
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `pytest tests/test_profiles.py -v`
|
||||
Expected: FAIL — `AttributeError: ... 'get_quality_profile_id'` and metadata env tests fail.
|
||||
|
||||
- [ ] **Step 3: Implement**
|
||||
|
||||
In `musicfetch`, replace `get_default_metadata_profile_id` with:
|
||||
```python
|
||||
def _profile_id_by_name(path: str, env_var: str, default_name: str) -> int:
|
||||
"""Return the id of the profile whose name matches env_var (default
|
||||
default_name, case-insensitive). Fall back to the first profile, then 1."""
|
||||
name = os.environ.get(env_var, default_name)
|
||||
try:
|
||||
profiles = lidarr_get(path, timeout=10)
|
||||
except RequestException as e:
|
||||
dbg(f"{path} fetch failed: {e}")
|
||||
return 1
|
||||
if not profiles:
|
||||
return 1
|
||||
for p in profiles:
|
||||
if p.get("name", "").casefold() == name.casefold():
|
||||
return p["id"]
|
||||
dbg(f"profile '{name}' not found at {path}; using first ('{profiles[0].get('name')}')")
|
||||
return profiles[0]["id"]
|
||||
|
||||
|
||||
def get_default_metadata_profile_id() -> int:
|
||||
return _profile_id_by_name("/api/v1/metadataprofile", "LIDARR_METADATA_PROFILE", "Standard")
|
||||
|
||||
|
||||
def get_quality_profile_id() -> int:
|
||||
return _profile_id_by_name("/api/v1/qualityprofile", "LIDARR_QUALITY_PROFILE", "Any")
|
||||
```
|
||||
In `add_artist`, change the payload line `"qualityProfileId": 1,` to:
|
||||
```python
|
||||
"qualityProfileId": get_quality_profile_id(),
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `pytest tests/test_profiles.py -v`
|
||||
Expected: PASS (6 passed)
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add musicfetch tests/test_profiles.py
|
||||
git commit -m "fix(lidarr): select metadata/quality profiles by name with env overrides"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: Playlist core in `musicfetch`
|
||||
|
||||
**Files:**
|
||||
- Modify: `musicfetch` (add `from urllib.parse import urlparse, parse_qs`; add `is_playlist_url`, `_playlist_id`, `expand_playlist`, `download_playlist`, `download_single`; make `yt_download` + `act_youtube` return a success bool; rewrite `handle_url`)
|
||||
- Test: `tests/test_playlist.py`
|
||||
|
||||
- [ ] **Step 1: Write the failing test**
|
||||
|
||||
Create `tests/test_playlist.py`:
|
||||
```python
|
||||
import server.mf # noqa: F401
|
||||
import musicfetch_core as mf
|
||||
|
||||
|
||||
# ---- is_playlist_url ----
|
||||
def test_pure_playlist_url_is_playlist():
|
||||
assert mf.is_playlist_url("https://music.youtube.com/playlist?list=PLabc") is True
|
||||
assert mf.is_playlist_url("https://www.youtube.com/playlist?list=PLabc") is True
|
||||
|
||||
|
||||
def test_watch_with_list_is_not_playlist():
|
||||
assert mf.is_playlist_url("https://www.youtube.com/watch?v=abc&list=PLx") is False
|
||||
|
||||
|
||||
def test_plain_watch_is_not_playlist():
|
||||
assert mf.is_playlist_url("https://www.youtube.com/watch?v=abc") is False
|
||||
|
||||
|
||||
def test_non_url_is_not_playlist():
|
||||
assert mf.is_playlist_url("Daft Punk - Discovery") is False
|
||||
|
||||
|
||||
# ---- expand_playlist (yt-dlp fallback path) ----
|
||||
class _CP:
|
||||
def __init__(self, stdout):
|
||||
self.stdout = stdout
|
||||
self.returncode = 0
|
||||
|
||||
|
||||
def test_expand_playlist_ytdlp_fallback(monkeypatch):
|
||||
import json as _json
|
||||
monkeypatch.setattr(mf, "YTMusic", None) # force yt-dlp path
|
||||
payload = {"title": "My Mix", "entries": [
|
||||
{"id": "v1", "title": "Song One", "uploader": "Artist A"},
|
||||
{"id": "v2", "title": "Song Two", "channel": "Artist B"},
|
||||
{"id": None, "title": "skip"},
|
||||
]}
|
||||
monkeypatch.setattr(mf.subprocess, "run",
|
||||
lambda *a, **k: _CP(_json.dumps(payload)))
|
||||
title, hits = mf.expand_playlist("https://www.youtube.com/playlist?list=PLx")
|
||||
assert title == "My Mix"
|
||||
assert [h.payload["videoId"] for h in hits] == ["v1", "v2"]
|
||||
assert hits[0].artist == "Artist A"
|
||||
|
||||
|
||||
# ---- download_playlist ----
|
||||
def test_download_playlist_counts_ok_and_total(monkeypatch):
|
||||
h1 = mf.Hit(source="youtube", kind="track", title="A", artist="X", payload={"videoId": "1"})
|
||||
h2 = mf.Hit(source="youtube", kind="track", title="B", artist="Y", payload={"videoId": "2"})
|
||||
h3 = mf.Hit(source="youtube", kind="track", title="C", artist="Z", payload={"videoId": "3"})
|
||||
monkeypatch.setattr(mf, "expand_playlist", lambda url: ("PL Title", [h1, h2, h3]))
|
||||
|
||||
def fake_act(hit, root, quality, dry_run):
|
||||
return hit.title != "B" # B "fails"
|
||||
monkeypatch.setattr(mf, "act_youtube", fake_act)
|
||||
ok, total, title = mf.download_playlist("u", "/tmp", "best", False)
|
||||
assert (ok, total, title) == (2, 3, "PL Title")
|
||||
|
||||
|
||||
def test_download_playlist_track_exception_counts_as_failure(monkeypatch):
|
||||
h1 = mf.Hit(source="youtube", kind="track", title="A", artist="X", payload={"videoId": "1"})
|
||||
h2 = mf.Hit(source="youtube", kind="track", title="B", artist="Y", payload={"videoId": "2"})
|
||||
monkeypatch.setattr(mf, "expand_playlist", lambda url: ("T", [h1, h2]))
|
||||
|
||||
def fake_act(hit, root, quality, dry_run):
|
||||
if hit.title == "B":
|
||||
raise RuntimeError("boom")
|
||||
return True
|
||||
monkeypatch.setattr(mf, "act_youtube", fake_act)
|
||||
ok, total, _ = mf.download_playlist("u", "/tmp", "best", False)
|
||||
assert (ok, total) == (1, 2)
|
||||
|
||||
|
||||
# ---- yt_download returns success bool ----
|
||||
def test_yt_download_returns_true_on_zero_exit(monkeypatch):
|
||||
monkeypatch.setattr(mf.os, "makedirs", lambda *a, **k: None)
|
||||
monkeypatch.setattr(mf.subprocess, "run", lambda *a, **k: _CP("")) # returncode 0
|
||||
assert mf.yt_download("u", "/tmp/x", "best", False) is True
|
||||
|
||||
|
||||
def test_yt_download_dry_run_returns_true(monkeypatch):
|
||||
assert mf.yt_download("u", "/tmp/x", "best", True) is True
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `pytest tests/test_playlist.py -v`
|
||||
Expected: FAIL — `AttributeError: ... 'is_playlist_url'`.
|
||||
|
||||
- [ ] **Step 3: Implement**
|
||||
|
||||
Add to the top imports block of `musicfetch`:
|
||||
```python
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
```
|
||||
|
||||
Make `yt_download` return a success bool. Change its tail (the `if dry_run:` block and the final `subprocess.run`) to:
|
||||
```python
|
||||
if dry_run:
|
||||
print(f"[dry-run] mkdir -p {target_folder}")
|
||||
print(f"[dry-run] {' '.join(cmd)}")
|
||||
return True
|
||||
os.makedirs(target_folder, exist_ok=True)
|
||||
print(f"Downloading via yt-dlp -> {target_folder}")
|
||||
return subprocess.run(cmd).returncode == 0
|
||||
```
|
||||
|
||||
Make `act_youtube` return the bool — change its last line `yt_download(url, target, quality, dry_run, hit=hit)` to:
|
||||
```python
|
||||
return yt_download(url, target, quality, dry_run, hit=hit)
|
||||
```
|
||||
|
||||
Add the playlist functions (place them in the URL-path section, after `handle_url`'s helpers / near `handle_url`):
|
||||
```python
|
||||
def _playlist_id(url: str) -> str:
|
||||
return parse_qs(urlparse(url).query).get("list", [""])[0]
|
||||
|
||||
|
||||
def is_playlist_url(url: str) -> bool:
|
||||
"""True for a pure playlist URL (/playlist?list=… or list= without v=).
|
||||
A watch?v=…&list=… URL is treated as a single track, not a batch."""
|
||||
if not is_url(url):
|
||||
return False
|
||||
parsed = urlparse(url)
|
||||
qs = parse_qs(parsed.query)
|
||||
if "/playlist" in parsed.path:
|
||||
return True
|
||||
return "list" in qs and "v" not in qs
|
||||
|
||||
|
||||
def expand_playlist(url: str) -> tuple[str, list[Hit]]:
|
||||
"""Return (playlist_title, [track Hits]). Prefer ytmusicapi; fall back to
|
||||
yt-dlp --flat-playlist. Returns ("", []) on failure."""
|
||||
pid = _playlist_id(url)
|
||||
if YTMusic is not None and pid:
|
||||
try:
|
||||
pl = YTMusic().get_playlist(pid, limit=None)
|
||||
hits = []
|
||||
for t in pl.get("tracks", []):
|
||||
vid = t.get("videoId")
|
||||
if not vid:
|
||||
continue
|
||||
alb = t.get("album")
|
||||
album = alb.get("name", "") if isinstance(alb, dict) else (alb or "")
|
||||
hits.append(Hit(source="youtube", kind="track", title=t.get("title", ""),
|
||||
artist=_ytm_artists(t), album=album,
|
||||
year=str(t.get("year") or ""), payload={"videoId": vid}))
|
||||
if hits:
|
||||
return pl.get("title", ""), hits
|
||||
except Exception as e: # noqa: BLE001
|
||||
dbg(f"ytmusicapi playlist expand failed: {e}")
|
||||
try:
|
||||
result = subprocess.run(["yt-dlp", "--flat-playlist", "-J", url],
|
||||
capture_output=True, text=True, check=True)
|
||||
data = json.loads(result.stdout)
|
||||
except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
|
||||
err(f"yt-dlp playlist expand failed: {e}")
|
||||
return "", []
|
||||
hits = []
|
||||
for entry in data.get("entries", []):
|
||||
vid = entry.get("id")
|
||||
if not vid:
|
||||
continue
|
||||
hits.append(Hit(source="youtube", kind="track", title=entry.get("title", ""),
|
||||
artist=entry.get("uploader") or entry.get("channel") or "",
|
||||
payload={"videoId": vid}))
|
||||
return data.get("title", ""), hits
|
||||
|
||||
|
||||
def download_playlist(url: str, root: str, quality: str, dry_run: bool) -> tuple[int, int, str]:
|
||||
"""Download each playlist track via act_youtube. Returns (ok, total, title)."""
|
||||
title, hits = expand_playlist(url)
|
||||
ok = 0
|
||||
for h in hits:
|
||||
try:
|
||||
if act_youtube(h, root, quality, dry_run):
|
||||
ok += 1
|
||||
except Exception as e: # noqa: BLE001 — one bad track shouldn't abort the batch
|
||||
err(f"track failed ({h.title}): {e}")
|
||||
return ok, len(hits), title
|
||||
|
||||
|
||||
def download_single(url: str, root: str, quality: str, dry_run: bool) -> dict:
|
||||
"""Download a single URL. Returns {title, artist, ok}."""
|
||||
meta = run_yt_dlp_get_metadata(url)
|
||||
artist = get_artist_from_metadata(meta) if meta else "Unknown Artist"
|
||||
title = (meta or {}).get("title", "")
|
||||
target = os.path.join(root, artist, "youtube")
|
||||
ok = yt_download(url, target, quality, dry_run)
|
||||
return {"title": title, "artist": artist, "ok": ok}
|
||||
```
|
||||
|
||||
Rewrite `handle_url` to route playlists:
|
||||
```python
|
||||
def handle_url(url: str, root: str, quality: str, dry_run: bool):
|
||||
if is_playlist_url(url):
|
||||
ok, total, title = download_playlist(url, root, quality, dry_run)
|
||||
label = f" from '{title}'" if title else ""
|
||||
print(f"Downloaded {ok}/{total} tracks{label}")
|
||||
return
|
||||
download_single(url, root, quality, dry_run)
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `pytest tests/test_playlist.py -v`
|
||||
Expected: PASS (9 passed)
|
||||
|
||||
- [ ] **Step 5: Full suite + compile**
|
||||
|
||||
Run: `pytest -q` (prior 43 + 6 profiles + 9 playlist = 58) and `python3 -m py_compile musicfetch`.
|
||||
Expected: all green, clean compile.
|
||||
|
||||
- [ ] **Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add musicfetch tests/test_playlist.py
|
||||
git commit -m "feat(youtube): playlist expansion + per-track download, success bools"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Re-exports + callable job message
|
||||
|
||||
**Files:**
|
||||
- Modify: `server/mf.py` (re-export new URL helpers)
|
||||
- Modify: `server/jobs.py` (`run_job` accepts a callable `done_message`)
|
||||
- Test: `tests/test_jobs.py` (add a callable-message test)
|
||||
|
||||
- [ ] **Step 1: Write the failing test**
|
||||
|
||||
Append to `tests/test_jobs.py`:
|
||||
```python
|
||||
def test_run_job_callable_done_message():
|
||||
job = jobs.create_job(hit={}, message="m")
|
||||
jobs.run_job(job.id, lambda: {"ok": 2, "total": 3},
|
||||
done_message=lambda res: f"{res['ok']}/{res['total']} done")
|
||||
j = _wait(job.id, "done")
|
||||
assert j.message == "2/3 done"
|
||||
```
|
||||
Also add a re-export check — create `tests/test_mf_url_exports.py`:
|
||||
```python
|
||||
import server.mf as smf
|
||||
|
||||
|
||||
def test_url_helpers_reexported():
|
||||
assert callable(smf.is_url)
|
||||
assert callable(smf.is_playlist_url)
|
||||
assert callable(smf.download_playlist)
|
||||
assert callable(smf.download_single)
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `pytest tests/test_jobs.py::test_run_job_callable_done_message tests/test_mf_url_exports.py -v`
|
||||
Expected: FAIL (callable message not supported; `smf.is_playlist_url` missing).
|
||||
|
||||
- [ ] **Step 3: Implement**
|
||||
|
||||
In `server/jobs.py`, inside `run_job`'s `_task`, change the success branch to support a callable:
|
||||
```python
|
||||
result = fn()
|
||||
msg = done_message(result) if callable(done_message) else done_message
|
||||
_touch(job, status="done", result=result, message=msg)
|
||||
```
|
||||
(Update the `run_job` signature/type hint to `done_message` being `str | Callable[[dict], str]`; import `Callable` is already present.)
|
||||
|
||||
In `server/mf.py`, add to the re-export assignments and `__all__`:
|
||||
```python
|
||||
is_url = _mod.is_url
|
||||
is_playlist_url = _mod.is_playlist_url
|
||||
download_playlist = _mod.download_playlist
|
||||
download_single = _mod.download_single
|
||||
```
|
||||
Add those four names to the `__all__` list.
|
||||
|
||||
- [ ] **Step 4: Run tests to verify they pass**
|
||||
|
||||
Run: `pytest tests/test_jobs.py tests/test_mf_url_exports.py -v`
|
||||
Expected: PASS.
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add server/jobs.py server/mf.py tests/test_jobs.py tests/test_mf_url_exports.py
|
||||
git commit -m "feat(server): re-export URL helpers; callable job done_message"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 4: REST API URL/playlist routing
|
||||
|
||||
**Files:**
|
||||
- Modify: `server/actions.py` (add `url_started_message`, `url_done_message`, `playlist_done_message`, `perform_url_fetch`)
|
||||
- Modify: `server/app.py` (route URL `q` to a download job)
|
||||
- Test: `tests/test_api_url.py`
|
||||
|
||||
- [ ] **Step 1: Write the failing test**
|
||||
|
||||
Create `tests/test_api_url.py`:
|
||||
```python
|
||||
import time
|
||||
import pytest
|
||||
from server import jobs as jobs_mod
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clear_jobs():
|
||||
jobs_mod.JOBS.clear()
|
||||
yield
|
||||
jobs_mod.JOBS.clear()
|
||||
|
||||
|
||||
def _wait_done(client, auth, job_id, timeout=2.0):
|
||||
end = time.time() + timeout
|
||||
while time.time() < end:
|
||||
b = client.get(f"/jobs/{job_id}", headers=auth).json()
|
||||
if b["status"] in ("done", "failed"):
|
||||
return b
|
||||
time.sleep(0.01)
|
||||
raise AssertionError("job never finished")
|
||||
|
||||
|
||||
def test_playlist_url_batch_job(client, auth, monkeypatch):
|
||||
monkeypatch.setattr("server.app.mf.download_playlist",
|
||||
lambda url, root, quality, dry_run: (2, 3, "My Mix"))
|
||||
r = client.post("/fetch", params={"q": "https://music.youtube.com/playlist?list=PLx"}, headers=auth)
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert body["status"] == "queued"
|
||||
assert body["hit"]["kind"] == "playlist"
|
||||
done = _wait_done(client, auth, body["job_id"])
|
||||
assert done["status"] == "done"
|
||||
assert "2/3" in done["message"]
|
||||
assert done["result"]["ok"] == 2
|
||||
|
||||
|
||||
def test_playlist_zero_success_fails(client, auth, monkeypatch):
|
||||
monkeypatch.setattr("server.app.mf.download_playlist",
|
||||
lambda url, root, quality, dry_run: (0, 3, "Dead Mix"))
|
||||
body = client.post("/fetch", params={"q": "https://www.youtube.com/playlist?list=PLy"}, headers=auth).json()
|
||||
done = _wait_done(client, auth, body["job_id"])
|
||||
assert done["status"] == "failed"
|
||||
|
||||
|
||||
def test_single_video_url_download(client, auth, monkeypatch):
|
||||
monkeypatch.setattr("server.app.mf.download_single",
|
||||
lambda url, root, quality, dry_run: {"title": "Song", "artist": "A", "ok": True})
|
||||
body = client.post("/fetch", params={"q": "https://music.youtube.com/watch?v=abc"}, headers=auth).json()
|
||||
assert body["hit"]["kind"] == "track"
|
||||
done = _wait_done(client, auth, body["job_id"])
|
||||
assert done["status"] == "done"
|
||||
assert "Song" in done["message"]
|
||||
|
||||
|
||||
def test_search_query_still_works(client, auth, monkeypatch):
|
||||
from server import mf
|
||||
hit = mf.Hit(source="youtube", kind="track", title="T", artist="A", payload={"videoId": "x"})
|
||||
monkeypatch.setattr("server.app.mf.build_combined_hits",
|
||||
lambda q, limit, yt_first, lidarr_only, yt_only: [hit])
|
||||
monkeypatch.setattr("server.app.mf.pick", lambda hits, q, ni, yf: hits[0])
|
||||
monkeypatch.setattr("server.app.actions.perform_fetch",
|
||||
lambda chosen, hits, quality, root: {"path": "/x", "lidarr_album_id": None})
|
||||
r = client.post("/fetch", params={"q": "Daft Punk - Discovery"}, headers=auth)
|
||||
assert r.status_code == 200
|
||||
assert r.json()["status"] == "queued"
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `pytest tests/test_api_url.py -v`
|
||||
Expected: FAIL (URL `q` currently goes to search → no `download_playlist`/`download_single` calls; kind not "playlist").
|
||||
|
||||
- [ ] **Step 3: Implement**
|
||||
|
||||
In `server/actions.py`, add:
|
||||
```python
|
||||
def url_started_message(kind: str, title: str = "") -> str:
|
||||
if kind == "playlist":
|
||||
return (f"Fetching playlist '{title}'. Downloading tracks now."
|
||||
if title else "Fetching playlist. Downloading tracks now.")
|
||||
return f"Fetching '{title}'. Downloading now." if title else "Fetching track. Downloading now."
|
||||
|
||||
|
||||
def playlist_done_message(result: dict) -> str:
|
||||
ok, total = result.get("ok", 0), result.get("total", 0)
|
||||
failed = total - ok
|
||||
return f"Downloaded {ok}/{total} tracks" + (f" ({failed} failed)." if failed else ".")
|
||||
|
||||
|
||||
def url_done_message(result: dict) -> str:
|
||||
title = result.get("title", "")
|
||||
return f"Downloaded '{title}'." if title else "Download complete."
|
||||
|
||||
|
||||
def perform_url_fetch(url: str, quality: str, root: str) -> dict:
|
||||
"""Download a URL (playlist → batch, else single). Raises if nothing
|
||||
downloaded so the job is marked failed."""
|
||||
if mf.is_playlist_url(url):
|
||||
ok, total, title = mf.download_playlist(url, root, quality, False)
|
||||
if ok == 0:
|
||||
raise RuntimeError(f"No tracks downloaded from playlist '{title}'." if title
|
||||
else "No tracks downloaded from playlist.")
|
||||
return {"kind": "playlist", "title": title, "ok": ok, "total": total,
|
||||
"path": None, "lidarr_album_id": None}
|
||||
info = mf.download_single(url, root, quality, False)
|
||||
if not info.get("ok"):
|
||||
raise RuntimeError("Download failed.")
|
||||
return {"kind": "track", "title": info["title"], "artist": info["artist"],
|
||||
"ok": 1, "total": 1, "path": None, "lidarr_album_id": None}
|
||||
```
|
||||
|
||||
In `server/app.py` `fetch()`, add a URL branch BEFORE the search logic (after the `quality` validation; keep the existing `quality not in mf.QUALITY_CHOICES` 422 check above it). Insert:
|
||||
```python
|
||||
if mf.is_url(q):
|
||||
kind = "playlist" if mf.is_playlist_url(q) else "track"
|
||||
syn = mf.Hit(source="youtube", kind=kind, title="", artist="")
|
||||
job = jobs.create_job(hit=syn, message=actions.url_started_message(kind))
|
||||
response = _job_public(job)
|
||||
done_msg = actions.playlist_done_message if kind == "playlist" else actions.url_done_message
|
||||
jobs.run_job(
|
||||
job.id,
|
||||
lambda: actions.perform_url_fetch(q, quality, ROOT),
|
||||
done_message=done_msg,
|
||||
fail_message="Download failed.",
|
||||
)
|
||||
return response
|
||||
```
|
||||
(The existing `source` validation can stay; it's ignored for URLs. Leave the search path untouched below this branch.)
|
||||
|
||||
- [ ] **Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `pytest tests/test_api_url.py -v`
|
||||
Expected: PASS (4 passed)
|
||||
|
||||
- [ ] **Step 5: Full suite**
|
||||
|
||||
Run: `pytest -q`
|
||||
Expected: all green (58 + callable/export + 4 api-url ≈ 64).
|
||||
|
||||
- [ ] **Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add server/actions.py server/app.py tests/test_api_url.py
|
||||
git commit -m "feat(server): route URL/playlist /fetch to download jobs"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 5: Live verification
|
||||
|
||||
**Files:** none (controller-run).
|
||||
|
||||
- [ ] **Step 1: Profiles** — read-only confirm name selection against the real Lidarr:
|
||||
```bash
|
||||
cd /home/zhering/Documents/musicfetch
|
||||
env LIDARR_URL=http://10.2.1.16:8686 LIDARR_API_KEY=49cf02acb4c7436b842df2150056d468 \
|
||||
python3 -c "import server.mf, musicfetch_core as mf; print('meta', mf.get_default_metadata_profile_id(), 'qual', mf.get_quality_profile_id())"
|
||||
```
|
||||
Expected: `meta 1 qual 1` (Standard / Any). Then with `LIDARR_METADATA_PROFILE=OST` → `meta 3`.
|
||||
|
||||
- [ ] **Step 2: Playlist (CLI dry-run)** — confirm expansion + per-track routing without downloading. Pick a small real YT Music playlist URL:
|
||||
```bash
|
||||
./musicfetch -d "<small-playlist-url>"
|
||||
```
|
||||
Expected: prints a `[dry-run] yt-dlp …` line per track, each targeting `/media/music/<artist>/youtube`.
|
||||
|
||||
- [ ] **Step 3: Playlist (real, small)** — with user approval, run the API against a 3-5 track playlist:
|
||||
```bash
|
||||
fuser -k 6769/tcp 2>/dev/null; sleep 1
|
||||
env MUSICFETCH_API_KEY=testkey MUSICFETCH_ROOT=/tmp \
|
||||
python3 -m uvicorn server.app:app --host 127.0.0.1 --port 6769 --log-level warning &
|
||||
sleep 4
|
||||
curl -s -X POST 'http://127.0.0.1:6769/fetch?q=<small-playlist-url>' -H 'X-API-Key: testkey'
|
||||
# poll /jobs/{id} → expect "Downloaded N/M tracks", files under /tmp/<artist>/youtube/
|
||||
fuser -k 6769/tcp 2>/dev/null
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Self-Review
|
||||
|
||||
**Spec coverage:**
|
||||
- Profile-by-name + env overrides + add_artist uses quality profile → Task 1. ✅
|
||||
- `is_playlist_url` (watch?v&list → single) → Task 2. ✅
|
||||
- `expand_playlist` (ytmusicapi → yt-dlp fallback) → Task 2. ✅
|
||||
- `download_playlist` per-track via `act_youtube`, ok/total counting, per-track failures caught → Task 2. ✅
|
||||
- `yt_download`/`act_youtube` success bools → Task 2. ✅
|
||||
- CLI `handle_url` playlist routing → Task 2. ✅
|
||||
- Re-exports + callable batch message → Task 3. ✅
|
||||
- API URL routing, playlist batch job, `done` if ok≥1 else `failed`, single-URL job, Siri messages, search path unchanged → Task 4. ✅
|
||||
- Live checks (profiles + playlist) → Task 5. ✅
|
||||
- Out-of-scope (per-track fan-out, resume/dedup) excluded. ✅
|
||||
|
||||
**Placeholder scan:** none — all code/tests complete (the only `<…>` are real user-supplied URLs in the manual Task 5 steps).
|
||||
|
||||
**Type consistency:** `download_playlist -> (int,int,str)` consumed as `(ok,total,title)` in CLI + `perform_url_fetch`. `download_single -> {title,artist,ok}` consumed in `perform_url_fetch`. `yt_download`/`act_youtube` now return bool; `act_youtube`'s only other caller (`actions.perform_fetch._download_youtube` in the existing search path) ignores the return value — unaffected. `run_job(done_message)` accepts str or `Callable[[dict],str]`; existing search-path callers pass str (unchanged). `_profile_id_by_name(path, env_var, default_name)` used by both profile getters. New `mf.py` exports (`is_url`, `is_playlist_url`, `download_playlist`, `download_single`) match the names used in `server/app.py` and `server/actions.py`.
|
||||
@@ -0,0 +1,113 @@
|
||||
# YouTube Playlists + Lidarr Profile Hardening — Design
|
||||
|
||||
**Date:** 2026-06-08
|
||||
**Status:** Approved
|
||||
|
||||
## Context & Goal
|
||||
|
||||
Two additions to `musicfetch` (and its REST API):
|
||||
|
||||
1. **Lidarr profile hardening.** `add_artist` currently hardcodes
|
||||
`qualityProfileId=1` and `get_default_metadata_profile_id()` returns
|
||||
`profiles[0]["id"]` — the *first* profile, arbitrarily. If a user's profile
|
||||
order differs, every added artist could silently get the wrong metadata
|
||||
profile (e.g. **OST** or **None**) or quality profile. Select by name with an
|
||||
env override instead.
|
||||
|
||||
2. **YouTube playlist support.** A playlist URL should download **each track as
|
||||
its own file**, routed into per-track artist folders, via both the CLI and the
|
||||
REST API (one playlist = one job).
|
||||
|
||||
## Decisions (confirmed with user)
|
||||
|
||||
- Playlists work in **CLI + REST API**; one playlist = **one job** (option B).
|
||||
- Tracks land in **per-track artist folders** `<root>/<artist>/youtube/` (option A),
|
||||
reusing the existing single-track path.
|
||||
- Partial failures: job is **`done` if ≥1 track succeeded** (message
|
||||
`"Downloaded 12/14 tracks (2 failed)"`); `failed` only if zero succeed.
|
||||
- Playlist expansion prefers **ytmusicapi** for YT Music playlists, falls back to
|
||||
**`yt-dlp --flat-playlist`**; each track then downloads through the existing
|
||||
`act_youtube` (first-artist folders, tag overrides, `music.youtube` URLs).
|
||||
- A `watch?v=X&list=Y` URL stays a **single track** (no surprise batch); only a
|
||||
pure playlist URL (`/playlist?list=` or `list=` without `v=`) triggers a batch.
|
||||
|
||||
## Architecture
|
||||
|
||||
### Feature 1 — Profile hardening (`musicfetch`)
|
||||
|
||||
- `get_default_metadata_profile_id()` → look up `/api/v1/metadataprofile`, pick the
|
||||
profile whose name matches env `LIDARR_METADATA_PROFILE` (default `"Standard"`,
|
||||
case-insensitive); fall back to the first profile's id, then `1`.
|
||||
- New `get_quality_profile_id()` → `/api/v1/qualityprofile`, match env
|
||||
`LIDARR_QUALITY_PROFILE` (default `"Any"`); fall back to first id, then `1`.
|
||||
- `add_artist` uses `get_quality_profile_id()` instead of literal `1`.
|
||||
|
||||
### Feature 2 — Playlists
|
||||
|
||||
New units in `musicfetch`:
|
||||
|
||||
```
|
||||
is_playlist_url(url) -> bool
|
||||
expand_playlist(url) -> tuple[str, list[Hit]] # (playlist_title, track Hits)
|
||||
download_playlist(url, root, quality, dry_run) -> tuple[int, int] # (ok, total)
|
||||
```
|
||||
|
||||
- **`is_playlist_url`**: true when the URL has `list=` AND no `v=` param, or path
|
||||
contains `/playlist`. (So `watch?v=...&list=...` → False.)
|
||||
- **`expand_playlist`**: if `ytmusicapi` is available and the URL is a YT Music
|
||||
playlist, use `YTMusic().get_playlist(playlist_id)` → tracks with
|
||||
artist/album/year/videoId. Otherwise `yt-dlp --flat-playlist -J <url>` → entries
|
||||
(title, uploader→artist, id). Map each to `Hit(source="youtube", kind="track",
|
||||
…, payload={"videoId": id})`. Returns the playlist title + Hits. Empty/none → `("", [])`.
|
||||
- **`download_playlist`**: for each Hit call `act_youtube(hit, root, quality,
|
||||
dry_run)`, catching per-track exceptions (count ok/total); returns `(ok, total)`.
|
||||
- **`handle_url`** (CLI): if `is_playlist_url` → `download_playlist` and print
|
||||
`"Downloaded N/M tracks"`; else the existing single-URL download.
|
||||
|
||||
### Feature 2 — REST API (`server/`)
|
||||
|
||||
`POST /fetch` currently treats `q` only as a search term. Add URL routing:
|
||||
|
||||
- In `server/app.py` `fetch()`: if `mf.is_url(q)` → create a **download job**
|
||||
(not a search). The job runs in `server/actions.py`:
|
||||
- `is_playlist_url(q)` → `download_playlist(q, ROOT, quality, False)` → result
|
||||
`{"ok": n, "total": m, "path": None, "lidarr_album_id": None}`; message
|
||||
`"Downloaded {n}/{m} tracks"` (+ ` ({m-n} failed)` when failures).
|
||||
- else single URL → reuse `handle_url`-equivalent single download; message
|
||||
`"Downloaded '<title>'"`.
|
||||
- Response `hit` for a playlist: `{"source":"youtube","kind":"playlist",
|
||||
"title": <playlist title>, "artist":"", "album":"", "year":""}`.
|
||||
- Status: `done` if `ok ≥ 1` (or single URL succeeded), else `failed`.
|
||||
- `actions.perform_fetch` (search path) is unchanged; a new
|
||||
`actions.perform_url_fetch(q, quality, root) -> dict` handles the URL branch, and
|
||||
`started_message`/`done_message` get URL/playlist-aware variants.
|
||||
|
||||
## Error Handling
|
||||
|
||||
- Per-track download failures in `download_playlist` are caught and counted; the
|
||||
batch continues. A batch with zero successes → job `failed`.
|
||||
- `expand_playlist` degrades ytmusicapi → yt-dlp → `("", [])`; an empty expansion
|
||||
yields a `failed` job with message `"No tracks found in playlist."`.
|
||||
- Profile lookups already degrade to a sane fallback id on any HTTP error.
|
||||
|
||||
## Testing
|
||||
|
||||
Unit (mock network / `act_youtube`, no real downloads):
|
||||
- `is_playlist_url`: `/playlist?list=…` → True; `watch?v=X&list=Y` → False;
|
||||
`watch?v=X` → False; non-URL → False.
|
||||
- `get_default_metadata_profile_id` / `get_quality_profile_id`: pick by env name;
|
||||
fall back to first when name absent; fall back to `1` on error.
|
||||
- `expand_playlist`: maps ytmusicapi playlist JSON → Hits (title/artist/videoId);
|
||||
yt-dlp fallback path; empty → `("", [])`.
|
||||
- `download_playlist`: counts ok/total with one track's `act_youtube` raising.
|
||||
- API: `POST /fetch` with a playlist URL → job, batch message `"Downloaded n/m…"`,
|
||||
`done` when ok≥1; single URL → single-download job; zero-success → `failed`.
|
||||
|
||||
Live check: a small real YT Music playlist (3-5 tracks) → each track lands in
|
||||
`<root>/<artist>/youtube/` with correct single-artist tags; job message reports
|
||||
`N/M`.
|
||||
|
||||
## Out of Scope (YAGNI)
|
||||
|
||||
Per-track job fan-out (option C), resume/skip-already-downloaded, playlist→Lidarr
|
||||
album matching, dedup across runs, progress streaming during a batch.
|
||||
736
musicfetch
736
musicfetch
@@ -12,12 +12,15 @@ import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
from requests.exceptions import ConnectionError as ReqConnectionError
|
||||
from requests.exceptions import RequestException, Timeout
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
# Optional deps — degrade gracefully if missing.
|
||||
try:
|
||||
@@ -126,6 +129,77 @@ def _artist_to_hit(artist: dict) -> Hit:
|
||||
)
|
||||
|
||||
|
||||
MUSICBRAINZ_URL = "https://musicbrainz.org/ws/2"
|
||||
MB_HEADERS = {"User-Agent": "musicfetch/2.0 (https://github.com/; personal music fetcher)"}
|
||||
_mb_last_call = 0.0
|
||||
|
||||
|
||||
def _mb_rate_limit():
|
||||
"""Courtesy ~1 req/sec to MusicBrainz."""
|
||||
global _mb_last_call
|
||||
elapsed = time.time() - _mb_last_call
|
||||
if elapsed < 1.0:
|
||||
time.sleep(1.0 - elapsed)
|
||||
_mb_last_call = time.time()
|
||||
|
||||
|
||||
def _mb_artist_credit(credit) -> str:
|
||||
"""First credited artist name only (ignore featured/secondary)."""
|
||||
if credit and isinstance(credit, list) and isinstance(credit[0], dict):
|
||||
return credit[0].get("name") or (credit[0].get("artist") or {}).get("name", "")
|
||||
return ""
|
||||
|
||||
|
||||
def musicbrainz_best_album(artist: str, track: str, timeout: int = 8) -> Optional[dict]:
|
||||
"""Resolve 'artist - track' to its best studio album via MusicBrainz.
|
||||
Prefers a studio album credited to the track's own artist (not a Various
|
||||
Artists compilation). Returns {album_title, artist, year, rg_mbid} or None.
|
||||
Never raises."""
|
||||
query = f'artist:"{artist}" AND recording:"{track}"'
|
||||
try:
|
||||
_mb_rate_limit()
|
||||
resp = requests.get(
|
||||
f"{MUSICBRAINZ_URL}/recording",
|
||||
params={"query": query, "fmt": "json", "limit": 25},
|
||||
headers=MB_HEADERS, timeout=timeout,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
except Exception as e: # noqa: BLE001 — degrade to fallback on any failure
|
||||
dbg(f"MusicBrainz lookup failed: {e}")
|
||||
return None
|
||||
|
||||
# candidate = (own_studio, is_studio, date_sortkey, title, artist, year, mbid)
|
||||
candidates = []
|
||||
for rec in data.get("recordings", []):
|
||||
rec_artist = _mb_artist_credit(rec.get("artist-credit"))
|
||||
for rel in rec.get("releases", []):
|
||||
rg = rel.get("release-group") or {}
|
||||
title = rg.get("title") or rel.get("title") or ""
|
||||
if not title:
|
||||
continue
|
||||
mbid = rg.get("id") or ""
|
||||
primary = rg.get("primary-type") or ""
|
||||
secondary = rg.get("secondary-types") or []
|
||||
rel_artist = _mb_artist_credit(rel.get("artist-credit"))
|
||||
date = rel.get("date") or rg.get("first-release-date") or ""
|
||||
is_studio = primary == "Album" and not secondary
|
||||
own_studio = is_studio and (
|
||||
not rel_artist or rel_artist.casefold() == rec_artist.casefold()
|
||||
)
|
||||
candidates.append((own_studio, is_studio, date or "9999", title, rec_artist, date[:4], mbid))
|
||||
|
||||
if not candidates:
|
||||
return None
|
||||
pool = ([c for c in candidates if c[0]]
|
||||
or [c for c in candidates if c[1]]
|
||||
or candidates)
|
||||
pool.sort(key=lambda c: c[2]) # earliest date first
|
||||
_, _, _, title, art, year, mbid = pool[0]
|
||||
dbg(f"MusicBrainz resolved '{artist} - {track}' -> '{title}' ({year}) mbid={mbid}")
|
||||
return {"album_title": title, "artist": art or artist, "year": year, "rg_mbid": mbid}
|
||||
|
||||
|
||||
def _split_query(query: str) -> tuple[str, Optional[str]]:
|
||||
"""Split a Shazam-style 'Artist - Track' on the first ' - '.
|
||||
Returns (artist, track) or (term, None) when there is no separator."""
|
||||
@@ -136,38 +210,81 @@ def _split_query(query: str) -> tuple[str, Optional[str]]:
|
||||
|
||||
|
||||
def lidarr_search(query: str, limit: int) -> list[Hit]:
|
||||
"""Universal search via /api/v1/search; fall back to album+artist lookup."""
|
||||
"""Return Lidarr hits, best match first. Resolves 'Artist - Track' to an
|
||||
album's MusicBrainz release-group MBID, then does an exact Lidarr lookup
|
||||
(term=mbid:<id>) — no fuzzy ranking. Falls back so it never raises and
|
||||
returns [] only on total failure / missing key."""
|
||||
if not API_KEY:
|
||||
err("LIDARR_API_KEY not set — skipping Lidarr search.")
|
||||
return []
|
||||
|
||||
artist, right = _split_query(query)
|
||||
|
||||
if right:
|
||||
mb = musicbrainz_best_album(artist, right)
|
||||
if mb and mb["rg_mbid"]:
|
||||
hits = _lidarr_album_candidates(f"mbid:{mb['rg_mbid']}")
|
||||
for h in hits:
|
||||
if not h.year and mb["year"]:
|
||||
h.year = mb["year"]
|
||||
if hits:
|
||||
return hits[:limit]
|
||||
# MusicBrainz miss / no exact album → plain lookup (album-first: a dash
|
||||
# query named an album/track).
|
||||
return _fallback_lookup(query, limit, artist_first=False)
|
||||
|
||||
# Bare term is most often an artist.
|
||||
return _fallback_lookup(query, limit, artist_first=True)
|
||||
|
||||
|
||||
def _log_lidarr_failure(label: str, e: Exception) -> None:
|
||||
"""A connection/timeout error means Lidarr is unreachable — the silent
|
||||
YouTube fallback that follows is easy to mistake for "Lidarr had no match",
|
||||
so surface it loudly. Ordinary HTTP errors stay debug-only."""
|
||||
if isinstance(e, (ReqConnectionError, Timeout)):
|
||||
err(f"Lidarr unreachable ({label} at {LIDARR_URL}): {e}. "
|
||||
f"Falling back to YouTube.")
|
||||
else:
|
||||
dbg(f"{label} failed: {e}")
|
||||
|
||||
|
||||
def _lidarr_album_candidates(term: str) -> list[Hit]:
|
||||
try:
|
||||
return [_album_to_hit(a) for a in lidarr_get("/api/v1/album/lookup", params={"term": term})]
|
||||
except RequestException as e:
|
||||
_log_lidarr_failure("album/lookup", e)
|
||||
return []
|
||||
|
||||
|
||||
def _lidarr_artist_candidates(term: str) -> list[Hit]:
|
||||
try:
|
||||
return [_artist_to_hit(a) for a in lidarr_get("/api/v1/artist/lookup", params={"term": term})]
|
||||
except RequestException as e:
|
||||
_log_lidarr_failure("artist/lookup", e)
|
||||
return []
|
||||
|
||||
|
||||
def _fallback_lookup(query: str, limit: int, artist_first: bool) -> list[Hit]:
|
||||
"""Plain album + artist lookups (no scoring); /search as last resort."""
|
||||
albums = _lidarr_album_candidates(query)
|
||||
artists = _lidarr_artist_candidates(query)
|
||||
hits = (artists + albums) if artist_first else (albums + artists)
|
||||
if hits:
|
||||
return hits[:limit]
|
||||
return _universal_search(query, limit)
|
||||
|
||||
|
||||
def _universal_search(query: str, limit: int) -> list[Hit]:
|
||||
"""Last resort: Lidarr's fuzzy /search (unranked)."""
|
||||
hits: list[Hit] = []
|
||||
try:
|
||||
results = lidarr_get("/api/v1/search", params={"term": query})
|
||||
for item in results:
|
||||
# /search returns objects with 'foreignId' and either 'album' or 'artist'.
|
||||
for item in lidarr_get("/api/v1/search", params={"term": query}):
|
||||
if item.get("album"):
|
||||
hits.append(_album_to_hit(item["album"]))
|
||||
elif item.get("artist"):
|
||||
hits.append(_artist_to_hit(item["artist"]))
|
||||
if hits:
|
||||
return hits[:limit]
|
||||
dbg("/api/v1/search returned nothing useful; trying lookup endpoints.")
|
||||
except Timeout:
|
||||
err("Lidarr universal search timed out.")
|
||||
except RequestException as e:
|
||||
dbg(f"/api/v1/search unavailable ({e}); falling back to lookup endpoints.")
|
||||
|
||||
# Fallback: album lookup then artist lookup.
|
||||
try:
|
||||
for album in lidarr_get("/api/v1/album/lookup", params={"term": query}):
|
||||
hits.append(_album_to_hit(album))
|
||||
except RequestException as e:
|
||||
dbg(f"album/lookup failed: {e}")
|
||||
try:
|
||||
for artist in lidarr_get("/api/v1/artist/lookup", params={"term": query}):
|
||||
hits.append(_artist_to_hit(artist))
|
||||
except RequestException as e:
|
||||
dbg(f"artist/lookup failed: {e}")
|
||||
dbg(f"/api/v1/search failed: {e}")
|
||||
return hits[:limit]
|
||||
|
||||
|
||||
@@ -340,14 +457,30 @@ def get_existing_artist(name: str) -> Optional[dict]:
|
||||
return None
|
||||
|
||||
|
||||
def get_default_metadata_profile_id() -> int:
|
||||
def _profile_id_by_name(path: str, env_var: str, default_name: str) -> int:
|
||||
"""Return the id of the profile whose name matches env_var (default
|
||||
default_name, case-insensitive). Fall back to the first profile, then 1."""
|
||||
name = os.environ.get(env_var, default_name)
|
||||
try:
|
||||
profiles = lidarr_get("/api/v1/metadataprofile", timeout=10)
|
||||
if profiles:
|
||||
return profiles[0]["id"]
|
||||
profiles = lidarr_get(path, timeout=10)
|
||||
except RequestException as e:
|
||||
dbg(f"metadataprofile fetch failed: {e}")
|
||||
return 1
|
||||
dbg(f"{path} fetch failed: {e}")
|
||||
return 1
|
||||
if not profiles:
|
||||
return 1
|
||||
for p in profiles:
|
||||
if p.get("name", "").casefold() == name.casefold():
|
||||
return p["id"]
|
||||
dbg(f"profile '{name}' not found at {path}; using first ('{profiles[0].get('name')}')")
|
||||
return profiles[0]["id"]
|
||||
|
||||
|
||||
def get_default_metadata_profile_id() -> int:
|
||||
return _profile_id_by_name("/api/v1/metadataprofile", "LIDARR_METADATA_PROFILE", "Standard")
|
||||
|
||||
|
||||
def get_quality_profile_id() -> int:
|
||||
return _profile_id_by_name("/api/v1/qualityprofile", "LIDARR_QUALITY_PROFILE", "Any")
|
||||
|
||||
|
||||
def add_artist(meta: dict, root: str, search_all: bool, dry_run: bool) -> Optional[dict]:
|
||||
@@ -359,7 +492,7 @@ def add_artist(meta: dict, root: str, search_all: bool, dry_run: bool) -> Option
|
||||
payload = {
|
||||
"foreignArtistId": foreign_id,
|
||||
"artistName": name,
|
||||
"qualityProfileId": 1,
|
||||
"qualityProfileId": get_quality_profile_id(),
|
||||
"metadataProfileId": get_default_metadata_profile_id(),
|
||||
"rootFolderPath": root,
|
||||
"monitored": True,
|
||||
@@ -472,62 +605,210 @@ def _quality_args(quality: str) -> list[str]:
|
||||
return ["-f", "bestaudio/best", "-x"]
|
||||
|
||||
|
||||
def yt_download(url_or_query: str, target_folder: str, quality: str, dry_run: bool,
|
||||
hit: Optional[Hit] = None):
|
||||
def yt_download(url_or_query: str, target_folder: Optional[str], quality: str, dry_run: bool,
|
||||
hit: Optional[Hit] = None, outtmpl: Optional[str] = None):
|
||||
cmd = ["yt-dlp",
|
||||
*_quality_args(quality),
|
||||
"--embed-metadata",
|
||||
"--embed-thumbnail",
|
||||
"--no-playlist",
|
||||
"-P", target_folder]
|
||||
# Override tags from the chosen hit so they don't rely on scraped titles.
|
||||
"--no-playlist"]
|
||||
# Either a fixed output dir (-P) or a metadata-driven output template (-o).
|
||||
if outtmpl:
|
||||
cmd += ["-o", outtmpl]
|
||||
else:
|
||||
cmd += ["-P", target_folder]
|
||||
# Override embedded tags from the chosen hit. Inject literals via a
|
||||
# seed-then-replace pair: --parse-metadata first copies an always-present
|
||||
# field into meta_<tag> (so the tag exists even when the source lacks it,
|
||||
# e.g. YouTube videos with no album), then --replace-in-metadata overwrites
|
||||
# it with the literal value. This dodges yt-dlp's output-template trap where
|
||||
# a bare-word FROM (e.g. "Cochise") matches field_to_template's r'[a-zA-Z_]+$'
|
||||
# and is read as a *field name* -> "NA". --replace-in-metadata args are
|
||||
# literal, so single-word values and parens survive intact.
|
||||
def _force_tag(field: str, value: str) -> list[str]:
|
||||
repl = value.replace("\\", r"\\") # backslash is special in re.sub repl
|
||||
return ["--parse-metadata", f"%(title,id)s:%(meta_{field})s",
|
||||
"--replace-in-metadata", f"meta_{field}", "^.*$", repl]
|
||||
|
||||
if hit:
|
||||
if hit.artist:
|
||||
# First artist only; anchored ^.*$ replaces the whole field exactly once
|
||||
# (a bare .* matches twice and doubles the value).
|
||||
primary_artist = hit.artist.split(",")[0].strip()
|
||||
cmd += ["--replace-in-metadata", "artist", "^.*$", primary_artist]
|
||||
if hit.album:
|
||||
cmd += ["--parse-metadata", f"{hit.album}:%(album)s"]
|
||||
primary_artist = hit.artist.split(",")[0].strip() if hit.artist else ""
|
||||
if primary_artist:
|
||||
cmd += _force_tag("artist", primary_artist)
|
||||
if hit.title:
|
||||
cmd += ["--parse-metadata", f"{hit.title}:%(title)s"]
|
||||
cmd += _force_tag("title", hit.title)
|
||||
if hit.album:
|
||||
cmd += _force_tag("album", hit.album)
|
||||
if hit.year:
|
||||
cmd += ["--parse-metadata", f"{hit.year}:%(release_year)s"]
|
||||
# When the hit carried no album, still embed one: the resolved/native album
|
||||
# if present, else a placeholder so players (e.g. Plexamp) don't choke on a
|
||||
# blank album. (A hit album is already forced above and must not be clobbered.)
|
||||
if not (hit and hit.album):
|
||||
cmd += ["--parse-metadata", "%(album|Unknown Album)s:%(meta_album)s"]
|
||||
cmd.append(url_or_query)
|
||||
|
||||
dest = outtmpl or target_folder
|
||||
if dry_run:
|
||||
print(f"[dry-run] mkdir -p {target_folder}")
|
||||
if target_folder:
|
||||
print(f"[dry-run] mkdir -p {target_folder}")
|
||||
print(f"[dry-run] {' '.join(cmd)}")
|
||||
return
|
||||
os.makedirs(target_folder, exist_ok=True)
|
||||
print(f"Downloading via yt-dlp -> {target_folder}")
|
||||
subprocess.run(cmd)
|
||||
return True
|
||||
if target_folder:
|
||||
os.makedirs(target_folder, exist_ok=True)
|
||||
print(f"Downloading via yt-dlp -> {dest}")
|
||||
return subprocess.run(cmd).returncode == 0
|
||||
|
||||
|
||||
def _sanitize_source(name: str) -> str:
|
||||
"""Normalize a yt-dlp extractor key to a folder name ('Youtube'->'youtube')."""
|
||||
clean = re.sub(r"[^a-z0-9]+", "", (name or "").lower())
|
||||
return clean or "downloads"
|
||||
|
||||
|
||||
def _track_url(hit: Hit) -> str:
|
||||
"""Resolve the best download URL for a track Hit. YouTube tracks prefer the
|
||||
music.youtube URL (correct album art); other platforms use their own URL."""
|
||||
p = hit.payload
|
||||
extractor = p.get("extractor")
|
||||
vid = p.get("videoId")
|
||||
if vid and extractor in (None, "youtube"):
|
||||
return f"https://music.youtube.com/watch?v={vid}"
|
||||
if p.get("url"):
|
||||
return p["url"]
|
||||
if vid:
|
||||
return f"https://music.youtube.com/watch?v={vid}"
|
||||
return f"ytsearch1:{hit.artist} {hit.title}"
|
||||
|
||||
|
||||
def act_youtube(hit: Hit, root: str, quality: str, dry_run: bool):
|
||||
vid = hit.payload.get("videoId")
|
||||
# Prefer YouTube Music URL for correct album art / topic metadata.
|
||||
url = f"https://music.youtube.com/watch?v={vid}" if vid else f"ytsearch1:{hit.artist} {hit.title}"
|
||||
artist_dir = hit.artist.split(",")[0].strip() or "Unknown Artist"
|
||||
target = os.path.join(root, artist_dir, "youtube")
|
||||
yt_download(url, target, quality, dry_run, hit=hit)
|
||||
url = _track_url(hit)
|
||||
source = hit.payload.get("extractor") or "youtube"
|
||||
artist_dir = hit.artist.split(",")[0].strip()
|
||||
if artist_dir:
|
||||
target = os.path.join(root, artist_dir, source)
|
||||
return yt_download(url, target, quality, dry_run, hit=hit)
|
||||
# Sparse playlist metadata (e.g. SoundCloud sets): let yt-dlp route the file
|
||||
# by the track's own metadata so it lands under the real artist.
|
||||
outtmpl = os.path.join(root, "%(artist,uploader,channel)s", source, "%(title)s [%(id)s].%(ext)s")
|
||||
return yt_download(url, None, quality, dry_run, hit=hit, outtmpl=outtmpl)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# URL path
|
||||
# ---------------------------------------------------------------------------
|
||||
def run_yt_dlp_get_metadata(url: str) -> Optional[dict]:
|
||||
def _playlist_id(url: str) -> str:
|
||||
return parse_qs(urlparse(url).query).get("list", [""])[0]
|
||||
|
||||
|
||||
def _is_youtube_playlist_url(url: str) -> bool:
|
||||
"""True for a YouTube playlist URL (/playlist?list=… or list= without v=).
|
||||
A watch?v=…&list=… URL is treated as a single track, not a batch."""
|
||||
if not is_url(url):
|
||||
return False
|
||||
parsed = urlparse(url)
|
||||
if "youtube" not in parsed.netloc:
|
||||
return False
|
||||
qs = parse_qs(parsed.query)
|
||||
if "/playlist" in parsed.path:
|
||||
return True
|
||||
return "list" in qs and "v" not in qs
|
||||
|
||||
|
||||
def _ytmusic_playlist(pid: str) -> tuple[str, list[Hit]]:
|
||||
"""Expand a YouTube Music playlist via ytmusicapi. Returns ("", []) on failure."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["yt-dlp", "-j", "--no-playlist", url],
|
||||
capture_output=True, text=True, check=True,
|
||||
)
|
||||
pl = YTMusic().get_playlist(pid, limit=None)
|
||||
except Exception as e: # noqa: BLE001
|
||||
dbg(f"ytmusicapi playlist expand failed: {e}")
|
||||
return "", []
|
||||
hits = []
|
||||
for t in pl.get("tracks", []):
|
||||
vid = t.get("videoId")
|
||||
if not vid:
|
||||
continue
|
||||
alb = t.get("album")
|
||||
album = alb.get("name", "") if isinstance(alb, dict) else (alb or "")
|
||||
hits.append(Hit(source="youtube", kind="track", title=t.get("title", ""),
|
||||
artist=_ytm_artists(t), album=album, year=str(t.get("year") or ""),
|
||||
payload={"videoId": vid, "extractor": "youtube"}))
|
||||
return pl.get("title", ""), hits
|
||||
|
||||
|
||||
def _entry_to_hit(entry: dict) -> Hit:
|
||||
"""Map a yt-dlp --flat-playlist entry to a track Hit (any platform)."""
|
||||
source = _sanitize_source(entry.get("ie_key") or entry.get("extractor") or "")
|
||||
vid = entry.get("id")
|
||||
return Hit(source="youtube", kind="track", title=entry.get("title", ""),
|
||||
artist=entry.get("uploader") or entry.get("channel") or "",
|
||||
payload={"url": entry.get("url"),
|
||||
"videoId": vid if source == "youtube" else None,
|
||||
"extractor": source})
|
||||
|
||||
|
||||
def probe_url(url: str) -> tuple[str, str, list[Hit]]:
|
||||
"""Classify a URL via yt-dlp. Returns (kind, title, hits) where kind is
|
||||
'playlist' (hits populated) or 'track' (hits empty; caller downloads the URL).
|
||||
YouTube playlists use ytmusicapi for richer metadata."""
|
||||
if _is_youtube_playlist_url(url) and YTMusic is not None:
|
||||
pid = _playlist_id(url)
|
||||
if pid:
|
||||
title, hits = _ytmusic_playlist(pid)
|
||||
if hits:
|
||||
return "playlist", title, hits
|
||||
try:
|
||||
result = subprocess.run(["yt-dlp", "--flat-playlist", "-J", url],
|
||||
capture_output=True, text=True, check=True)
|
||||
data = json.loads(result.stdout)
|
||||
except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
|
||||
dbg(f"yt-dlp probe failed: {e}")
|
||||
return "track", "", []
|
||||
if data.get("entries") is not None or data.get("_type") == "playlist":
|
||||
hits = [_entry_to_hit(e) for e in data.get("entries", [])
|
||||
if e.get("id") or e.get("url")]
|
||||
return "playlist", data.get("title", ""), hits
|
||||
return "track", data.get("title", ""), []
|
||||
|
||||
|
||||
def download_hits(hits: list[Hit], root: str, quality: str, dry_run: bool) -> tuple[int, int]:
|
||||
"""Download each track Hit via act_youtube. Returns (ok, total)."""
|
||||
ok = 0
|
||||
for h in hits:
|
||||
try:
|
||||
if act_youtube(h, root, quality, dry_run):
|
||||
ok += 1
|
||||
except Exception as e: # noqa: BLE001 — one bad track shouldn't abort the batch
|
||||
err(f"track failed ({h.title}): {e}")
|
||||
return ok, len(hits)
|
||||
|
||||
|
||||
def download_single(url: str, root: str, quality: str, dry_run: bool) -> dict:
|
||||
"""Download a single URL (any yt-dlp site). Returns {title, artist, ok}."""
|
||||
meta = run_yt_dlp_get_metadata(url)
|
||||
artist = get_artist_from_metadata(meta) if meta else "Unknown Artist"
|
||||
title = (meta or {}).get("title", "")
|
||||
source = _sanitize_source((meta or {}).get("extractor", "")) if meta else "downloads"
|
||||
# First artist only for the folder (matches the search/playlist paths).
|
||||
artist_dir = artist.split(",")[0].strip() or "Unknown Artist"
|
||||
target = os.path.join(root, artist_dir, source)
|
||||
ok = yt_download(url, target, quality, dry_run)
|
||||
return {"title": title, "artist": artist, "ok": ok}
|
||||
|
||||
|
||||
def run_yt_dlp_get_metadata(url: str, extra_args=None) -> Optional[dict]:
|
||||
cmd = ["yt-dlp", "-j", "--no-playlist", *(extra_args or []), url]
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
||||
return json.loads(result.stdout)
|
||||
except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
|
||||
err(f"yt-dlp metadata extraction failed: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# Repair only reads tags — skip YouTube's slow/throttled JS signature step
|
||||
# (we never download here), which keeps metadata but is far faster per file.
|
||||
_REPAIR_META_ARGS = ["--extractor-args", "youtube:player_skip=js"]
|
||||
|
||||
|
||||
def get_artist_from_metadata(meta: dict) -> str:
|
||||
for key in ("artist", "creator", "uploader", "channel"):
|
||||
if meta.get(key):
|
||||
@@ -538,10 +819,319 @@ def get_artist_from_metadata(meta: dict) -> str:
|
||||
|
||||
|
||||
def handle_url(url: str, root: str, quality: str, dry_run: bool):
|
||||
meta = run_yt_dlp_get_metadata(url)
|
||||
artist = get_artist_from_metadata(meta) if meta else "Unknown Artist"
|
||||
target = os.path.join(root, artist, "youtube")
|
||||
yt_download(url, target, quality, dry_run)
|
||||
kind, title, hits = probe_url(url)
|
||||
if kind == "playlist":
|
||||
ok, total = download_hits(hits, root, quality, dry_run)
|
||||
label = f" from '{title}'" if title else ""
|
||||
print(f"Downloaded {ok}/{total} tracks{label}")
|
||||
return
|
||||
download_single(url, root, quality, dry_run)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Repair: re-tag existing downloads from source metadata (CLI only)
|
||||
# ---------------------------------------------------------------------------
|
||||
_AUDIO_EXTS = ("opus", "m4a", "mp3", "flac")
|
||||
_TRACK_FILE_RE = re.compile(
|
||||
r"^(?P<title>.*) \[(?P<id>[^\]]+)\]\.(?P<ext>" + "|".join(_AUDIO_EXTS) + r")$")
|
||||
# m4a uses atom keys; vorbis/easy formats use plain names.
|
||||
_MP4_KEYS = {"artist": "\xa9ART", "title": "\xa9nam", "album": "\xa9alb", "date": "\xa9day"}
|
||||
|
||||
|
||||
def _is_source_dir(name: str) -> bool:
|
||||
"""True for a yt-dlp-style source folder (youtube/soundcloud/…), so we skip
|
||||
Lidarr album folders (which have spaces/capitals)."""
|
||||
return bool(name) and name == _sanitize_source(name)
|
||||
|
||||
|
||||
def _parse_track_file(filename: str):
|
||||
"""Return (title, id) parsed from '<title> [<id>].<ext>', else None."""
|
||||
m = _TRACK_FILE_RE.match(filename)
|
||||
return (m.group("title"), m.group("id")) if m else None
|
||||
|
||||
|
||||
def _repair_probe_url(source: str, vid: str):
|
||||
"""Reconstruct a fetchable URL from (source, id), or None if unsupported."""
|
||||
if source == "youtube":
|
||||
return f"https://music.youtube.com/watch?v={vid}"
|
||||
if source == "soundcloud":
|
||||
return f"https://api.soundcloud.com/tracks/{vid}"
|
||||
return None
|
||||
|
||||
|
||||
def _repair_id_ok(source: str, vid: str) -> bool:
|
||||
"""True if the parsed id matches the source's id format (avoids querying
|
||||
junk ids pulled from bracketed descriptors like '[Official Video]')."""
|
||||
if source == "youtube":
|
||||
return bool(re.fullmatch(r"[A-Za-z0-9_-]{11}", vid))
|
||||
if source == "soundcloud":
|
||||
return vid.isdigit()
|
||||
return False
|
||||
|
||||
|
||||
def _valid_year(meta: dict) -> str:
|
||||
"""A plausible release year from metadata, or '' . Uses release info only —
|
||||
NOT upload_date, which is the upload year, not the song's year."""
|
||||
for v in (meta.get("release_year"), (meta.get("release_date") or "")[:4]):
|
||||
s = str(v or "")
|
||||
if s.isdigit() and 1000 <= int(s) <= 2100:
|
||||
return s
|
||||
return ""
|
||||
|
||||
|
||||
def _open_audio(path: str):
|
||||
"""Return (mutagen_file, key_map) for the path's format, or (None, None)."""
|
||||
import mutagen.flac
|
||||
import mutagen.mp4
|
||||
import mutagen.oggopus
|
||||
from mutagen.easyid3 import EasyID3
|
||||
ext = path.rsplit(".", 1)[-1].lower()
|
||||
if ext == "opus":
|
||||
return mutagen.oggopus.OggOpus(path), None
|
||||
if ext == "m4a":
|
||||
return mutagen.mp4.MP4(path), _MP4_KEYS
|
||||
if ext == "mp3":
|
||||
return EasyID3(path), None
|
||||
if ext == "flac":
|
||||
return mutagen.flac.FLAC(path), None
|
||||
return None, None
|
||||
|
||||
|
||||
def _read_tag(audio, key_map, field: str) -> str:
|
||||
k = key_map[field] if key_map else field
|
||||
val = audio.get(k)
|
||||
if not val:
|
||||
return ""
|
||||
return str(val[0]) if isinstance(val, list) else str(val)
|
||||
|
||||
|
||||
# Placeholder tag values the old tagging bug left behind (yt-dlp's "NA" missing
|
||||
# marker, and the "Unknown *" fallbacks). Treated as empty so repair overwrites
|
||||
# them rather than mistaking them for a real, present tag.
|
||||
_BOGUS_TAGS = {"", "na", "n/a", "unknown", "unknown album", "unknown artist"}
|
||||
|
||||
|
||||
def _is_bogus(value: str) -> bool:
|
||||
return (value or "").strip().casefold() in _BOGUS_TAGS
|
||||
|
||||
|
||||
def _fs_safe(name: str) -> str:
|
||||
"""Filesystem-safe filename stem: mirror yt-dlp's default '/'->'⧸' so the
|
||||
path stays a single segment, and drop NULs."""
|
||||
return name.replace("/", "⧸").replace("\0", "").strip()
|
||||
|
||||
|
||||
def _maybe_rename_bogus(path: str, title: str, dry_run: bool) -> tuple[str, Optional[str]]:
|
||||
"""When the filename stem is a placeholder (e.g. 'NA [<id>]'), rename to
|
||||
'<title> [<id>].<ext>'. Returns (current_path, change_note_or_None)."""
|
||||
fname = os.path.basename(path)
|
||||
parsed = _parse_track_file(fname)
|
||||
if not parsed:
|
||||
return path, None
|
||||
stem_title, vid = parsed
|
||||
if not _is_bogus(stem_title) or _is_bogus(title):
|
||||
return path, None
|
||||
ext = fname.rsplit(".", 1)[-1]
|
||||
new_name = f"{_fs_safe(title)} [{vid}].{ext}"
|
||||
new_path = os.path.join(os.path.dirname(path), new_name)
|
||||
if new_path == path or not new_name:
|
||||
return path, None
|
||||
note = f"renamed -> {new_name}"
|
||||
if dry_run:
|
||||
print(f"[dry-run] would rename {fname} -> {new_name}")
|
||||
return path, note
|
||||
os.rename(path, new_path)
|
||||
print(f"renamed {fname} -> {new_name}")
|
||||
return new_path, note
|
||||
|
||||
|
||||
def repair_file(path: str, source: str, dry_run: bool) -> list[str]:
|
||||
"""Re-tag one file from source metadata. album/year are authoritative
|
||||
(overwrite); artist/title are filled when MISSING *or* a known-bogus
|
||||
placeholder ('NA', 'Unknown …') — the old tagging bug wrote those — but a
|
||||
genuine existing tag is never clobbered with a channel name or decorated
|
||||
music-video title. A bogus 'NA [<id>]' filename is renamed to the recovered
|
||||
title. Returns the list of changed fields."""
|
||||
parsed = _parse_track_file(os.path.basename(path))
|
||||
if not parsed:
|
||||
dbg(f"skip (no id): {path}")
|
||||
return []
|
||||
_, vid = parsed
|
||||
if not _repair_id_ok(source, vid):
|
||||
dbg(f"skip (bad {source} id '{vid}'): {path}")
|
||||
return []
|
||||
url = _repair_probe_url(source, vid)
|
||||
if not url:
|
||||
dbg(f"skip (source '{source}' not re-queryable): {path}")
|
||||
return []
|
||||
meta = run_yt_dlp_get_metadata(url, _REPAIR_META_ARGS)
|
||||
if not meta:
|
||||
dbg(f"skip (no metadata): {path}")
|
||||
return []
|
||||
|
||||
try:
|
||||
audio, key_map = _open_audio(path)
|
||||
except Exception as e: # noqa: BLE001
|
||||
err(f"cannot open {path}: {e}")
|
||||
return []
|
||||
if audio is None:
|
||||
return []
|
||||
|
||||
album = (meta.get("album") or "").strip()
|
||||
year = _valid_year(meta)
|
||||
cur_artist = _read_tag(audio, key_map, "artist")
|
||||
cur_title = _read_tag(audio, key_map, "title")
|
||||
cur_album = _read_tag(audio, key_map, "album")
|
||||
meta_artist = get_artist_from_metadata(meta)
|
||||
meta_title = (meta.get("title") or "").strip()
|
||||
|
||||
updates = {}
|
||||
if album:
|
||||
updates["album"] = album
|
||||
elif cur_album and _is_bogus(cur_album) and cur_album.strip().casefold() != "unknown album":
|
||||
# No source album, but the tag is a literal 'NA' — normalise it so no
|
||||
# file keeps the placeholder (a blank album is left blank, as before).
|
||||
updates["album"] = "Unknown Album"
|
||||
if year:
|
||||
updates["date"] = year
|
||||
# Fill artist/title when missing OR bogus; never overwrite a genuine value.
|
||||
if meta_artist and not _is_bogus(meta_artist) and _is_bogus(cur_artist):
|
||||
updates["artist"] = meta_artist
|
||||
if meta_title and not _is_bogus(meta_title) and _is_bogus(cur_title):
|
||||
updates["title"] = meta_title
|
||||
|
||||
changed = []
|
||||
for field, value in updates.items():
|
||||
if _read_tag(audio, key_map, field) != value:
|
||||
changed.append(f"{field}={value}")
|
||||
if not dry_run:
|
||||
audio[key_map[field] if key_map else field] = [value]
|
||||
if changed and not dry_run:
|
||||
audio.save()
|
||||
if changed:
|
||||
prefix = "[dry-run] would set" if dry_run else "set"
|
||||
print(f"{prefix} [{', '.join(changed)}] on {path}")
|
||||
|
||||
# Repair a placeholder filename using the final (recovered) title.
|
||||
final_title = updates.get("title") or cur_title
|
||||
_, rename_note = _maybe_rename_bogus(path, final_title, dry_run)
|
||||
if rename_note:
|
||||
changed.append(rename_note)
|
||||
return changed
|
||||
|
||||
|
||||
def repair_library(root: str, dry_run: bool, exclude=()) -> tuple[int, int]:
|
||||
"""Walk <root>/<artist>/<source>/ and re-tag audio files. Returns (scanned, changed)."""
|
||||
if not os.path.isdir(root):
|
||||
err(f"Root folder not found: {root}")
|
||||
return 0, 0
|
||||
scanned = changed = 0
|
||||
for path, source, _artist in _iter_source_files(root, exclude):
|
||||
scanned += 1
|
||||
try:
|
||||
if repair_file(path, source, dry_run):
|
||||
changed += 1
|
||||
except Exception as e: # noqa: BLE001 — one bad file shouldn't abort
|
||||
err(f"repair failed ({os.path.basename(path)}): {e}")
|
||||
verb = "Would repair" if dry_run else "Repaired"
|
||||
print(f"{verb} {changed}/{scanned} files")
|
||||
return scanned, changed
|
||||
|
||||
|
||||
def _iter_source_files(root: str, exclude=()):
|
||||
"""Yield (path, source, artist) for audio files under <root>/<artist>/<source>/
|
||||
where source is a yt-dlp source folder (Lidarr album folders are skipped).
|
||||
Skips any artist or source folder whose name is in `exclude` (case-insensitive)."""
|
||||
skip = {e.lower() for e in exclude}
|
||||
for artist in sorted(os.listdir(root)):
|
||||
if artist.lower() in skip:
|
||||
continue
|
||||
adir = os.path.join(root, artist)
|
||||
if not os.path.isdir(adir):
|
||||
continue
|
||||
for source in sorted(os.listdir(adir)):
|
||||
if source.lower() in skip:
|
||||
continue
|
||||
sdir = os.path.join(adir, source)
|
||||
if not os.path.isdir(sdir) or not _is_source_dir(source):
|
||||
continue
|
||||
for fname in sorted(os.listdir(sdir)):
|
||||
if fname.lower().endswith(_AUDIO_EXTS):
|
||||
yield os.path.join(sdir, fname), source, artist
|
||||
|
||||
|
||||
# --- Offline retag-from-path (recover from tags damaged by a prior --repair) ---
|
||||
_DECORATION_RE = re.compile(
|
||||
r"\s*[\(\[][^)\]]*\b(?:official|lyric[s]?|audio|visuali[sz]er|"
|
||||
r"music\s+video|m/?v|hd|hq|4k|explicit|remaster(?:ed)?)\b[^)\]]*[\)\]]",
|
||||
re.IGNORECASE)
|
||||
|
||||
|
||||
def _title_from_filename(filename: str) -> str:
|
||||
"""Filename minus extension and a trailing ' [<id>]'."""
|
||||
stem = re.sub(r"\.(?:" + "|".join(_AUDIO_EXTS) + r")$", "", filename, flags=re.IGNORECASE)
|
||||
return re.sub(r"\s*\[[^\]]+\]$", "", stem).strip()
|
||||
|
||||
|
||||
def _strip_decorations(title: str) -> str:
|
||||
return re.sub(r"\s{2,}", " ", _DECORATION_RE.sub("", title)).strip(" -–—")
|
||||
|
||||
|
||||
def _derive_from_filename(filename: str, folder_artist: str) -> tuple[str, str]:
|
||||
"""Best-effort (artist, title) from the filename. A 'Artist - Title' name wins
|
||||
over the folder (handles music-video downloads filed under a channel name)."""
|
||||
title = _strip_decorations(_title_from_filename(filename))
|
||||
if " - " in title:
|
||||
left, right = title.split(" - ", 1)
|
||||
return left.strip(), right.strip()
|
||||
return folder_artist, title
|
||||
|
||||
|
||||
def retag_file_from_path(path: str, folder_artist: str, dry_run: bool) -> list[str]:
|
||||
"""Overwrite artist/title from the folder + cleaned filename. Leaves album/date."""
|
||||
artist, title = _derive_from_filename(os.path.basename(path), folder_artist)
|
||||
try:
|
||||
audio, key_map = _open_audio(path)
|
||||
except Exception as e: # noqa: BLE001
|
||||
err(f"cannot open {path}: {e}")
|
||||
return []
|
||||
if audio is None:
|
||||
return []
|
||||
updates = {}
|
||||
if artist:
|
||||
updates["artist"] = artist
|
||||
if title:
|
||||
updates["title"] = title
|
||||
changed = []
|
||||
for field, value in updates.items():
|
||||
if _read_tag(audio, key_map, field) != value:
|
||||
changed.append(f"{field}={value}")
|
||||
if not dry_run:
|
||||
audio[key_map[field] if key_map else field] = [value]
|
||||
if changed and not dry_run:
|
||||
audio.save()
|
||||
if changed:
|
||||
prefix = "[dry-run] would set" if dry_run else "set"
|
||||
print(f"{prefix} [{', '.join(changed)}] on {path}")
|
||||
return changed
|
||||
|
||||
|
||||
def retag_library_from_path(root: str, dry_run: bool, exclude=()) -> tuple[int, int]:
|
||||
"""Re-tag artist/title offline from folder+filename for every source file."""
|
||||
if not os.path.isdir(root):
|
||||
err(f"Root folder not found: {root}")
|
||||
return 0, 0
|
||||
scanned = changed = 0
|
||||
for path, _source, artist in _iter_source_files(root, exclude):
|
||||
scanned += 1
|
||||
try:
|
||||
if retag_file_from_path(path, artist, dry_run):
|
||||
changed += 1
|
||||
except Exception as e: # noqa: BLE001
|
||||
err(f"retag failed ({os.path.basename(path)}): {e}")
|
||||
verb = "Would retag" if dry_run else "Retagged"
|
||||
print(f"{verb} {changed}/{scanned} files")
|
||||
return scanned, changed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -564,7 +1154,7 @@ def parse_args():
|
||||
p = argparse.ArgumentParser(
|
||||
prog="musicfetch",
|
||||
description="Fetch music via Lidarr (preferred) or YouTube Music.")
|
||||
p.add_argument("query", nargs="+", help="Free-form query or a URL.")
|
||||
p.add_argument("query", nargs="*", help="Free-form query or a URL.")
|
||||
p.add_argument("-n", "--noninteractive", action="store_true",
|
||||
help="Auto-pick the top hit, no prompt.")
|
||||
p.add_argument("-s", "--ytsearch", action="store_true",
|
||||
@@ -579,6 +1169,14 @@ def parse_args():
|
||||
p.add_argument("-o", "--root", default=DEFAULT_ROOT, help=f"Output root (default {DEFAULT_ROOT}).")
|
||||
p.add_argument("--search-all", action="store_true",
|
||||
help="Search all albums when adding an artist to Lidarr.")
|
||||
p.add_argument("--repair", action="store_true",
|
||||
help="Re-tag existing downloads under --root from source metadata.")
|
||||
p.add_argument("--retag-from-path", action="store_true",
|
||||
help="Offline: re-tag artist/title from folder + filename "
|
||||
"(fixes tags damaged by a prior --repair).")
|
||||
p.add_argument("-x", "--exclude", action="append", default=[], metavar="NAME",
|
||||
help="Folder name under --root to skip during --repair/--retag-from-path "
|
||||
"(repeatable, e.g. -x Unsorted -x playlists).")
|
||||
p.add_argument("--debug", action="store_true", help="Verbose output.")
|
||||
return p.parse_args()
|
||||
|
||||
@@ -589,6 +1187,18 @@ def main():
|
||||
DEBUG = args.debug
|
||||
query = " ".join(args.query).strip()
|
||||
|
||||
if args.retag_from_path:
|
||||
retag_library_from_path(args.root, args.dry_run, args.exclude)
|
||||
return
|
||||
|
||||
if args.repair:
|
||||
repair_library(args.root, args.dry_run, args.exclude)
|
||||
return
|
||||
|
||||
if not query:
|
||||
err("Provide a query/URL, or use --repair. See --help.")
|
||||
sys.exit(1)
|
||||
|
||||
if args.lidarr_only and args.yt_only:
|
||||
err("--lidarr-only and --yt-only are mutually exclusive.")
|
||||
sys.exit(1)
|
||||
|
||||
@@ -1,7 +1,20 @@
|
||||
FROM python:3.12-slim
|
||||
|
||||
# ffmpeg for audio extraction/embedding; deno is the JS runtime yt-dlp needs
|
||||
# for YouTube (without it: "No supported JavaScript runtime" -> missing formats
|
||||
# / HTTP 403). yt-dlp auto-detects deno on PATH.
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends ffmpeg \
|
||||
&& apt-get install -y --no-install-recommends ffmpeg ca-certificates curl unzip \
|
||||
&& arch="$(uname -m)" \
|
||||
&& case "$arch" in \
|
||||
x86_64) deno_arch=x86_64-unknown-linux-gnu ;; \
|
||||
aarch64) deno_arch=aarch64-unknown-linux-gnu ;; \
|
||||
*) echo "unsupported arch: $arch" >&2; exit 1 ;; \
|
||||
esac \
|
||||
&& curl -fsSL "https://github.com/denoland/deno/releases/latest/download/deno-${deno_arch}.zip" -o /tmp/deno.zip \
|
||||
&& unzip /tmp/deno.zip -d /usr/local/bin \
|
||||
&& rm /tmp/deno.zip \
|
||||
&& apt-get purge -y --auto-remove curl unzip \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
@@ -61,3 +61,38 @@ def perform_fetch(chosen, hits: list, quality: str, root: str) -> dict:
|
||||
if not ok:
|
||||
raise RuntimeError("Failed to add artist to Lidarr.")
|
||||
return {"path": None, "lidarr_album_id": None}
|
||||
|
||||
|
||||
def url_started_message(kind: str, title: str = "") -> str:
|
||||
if kind == "playlist":
|
||||
return (f"Fetching playlist '{title}'. Downloading tracks now."
|
||||
if title else "Fetching playlist. Downloading tracks now.")
|
||||
return f"Fetching '{title}'. Downloading now." if title else "Fetching track. Downloading now."
|
||||
|
||||
|
||||
def playlist_done_message(result: dict) -> str:
|
||||
ok, total = result.get("ok", 0), result.get("total", 0)
|
||||
failed = total - ok
|
||||
return f"Downloaded {ok}/{total} tracks" + (f" ({failed} failed)." if failed else ".")
|
||||
|
||||
|
||||
def url_done_message(result: dict) -> str:
|
||||
title = result.get("title", "")
|
||||
return f"Downloaded '{title}'." if title else "Download complete."
|
||||
|
||||
|
||||
def perform_url_fetch(url: str, kind: str, title: str, hits: list, quality: str, root: str) -> dict:
|
||||
"""Download a probed URL (playlist -> batch over pre-probed hits, else single).
|
||||
Raises if nothing downloaded so the job is marked failed."""
|
||||
if kind == "playlist":
|
||||
ok, total = mf.download_hits(hits, root, quality, False)
|
||||
if ok == 0:
|
||||
raise RuntimeError(f"No tracks downloaded from playlist '{title}'." if title
|
||||
else "No tracks downloaded from playlist.")
|
||||
return {"kind": "playlist", "title": title, "ok": ok, "total": total,
|
||||
"path": None, "lidarr_album_id": None}
|
||||
info = mf.download_single(url, root, quality, False)
|
||||
if not info.get("ok"):
|
||||
raise RuntimeError("Download failed.")
|
||||
return {"kind": "track", "title": info.get("title", ""), "artist": info.get("artist", ""),
|
||||
"ok": 1, "total": 1, "path": None, "lidarr_album_id": None}
|
||||
|
||||
@@ -51,6 +51,21 @@ def fetch(q: str = Query(..., min_length=1),
|
||||
source: str = Query("auto")):
|
||||
if quality not in mf.QUALITY_CHOICES:
|
||||
raise HTTPException(status_code=422, detail=f"Invalid quality '{quality}'.")
|
||||
|
||||
if mf.is_url(q):
|
||||
kind, title, hits = mf.probe_url(q)
|
||||
syn = mf.Hit(source="youtube", kind=kind, title=title, artist="")
|
||||
job = jobs.create_job(hit=syn, message=actions.url_started_message(kind, title))
|
||||
response = _job_public(job)
|
||||
done_msg = actions.playlist_done_message if kind == "playlist" else actions.url_done_message
|
||||
jobs.run_job(
|
||||
job.id,
|
||||
lambda: actions.perform_url_fetch(q, kind, title, hits, quality, ROOT),
|
||||
done_message=done_msg,
|
||||
fail_message="Download failed.",
|
||||
)
|
||||
return response
|
||||
|
||||
if source not in ("auto", "lidarr", "youtube"):
|
||||
raise HTTPException(status_code=422, detail=f"Invalid source '{source}'.")
|
||||
|
||||
|
||||
@@ -15,11 +15,3 @@ services:
|
||||
MUSICFETCH_PORT: "6769"
|
||||
volumes:
|
||||
- /media/music:/media/music
|
||||
networks:
|
||||
- lidarr_net
|
||||
|
||||
networks:
|
||||
lidarr_net:
|
||||
external: true
|
||||
# Set to the actual network name of your existing Lidarr stack, e.g.:
|
||||
# name: media_default
|
||||
|
||||
@@ -48,7 +48,8 @@ def get_job(job_id: str) -> Optional["Job"]:
|
||||
return JOBS.get(job_id)
|
||||
|
||||
|
||||
def run_job(job_id: str, fn: Callable[[], dict], done_message: str,
|
||||
def run_job(job_id: str, fn: Callable[[], dict],
|
||||
done_message: "str | Callable[[dict], str]",
|
||||
fail_message: str = "Something went wrong while fetching.") -> None:
|
||||
def _task():
|
||||
job = JOBS.get(job_id)
|
||||
@@ -57,7 +58,8 @@ def run_job(job_id: str, fn: Callable[[], dict], done_message: str,
|
||||
_touch(job, status="running")
|
||||
try:
|
||||
result = fn()
|
||||
_touch(job, status="done", result=result, message=done_message)
|
||||
msg = done_message(result) if callable(done_message) else done_message
|
||||
_touch(job, status="done", result=result, message=msg)
|
||||
except Exception as e: # noqa: BLE001 — record any failure on the job
|
||||
_touch(job, status="failed", error=f"{type(e).__name__}: {e}",
|
||||
message=fail_message)
|
||||
|
||||
@@ -24,6 +24,11 @@ act_youtube = _mod.act_youtube
|
||||
act_lidarr_album = _mod.act_lidarr_album
|
||||
act_lidarr_artist = _mod.act_lidarr_artist
|
||||
QUALITY_CHOICES = _mod.QUALITY_CHOICES
|
||||
is_url = _mod.is_url
|
||||
probe_url = _mod.probe_url
|
||||
download_hits = _mod.download_hits
|
||||
download_single = _mod.download_single
|
||||
|
||||
__all__ = ["Hit", "build_combined_hits", "pick", "act_youtube",
|
||||
"act_lidarr_album", "act_lidarr_artist", "QUALITY_CHOICES"]
|
||||
"act_lidarr_album", "act_lidarr_artist", "QUALITY_CHOICES",
|
||||
"is_url", "probe_url", "download_hits", "download_single"]
|
||||
|
||||
@@ -4,3 +4,4 @@ requests
|
||||
ytmusicapi
|
||||
rich
|
||||
yt-dlp
|
||||
mutagen
|
||||
|
||||
75
tests/test_api_url.py
Normal file
75
tests/test_api_url.py
Normal file
@@ -0,0 +1,75 @@
|
||||
import time
|
||||
import pytest
|
||||
from server import jobs as jobs_mod
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clear_jobs():
|
||||
jobs_mod.JOBS.clear()
|
||||
yield
|
||||
jobs_mod.JOBS.clear()
|
||||
|
||||
|
||||
def _wait_done(client, auth, job_id, timeout=2.0):
|
||||
end = time.time() + timeout
|
||||
while time.time() < end:
|
||||
b = client.get(f"/jobs/{job_id}", headers=auth).json()
|
||||
if b["status"] in ("done", "failed"):
|
||||
return b
|
||||
time.sleep(0.01)
|
||||
raise AssertionError("job never finished")
|
||||
|
||||
|
||||
def _mk_hit():
|
||||
from server import mf
|
||||
return mf.Hit(source="youtube", kind="track", title="t", artist="a", payload={"videoId": "1"})
|
||||
|
||||
|
||||
def test_playlist_url_batch_job(client, auth, monkeypatch):
|
||||
monkeypatch.setattr("server.app.mf.probe_url",
|
||||
lambda url: ("playlist", "My Mix", [_mk_hit(), _mk_hit(), _mk_hit()]))
|
||||
monkeypatch.setattr("server.app.mf.download_hits",
|
||||
lambda hits, root, quality, dry_run: (2, 3))
|
||||
r = client.post("/fetch", params={"q": "https://soundcloud.com/dj/sets/mix"}, headers=auth)
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert body["status"] == "queued"
|
||||
assert body["hit"]["kind"] == "playlist"
|
||||
done = _wait_done(client, auth, body["job_id"])
|
||||
assert done["status"] == "done"
|
||||
assert "2/3" in done["message"]
|
||||
assert done["result"]["ok"] == 2
|
||||
|
||||
|
||||
def test_playlist_zero_success_fails(client, auth, monkeypatch):
|
||||
monkeypatch.setattr("server.app.mf.probe_url",
|
||||
lambda url: ("playlist", "Dead Mix", [_mk_hit()]))
|
||||
monkeypatch.setattr("server.app.mf.download_hits",
|
||||
lambda hits, root, quality, dry_run: (0, 3))
|
||||
body = client.post("/fetch", params={"q": "https://www.youtube.com/playlist?list=PLy"}, headers=auth).json()
|
||||
done = _wait_done(client, auth, body["job_id"])
|
||||
assert done["status"] == "failed"
|
||||
|
||||
|
||||
def test_single_video_url_download(client, auth, monkeypatch):
|
||||
monkeypatch.setattr("server.app.mf.probe_url", lambda url: ("track", "Song", []))
|
||||
monkeypatch.setattr("server.app.mf.download_single",
|
||||
lambda url, root, quality, dry_run: {"title": "Song", "artist": "A", "ok": True})
|
||||
body = client.post("/fetch", params={"q": "https://soundcloud.com/a/song"}, headers=auth).json()
|
||||
assert body["hit"]["kind"] == "track"
|
||||
done = _wait_done(client, auth, body["job_id"])
|
||||
assert done["status"] == "done"
|
||||
assert "Song" in done["message"]
|
||||
|
||||
|
||||
def test_search_query_still_works(client, auth, monkeypatch):
|
||||
from server import mf
|
||||
hit = mf.Hit(source="youtube", kind="track", title="T", artist="A", payload={"videoId": "x"})
|
||||
monkeypatch.setattr("server.app.mf.build_combined_hits",
|
||||
lambda q, limit, yt_first, lidarr_only, yt_only: [hit])
|
||||
monkeypatch.setattr("server.app.mf.pick", lambda hits, q, ni, yf: hits[0])
|
||||
monkeypatch.setattr("server.app.actions.perform_fetch",
|
||||
lambda chosen, hits, quality, root: {"path": "/x", "lidarr_album_id": None})
|
||||
r = client.post("/fetch", params={"q": "Daft Punk - Discovery"}, headers=auth)
|
||||
assert r.status_code == 200
|
||||
assert r.json()["status"] == "queued"
|
||||
@@ -51,5 +51,13 @@ def test_eviction_keeps_within_cap():
|
||||
jobs.JOBS.clear()
|
||||
|
||||
|
||||
def test_run_job_callable_done_message():
|
||||
job = jobs.create_job(hit={}, message="m")
|
||||
jobs.run_job(job.id, lambda: {"ok": 2, "total": 3},
|
||||
done_message=lambda res: f"{res['ok']}/{res['total']} done")
|
||||
j = _wait(job.id, "done")
|
||||
assert j.message == "2/3 done"
|
||||
|
||||
|
||||
def teardown_module():
|
||||
jobs.JOBS.clear()
|
||||
|
||||
110
tests/test_lidarr_search.py
Normal file
110
tests/test_lidarr_search.py
Normal file
@@ -0,0 +1,110 @@
|
||||
import server.mf # noqa: F401
|
||||
import musicfetch_core as mf
|
||||
|
||||
DISCOVERY_MBID = "48117b90-a16e-34ca-a514-19c702df1158"
|
||||
|
||||
DISCOVERY_ALBUM = {"title": "Discovery", "artist": {"artistName": "Daft Punk"},
|
||||
"releaseDate": "2001-01-01", "foreignAlbumId": DISCOVERY_MBID}
|
||||
|
||||
|
||||
def test_artist_track_uses_mbid_exact_lookup(monkeypatch):
|
||||
monkeypatch.setattr(mf, "API_KEY", "testkey")
|
||||
monkeypatch.setattr(mf, "musicbrainz_best_album",
|
||||
lambda artist, track: {"album_title": "Discovery", "artist": "Daft Punk",
|
||||
"year": "2001", "rg_mbid": DISCOVERY_MBID})
|
||||
seen = {}
|
||||
|
||||
def fake_get(path, params=None, timeout=15):
|
||||
seen["term"] = (params or {}).get("term")
|
||||
if path == "/api/v1/album/lookup" and seen["term"] == f"mbid:{DISCOVERY_MBID}":
|
||||
return [DISCOVERY_ALBUM]
|
||||
return []
|
||||
monkeypatch.setattr(mf, "lidarr_get", fake_get)
|
||||
|
||||
hits = mf.lidarr_search("Daft Punk - Harder Better Faster Stronger", 10)
|
||||
assert seen["term"] == f"mbid:{DISCOVERY_MBID}"
|
||||
assert hits[0].album == "Discovery"
|
||||
assert hits[0].artist == "Daft Punk"
|
||||
assert hits[0].payload["album"]["foreignAlbumId"] == DISCOVERY_MBID
|
||||
|
||||
|
||||
def test_year_enriched_from_musicbrainz(monkeypatch):
|
||||
monkeypatch.setattr(mf, "API_KEY", "testkey")
|
||||
monkeypatch.setattr(mf, "musicbrainz_best_album",
|
||||
lambda artist, track: {"album_title": "Discovery", "artist": "Daft Punk",
|
||||
"year": "2001", "rg_mbid": DISCOVERY_MBID})
|
||||
no_year = [{"title": "Discovery", "artist": {"artistName": "Daft Punk"},
|
||||
"releaseDate": "", "foreignAlbumId": DISCOVERY_MBID}]
|
||||
monkeypatch.setattr(mf, "lidarr_get",
|
||||
lambda path, params=None, timeout=15: no_year if path == "/api/v1/album/lookup" else [])
|
||||
hits = mf.lidarr_search("Daft Punk - Discovery", 10)
|
||||
assert hits[0].year == "2001"
|
||||
|
||||
|
||||
def test_no_api_key_returns_empty(monkeypatch):
|
||||
monkeypatch.setattr(mf, "API_KEY", "")
|
||||
assert mf.lidarr_search("Daft Punk - Discovery", 10) == []
|
||||
|
||||
|
||||
def test_mb_miss_falls_back_to_lookup(monkeypatch):
|
||||
monkeypatch.setattr(mf, "API_KEY", "testkey")
|
||||
monkeypatch.setattr(mf, "musicbrainz_best_album", lambda artist, track: None)
|
||||
monkeypatch.setattr(mf, "lidarr_get",
|
||||
lambda path, params=None, timeout=15: [DISCOVERY_ALBUM] if path == "/api/v1/album/lookup" else [])
|
||||
hits = mf.lidarr_search("Daft Punk - Discovery", 10)
|
||||
assert hits[0].album == "Discovery"
|
||||
|
||||
|
||||
def test_single_term_is_artist_first(monkeypatch):
|
||||
monkeypatch.setattr(mf, "API_KEY", "testkey")
|
||||
|
||||
def fake_get(path, params=None, timeout=15):
|
||||
if path == "/api/v1/artist/lookup":
|
||||
return [{"artistName": "Daft Punk"}]
|
||||
if path == "/api/v1/album/lookup":
|
||||
return [DISCOVERY_ALBUM]
|
||||
return []
|
||||
monkeypatch.setattr(mf, "lidarr_get", fake_get)
|
||||
hits = mf.lidarr_search("Daft Punk", 10)
|
||||
assert hits[0].kind == "artist"
|
||||
assert hits[0].artist == "Daft Punk"
|
||||
|
||||
|
||||
def test_last_resort_universal_search(monkeypatch):
|
||||
monkeypatch.setattr(mf, "API_KEY", "testkey")
|
||||
monkeypatch.setattr(mf, "musicbrainz_best_album", lambda artist, track: None)
|
||||
|
||||
def fake_get(path, params=None, timeout=15):
|
||||
if path == "/api/v1/search":
|
||||
return [{"album": DISCOVERY_ALBUM}]
|
||||
return []
|
||||
monkeypatch.setattr(mf, "lidarr_get", fake_get)
|
||||
hits = mf.lidarr_search("Daft Punk - Discovery", 10)
|
||||
assert hits and hits[0].album == "Discovery"
|
||||
|
||||
|
||||
def test_unreachable_lidarr_warns_loudly(monkeypatch, capsys):
|
||||
# A connection error must surface on stderr (not silent dbg) so the
|
||||
# YouTube fallback isn't mistaken for "Lidarr had no match".
|
||||
monkeypatch.setattr(mf, "API_KEY", "testkey")
|
||||
monkeypatch.setattr(mf, "DEBUG", False)
|
||||
|
||||
def boom(path, params=None, timeout=15):
|
||||
raise mf.ReqConnectionError("Name or service not known")
|
||||
monkeypatch.setattr(mf, "lidarr_get", boom)
|
||||
|
||||
hits = mf._lidarr_album_candidates("anything")
|
||||
assert hits == []
|
||||
assert "Lidarr unreachable" in capsys.readouterr().err
|
||||
|
||||
|
||||
def test_http_error_stays_quiet(monkeypatch, capsys):
|
||||
monkeypatch.setattr(mf, "API_KEY", "testkey")
|
||||
monkeypatch.setattr(mf, "DEBUG", False)
|
||||
|
||||
def boom(path, params=None, timeout=15):
|
||||
raise mf.RequestException("500 Server Error")
|
||||
monkeypatch.setattr(mf, "lidarr_get", boom)
|
||||
|
||||
assert mf._lidarr_album_candidates("anything") == []
|
||||
assert "Lidarr unreachable" not in capsys.readouterr().err
|
||||
8
tests/test_mf_url_exports.py
Normal file
8
tests/test_mf_url_exports.py
Normal file
@@ -0,0 +1,8 @@
|
||||
import server.mf as smf
|
||||
|
||||
|
||||
def test_url_helpers_reexported():
|
||||
assert callable(smf.is_url)
|
||||
assert callable(smf.probe_url)
|
||||
assert callable(smf.download_hits)
|
||||
assert callable(smf.download_single)
|
||||
93
tests/test_multiplatform.py
Normal file
93
tests/test_multiplatform.py
Normal file
@@ -0,0 +1,93 @@
|
||||
import server.mf # noqa: F401 — loads musicfetch, registers musicfetch_core
|
||||
import musicfetch_core as mf
|
||||
|
||||
|
||||
# ---- _sanitize_source ----
|
||||
def test_sanitize_source():
|
||||
assert mf._sanitize_source("Youtube") == "youtube"
|
||||
assert mf._sanitize_source("Soundcloud") == "soundcloud"
|
||||
assert mf._sanitize_source("") == "downloads"
|
||||
|
||||
|
||||
# ---- _entry_to_hit ----
|
||||
def test_entry_to_hit_soundcloud_keeps_url_no_videoid():
|
||||
h = mf._entry_to_hit({"id": "t1", "title": "Track", "uploader": "DJ",
|
||||
"ie_key": "Soundcloud", "url": "https://soundcloud.com/dj/track"})
|
||||
assert h.payload["extractor"] == "soundcloud"
|
||||
assert h.payload["url"] == "https://soundcloud.com/dj/track"
|
||||
assert h.payload["videoId"] is None
|
||||
assert h.artist == "DJ"
|
||||
|
||||
|
||||
def test_entry_to_hit_youtube_keeps_videoid():
|
||||
h = mf._entry_to_hit({"id": "vid123", "title": "Song", "channel": "Chan",
|
||||
"ie_key": "Youtube", "url": "https://youtube.com/watch?v=vid123"})
|
||||
assert h.payload["extractor"] == "youtube"
|
||||
assert h.payload["videoId"] == "vid123"
|
||||
|
||||
|
||||
# ---- _track_url ----
|
||||
def test_track_url_youtube_prefers_music_youtube():
|
||||
h = mf.Hit(source="youtube", kind="track", title="T", artist="A",
|
||||
payload={"videoId": "vid", "extractor": "youtube", "url": "https://youtube.com/watch?v=vid"})
|
||||
assert mf._track_url(h) == "https://music.youtube.com/watch?v=vid"
|
||||
|
||||
|
||||
def test_track_url_soundcloud_uses_native_url():
|
||||
h = mf.Hit(source="youtube", kind="track", title="T", artist="A",
|
||||
payload={"videoId": None, "extractor": "soundcloud", "url": "https://soundcloud.com/a/t"})
|
||||
assert mf._track_url(h) == "https://soundcloud.com/a/t"
|
||||
|
||||
|
||||
def test_track_url_ytmusic_search_hit_default_youtube():
|
||||
# ytmusicapi search hits carry only videoId (no extractor) -> music.youtube.
|
||||
h = mf.Hit(source="youtube", kind="track", title="T", artist="A", payload={"videoId": "vid"})
|
||||
assert mf._track_url(h) == "https://music.youtube.com/watch?v=vid"
|
||||
|
||||
|
||||
# ---- act_youtube routes to per-source folder ----
|
||||
def test_act_youtube_soundcloud_folder(monkeypatch):
|
||||
captured = {}
|
||||
monkeypatch.setattr(mf, "yt_download",
|
||||
lambda url, target, quality, dry_run, hit=None: captured.update(url=url, target=target) or True)
|
||||
h = mf.Hit(source="youtube", kind="track", title="T", artist="DJ, Other",
|
||||
payload={"videoId": None, "extractor": "soundcloud", "url": "https://soundcloud.com/dj/t"})
|
||||
mf.act_youtube(h, "/media/music", "best", False)
|
||||
assert captured["target"] == "/media/music/DJ/soundcloud" # first artist only
|
||||
assert captured["url"] == "https://soundcloud.com/dj/t"
|
||||
|
||||
|
||||
def test_act_youtube_youtube_folder(monkeypatch):
|
||||
captured = {}
|
||||
monkeypatch.setattr(mf, "yt_download",
|
||||
lambda url, target, quality, dry_run, hit=None, outtmpl=None:
|
||||
captured.update(target=target) or True)
|
||||
h = mf.Hit(source="youtube", kind="track", title="T", artist="A",
|
||||
payload={"videoId": "vid", "extractor": "youtube"})
|
||||
mf.act_youtube(h, "/media/music", "best", False)
|
||||
assert captured["target"] == "/media/music/A/youtube"
|
||||
|
||||
|
||||
def test_act_youtube_unknown_artist_uses_metadata_template(monkeypatch):
|
||||
captured = {}
|
||||
monkeypatch.setattr(mf, "yt_download",
|
||||
lambda url, target, quality, dry_run, hit=None, outtmpl=None:
|
||||
captured.update(target=target, outtmpl=outtmpl) or True)
|
||||
h = mf.Hit(source="youtube", kind="track", title="", artist="",
|
||||
payload={"videoId": None, "extractor": "soundcloud", "url": "https://soundcloud.com/a/t"})
|
||||
mf.act_youtube(h, "/media/music", "best", False)
|
||||
assert captured["target"] is None
|
||||
assert "%(artist,uploader,channel)s" in captured["outtmpl"]
|
||||
assert captured["outtmpl"].endswith("/soundcloud/%(title)s [%(id)s].%(ext)s")
|
||||
|
||||
|
||||
# ---- download_single per-source folder ----
|
||||
def test_download_single_bandcamp_folder(monkeypatch):
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url: {"title": "Song", "artist": "Band", "extractor": "Bandcamp"})
|
||||
captured = {}
|
||||
monkeypatch.setattr(mf, "yt_download",
|
||||
lambda url, target, quality, dry_run, hit=None: captured.update(target=target) or True)
|
||||
info = mf.download_single("https://band.bandcamp.com/track/song", "/media/music", "best", False)
|
||||
assert captured["target"] == "/media/music/Band/bandcamp"
|
||||
assert info == {"title": "Song", "artist": "Band", "ok": True}
|
||||
96
tests/test_musicbrainz.py
Normal file
96
tests/test_musicbrainz.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import server.mf # noqa: F401
|
||||
import musicfetch_core as mf
|
||||
|
||||
|
||||
class _FakeResp:
|
||||
def __init__(self, payload):
|
||||
self._payload = payload
|
||||
def raise_for_status(self):
|
||||
pass
|
||||
def json(self):
|
||||
return self._payload
|
||||
|
||||
|
||||
MB_PAYLOAD = {
|
||||
"recordings": [
|
||||
{
|
||||
"artist-credit": [{"name": "Daft Punk"}],
|
||||
"releases": [
|
||||
{"date": "2001",
|
||||
"release-group": {"id": "single-mbid", "title": "Harder, Better, Faster, Stronger",
|
||||
"primary-type": "Single", "secondary-types": []}},
|
||||
{"date": "2002",
|
||||
"release-group": {"id": "comp-mbid", "title": "Musique, Vol. 1",
|
||||
"primary-type": "Album", "secondary-types": ["Compilation"]}},
|
||||
{"date": "2001",
|
||||
"release-group": {"id": "48117b90-a16e-34ca-a514-19c702df1158",
|
||||
"title": "Discovery", "primary-type": "Album",
|
||||
"secondary-types": []}},
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def test_picks_studio_album_over_single_and_comp(monkeypatch):
|
||||
monkeypatch.setattr(mf.requests, "get", lambda *a, **k: _FakeResp(MB_PAYLOAD))
|
||||
monkeypatch.setattr(mf.time, "sleep", lambda *_: None)
|
||||
out = mf.musicbrainz_best_album("Daft Punk", "Harder Better Faster Stronger")
|
||||
assert out["album_title"] == "Discovery"
|
||||
assert out["artist"] == "Daft Punk"
|
||||
assert out["year"] == "2001"
|
||||
assert out["rg_mbid"] == "48117b90-a16e-34ca-a514-19c702df1158"
|
||||
|
||||
|
||||
def test_returns_none_on_empty(monkeypatch):
|
||||
monkeypatch.setattr(mf.requests, "get", lambda *a, **k: _FakeResp({"recordings": []}))
|
||||
monkeypatch.setattr(mf.time, "sleep", lambda *_: None)
|
||||
assert mf.musicbrainz_best_album("Nobody", "Nothing") is None
|
||||
|
||||
|
||||
def test_returns_none_on_exception(monkeypatch):
|
||||
def boom(*a, **k):
|
||||
raise mf.requests.exceptions.RequestException("network down")
|
||||
monkeypatch.setattr(mf.requests, "get", boom)
|
||||
monkeypatch.setattr(mf.time, "sleep", lambda *_: None)
|
||||
assert mf.musicbrainz_best_album("Daft Punk", "Discovery") is None
|
||||
|
||||
|
||||
def test_falls_back_to_any_releasegroup_when_no_studio(monkeypatch):
|
||||
payload = {"recordings": [{"artist-credit": [{"name": "X"}], "releases": [
|
||||
{"date": "2010", "release-group": {"id": "live1", "title": "Live Thing",
|
||||
"primary-type": "Album", "secondary-types": ["Live"]}},
|
||||
]}]}
|
||||
monkeypatch.setattr(mf.requests, "get", lambda *a, **k: _FakeResp(payload))
|
||||
monkeypatch.setattr(mf.time, "sleep", lambda *_: None)
|
||||
out = mf.musicbrainz_best_album("X", "Y")
|
||||
assert out["album_title"] == "Live Thing"
|
||||
|
||||
|
||||
def test_first_artist_credit_only(monkeypatch):
|
||||
payload = {"recordings": [{"artist-credit": [{"name": "SLVMLORD"}, {"name": "Travis Bradley"}],
|
||||
"releases": [{"date": "2025",
|
||||
"release-group": {"id": "x", "title": "Album X",
|
||||
"primary-type": "Album",
|
||||
"secondary-types": []}}]}]}
|
||||
monkeypatch.setattr(mf.requests, "get", lambda *a, **k: _FakeResp(payload))
|
||||
monkeypatch.setattr(mf.time, "sleep", lambda *_: None)
|
||||
out = mf.musicbrainz_best_album("SLVMLORD", "Under My Skin")
|
||||
assert out["artist"] == "SLVMLORD"
|
||||
|
||||
|
||||
def test_prefers_own_artist_studio_over_various_artists(monkeypatch):
|
||||
# A studio-looking VA compilation dated earlier must NOT beat the artist's own album.
|
||||
payload = {"recordings": [{"artist-credit": [{"name": "Daft Punk"}], "releases": [
|
||||
{"date": "2001-10-26", "artist-credit": [{"name": "Various Artists"}],
|
||||
"release-group": {"id": "va-mbid", "title": "All The Hits Now",
|
||||
"primary-type": "Album", "secondary-types": []}},
|
||||
{"date": "2002", "artist-credit": [{"name": "Daft Punk"}],
|
||||
"release-group": {"id": "48117b90-a16e-34ca-a514-19c702df1158", "title": "Discovery",
|
||||
"primary-type": "Album", "secondary-types": []}},
|
||||
]}]}
|
||||
monkeypatch.setattr(mf.requests, "get", lambda *a, **k: _FakeResp(payload))
|
||||
monkeypatch.setattr(mf.time, "sleep", lambda *_: None)
|
||||
out = mf.musicbrainz_best_album("Daft Punk", "Harder Better Faster Stronger")
|
||||
assert out["album_title"] == "Discovery"
|
||||
assert out["rg_mbid"] == "48117b90-a16e-34ca-a514-19c702df1158"
|
||||
134
tests/test_playlist.py
Normal file
134
tests/test_playlist.py
Normal file
@@ -0,0 +1,134 @@
|
||||
import json as _json
|
||||
|
||||
import server.mf # noqa: F401 — loads musicfetch, registers musicfetch_core
|
||||
import musicfetch_core as mf
|
||||
|
||||
|
||||
class _CP:
|
||||
def __init__(self, stdout):
|
||||
self.stdout = stdout
|
||||
self.returncode = 0
|
||||
|
||||
|
||||
# ---- _is_youtube_playlist_url ----
|
||||
def test_youtube_playlist_url_true():
|
||||
assert mf._is_youtube_playlist_url("https://music.youtube.com/playlist?list=PLabc") is True
|
||||
assert mf._is_youtube_playlist_url("https://www.youtube.com/playlist?list=PLabc") is True
|
||||
|
||||
|
||||
def test_youtube_watch_with_list_is_not_playlist():
|
||||
assert mf._is_youtube_playlist_url("https://www.youtube.com/watch?v=abc&list=PLx") is False
|
||||
|
||||
|
||||
def test_non_youtube_url_not_youtube_playlist():
|
||||
# SoundCloud sets are not matched here — probe_url handles them via yt-dlp.
|
||||
assert mf._is_youtube_playlist_url("https://soundcloud.com/user/sets/mix") is False
|
||||
|
||||
|
||||
# ---- probe_url ----
|
||||
def test_probe_url_youtube_playlist_uses_ytmusic(monkeypatch):
|
||||
h = mf.Hit(source="youtube", kind="track", title="A", artist="X",
|
||||
payload={"videoId": "1", "extractor": "youtube"})
|
||||
monkeypatch.setattr(mf, "_ytmusic_playlist", lambda pid: ("My YT Mix", [h]))
|
||||
monkeypatch.setattr(mf, "YTMusic", object()) # non-None to enter ytmusic branch
|
||||
kind, title, hits = mf.probe_url("https://music.youtube.com/playlist?list=PLx")
|
||||
assert kind == "playlist"
|
||||
assert title == "My YT Mix"
|
||||
assert hits == [h]
|
||||
|
||||
|
||||
def test_probe_url_generic_playlist_via_ytdlp(monkeypatch):
|
||||
monkeypatch.setattr(mf, "YTMusic", None)
|
||||
payload = {"title": "SC Set", "_type": "playlist", "entries": [
|
||||
{"id": "t1", "title": "One", "uploader": "DJ", "ie_key": "Soundcloud",
|
||||
"url": "https://soundcloud.com/dj/one"},
|
||||
{"id": None, "url": None, "title": "skip"},
|
||||
]}
|
||||
monkeypatch.setattr(mf.subprocess, "run", lambda *a, **k: _CP(_json.dumps(payload)))
|
||||
kind, title, hits = mf.probe_url("https://soundcloud.com/dj/sets/sc-set")
|
||||
assert kind == "playlist"
|
||||
assert title == "SC Set"
|
||||
assert len(hits) == 1
|
||||
assert hits[0].payload["extractor"] == "soundcloud"
|
||||
assert hits[0].payload["url"] == "https://soundcloud.com/dj/one"
|
||||
|
||||
|
||||
def test_probe_url_single_track(monkeypatch):
|
||||
monkeypatch.setattr(mf, "YTMusic", None)
|
||||
payload = {"title": "A Song", "extractor": "soundcloud"} # no entries -> single
|
||||
monkeypatch.setattr(mf.subprocess, "run", lambda *a, **k: _CP(_json.dumps(payload)))
|
||||
kind, title, hits = mf.probe_url("https://soundcloud.com/dj/one")
|
||||
assert kind == "track"
|
||||
assert title == "A Song"
|
||||
assert hits == []
|
||||
|
||||
|
||||
def test_probe_url_failure_returns_track(monkeypatch):
|
||||
monkeypatch.setattr(mf, "YTMusic", None)
|
||||
|
||||
def boom(*a, **k):
|
||||
raise mf.subprocess.CalledProcessError(1, "yt-dlp")
|
||||
monkeypatch.setattr(mf.subprocess, "run", boom)
|
||||
assert mf.probe_url("https://example.com/x") == ("track", "", [])
|
||||
|
||||
|
||||
# ---- download_hits ----
|
||||
def test_download_hits_counts(monkeypatch):
|
||||
h1 = mf.Hit(source="youtube", kind="track", title="A", artist="X", payload={"videoId": "1"})
|
||||
h2 = mf.Hit(source="youtube", kind="track", title="B", artist="Y", payload={"videoId": "2"})
|
||||
h3 = mf.Hit(source="youtube", kind="track", title="C", artist="Z", payload={"videoId": "3"})
|
||||
monkeypatch.setattr(mf, "act_youtube", lambda hit, root, quality, dry_run: hit.title != "B")
|
||||
assert mf.download_hits([h1, h2, h3], "/tmp", "best", False) == (2, 3)
|
||||
|
||||
|
||||
def test_download_hits_track_exception_is_failure(monkeypatch):
|
||||
h1 = mf.Hit(source="youtube", kind="track", title="A", artist="X", payload={"videoId": "1"})
|
||||
h2 = mf.Hit(source="youtube", kind="track", title="B", artist="Y", payload={"videoId": "2"})
|
||||
|
||||
def fake_act(hit, root, quality, dry_run):
|
||||
if hit.title == "B":
|
||||
raise RuntimeError("boom")
|
||||
return True
|
||||
monkeypatch.setattr(mf, "act_youtube", fake_act)
|
||||
assert mf.download_hits([h1, h2], "/tmp", "best", False) == (1, 2)
|
||||
|
||||
|
||||
# ---- yt_download bool ----
|
||||
def test_yt_download_returns_true_on_zero_exit(monkeypatch):
|
||||
monkeypatch.setattr(mf.os, "makedirs", lambda *a, **k: None)
|
||||
monkeypatch.setattr(mf.subprocess, "run", lambda *a, **k: _CP(""))
|
||||
assert mf.yt_download("u", "/tmp/x", "best", False) is True
|
||||
|
||||
|
||||
def test_yt_download_dry_run_returns_true():
|
||||
assert mf.yt_download("u", "/tmp/x", "best", True) is True
|
||||
|
||||
|
||||
def test_yt_download_always_sets_album_default(monkeypatch):
|
||||
captured = {}
|
||||
monkeypatch.setattr(mf.os, "makedirs", lambda *a, **k: None)
|
||||
monkeypatch.setattr(mf.subprocess, "run", lambda cmd, **k: captured.update(cmd=cmd) or _CP(""))
|
||||
mf.yt_download("u", "/tmp/x", "best", False)
|
||||
assert "%(album|Unknown Album)s:%(meta_album)s" in captured["cmd"]
|
||||
|
||||
|
||||
def test_yt_download_single_word_tags_injected_literally(monkeypatch):
|
||||
# Regression: `--parse-metadata "Cochise:%(title)s"` makes yt-dlp treat the
|
||||
# bare word 'Cochise' as a FIELD name (field_to_template's r'[a-zA-Z_]+$'),
|
||||
# producing 'NA'. Single-word album/title must reach yt-dlp as literals.
|
||||
captured = {}
|
||||
monkeypatch.setattr(mf.os, "makedirs", lambda *a, **k: None)
|
||||
monkeypatch.setattr(mf.subprocess, "run", lambda cmd, **k: captured.update(cmd=cmd) or _CP(""))
|
||||
hit = mf.Hit(source="youtube", kind="track", title="Cochise",
|
||||
artist="Audioslave", album="Solid", payload={"videoId": "x"})
|
||||
mf.yt_download("u", "/tmp/x", "best", False, hit=hit)
|
||||
cmd = captured["cmd"]
|
||||
joined = " ".join(cmd)
|
||||
# The buggy bare-word parse-metadata FROM must be gone.
|
||||
assert "Solid:%(album)s" not in joined
|
||||
assert "Cochise:%(title)s" not in joined
|
||||
# Literal values must be passed as literal args (immune to template parsing).
|
||||
assert "Solid" in cmd
|
||||
assert "Cochise" in cmd
|
||||
# A hit album must not be clobbered by the Unknown-Album default.
|
||||
assert "%(album|Unknown Album)s:%(meta_album)s" not in cmd
|
||||
43
tests/test_profiles.py
Normal file
43
tests/test_profiles.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import server.mf # noqa: F401
|
||||
import musicfetch_core as mf
|
||||
|
||||
META = [{"id": 1, "name": "Standard"}, {"id": 2, "name": "None"}, {"id": 3, "name": "OST"}]
|
||||
QUAL = [{"id": 1, "name": "Any"}, {"id": 2, "name": "Lossless"}]
|
||||
|
||||
|
||||
def test_metadata_profile_default_standard_by_name(monkeypatch):
|
||||
monkeypatch.delenv("LIDARR_METADATA_PROFILE", raising=False)
|
||||
monkeypatch.setattr(mf, "lidarr_get", lambda path, timeout=10: META)
|
||||
assert mf.get_default_metadata_profile_id() == 1
|
||||
|
||||
|
||||
def test_metadata_profile_env_override(monkeypatch):
|
||||
monkeypatch.setenv("LIDARR_METADATA_PROFILE", "OST")
|
||||
monkeypatch.setattr(mf, "lidarr_get", lambda path, timeout=10: META)
|
||||
assert mf.get_default_metadata_profile_id() == 3
|
||||
|
||||
|
||||
def test_metadata_profile_unknown_name_falls_back_to_first(monkeypatch):
|
||||
monkeypatch.setenv("LIDARR_METADATA_PROFILE", "Nonexistent")
|
||||
monkeypatch.setattr(mf, "lidarr_get", lambda path, timeout=10: META)
|
||||
assert mf.get_default_metadata_profile_id() == 1
|
||||
|
||||
|
||||
def test_quality_profile_default_any_by_name(monkeypatch):
|
||||
monkeypatch.delenv("LIDARR_QUALITY_PROFILE", raising=False)
|
||||
monkeypatch.setattr(mf, "lidarr_get", lambda path, timeout=10: QUAL)
|
||||
assert mf.get_quality_profile_id() == 1
|
||||
|
||||
|
||||
def test_quality_profile_env_override(monkeypatch):
|
||||
monkeypatch.setenv("LIDARR_QUALITY_PROFILE", "Lossless")
|
||||
monkeypatch.setattr(mf, "lidarr_get", lambda path, timeout=10: QUAL)
|
||||
assert mf.get_quality_profile_id() == 2
|
||||
|
||||
|
||||
def test_profile_fetch_error_returns_one(monkeypatch):
|
||||
def boom(path, timeout=10):
|
||||
raise mf.RequestException("down")
|
||||
monkeypatch.setattr(mf, "lidarr_get", boom)
|
||||
assert mf.get_default_metadata_profile_id() == 1
|
||||
assert mf.get_quality_profile_id() == 1
|
||||
338
tests/test_repair.py
Normal file
338
tests/test_repair.py
Normal file
@@ -0,0 +1,338 @@
|
||||
import server.mf # noqa: F401 — loads musicfetch, registers musicfetch_core
|
||||
import musicfetch_core as mf
|
||||
|
||||
YT_ID = "dQw4w9WgXcQ" # valid 11-char YouTube id
|
||||
|
||||
|
||||
# ---- _is_source_dir ----
|
||||
def test_is_source_dir():
|
||||
assert mf._is_source_dir("youtube") is True
|
||||
assert mf._is_source_dir("soundcloud") is True
|
||||
assert mf._is_source_dir("downloads") is True
|
||||
assert mf._is_source_dir("Discovery") is False # Lidarr album folder
|
||||
assert mf._is_source_dir("Random Access Memories") is False
|
||||
assert mf._is_source_dir("") is False
|
||||
|
||||
|
||||
# ---- _parse_track_file ----
|
||||
def test_parse_track_file():
|
||||
assert mf._parse_track_file("Under My Skin [nGSNF2l44Zc].opus") == ("Under My Skin", "nGSNF2l44Zc")
|
||||
assert mf._parse_track_file("Ignomon [2202690443].m4a") == ("Ignomon", "2202690443")
|
||||
# greedy title: real id is the LAST bracket
|
||||
assert mf._parse_track_file("WHO GON' SLIDE [Official Music Video] [AxjP9s6J3uY].opus") \
|
||||
== ("WHO GON' SLIDE [Official Music Video]", "AxjP9s6J3uY")
|
||||
assert mf._parse_track_file("no-id-here.opus") is None
|
||||
assert mf._parse_track_file("cover.jpg") is None
|
||||
|
||||
|
||||
# ---- _repair_id_ok ----
|
||||
def test_repair_id_ok():
|
||||
assert mf._repair_id_ok("youtube", YT_ID) is True
|
||||
assert mf._repair_id_ok("youtube", "Official Video") is False # space, wrong length
|
||||
assert mf._repair_id_ok("youtube", "Cover") is False
|
||||
assert mf._repair_id_ok("soundcloud", "2202690443") is True
|
||||
assert mf._repair_id_ok("soundcloud", "abc") is False
|
||||
assert mf._repair_id_ok("bandcamp", "x") is False
|
||||
|
||||
|
||||
# ---- _valid_year ----
|
||||
def test_valid_year():
|
||||
assert mf._valid_year({"release_year": 2001}) == "2001"
|
||||
assert mf._valid_year({"release_date": "1976-09-10"}) == "1976"
|
||||
assert mf._valid_year({"upload_date": "20110101"}) == "" # upload date ignored
|
||||
assert mf._valid_year({"release_year": 6577}) == "" # out of range
|
||||
assert mf._valid_year({}) == ""
|
||||
|
||||
|
||||
# ---- _repair_probe_url ----
|
||||
def test_repair_probe_url():
|
||||
assert mf._repair_probe_url("youtube", YT_ID) == f"https://music.youtube.com/watch?v={YT_ID}"
|
||||
assert mf._repair_probe_url("soundcloud", "123") == "https://api.soundcloud.com/tracks/123"
|
||||
assert mf._repair_probe_url("bandcamp", "x") is None
|
||||
|
||||
|
||||
# ---- repair_file (fake audio + mocked metadata) ----
|
||||
class _FakeAudio(dict):
|
||||
def __init__(self, initial):
|
||||
super().__init__(initial)
|
||||
self.saved = False
|
||||
|
||||
def save(self):
|
||||
self.saved = True
|
||||
|
||||
|
||||
def test_repair_file_fixes_album_and_year(monkeypatch):
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: {"artist": "Daft Punk", "title": "Aerodynamic",
|
||||
"album": "Discovery", "release_year": 2001})
|
||||
audio = _FakeAudio({"artist": ["Daft Punk"], "title": ["Aerodynamic"]}) # album/date missing
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.repair_file(f"X/youtube/Aerodynamic [{YT_ID}].opus", "youtube", dry_run=False)
|
||||
assert set(changed) == {"album=Discovery", "date=2001"}
|
||||
assert audio["album"] == ["Discovery"]
|
||||
assert audio["date"] == ["2001"]
|
||||
assert audio.saved is True
|
||||
|
||||
|
||||
def test_repair_file_dry_run_writes_nothing(monkeypatch):
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: {"artist": "A", "title": "T", "album": "Alb", "release_year": 2020})
|
||||
audio = _FakeAudio({})
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.repair_file(f"X/youtube/T [{YT_ID}].opus", "youtube", dry_run=True)
|
||||
assert changed
|
||||
assert audio == {}
|
||||
assert audio.saved is False
|
||||
|
||||
|
||||
def test_repair_file_skips_music_video(monkeypatch):
|
||||
# No album AND no valid release year -> treat as a video, leave tags alone.
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: {"title": "Artist - Song (Official Music Video)",
|
||||
"uploader": "SomeVEVO", "upload_date": "20110101"})
|
||||
audio = _FakeAudio({"artist": ["Real Artist"], "title": ["Song"]})
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.repair_file(f"X/youtube/Song [{YT_ID}].opus", "youtube", dry_run=False)
|
||||
assert changed == []
|
||||
assert audio == {"artist": ["Real Artist"], "title": ["Song"]} # untouched
|
||||
|
||||
|
||||
def test_repair_file_fills_missing_but_never_clobbers(monkeypatch):
|
||||
# Source artist is a channel name; existing artist must be kept.
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: {"artist": "SomeChannelVEVO", "title": "Channel Decorated Title",
|
||||
"album": "Real Album", "release_year": 2019})
|
||||
audio = _FakeAudio({"artist": ["Correct Artist"], "title": ["Clean Title"]})
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.repair_file(f"X/youtube/x [{YT_ID}].opus", "youtube", dry_run=False)
|
||||
assert set(changed) == {"album=Real Album", "date=2019"}
|
||||
assert audio["artist"] == ["Correct Artist"] # NOT overwritten with channel
|
||||
assert audio["title"] == ["Clean Title"] # NOT overwritten with decorated title
|
||||
|
||||
|
||||
def test_repair_file_fills_missing_artist_when_absent(monkeypatch):
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: {"artist": "Real Artist", "title": "T",
|
||||
"album": "Alb", "release_year": 2020})
|
||||
audio = _FakeAudio({}) # nothing present -> fill artist + title too
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.repair_file(f"X/youtube/T [{YT_ID}].opus", "youtube", dry_run=False)
|
||||
assert set(changed) == {"album=Alb", "date=2020", "artist=Real Artist", "title=T"}
|
||||
|
||||
|
||||
def test_repair_file_skips_bad_id(monkeypatch):
|
||||
called = {"meta": False}
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: called.update(meta=True) or {})
|
||||
# last bracket is a descriptor, not a real id
|
||||
assert mf.repair_file("X/youtube/Song [Official Video].opus", "youtube", dry_run=False) == []
|
||||
assert called["meta"] is False # never hit the network
|
||||
|
||||
|
||||
def test_repair_file_skips_unparseable(monkeypatch):
|
||||
called = {"meta": False}
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: called.update(meta=True) or {})
|
||||
assert mf.repair_file("X/youtube/no-id.opus", "youtube", dry_run=False) == []
|
||||
assert called["meta"] is False
|
||||
|
||||
|
||||
def test_run_yt_dlp_get_metadata_passes_extra_args(monkeypatch):
|
||||
captured = {}
|
||||
|
||||
class _R:
|
||||
stdout = '{"title": "x"}'
|
||||
monkeypatch.setattr(mf.subprocess, "run", lambda cmd, **k: captured.update(cmd=cmd) or _R())
|
||||
mf.run_yt_dlp_get_metadata("http://u", ["--extractor-args", "youtube:player_skip=js"])
|
||||
assert "youtube:player_skip=js" in captured["cmd"]
|
||||
|
||||
|
||||
def test_repair_uses_player_skip_fast_args(monkeypatch):
|
||||
captured = {}
|
||||
|
||||
def fake_meta(url, extra_args=None):
|
||||
captured["extra"] = extra_args
|
||||
return {"album": "A", "release_year": 2020, "artist": "X", "title": "T"}
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata", fake_meta)
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda p: (_FakeAudio({}), None))
|
||||
mf.repair_file(f"X/youtube/T [{YT_ID}].opus", "youtube", dry_run=True)
|
||||
assert captured["extra"] == mf._REPAIR_META_ARGS
|
||||
|
||||
|
||||
# ---- repair_library (real temp tree, repair_file mocked) ----
|
||||
def test_repair_library_scans_only_source_dirs(tmp_path, monkeypatch):
|
||||
root = tmp_path
|
||||
(root / "Daft Punk" / "youtube").mkdir(parents=True)
|
||||
(root / "Daft Punk" / "youtube" / f"Aerodynamic [{YT_ID}].opus").write_text("x")
|
||||
(root / "Daft Punk" / "Discovery").mkdir(parents=True) # Lidarr album -> skip
|
||||
(root / "Daft Punk" / "Discovery" / "Aerodynamic.flac").write_text("x")
|
||||
(root / "Ephixa" / "soundcloud").mkdir(parents=True)
|
||||
(root / "Ephixa" / "soundcloud" / "Ignomon [123].m4a").write_text("x")
|
||||
|
||||
visited = []
|
||||
monkeypatch.setattr(mf, "repair_file",
|
||||
lambda path, source, dry_run: visited.append((source, path)) or ["album=X"])
|
||||
scanned, changed = mf.repair_library(str(root), dry_run=False)
|
||||
assert scanned == 2 and changed == 2
|
||||
assert sorted(s for s, _ in visited) == ["soundcloud", "youtube"] # album folder skipped
|
||||
|
||||
|
||||
def test_repair_library_missing_root():
|
||||
assert mf.repair_library("/no/such/dir", dry_run=False) == (0, 0)
|
||||
|
||||
|
||||
def test_repair_library_exclude_skips_folders(tmp_path, monkeypatch):
|
||||
root = tmp_path
|
||||
(root / "Daft Punk" / "youtube").mkdir(parents=True)
|
||||
(root / "Daft Punk" / "youtube" / f"A [{YT_ID}].opus").write_text("x")
|
||||
(root / "Unsorted" / "youtube").mkdir(parents=True) # excluded artist folder
|
||||
(root / "Unsorted" / "youtube" / f"B [{YT_ID}].opus").write_text("x")
|
||||
(root / "Ephixa" / "playlists").mkdir(parents=True) # excluded source folder
|
||||
(root / "Ephixa" / "playlists" / f"C [{YT_ID}].opus").write_text("x")
|
||||
|
||||
visited = []
|
||||
monkeypatch.setattr(mf, "repair_file",
|
||||
lambda path, source, dry_run: visited.append(path) or ["x"])
|
||||
scanned, _ = mf.repair_library(str(root), dry_run=False, exclude=["unsorted", "playlists"])
|
||||
assert scanned == 1
|
||||
assert visited and "Daft Punk" in visited[0]
|
||||
|
||||
|
||||
# ---- offline retag-from-path ----
|
||||
def test_title_from_filename():
|
||||
assert mf._title_from_filename(f"Song [{YT_ID}].opus") == "Song"
|
||||
assert mf._title_from_filename("STARDUST (Official Music Video) [3nsYNXtALhA].opus") \
|
||||
== "STARDUST (Official Music Video)"
|
||||
assert mf._title_from_filename("no brackets.mp3") == "no brackets"
|
||||
|
||||
|
||||
def test_strip_decorations():
|
||||
assert mf._strip_decorations("STARDUST (Official Music Video)") == "STARDUST"
|
||||
assert mf._strip_decorations("Away From You (Lyrics)") == "Away From You"
|
||||
assert mf._strip_decorations("More Than a Feeling (Official HD Video)") == "More Than a Feeling"
|
||||
# real info like a feature credit is kept
|
||||
assert mf._strip_decorations("WHO GON' SLIDE (Feat. Shakewell) [Official Music Video]") \
|
||||
== "WHO GON' SLIDE (Feat. Shakewell)"
|
||||
|
||||
|
||||
def test_derive_from_filename():
|
||||
# plain title -> folder is the artist
|
||||
assert mf._derive_from_filename(f"Aerodynamic [{YT_ID}].opus", "Daft Punk") == ("Daft Punk", "Aerodynamic")
|
||||
# decorated music video filed under the artist
|
||||
assert mf._derive_from_filename("STARDUST (Official Music Video) [3nsYNXtALhA].opus", "1nonly") \
|
||||
== ("1nonly", "STARDUST")
|
||||
# 'Artist - Title' name wins over a channel folder
|
||||
assert mf._derive_from_filename("BLCKLGHT - Away From You (Lyrics) [QapF4b1jYw8].opus", "7clouds Techno") \
|
||||
== ("BLCKLGHT", "Away From You")
|
||||
|
||||
|
||||
def test_retag_file_from_path_fixes_clobbered_tags(monkeypatch):
|
||||
audio = _FakeAudio({"artist": ["7clouds Techno"], "title": ["BLCKLGHT - Away From You (Lyrics)"]})
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.retag_file_from_path(
|
||||
"X/7clouds Techno/youtube/BLCKLGHT - Away From You (Lyrics) [QapF4b1jYw8].opus",
|
||||
"7clouds Techno", dry_run=False)
|
||||
assert set(changed) == {"artist=BLCKLGHT", "title=Away From You"}
|
||||
assert audio["artist"] == ["BLCKLGHT"]
|
||||
assert audio["title"] == ["Away From You"]
|
||||
assert audio.saved is True
|
||||
|
||||
|
||||
def test_retag_file_from_path_dry_run(monkeypatch):
|
||||
audio = _FakeAudio({"artist": ["wrong"], "title": ["wrong"]})
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.retag_file_from_path(f"X/Daft Punk/youtube/Aerodynamic [{YT_ID}].opus",
|
||||
"Daft Punk", dry_run=True)
|
||||
assert changed
|
||||
assert audio == {"artist": ["wrong"], "title": ["wrong"]}
|
||||
assert audio.saved is False
|
||||
|
||||
|
||||
def test_retag_library_walks_source_files(tmp_path, monkeypatch):
|
||||
root = tmp_path
|
||||
(root / "Daft Punk" / "youtube").mkdir(parents=True)
|
||||
(root / "Daft Punk" / "youtube" / f"Aerodynamic [{YT_ID}].opus").write_text("x")
|
||||
(root / "Daft Punk" / "Discovery").mkdir(parents=True) # album folder -> skip
|
||||
(root / "Daft Punk" / "Discovery" / "x.flac").write_text("x")
|
||||
visited = []
|
||||
monkeypatch.setattr(mf, "retag_file_from_path",
|
||||
lambda path, artist, dry_run: visited.append(artist) or ["artist=x"])
|
||||
scanned, changed = mf.retag_library_from_path(str(root), dry_run=False)
|
||||
assert (scanned, changed) == (1, 1)
|
||||
assert visited == ["Daft Punk"]
|
||||
|
||||
|
||||
# ---- bogus-tag recovery (old-code NA / Unknown breakage) ----
|
||||
def test_is_bogus():
|
||||
for v in ("", "NA", "na", "N/A", "Unknown", "Unknown Album", "unknown artist", " NA "):
|
||||
assert mf._is_bogus(v) is True, v
|
||||
for v in ("Cochise", "Solid", "Brother Stoon", "Discovery"):
|
||||
assert mf._is_bogus(v) is False, v
|
||||
|
||||
|
||||
def test_repair_file_overwrites_bogus_title(monkeypatch):
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: {"artist": "Audioslave", "title": "Cochise",
|
||||
"album": "Audioslave", "release_year": 2002})
|
||||
audio = _FakeAudio({"artist": ["Audioslave"], "title": ["NA"]}) # bogus title
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.repair_file(f"X/youtube/Brother Stoon [{YT_ID}].opus", "youtube", dry_run=False)
|
||||
assert "title=Cochise" in changed
|
||||
assert audio["title"] == ["Cochise"]
|
||||
|
||||
|
||||
def test_repair_file_overwrites_bogus_artist(monkeypatch):
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: {"artist": "Real Artist", "title": "Real Title",
|
||||
"album": "Alb", "release_year": 2020})
|
||||
audio = _FakeAudio({"artist": ["NA"], "title": ["Good Title"]}) # bogus artist, good title
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.repair_file(f"X/youtube/Good Title [{YT_ID}].opus", "youtube", dry_run=False)
|
||||
assert "artist=Real Artist" in changed
|
||||
assert audio["artist"] == ["Real Artist"]
|
||||
assert audio["title"] == ["Good Title"] # good title untouched
|
||||
|
||||
|
||||
def test_repair_file_normalizes_na_album_when_source_has_none(monkeypatch):
|
||||
# Music video: no source album/year, but album tag is the literal 'NA' -> Unknown Album.
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: {"title": "Some Live Thing", "uploader": "Chan"})
|
||||
audio = _FakeAudio({"artist": ["X"], "title": ["Y"], "album": ["NA"]})
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.repair_file(f"X/youtube/Y [{YT_ID}].opus", "youtube", dry_run=False)
|
||||
assert "album=Unknown Album" in changed
|
||||
assert audio["album"] == ["Unknown Album"]
|
||||
|
||||
|
||||
def test_repair_file_renames_bogus_filename(tmp_path, monkeypatch):
|
||||
d = tmp_path / "Audioslave" / "youtube"
|
||||
d.mkdir(parents=True)
|
||||
f = d / f"NA [{YT_ID}].opus"
|
||||
f.write_text("x")
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: {"artist": "Audioslave", "title": "Cochise",
|
||||
"album": "Audioslave", "release_year": 2002})
|
||||
audio = _FakeAudio({"artist": ["Audioslave"], "title": ["NA"]})
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.repair_file(str(f), "youtube", dry_run=False)
|
||||
assert (d / f"Cochise [{YT_ID}].opus").exists()
|
||||
assert not f.exists()
|
||||
assert any("rename" in c.lower() or c.startswith("title=") for c in changed)
|
||||
|
||||
|
||||
def test_repair_file_dry_run_does_not_rename(tmp_path, monkeypatch):
|
||||
d = tmp_path / "Audioslave" / "youtube"
|
||||
d.mkdir(parents=True)
|
||||
f = d / f"NA [{YT_ID}].opus"
|
||||
f.write_text("x")
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url, *a: {"artist": "Audioslave", "title": "Cochise",
|
||||
"album": "Audioslave", "release_year": 2002})
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (_FakeAudio({"title": ["NA"]}), None))
|
||||
mf.repair_file(str(f), "youtube", dry_run=True)
|
||||
assert f.exists() # untouched in dry-run
|
||||
assert not (d / f"Cochise [{YT_ID}].opus").exists()
|
||||
|
||||
|
||||
def test_fs_safe_replaces_slash():
|
||||
assert "/" not in mf._fs_safe("AC/DC Live")
|
||||
Reference in New Issue
Block a user