diff --git a/musicfetch b/musicfetch index f02ebab..05ef3a8 100755 --- a/musicfetch +++ b/musicfetch @@ -19,6 +19,7 @@ from typing import Optional import requests from requests.exceptions import RequestException +from urllib.parse import urlparse, parse_qs # Optional deps — degrade gracefully if missing. try: @@ -618,10 +619,10 @@ def yt_download(url_or_query: str, target_folder: str, quality: str, dry_run: bo if dry_run: print(f"[dry-run] mkdir -p {target_folder}") print(f"[dry-run] {' '.join(cmd)}") - return + return True os.makedirs(target_folder, exist_ok=True) print(f"Downloading via yt-dlp -> {target_folder}") - subprocess.run(cmd) + return subprocess.run(cmd).returncode == 0 def act_youtube(hit: Hit, root: str, quality: str, dry_run: bool): @@ -630,12 +631,90 @@ def act_youtube(hit: Hit, root: str, quality: str, dry_run: bool): url = f"https://music.youtube.com/watch?v={vid}" if vid else f"ytsearch1:{hit.artist} {hit.title}" artist_dir = hit.artist.split(",")[0].strip() or "Unknown Artist" target = os.path.join(root, artist_dir, "youtube") - yt_download(url, target, quality, dry_run, hit=hit) + return yt_download(url, target, quality, dry_run, hit=hit) # --------------------------------------------------------------------------- # URL path # --------------------------------------------------------------------------- +def _playlist_id(url: str) -> str: + return parse_qs(urlparse(url).query).get("list", [""])[0] + + +def is_playlist_url(url: str) -> bool: + """True for a pure playlist URL (/playlist?list=… or list= without v=). + A watch?v=…&list=… URL is treated as a single track, not a batch.""" + if not is_url(url): + return False + parsed = urlparse(url) + qs = parse_qs(parsed.query) + if "/playlist" in parsed.path: + return True + return "list" in qs and "v" not in qs + + +def expand_playlist(url: str) -> tuple[str, list[Hit]]: + """Return (playlist_title, [track Hits]). Prefer ytmusicapi; fall back to + yt-dlp --flat-playlist. Returns ("", []) on failure.""" + pid = _playlist_id(url) + if YTMusic is not None and pid: + try: + pl = YTMusic().get_playlist(pid, limit=None) + hits = [] + for t in pl.get("tracks", []): + vid = t.get("videoId") + if not vid: + continue + alb = t.get("album") + album = alb.get("name", "") if isinstance(alb, dict) else (alb or "") + hits.append(Hit(source="youtube", kind="track", title=t.get("title", ""), + artist=_ytm_artists(t), album=album, + year=str(t.get("year") or ""), payload={"videoId": vid})) + if hits: + return pl.get("title", ""), hits + except Exception as e: # noqa: BLE001 + dbg(f"ytmusicapi playlist expand failed: {e}") + try: + result = subprocess.run(["yt-dlp", "--flat-playlist", "-J", url], + capture_output=True, text=True, check=True) + data = json.loads(result.stdout) + except (subprocess.CalledProcessError, json.JSONDecodeError) as e: + err(f"yt-dlp playlist expand failed: {e}") + return "", [] + hits = [] + for entry in data.get("entries", []): + vid = entry.get("id") + if not vid: + continue + hits.append(Hit(source="youtube", kind="track", title=entry.get("title", ""), + artist=entry.get("uploader") or entry.get("channel") or "", + payload={"videoId": vid})) + return data.get("title", ""), hits + + +def download_playlist(url: str, root: str, quality: str, dry_run: bool) -> tuple[int, int, str]: + """Download each playlist track via act_youtube. Returns (ok, total, title).""" + title, hits = expand_playlist(url) + ok = 0 + for h in hits: + try: + if act_youtube(h, root, quality, dry_run): + ok += 1 + except Exception as e: # noqa: BLE001 — one bad track shouldn't abort the batch + err(f"track failed ({h.title}): {e}") + return ok, len(hits), title + + +def download_single(url: str, root: str, quality: str, dry_run: bool) -> dict: + """Download a single URL. Returns {title, artist, ok}.""" + meta = run_yt_dlp_get_metadata(url) + artist = get_artist_from_metadata(meta) if meta else "Unknown Artist" + title = (meta or {}).get("title", "") + target = os.path.join(root, artist, "youtube") + ok = yt_download(url, target, quality, dry_run) + return {"title": title, "artist": artist, "ok": ok} + + def run_yt_dlp_get_metadata(url: str) -> Optional[dict]: try: result = subprocess.run( @@ -658,10 +737,12 @@ def get_artist_from_metadata(meta: dict) -> str: def handle_url(url: str, root: str, quality: str, dry_run: bool): - meta = run_yt_dlp_get_metadata(url) - artist = get_artist_from_metadata(meta) if meta else "Unknown Artist" - target = os.path.join(root, artist, "youtube") - yt_download(url, target, quality, dry_run) + if is_playlist_url(url): + ok, total, title = download_playlist(url, root, quality, dry_run) + label = f" from '{title}'" if title else "" + print(f"Downloaded {ok}/{total} tracks{label}") + return + download_single(url, root, quality, dry_run) # --------------------------------------------------------------------------- diff --git a/tests/test_playlist.py b/tests/test_playlist.py new file mode 100644 index 0000000..c552316 --- /dev/null +++ b/tests/test_playlist.py @@ -0,0 +1,74 @@ +import server.mf # noqa: F401 +import musicfetch_core as mf + + +def test_pure_playlist_url_is_playlist(): + assert mf.is_playlist_url("https://music.youtube.com/playlist?list=PLabc") is True + assert mf.is_playlist_url("https://www.youtube.com/playlist?list=PLabc") is True + + +def test_watch_with_list_is_not_playlist(): + assert mf.is_playlist_url("https://www.youtube.com/watch?v=abc&list=PLx") is False + + +def test_plain_watch_is_not_playlist(): + assert mf.is_playlist_url("https://www.youtube.com/watch?v=abc") is False + + +def test_non_url_is_not_playlist(): + assert mf.is_playlist_url("Daft Punk - Discovery") is False + + +class _CP: + def __init__(self, stdout): + self.stdout = stdout + self.returncode = 0 + + +def test_expand_playlist_ytdlp_fallback(monkeypatch): + import json as _json + monkeypatch.setattr(mf, "YTMusic", None) + payload = {"title": "My Mix", "entries": [ + {"id": "v1", "title": "Song One", "uploader": "Artist A"}, + {"id": "v2", "title": "Song Two", "channel": "Artist B"}, + {"id": None, "title": "skip"}, + ]} + monkeypatch.setattr(mf.subprocess, "run", lambda *a, **k: _CP(_json.dumps(payload))) + title, hits = mf.expand_playlist("https://www.youtube.com/playlist?list=PLx") + assert title == "My Mix" + assert [h.payload["videoId"] for h in hits] == ["v1", "v2"] + assert hits[0].artist == "Artist A" + + +def test_download_playlist_counts_ok_and_total(monkeypatch): + h1 = mf.Hit(source="youtube", kind="track", title="A", artist="X", payload={"videoId": "1"}) + h2 = mf.Hit(source="youtube", kind="track", title="B", artist="Y", payload={"videoId": "2"}) + h3 = mf.Hit(source="youtube", kind="track", title="C", artist="Z", payload={"videoId": "3"}) + monkeypatch.setattr(mf, "expand_playlist", lambda url: ("PL Title", [h1, h2, h3])) + monkeypatch.setattr(mf, "act_youtube", lambda hit, root, quality, dry_run: hit.title != "B") + ok, total, title = mf.download_playlist("u", "/tmp", "best", False) + assert (ok, total, title) == (2, 3, "PL Title") + + +def test_download_playlist_track_exception_counts_as_failure(monkeypatch): + h1 = mf.Hit(source="youtube", kind="track", title="A", artist="X", payload={"videoId": "1"}) + h2 = mf.Hit(source="youtube", kind="track", title="B", artist="Y", payload={"videoId": "2"}) + monkeypatch.setattr(mf, "expand_playlist", lambda url: ("T", [h1, h2])) + + def fake_act(hit, root, quality, dry_run): + if hit.title == "B": + raise RuntimeError("boom") + return True + monkeypatch.setattr(mf, "act_youtube", fake_act) + ok, total, _ = mf.download_playlist("u", "/tmp", "best", False) + assert (ok, total) == (1, 2) + + +def test_yt_download_returns_true_on_zero_exit(monkeypatch): + monkeypatch.setattr(mf.os, "makedirs", lambda *a, **k: None) + monkeypatch.setattr(mf.subprocess, "run", lambda *a, **k: _CP("")) + assert mf.yt_download("u", "/tmp/x", "best", False) is True + + +def test_yt_download_dry_run_returns_true(monkeypatch): + assert mf.yt_download("u", "/tmp/x", "best", True) is True