Review finding: bare endswith routed look-alike hosts to the direct yt-dlp path. Match on a domain-label boundary and drop the redundant _DIRECT_HOSTS. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1390 lines
54 KiB
Python
Executable File
1390 lines
54 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""MusicFetch v2 — fetch music via Lidarr (preferred) or YouTube Music (yt-dlp).
|
||
|
||
Accepts a free-form query ("artist", "title", "album", or combos like
|
||
"artist - title" / "artist - album") or a URL. Searches Lidarr and YouTube
|
||
Music concurrently, shows the top hits in an interactive picker, and acts on
|
||
the chosen hit. See README.md for full docs.
|
||
"""
|
||
import argparse
|
||
import json
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from dataclasses import dataclass, field
|
||
from typing import Optional
|
||
|
||
import requests
|
||
from requests.exceptions import ConnectionError as ReqConnectionError
|
||
from requests.exceptions import RequestException, Timeout
|
||
from urllib.parse import urlparse, parse_qs
|
||
|
||
# Optional deps — degrade gracefully if missing.
|
||
try:
|
||
from ytmusicapi import YTMusic
|
||
except ImportError:
|
||
YTMusic = None
|
||
|
||
try:
|
||
from rich.console import Console
|
||
from rich.table import Table
|
||
from rich.text import Text
|
||
_console = Console()
|
||
except ImportError:
|
||
Console = None
|
||
_console = None
|
||
|
||
# === CONFIGURATION ===
|
||
LIDARR_URL = os.environ.get("LIDARR_URL", "http://localhost:8686").rstrip("/")
|
||
API_KEY = os.environ.get("LIDARR_API_KEY", "")
|
||
DEFAULT_ROOT = os.environ.get("MUSICFETCH_ROOT", "/media/music")
|
||
|
||
HEADERS = {"X-Api-Key": API_KEY, "Content-Type": "application/json"}
|
||
|
||
# Runtime flags, populated in main().
|
||
DEBUG = False
|
||
|
||
# yt-dlp cookies — authenticated requests bypass YouTube's bot-check ("Sign in
|
||
# to confirm you're not a bot") and lift rate limits, which is essential for
|
||
# bulk --repair. Set via CLI (--cookies / --cookies-from-browser) or env so the
|
||
# REST API container can supply them too.
|
||
COOKIES_FILE = os.environ.get("YTDLP_COOKIES", "")
|
||
COOKIES_FROM_BROWSER = os.environ.get("YTDLP_COOKIES_FROM_BROWSER", "")
|
||
|
||
|
||
def _cookie_args() -> list:
|
||
"""yt-dlp cookie flags (file wins over browser); empty when neither is set."""
|
||
if COOKIES_FILE:
|
||
return ["--cookies", COOKIES_FILE]
|
||
if COOKIES_FROM_BROWSER:
|
||
return ["--cookies-from-browser", COOKIES_FROM_BROWSER]
|
||
return []
|
||
|
||
# Quality choices for --quality.
|
||
QUALITY_CHOICES = ["best", "320", "m4a", "opus", "flac"]
|
||
|
||
|
||
def dbg(*a):
|
||
if DEBUG:
|
||
print("[DEBUG]", *a)
|
||
|
||
|
||
def err(*a):
|
||
print("[ERROR]", *a, file=sys.stderr)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Hit model
|
||
# ---------------------------------------------------------------------------
|
||
@dataclass
|
||
class Hit:
|
||
source: str # "lidarr" | "youtube"
|
||
kind: str # "artist" | "album" | "track"
|
||
title: str = "" # track/album title (display)
|
||
artist: str = ""
|
||
album: str = ""
|
||
year: str = ""
|
||
thumbnail: str = ""
|
||
payload: dict = field(default_factory=dict) # raw data needed to act
|
||
|
||
@property
|
||
def display_title(self) -> str:
|
||
return self.title or self.album or self.artist
|
||
|
||
|
||
@dataclass
|
||
class Resolved:
|
||
title: str = ""
|
||
artist: str = ""
|
||
thumb: str = ""
|
||
youtube_url: str = ""
|
||
|
||
|
||
class OdesliError(Exception):
|
||
"""Raised when an Odesli link can't be resolved to usable metadata."""
|
||
|
||
|
||
ODESLI_URL = "https://api.song.link/v1-alpha.1/links"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
def is_url(s: str) -> bool:
|
||
return bool(re.match(r"https?://", s))
|
||
|
||
|
||
def lidarr_get(path, params=None, timeout=15):
|
||
resp = requests.get(f"{LIDARR_URL}{path}", headers=HEADERS, params=params, timeout=timeout)
|
||
resp.raise_for_status()
|
||
return resp.json()
|
||
|
||
|
||
def lidarr_post(path, payload, timeout=15):
|
||
resp = requests.post(f"{LIDARR_URL}{path}", headers=HEADERS, json=payload, timeout=timeout)
|
||
resp.raise_for_status()
|
||
return resp.json() if resp.content else {}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Lidarr search
|
||
# ---------------------------------------------------------------------------
|
||
def _year_from_album(album: dict) -> str:
|
||
rd = album.get("releaseDate") or album.get("firstReleaseDate") or ""
|
||
return rd[:4] if rd else ""
|
||
|
||
|
||
def _album_to_hit(album: dict) -> Hit:
|
||
artist = (album.get("artist") or {}).get("artistName") or album.get("artistName") or ""
|
||
return Hit(
|
||
source="lidarr",
|
||
kind="album",
|
||
title=album.get("title", ""),
|
||
artist=artist,
|
||
album=album.get("title", ""),
|
||
year=_year_from_album(album),
|
||
payload={"album": album},
|
||
)
|
||
|
||
|
||
def _artist_to_hit(artist: dict) -> Hit:
|
||
return Hit(
|
||
source="lidarr",
|
||
kind="artist",
|
||
title=artist.get("artistName") or artist.get("title", ""),
|
||
artist=artist.get("artistName") or artist.get("title", ""),
|
||
payload={"artist": artist},
|
||
)
|
||
|
||
|
||
MUSICBRAINZ_URL = "https://musicbrainz.org/ws/2"
|
||
MB_HEADERS = {"User-Agent": "musicfetch/2.0 (https://github.com/; personal music fetcher)"}
|
||
_mb_last_call = 0.0
|
||
|
||
|
||
def _mb_rate_limit():
|
||
"""Courtesy ~1 req/sec to MusicBrainz."""
|
||
global _mb_last_call
|
||
elapsed = time.time() - _mb_last_call
|
||
if elapsed < 1.0:
|
||
time.sleep(1.0 - elapsed)
|
||
_mb_last_call = time.time()
|
||
|
||
|
||
def _mb_artist_credit(credit) -> str:
|
||
"""First credited artist name only (ignore featured/secondary)."""
|
||
if credit and isinstance(credit, list) and isinstance(credit[0], dict):
|
||
return credit[0].get("name") or (credit[0].get("artist") or {}).get("name", "")
|
||
return ""
|
||
|
||
|
||
def musicbrainz_best_album(artist: str, track: str, timeout: int = 8) -> Optional[dict]:
|
||
"""Resolve 'artist - track' to its best studio album via MusicBrainz.
|
||
Prefers a studio album credited to the track's own artist (not a Various
|
||
Artists compilation). Returns {album_title, artist, year, rg_mbid} or None.
|
||
Never raises."""
|
||
query = f'artist:"{artist}" AND recording:"{track}"'
|
||
try:
|
||
_mb_rate_limit()
|
||
resp = requests.get(
|
||
f"{MUSICBRAINZ_URL}/recording",
|
||
params={"query": query, "fmt": "json", "limit": 25},
|
||
headers=MB_HEADERS, timeout=timeout,
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
except Exception as e: # noqa: BLE001 — degrade to fallback on any failure
|
||
dbg(f"MusicBrainz lookup failed: {e}")
|
||
return None
|
||
|
||
# candidate = (own_studio, is_studio, date_sortkey, title, artist, year, mbid)
|
||
candidates = []
|
||
for rec in data.get("recordings", []):
|
||
rec_artist = _mb_artist_credit(rec.get("artist-credit"))
|
||
for rel in rec.get("releases", []):
|
||
rg = rel.get("release-group") or {}
|
||
title = rg.get("title") or rel.get("title") or ""
|
||
if not title:
|
||
continue
|
||
mbid = rg.get("id") or ""
|
||
primary = rg.get("primary-type") or ""
|
||
secondary = rg.get("secondary-types") or []
|
||
rel_artist = _mb_artist_credit(rel.get("artist-credit"))
|
||
date = rel.get("date") or rg.get("first-release-date") or ""
|
||
is_studio = primary == "Album" and not secondary
|
||
own_studio = is_studio and (
|
||
not rel_artist or rel_artist.casefold() == rec_artist.casefold()
|
||
)
|
||
candidates.append((own_studio, is_studio, date or "9999", title, rec_artist, date[:4], mbid))
|
||
|
||
if not candidates:
|
||
return None
|
||
pool = ([c for c in candidates if c[0]]
|
||
or [c for c in candidates if c[1]]
|
||
or candidates)
|
||
pool.sort(key=lambda c: c[2]) # earliest date first
|
||
_, _, _, title, art, year, mbid = pool[0]
|
||
dbg(f"MusicBrainz resolved '{artist} - {track}' -> '{title}' ({year}) mbid={mbid}")
|
||
return {"album_title": title, "artist": art or artist, "year": year, "rg_mbid": mbid}
|
||
|
||
|
||
def _split_query(query: str) -> tuple[str, Optional[str]]:
|
||
"""Split a Shazam-style 'Artist - Track' on the first ' - '.
|
||
Returns (artist, track) or (term, None) when there is no separator."""
|
||
if " - " in query:
|
||
left, right = query.split(" - ", 1)
|
||
return left.strip(), right.strip()
|
||
return query.strip(), None
|
||
|
||
|
||
def lidarr_search(query: str, limit: int) -> list[Hit]:
|
||
"""Return Lidarr hits, best match first. Resolves 'Artist - Track' to an
|
||
album's MusicBrainz release-group MBID, then does an exact Lidarr lookup
|
||
(term=mbid:<id>) — no fuzzy ranking. Falls back so it never raises and
|
||
returns [] only on total failure / missing key."""
|
||
if not API_KEY:
|
||
err("LIDARR_API_KEY not set — skipping Lidarr search.")
|
||
return []
|
||
|
||
artist, right = _split_query(query)
|
||
|
||
if right:
|
||
mb = musicbrainz_best_album(artist, right)
|
||
if mb and mb["rg_mbid"]:
|
||
hits = _lidarr_album_candidates(f"mbid:{mb['rg_mbid']}")
|
||
for h in hits:
|
||
if not h.year and mb["year"]:
|
||
h.year = mb["year"]
|
||
if hits:
|
||
return hits[:limit]
|
||
# MusicBrainz miss / no exact album → plain lookup (album-first: a dash
|
||
# query named an album/track).
|
||
return _fallback_lookup(query, limit, artist_first=False)
|
||
|
||
# Bare term is most often an artist.
|
||
return _fallback_lookup(query, limit, artist_first=True)
|
||
|
||
|
||
def _log_lidarr_failure(label: str, e: Exception) -> None:
|
||
"""A connection/timeout error means Lidarr is unreachable — the silent
|
||
YouTube fallback that follows is easy to mistake for "Lidarr had no match",
|
||
so surface it loudly. Ordinary HTTP errors stay debug-only."""
|
||
if isinstance(e, (ReqConnectionError, Timeout)):
|
||
err(f"Lidarr unreachable ({label} at {LIDARR_URL}): {e}. "
|
||
f"Falling back to YouTube.")
|
||
else:
|
||
dbg(f"{label} failed: {e}")
|
||
|
||
|
||
def _lidarr_album_candidates(term: str) -> list[Hit]:
|
||
try:
|
||
return [_album_to_hit(a) for a in lidarr_get("/api/v1/album/lookup", params={"term": term})]
|
||
except RequestException as e:
|
||
_log_lidarr_failure("album/lookup", e)
|
||
return []
|
||
|
||
|
||
def _lidarr_artist_candidates(term: str) -> list[Hit]:
|
||
try:
|
||
return [_artist_to_hit(a) for a in lidarr_get("/api/v1/artist/lookup", params={"term": term})]
|
||
except RequestException as e:
|
||
_log_lidarr_failure("artist/lookup", e)
|
||
return []
|
||
|
||
|
||
def _fallback_lookup(query: str, limit: int, artist_first: bool) -> list[Hit]:
|
||
"""Plain album + artist lookups (no scoring); /search as last resort."""
|
||
albums = _lidarr_album_candidates(query)
|
||
artists = _lidarr_artist_candidates(query)
|
||
hits = (artists + albums) if artist_first else (albums + artists)
|
||
if hits:
|
||
return hits[:limit]
|
||
return _universal_search(query, limit)
|
||
|
||
|
||
def _universal_search(query: str, limit: int) -> list[Hit]:
|
||
"""Last resort: Lidarr's fuzzy /search (unranked)."""
|
||
hits: list[Hit] = []
|
||
try:
|
||
for item in lidarr_get("/api/v1/search", params={"term": query}):
|
||
if item.get("album"):
|
||
hits.append(_album_to_hit(item["album"]))
|
||
elif item.get("artist"):
|
||
hits.append(_artist_to_hit(item["artist"]))
|
||
except RequestException as e:
|
||
dbg(f"/api/v1/search failed: {e}")
|
||
return hits[:limit]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# YouTube search (ytmusicapi preferred, yt-dlp scrape fallback)
|
||
# ---------------------------------------------------------------------------
|
||
def _ytm_thumb(item: dict) -> str:
|
||
thumbs = item.get("thumbnails") or []
|
||
return thumbs[-1]["url"] if thumbs else ""
|
||
|
||
|
||
def _ytm_artists(item: dict) -> str:
|
||
arts = item.get("artists") or []
|
||
return ", ".join(a.get("name", "") for a in arts if a.get("name"))
|
||
|
||
|
||
def youtube_search(query: str, limit: int) -> list[Hit]:
|
||
if YTMusic is not None:
|
||
try:
|
||
return _ytmusic_search(query, limit)
|
||
except Exception as e: # ytmusicapi raises broadly
|
||
dbg(f"ytmusicapi search failed ({e}); falling back to yt-dlp scrape.")
|
||
return _ytdlp_search(query, limit)
|
||
|
||
|
||
def _ytmusic_search(query: str, limit: int) -> list[Hit]:
|
||
yt = YTMusic()
|
||
hits: list[Hit] = []
|
||
# Songs give us videoId + album + artist; that's the best download target.
|
||
for item in yt.search(query, filter="songs", limit=limit):
|
||
vid = item.get("videoId")
|
||
if not vid:
|
||
continue
|
||
album = (item.get("album") or {}).get("name", "") if isinstance(item.get("album"), dict) else (item.get("album") or "")
|
||
hits.append(Hit(
|
||
source="youtube",
|
||
kind="track",
|
||
title=item.get("title", ""),
|
||
artist=_ytm_artists(item),
|
||
album=album,
|
||
year=str(item.get("year") or ""),
|
||
thumbnail=_ytm_thumb(item),
|
||
payload={"videoId": vid},
|
||
))
|
||
if len(hits) >= limit:
|
||
break
|
||
return hits
|
||
|
||
|
||
def _ytdlp_search(query: str, limit: int) -> list[Hit]:
|
||
try:
|
||
result = subprocess.run(
|
||
["yt-dlp", *_cookie_args(), "--flat-playlist", "-J", f"ytsearch{limit}:{query}"],
|
||
capture_output=True, text=True, check=True,
|
||
)
|
||
data = json.loads(result.stdout)
|
||
except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
|
||
err(f"yt-dlp search failed: {e}")
|
||
return []
|
||
hits: list[Hit] = []
|
||
for entry in data.get("entries", []):
|
||
vid = entry.get("id")
|
||
if not vid:
|
||
continue
|
||
hits.append(Hit(
|
||
source="youtube",
|
||
kind="track",
|
||
title=entry.get("title", ""),
|
||
artist=entry.get("uploader") or entry.get("channel") or "",
|
||
year="",
|
||
thumbnail="",
|
||
payload={"videoId": vid},
|
||
))
|
||
return hits[:limit]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Picker / rendering
|
||
# ---------------------------------------------------------------------------
|
||
def _keywords(query: str) -> list[str]:
|
||
return [w for w in re.split(r"[\s\-]+", query.lower()) if len(w) > 1]
|
||
|
||
|
||
def _ansi_bold_match(s: str, keywords: list[str]) -> str:
|
||
if not s:
|
||
return ""
|
||
out = s
|
||
for kw in keywords:
|
||
out = re.sub(f"({re.escape(kw)})", "\033[1m\\1\033[0m", out, flags=re.IGNORECASE)
|
||
return out
|
||
|
||
|
||
def _rich_match(s: str, keywords: list[str]):
|
||
text = Text(s or "")
|
||
low = (s or "").lower()
|
||
for kw in keywords:
|
||
start = 0
|
||
while True:
|
||
idx = low.find(kw, start)
|
||
if idx == -1:
|
||
break
|
||
text.stylize("bold", idx, idx + len(kw))
|
||
start = idx + len(kw)
|
||
return text
|
||
|
||
|
||
def render_picker(hits: list[Hit], query: str, yt_first: bool) -> None:
|
||
keywords = _keywords(query)
|
||
|
||
if _console is not None:
|
||
table = Table(show_lines=False, expand=False)
|
||
table.add_column("#", justify="right", style="cyan")
|
||
table.add_column("Src")
|
||
table.add_column("Artist")
|
||
table.add_column("Album / Title")
|
||
table.add_column("Year")
|
||
table.add_column("Type")
|
||
for i, h in enumerate(hits, 1):
|
||
src = "[green]LID[/]" if h.source == "lidarr" else "[red]YT[/]"
|
||
at = h.album if h.kind == "album" else h.display_title
|
||
table.add_row(
|
||
str(i), src,
|
||
_rich_match(h.artist, keywords),
|
||
_rich_match(at, keywords),
|
||
h.year, h.kind,
|
||
)
|
||
_console.print(table)
|
||
else:
|
||
for i, h in enumerate(hits, 1):
|
||
src = "LID" if h.source == "lidarr" else "YT "
|
||
at = h.album if h.kind == "album" else h.display_title
|
||
print(f"{i:>3} {src} {_ansi_bold_match(h.artist, keywords):<30} "
|
||
f"{_ansi_bold_match(at, keywords):<40} {h.year:<6} {h.kind}")
|
||
|
||
|
||
def pick(hits: list[Hit], query: str, noninteractive: bool, yt_first: bool) -> Optional[Hit]:
|
||
if not hits:
|
||
return None
|
||
if noninteractive:
|
||
primary = "youtube" if yt_first else "lidarr"
|
||
for h in hits:
|
||
if h.source == primary:
|
||
return h
|
||
return hits[0]
|
||
|
||
render_picker(hits, query, yt_first)
|
||
while True:
|
||
try:
|
||
raw = input("Pick a number (q to quit): ").strip()
|
||
except (EOFError, KeyboardInterrupt):
|
||
print()
|
||
return None
|
||
if raw.lower() in ("q", "quit", ""):
|
||
return None
|
||
if raw.isdigit() and 1 <= int(raw) <= len(hits):
|
||
return hits[int(raw) - 1]
|
||
print("Invalid choice.")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Lidarr actions
|
||
# ---------------------------------------------------------------------------
|
||
def get_existing_artist(name: str) -> Optional[dict]:
|
||
try:
|
||
for artist in lidarr_get("/api/v1/artist", timeout=10):
|
||
if artist.get("artistName", "").lower() == name.lower():
|
||
return artist
|
||
except RequestException as e:
|
||
dbg(f"existing artist check failed: {e}")
|
||
return None
|
||
|
||
|
||
def _profile_id_by_name(path: str, env_var: str, default_name: str) -> int:
|
||
"""Return the id of the profile whose name matches env_var (default
|
||
default_name, case-insensitive). Fall back to the first profile, then 1."""
|
||
name = os.environ.get(env_var, default_name)
|
||
try:
|
||
profiles = lidarr_get(path, timeout=10)
|
||
except RequestException as e:
|
||
dbg(f"{path} fetch failed: {e}")
|
||
return 1
|
||
if not profiles:
|
||
return 1
|
||
for p in profiles:
|
||
if p.get("name", "").casefold() == name.casefold():
|
||
return p["id"]
|
||
dbg(f"profile '{name}' not found at {path}; using first ('{profiles[0].get('name')}')")
|
||
return profiles[0]["id"]
|
||
|
||
|
||
def get_default_metadata_profile_id() -> int:
|
||
return _profile_id_by_name("/api/v1/metadataprofile", "LIDARR_METADATA_PROFILE", "Standard")
|
||
|
||
|
||
def get_quality_profile_id() -> int:
|
||
return _profile_id_by_name("/api/v1/qualityprofile", "LIDARR_QUALITY_PROFILE", "Any")
|
||
|
||
|
||
def add_artist(meta: dict, root: str, search_all: bool, dry_run: bool) -> Optional[dict]:
|
||
foreign_id = meta.get("foreignArtistId") or meta.get("id")
|
||
name = meta.get("artistName") or meta.get("title")
|
||
if not foreign_id or not name:
|
||
err("Missing foreignArtistId/artistName; cannot add artist.")
|
||
return None
|
||
payload = {
|
||
"foreignArtistId": foreign_id,
|
||
"artistName": name,
|
||
"qualityProfileId": get_quality_profile_id(),
|
||
"metadataProfileId": get_default_metadata_profile_id(),
|
||
"rootFolderPath": root,
|
||
"monitored": True,
|
||
"addOptions": {"searchForMissingAlbums": search_all, "monitor": "all"},
|
||
}
|
||
if dry_run:
|
||
print(f"[dry-run] POST /api/v1/artist {json.dumps(payload)}")
|
||
return {"id": -1, "artistName": name, **payload}
|
||
try:
|
||
return lidarr_post("/api/v1/artist", payload)
|
||
except RequestException as e:
|
||
err(f"add_artist failed: {e}")
|
||
return None
|
||
|
||
|
||
def ensure_album_in_library(album: dict, root: str, search_all: bool, dry_run: bool) -> Optional[dict]:
|
||
"""Return a library album dict (with numeric id). Adds artist if needed."""
|
||
# Already in library?
|
||
if album.get("id") and isinstance(album.get("id"), int) and album.get("id") > 0 and not album.get("foreignAlbumId", "").startswith("lookup"):
|
||
# Heuristic: lookup results carry a 0/None id; library albums carry real ids.
|
||
if album.get("artistId"):
|
||
return album
|
||
|
||
artist_obj = album.get("artist") or {}
|
||
artist_name = artist_obj.get("artistName") or album.get("artistName") or ""
|
||
existing = get_existing_artist(artist_name) if artist_name else None
|
||
if not existing:
|
||
print(f"Adding artist '{artist_name}' to Lidarr...")
|
||
existing = add_artist(artist_obj or {"artistName": artist_name,
|
||
"foreignArtistId": artist_obj.get("foreignArtistId")},
|
||
root, search_all, dry_run)
|
||
if not existing:
|
||
return None
|
||
|
||
if dry_run:
|
||
print(f"[dry-run] would resolve album '{album.get('title')}' under artist id {existing.get('id')}")
|
||
return {**album, "id": album.get("id") or -1, "artistId": existing.get("id")}
|
||
|
||
# Find the album in the (now-present) artist's albums by title match.
|
||
try:
|
||
albums = lidarr_get("/api/v1/album", params={"artistId": existing["id"]}, timeout=15)
|
||
for a in albums:
|
||
if a.get("title", "").lower() == album.get("title", "").lower():
|
||
return a
|
||
if albums:
|
||
return albums[0]
|
||
except RequestException as e:
|
||
dbg(f"album list fetch failed: {e}")
|
||
return None
|
||
|
||
|
||
def release_available(album_id: int) -> bool:
|
||
"""Interactive search: does any indexer have a release for this album?"""
|
||
try:
|
||
releases = lidarr_get("/api/v1/release", params={"albumId": album_id}, timeout=90)
|
||
dbg(f"interactive search returned {len(releases)} releases for album {album_id}")
|
||
return len(releases) > 0
|
||
except RequestException as e:
|
||
dbg(f"release search failed: {e}")
|
||
return False
|
||
|
||
|
||
def trigger_album_search(album_id: int, dry_run: bool):
|
||
if dry_run:
|
||
print(f"[dry-run] POST /api/v1/command AlbumSearch albumIds=[{album_id}]")
|
||
return
|
||
lidarr_post("/api/v1/command", {"name": "AlbumSearch", "albumIds": [album_id]})
|
||
|
||
|
||
def act_lidarr_album(hit: Hit, root: str, search_all: bool, dry_run: bool) -> bool:
|
||
"""Returns True if Lidarr handled it; False to fall through to YouTube."""
|
||
album = hit.payload["album"]
|
||
lib_album = ensure_album_in_library(album, root, search_all, dry_run)
|
||
if not lib_album:
|
||
err("Could not resolve album in Lidarr.")
|
||
return False
|
||
album_id = lib_album.get("id")
|
||
if dry_run:
|
||
print(f"[dry-run] would interactive-search album id {album_id}; "
|
||
f"if no release found, fall through to YouTube.")
|
||
trigger_album_search(album_id, dry_run)
|
||
return True
|
||
|
||
if isinstance(album_id, int) and album_id > 0 and release_available(album_id):
|
||
print(f"Indexer release available — triggering Lidarr grab for '{hit.album}'.")
|
||
trigger_album_search(album_id, dry_run)
|
||
return True
|
||
print("No indexer release found in Lidarr — falling through to YouTube.")
|
||
return False
|
||
|
||
|
||
def act_lidarr_artist(hit: Hit, root: str, search_all: bool, dry_run: bool) -> bool:
|
||
artist = hit.payload["artist"]
|
||
print(f"Adding artist '{hit.artist}' to Lidarr...")
|
||
result = add_artist(artist, root, search_all, dry_run)
|
||
return result is not None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# YouTube download
|
||
# ---------------------------------------------------------------------------
|
||
def _quality_args(quality: str) -> list[str]:
|
||
if quality == "best":
|
||
# bestaudio, prefer mp3 320 only if extraction needs a container.
|
||
return ["-f", "bestaudio/best", "-x", "--audio-quality", "0"]
|
||
if quality == "320":
|
||
return ["-f", "bestaudio/best", "-x", "--audio-format", "mp3", "--audio-quality", "0"]
|
||
if quality in ("m4a", "opus", "flac"):
|
||
return ["-f", "bestaudio/best", "-x", "--audio-format", quality, "--audio-quality", "0"]
|
||
return ["-f", "bestaudio/best", "-x"]
|
||
|
||
|
||
def yt_download(url_or_query: str, target_folder: Optional[str], quality: str, dry_run: bool,
|
||
hit: Optional[Hit] = None, outtmpl: Optional[str] = None):
|
||
cmd = ["yt-dlp",
|
||
*_cookie_args(),
|
||
*_quality_args(quality),
|
||
"--embed-metadata",
|
||
"--embed-thumbnail",
|
||
"--no-playlist"]
|
||
# Either a fixed output dir (-P) or a metadata-driven output template (-o).
|
||
if outtmpl:
|
||
cmd += ["-o", outtmpl]
|
||
else:
|
||
cmd += ["-P", target_folder]
|
||
# Override embedded tags from the chosen hit. Inject literals via a
|
||
# seed-then-replace pair: --parse-metadata first copies an always-present
|
||
# field into meta_<tag> (so the tag exists even when the source lacks it,
|
||
# e.g. YouTube videos with no album), then --replace-in-metadata overwrites
|
||
# it with the literal value. This dodges yt-dlp's output-template trap where
|
||
# a bare-word FROM (e.g. "Cochise") matches field_to_template's r'[a-zA-Z_]+$'
|
||
# and is read as a *field name* -> "NA". --replace-in-metadata args are
|
||
# literal, so single-word values and parens survive intact.
|
||
def _force_tag(field: str, value: str) -> list[str]:
|
||
repl = value.replace("\\", r"\\") # backslash is special in re.sub repl
|
||
return ["--parse-metadata", f"%(title,id)s:%(meta_{field})s",
|
||
"--replace-in-metadata", f"meta_{field}", "^.*$", repl]
|
||
|
||
if hit:
|
||
primary_artist = hit.artist.split(",")[0].strip() if hit.artist else ""
|
||
if primary_artist:
|
||
cmd += _force_tag("artist", primary_artist)
|
||
if hit.title:
|
||
cmd += _force_tag("title", hit.title)
|
||
if hit.album:
|
||
cmd += _force_tag("album", hit.album)
|
||
if hit.year:
|
||
cmd += ["--parse-metadata", f"{hit.year}:%(release_year)s"]
|
||
# When the hit carried no album, still embed one: the resolved/native album
|
||
# if present, else a placeholder so players (e.g. Plexamp) don't choke on a
|
||
# blank album. (A hit album is already forced above and must not be clobbered.)
|
||
if not (hit and hit.album):
|
||
cmd += ["--parse-metadata", "%(album|Unknown Album)s:%(meta_album)s"]
|
||
cmd.append(url_or_query)
|
||
|
||
dest = outtmpl or target_folder
|
||
if dry_run:
|
||
if target_folder:
|
||
print(f"[dry-run] mkdir -p {target_folder}")
|
||
print(f"[dry-run] {' '.join(cmd)}")
|
||
return True
|
||
if target_folder:
|
||
os.makedirs(target_folder, exist_ok=True)
|
||
print(f"Downloading via yt-dlp -> {dest}")
|
||
return subprocess.run(cmd).returncode == 0
|
||
|
||
|
||
def _sanitize_source(name: str) -> str:
|
||
"""Normalize a yt-dlp extractor key to a folder name ('Youtube'->'youtube')."""
|
||
clean = re.sub(r"[^a-z0-9]+", "", (name or "").lower())
|
||
return clean or "downloads"
|
||
|
||
|
||
def _track_url(hit: Hit) -> str:
|
||
"""Resolve the best download URL for a track Hit. YouTube tracks prefer the
|
||
music.youtube URL (correct album art); other platforms use their own URL."""
|
||
p = hit.payload
|
||
extractor = p.get("extractor")
|
||
vid = p.get("videoId")
|
||
if vid and extractor in (None, "youtube"):
|
||
return f"https://music.youtube.com/watch?v={vid}"
|
||
if p.get("url"):
|
||
return p["url"]
|
||
if vid:
|
||
return f"https://music.youtube.com/watch?v={vid}"
|
||
return f"ytsearch1:{hit.artist} {hit.title}"
|
||
|
||
|
||
def act_youtube(hit: Hit, root: str, quality: str, dry_run: bool):
|
||
url = _track_url(hit)
|
||
source = hit.payload.get("extractor") or "youtube"
|
||
artist_dir = hit.artist.split(",")[0].strip()
|
||
if artist_dir:
|
||
target = os.path.join(root, artist_dir, source)
|
||
return yt_download(url, target, quality, dry_run, hit=hit)
|
||
# Sparse playlist metadata (e.g. SoundCloud sets): let yt-dlp route the file
|
||
# by the track's own metadata so it lands under the real artist.
|
||
outtmpl = os.path.join(root, "%(artist,uploader,channel)s", source, "%(title)s [%(id)s].%(ext)s")
|
||
return yt_download(url, None, quality, dry_run, hit=hit, outtmpl=outtmpl)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# URL path
|
||
# ---------------------------------------------------------------------------
|
||
def _playlist_id(url: str) -> str:
|
||
return parse_qs(urlparse(url).query).get("list", [""])[0]
|
||
|
||
|
||
def _is_youtube_playlist_url(url: str) -> bool:
|
||
"""True for a YouTube playlist URL (/playlist?list=… or list= without v=).
|
||
A watch?v=…&list=… URL is treated as a single track, not a batch."""
|
||
if not is_url(url):
|
||
return False
|
||
parsed = urlparse(url)
|
||
if "youtube" not in parsed.netloc:
|
||
return False
|
||
qs = parse_qs(parsed.query)
|
||
if "/playlist" in parsed.path:
|
||
return True
|
||
return "list" in qs and "v" not in qs
|
||
|
||
|
||
_DIRECT_DOMAINS = ("youtube.com", "youtu.be", "soundcloud.com")
|
||
|
||
|
||
def _is_direct_url(url: str) -> bool:
|
||
"""True for links yt-dlp downloads well directly (YouTube, SoundCloud).
|
||
These skip Odesli resolution and use the existing handle_url path. Matches on
|
||
a label boundary so look-alikes (notyoutube.com) don't slip through."""
|
||
if not is_url(url):
|
||
return False
|
||
host = (urlparse(url).hostname or "").lower()
|
||
return any(host == d or host.endswith("." + d) for d in _DIRECT_DOMAINS)
|
||
|
||
|
||
def _ytmusic_playlist(pid: str) -> tuple[str, list[Hit]]:
|
||
"""Expand a YouTube Music playlist via ytmusicapi. Returns ("", []) on failure."""
|
||
try:
|
||
pl = YTMusic().get_playlist(pid, limit=None)
|
||
except Exception as e: # noqa: BLE001
|
||
dbg(f"ytmusicapi playlist expand failed: {e}")
|
||
return "", []
|
||
hits = []
|
||
for t in pl.get("tracks", []):
|
||
vid = t.get("videoId")
|
||
if not vid:
|
||
continue
|
||
alb = t.get("album")
|
||
album = alb.get("name", "") if isinstance(alb, dict) else (alb or "")
|
||
hits.append(Hit(source="youtube", kind="track", title=t.get("title", ""),
|
||
artist=_ytm_artists(t), album=album, year=str(t.get("year") or ""),
|
||
payload={"videoId": vid, "extractor": "youtube"}))
|
||
return pl.get("title", ""), hits
|
||
|
||
|
||
def _entry_to_hit(entry: dict) -> Hit:
|
||
"""Map a yt-dlp --flat-playlist entry to a track Hit (any platform)."""
|
||
source = _sanitize_source(entry.get("ie_key") or entry.get("extractor") or "")
|
||
vid = entry.get("id")
|
||
return Hit(source="youtube", kind="track", title=entry.get("title", ""),
|
||
artist=entry.get("uploader") or entry.get("channel") or "",
|
||
payload={"url": entry.get("url"),
|
||
"videoId": vid if source == "youtube" else None,
|
||
"extractor": source})
|
||
|
||
|
||
def odesli_resolve(url: str) -> Optional[Resolved]:
|
||
"""Resolve any streaming link to {title, artist, thumb, youtube_url} via the
|
||
Odesli (song.link) public API. Returns None on any failure (network, non-200,
|
||
malformed body, missing title+artist) so callers can fall back loudly."""
|
||
try:
|
||
resp = requests.get(ODESLI_URL,
|
||
params={"url": url, "userCountry": "US"},
|
||
timeout=8)
|
||
if resp.status_code != 200:
|
||
dbg(f"odesli {resp.status_code} for {url}")
|
||
return None
|
||
data = resp.json()
|
||
entity = data["entitiesByUniqueId"][data["entityUniqueId"]]
|
||
title = entity.get("title", "")
|
||
artist = entity.get("artistName", "")
|
||
if not title and not artist:
|
||
return None
|
||
platforms = data.get("linksByPlatform", {})
|
||
yt = (platforms.get("youtubeMusic") or platforms.get("youtube") or {}).get("url", "")
|
||
return Resolved(title=title, artist=artist,
|
||
thumb=entity.get("thumbnailUrl", ""), youtube_url=yt)
|
||
except (RequestException, ValueError, KeyError, TypeError) as e:
|
||
dbg(f"odesli resolve failed for {url}: {e}")
|
||
return None
|
||
|
||
|
||
def probe_url(url: str) -> tuple[str, str, list[Hit]]:
|
||
"""Classify a URL via yt-dlp. Returns (kind, title, hits) where kind is
|
||
'playlist' (hits populated) or 'track' (hits empty; caller downloads the URL).
|
||
YouTube playlists use ytmusicapi for richer metadata."""
|
||
if _is_youtube_playlist_url(url) and YTMusic is not None:
|
||
pid = _playlist_id(url)
|
||
if pid:
|
||
title, hits = _ytmusic_playlist(pid)
|
||
if hits:
|
||
return "playlist", title, hits
|
||
try:
|
||
result = subprocess.run(["yt-dlp", *_cookie_args(), "--flat-playlist", "-J", url],
|
||
capture_output=True, text=True, check=True)
|
||
data = json.loads(result.stdout)
|
||
except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
|
||
dbg(f"yt-dlp probe failed: {e}")
|
||
return "track", "", []
|
||
if data.get("entries") is not None or data.get("_type") == "playlist":
|
||
hits = [_entry_to_hit(e) for e in data.get("entries", [])
|
||
if e.get("id") or e.get("url")]
|
||
return "playlist", data.get("title", ""), hits
|
||
return "track", data.get("title", ""), []
|
||
|
||
|
||
def download_hits(hits: list[Hit], root: str, quality: str, dry_run: bool) -> tuple[int, int]:
|
||
"""Download each track Hit via act_youtube. Returns (ok, total)."""
|
||
ok = 0
|
||
for h in hits:
|
||
try:
|
||
if act_youtube(h, root, quality, dry_run):
|
||
ok += 1
|
||
except Exception as e: # noqa: BLE001 — one bad track shouldn't abort the batch
|
||
err(f"track failed ({h.title}): {e}")
|
||
return ok, len(hits)
|
||
|
||
|
||
def download_single(url: str, root: str, quality: str, dry_run: bool) -> dict:
|
||
"""Download a single URL (any yt-dlp site). Returns {title, artist, ok}."""
|
||
meta = run_yt_dlp_get_metadata(url)
|
||
artist = get_artist_from_metadata(meta) if meta else "Unknown Artist"
|
||
title = (meta or {}).get("title", "")
|
||
source = _sanitize_source((meta or {}).get("extractor", "")) if meta else "downloads"
|
||
# First artist only for the folder (matches the search/playlist paths).
|
||
artist_dir = artist.split(",")[0].strip() or "Unknown Artist"
|
||
target = os.path.join(root, artist_dir, source)
|
||
ok = yt_download(url, target, quality, dry_run)
|
||
return {"title": title, "artist": artist, "ok": ok}
|
||
|
||
|
||
def run_yt_dlp_get_metadata(url: str, extra_args=None) -> Optional[dict]:
|
||
cmd = ["yt-dlp", *_cookie_args(), "-j", "--no-playlist", *(extra_args or []), url]
|
||
try:
|
||
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
||
return json.loads(result.stdout)
|
||
except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
|
||
# Surface yt-dlp's own last stderr line (e.g. 429 / "not a bot") instead
|
||
# of a bare exit code — the actual reason is what you need to act on.
|
||
detail = ""
|
||
stderr = getattr(e, "stderr", "") or ""
|
||
lines = [ln for ln in stderr.strip().splitlines() if ln.strip()]
|
||
if lines:
|
||
detail = f" — {lines[-1]}"
|
||
err(f"yt-dlp metadata extraction failed for {url}{detail}")
|
||
return None
|
||
|
||
|
||
# Repair only reads tags — skip YouTube's slow/throttled JS signature step
|
||
# (we never download here), which keeps metadata but is far faster per file.
|
||
_REPAIR_META_ARGS = ["--extractor-args", "youtube:player_skip=js"]
|
||
|
||
|
||
def get_artist_from_metadata(meta: dict) -> str:
|
||
for key in ("artist", "creator", "uploader", "channel"):
|
||
if meta.get(key):
|
||
return meta[key]
|
||
if "title" in meta and " - " in meta["title"]:
|
||
return meta["title"].split(" - ", 1)[0].strip()
|
||
return "Unknown Artist"
|
||
|
||
|
||
def resolve_link_hits(url: str, limit: int) -> tuple[str, list[Hit]]:
|
||
"""Resolve a non-YouTube/SoundCloud link via Odesli into a search query plus
|
||
hits: Lidarr album candidates for "Artist - Title", followed by the EXACT
|
||
YouTube track from the shared link (not a fuzzy re-search). Raises OdesliError
|
||
if the link can't be resolved."""
|
||
r = odesli_resolve(url)
|
||
if r is None:
|
||
raise OdesliError(url)
|
||
query = f"{r.artist} - {r.title}".strip(" -")
|
||
hits = lidarr_search(query, limit)
|
||
if r.youtube_url:
|
||
hits = hits + [Hit(source="youtube", kind="track", title=r.title,
|
||
artist=r.artist, thumbnail=r.thumb,
|
||
payload={"url": r.youtube_url})]
|
||
return query, hits
|
||
|
||
|
||
def handle_link(url: str, root: str, quality: str, dry_run: bool,
|
||
noninteractive: bool, yt_first: bool, limit: int) -> None:
|
||
"""CLI path for a non-direct link: resolve via Odesli, then run the normal
|
||
Lidarr-first pick/dispatch with the exact YouTube track as fallback."""
|
||
try:
|
||
query, hits = resolve_link_hits(url, limit)
|
||
except OdesliError:
|
||
err(f"Couldn't resolve {url}. Try the direct YouTube/SoundCloud link.")
|
||
return
|
||
if not hits:
|
||
err(f"No Lidarr or YouTube source found for '{query}'.")
|
||
return
|
||
chosen = pick(hits, query, noninteractive, yt_first)
|
||
if not chosen:
|
||
print("Nothing selected.")
|
||
return
|
||
_dispatch_chosen(chosen, hits, root, quality, dry_run, False, False)
|
||
|
||
|
||
def handle_url(url: str, root: str, quality: str, dry_run: bool):
|
||
kind, title, hits = probe_url(url)
|
||
if kind == "playlist":
|
||
ok, total = download_hits(hits, root, quality, dry_run)
|
||
label = f" from '{title}'" if title else ""
|
||
print(f"Downloaded {ok}/{total} tracks{label}")
|
||
return
|
||
download_single(url, root, quality, dry_run)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Repair: re-tag existing downloads from source metadata (CLI only)
|
||
# ---------------------------------------------------------------------------
|
||
_AUDIO_EXTS = ("opus", "m4a", "mp3", "flac")
|
||
_TRACK_FILE_RE = re.compile(
|
||
r"^(?P<title>.*) \[(?P<id>[^\]]+)\]\.(?P<ext>" + "|".join(_AUDIO_EXTS) + r")$")
|
||
# m4a uses atom keys; vorbis/easy formats use plain names.
|
||
_MP4_KEYS = {"artist": "\xa9ART", "title": "\xa9nam", "album": "\xa9alb", "date": "\xa9day"}
|
||
|
||
|
||
def _is_source_dir(name: str) -> bool:
|
||
"""True for a yt-dlp-style source folder (youtube/soundcloud/…), so we skip
|
||
Lidarr album folders (which have spaces/capitals)."""
|
||
return bool(name) and name == _sanitize_source(name)
|
||
|
||
|
||
def _parse_track_file(filename: str):
|
||
"""Return (title, id) parsed from '<title> [<id>].<ext>', else None."""
|
||
m = _TRACK_FILE_RE.match(filename)
|
||
return (m.group("title"), m.group("id")) if m else None
|
||
|
||
|
||
def _repair_probe_url(source: str, vid: str):
|
||
"""Reconstruct a fetchable URL from (source, id), or None if unsupported."""
|
||
if source == "youtube":
|
||
return f"https://music.youtube.com/watch?v={vid}"
|
||
if source == "soundcloud":
|
||
return f"https://api.soundcloud.com/tracks/{vid}"
|
||
return None
|
||
|
||
|
||
def _repair_id_ok(source: str, vid: str) -> bool:
|
||
"""True if the parsed id matches the source's id format (avoids querying
|
||
junk ids pulled from bracketed descriptors like '[Official Video]')."""
|
||
if source == "youtube":
|
||
return bool(re.fullmatch(r"[A-Za-z0-9_-]{11}", vid))
|
||
if source == "soundcloud":
|
||
return vid.isdigit()
|
||
return False
|
||
|
||
|
||
def _valid_year(meta: dict) -> str:
|
||
"""A plausible release year from metadata, or '' . Uses release info only —
|
||
NOT upload_date, which is the upload year, not the song's year."""
|
||
for v in (meta.get("release_year"), (meta.get("release_date") or "")[:4]):
|
||
s = str(v or "")
|
||
if s.isdigit() and 1000 <= int(s) <= 2100:
|
||
return s
|
||
return ""
|
||
|
||
|
||
def _open_audio(path: str):
|
||
"""Return (mutagen_file, key_map) for the path's format, or (None, None)."""
|
||
import mutagen.flac
|
||
import mutagen.mp4
|
||
import mutagen.oggopus
|
||
from mutagen.easyid3 import EasyID3
|
||
ext = path.rsplit(".", 1)[-1].lower()
|
||
if ext == "opus":
|
||
return mutagen.oggopus.OggOpus(path), None
|
||
if ext == "m4a":
|
||
return mutagen.mp4.MP4(path), _MP4_KEYS
|
||
if ext == "mp3":
|
||
return EasyID3(path), None
|
||
if ext == "flac":
|
||
return mutagen.flac.FLAC(path), None
|
||
return None, None
|
||
|
||
|
||
def _read_tag(audio, key_map, field: str) -> str:
|
||
k = key_map[field] if key_map else field
|
||
val = audio.get(k)
|
||
if not val:
|
||
return ""
|
||
return str(val[0]) if isinstance(val, list) else str(val)
|
||
|
||
|
||
# Placeholder tag values the old tagging bug left behind (yt-dlp's "NA" missing
|
||
# marker, and the "Unknown *" fallbacks). Treated as empty so repair overwrites
|
||
# them rather than mistaking them for a real, present tag.
|
||
_BOGUS_TAGS = {"", "na", "n/a", "unknown", "unknown album", "unknown artist"}
|
||
|
||
|
||
def _is_bogus(value: str) -> bool:
|
||
return (value or "").strip().casefold() in _BOGUS_TAGS
|
||
|
||
|
||
def _fs_safe(name: str) -> str:
|
||
"""Filesystem-safe filename stem: mirror yt-dlp's default '/'->'⧸' so the
|
||
path stays a single segment, and drop NULs."""
|
||
return name.replace("/", "⧸").replace("\0", "").strip()
|
||
|
||
|
||
def _maybe_rename_bogus(path: str, title: str, dry_run: bool) -> tuple[str, Optional[str]]:
|
||
"""When the filename stem is a placeholder (e.g. 'NA [<id>]'), rename to
|
||
'<title> [<id>].<ext>'. Returns (current_path, change_note_or_None)."""
|
||
fname = os.path.basename(path)
|
||
parsed = _parse_track_file(fname)
|
||
if not parsed:
|
||
return path, None
|
||
stem_title, vid = parsed
|
||
if not _is_bogus(stem_title) or _is_bogus(title):
|
||
return path, None
|
||
ext = fname.rsplit(".", 1)[-1]
|
||
new_name = f"{_fs_safe(title)} [{vid}].{ext}"
|
||
new_path = os.path.join(os.path.dirname(path), new_name)
|
||
if new_path == path or not new_name:
|
||
return path, None
|
||
note = f"renamed -> {new_name}"
|
||
if dry_run:
|
||
print(f"[dry-run] would rename {fname} -> {new_name}")
|
||
return path, note
|
||
os.rename(path, new_path)
|
||
print(f"renamed {fname} -> {new_name}")
|
||
return new_path, note
|
||
|
||
|
||
def repair_file(path: str, source: str, dry_run: bool) -> list[str]:
|
||
"""Re-tag one file from source metadata. album/year are authoritative
|
||
(overwrite); artist/title are filled when MISSING *or* a known-bogus
|
||
placeholder ('NA', 'Unknown …') — the old tagging bug wrote those — but a
|
||
genuine existing tag is never clobbered with a channel name or decorated
|
||
music-video title. A bogus 'NA [<id>]' filename is renamed to the recovered
|
||
title. Returns the list of changed fields."""
|
||
parsed = _parse_track_file(os.path.basename(path))
|
||
if not parsed:
|
||
dbg(f"skip (no id): {path}")
|
||
return []
|
||
_, vid = parsed
|
||
if not _repair_id_ok(source, vid):
|
||
dbg(f"skip (bad {source} id '{vid}'): {path}")
|
||
return []
|
||
url = _repair_probe_url(source, vid)
|
||
if not url:
|
||
dbg(f"skip (source '{source}' not re-queryable): {path}")
|
||
return []
|
||
meta = run_yt_dlp_get_metadata(url, _REPAIR_META_ARGS)
|
||
if not meta:
|
||
dbg(f"skip (no metadata): {path}")
|
||
return []
|
||
|
||
try:
|
||
audio, key_map = _open_audio(path)
|
||
except Exception as e: # noqa: BLE001
|
||
err(f"cannot open {path}: {e}")
|
||
return []
|
||
if audio is None:
|
||
return []
|
||
|
||
album = (meta.get("album") or "").strip()
|
||
year = _valid_year(meta)
|
||
cur_artist = _read_tag(audio, key_map, "artist")
|
||
cur_title = _read_tag(audio, key_map, "title")
|
||
cur_album = _read_tag(audio, key_map, "album")
|
||
meta_artist = get_artist_from_metadata(meta)
|
||
meta_title = (meta.get("title") or "").strip()
|
||
|
||
updates = {}
|
||
if album:
|
||
updates["album"] = album
|
||
elif cur_album and _is_bogus(cur_album) and cur_album.strip().casefold() != "unknown album":
|
||
# No source album, but the tag is a literal 'NA' — normalise it so no
|
||
# file keeps the placeholder (a blank album is left blank, as before).
|
||
updates["album"] = "Unknown Album"
|
||
if year:
|
||
updates["date"] = year
|
||
# Fill artist/title when missing OR bogus; never overwrite a genuine value.
|
||
if meta_artist and not _is_bogus(meta_artist) and _is_bogus(cur_artist):
|
||
updates["artist"] = meta_artist
|
||
if meta_title and not _is_bogus(meta_title) and _is_bogus(cur_title):
|
||
updates["title"] = meta_title
|
||
|
||
changed = []
|
||
for field, value in updates.items():
|
||
if _read_tag(audio, key_map, field) != value:
|
||
changed.append(f"{field}={value}")
|
||
if not dry_run:
|
||
audio[key_map[field] if key_map else field] = [value]
|
||
if changed and not dry_run:
|
||
audio.save()
|
||
if changed:
|
||
prefix = "[dry-run] would set" if dry_run else "set"
|
||
print(f"{prefix} [{', '.join(changed)}] on {path}")
|
||
|
||
# Repair a placeholder filename using the final (recovered) title.
|
||
final_title = updates.get("title") or cur_title
|
||
_, rename_note = _maybe_rename_bogus(path, final_title, dry_run)
|
||
if rename_note:
|
||
changed.append(rename_note)
|
||
return changed
|
||
|
||
|
||
def repair_library(root: str, dry_run: bool, exclude=(), workers: int = 8) -> tuple[int, int]:
|
||
"""Walk <root>/<artist>/<source>/ and re-tag audio files. Returns (scanned, changed).
|
||
Each file is an independent yt-dlp network round-trip, so they run in a
|
||
thread pool (network-bound); `workers` caps concurrency. Each thread owns
|
||
its own file + request, so no shared state needs locking beyond the counts.
|
||
Lower `workers` if YouTube starts rate-limiting (HTTP 429/403)."""
|
||
if not os.path.isdir(root):
|
||
err(f"Root folder not found: {root}")
|
||
return 0, 0
|
||
|
||
def _one(path, source):
|
||
try:
|
||
return bool(repair_file(path, source, dry_run))
|
||
except Exception as e: # noqa: BLE001 — one bad file shouldn't abort
|
||
err(f"repair failed ({os.path.basename(path)}): {e}")
|
||
return False
|
||
|
||
scanned = changed = 0
|
||
files = ((p, s) for p, s, _a in _iter_source_files(root, exclude))
|
||
with ThreadPoolExecutor(max_workers=max(1, workers)) as ex:
|
||
for ok in ex.map(lambda ps: _one(*ps), files):
|
||
scanned += 1
|
||
changed += int(ok)
|
||
if scanned % 100 == 0:
|
||
print(f"… {scanned} scanned, {changed} changed", flush=True)
|
||
verb = "Would repair" if dry_run else "Repaired"
|
||
print(f"{verb} {changed}/{scanned} files")
|
||
return scanned, changed
|
||
|
||
|
||
def _iter_source_files(root: str, exclude=()):
|
||
"""Yield (path, source, artist) for audio files under <root>/<artist>/<source>/
|
||
where source is a yt-dlp source folder (Lidarr album folders are skipped).
|
||
Skips any artist or source folder whose name is in `exclude` (case-insensitive)."""
|
||
skip = {e.lower() for e in exclude}
|
||
for artist in sorted(os.listdir(root)):
|
||
if artist.lower() in skip:
|
||
continue
|
||
adir = os.path.join(root, artist)
|
||
if not os.path.isdir(adir):
|
||
continue
|
||
for source in sorted(os.listdir(adir)):
|
||
if source.lower() in skip:
|
||
continue
|
||
sdir = os.path.join(adir, source)
|
||
if not os.path.isdir(sdir) or not _is_source_dir(source):
|
||
continue
|
||
for fname in sorted(os.listdir(sdir)):
|
||
if fname.lower().endswith(_AUDIO_EXTS):
|
||
yield os.path.join(sdir, fname), source, artist
|
||
|
||
|
||
# --- Offline retag-from-path (recover from tags damaged by a prior --repair) ---
|
||
_DECORATION_RE = re.compile(
|
||
r"\s*[\(\[][^)\]]*\b(?:official|lyric[s]?|audio|visuali[sz]er|"
|
||
r"music\s+video|m/?v|hd|hq|4k|explicit|remaster(?:ed)?)\b[^)\]]*[\)\]]",
|
||
re.IGNORECASE)
|
||
|
||
|
||
def _title_from_filename(filename: str) -> str:
|
||
"""Filename minus extension and a trailing ' [<id>]'."""
|
||
stem = re.sub(r"\.(?:" + "|".join(_AUDIO_EXTS) + r")$", "", filename, flags=re.IGNORECASE)
|
||
return re.sub(r"\s*\[[^\]]+\]$", "", stem).strip()
|
||
|
||
|
||
def _strip_decorations(title: str) -> str:
|
||
return re.sub(r"\s{2,}", " ", _DECORATION_RE.sub("", title)).strip(" -–—")
|
||
|
||
|
||
def _derive_from_filename(filename: str, folder_artist: str) -> tuple[str, str]:
|
||
"""Best-effort (artist, title) from the filename. A 'Artist - Title' name wins
|
||
over the folder (handles music-video downloads filed under a channel name)."""
|
||
title = _strip_decorations(_title_from_filename(filename))
|
||
if " - " in title:
|
||
left, right = title.split(" - ", 1)
|
||
return left.strip(), right.strip()
|
||
return folder_artist, title
|
||
|
||
|
||
def retag_file_from_path(path: str, folder_artist: str, dry_run: bool) -> list[str]:
|
||
"""Overwrite artist/title from the folder + cleaned filename. Leaves album/date."""
|
||
artist, title = _derive_from_filename(os.path.basename(path), folder_artist)
|
||
try:
|
||
audio, key_map = _open_audio(path)
|
||
except Exception as e: # noqa: BLE001
|
||
err(f"cannot open {path}: {e}")
|
||
return []
|
||
if audio is None:
|
||
return []
|
||
updates = {}
|
||
if artist:
|
||
updates["artist"] = artist
|
||
if title:
|
||
updates["title"] = title
|
||
changed = []
|
||
for field, value in updates.items():
|
||
if _read_tag(audio, key_map, field) != value:
|
||
changed.append(f"{field}={value}")
|
||
if not dry_run:
|
||
audio[key_map[field] if key_map else field] = [value]
|
||
if changed and not dry_run:
|
||
audio.save()
|
||
if changed:
|
||
prefix = "[dry-run] would set" if dry_run else "set"
|
||
print(f"{prefix} [{', '.join(changed)}] on {path}")
|
||
return changed
|
||
|
||
|
||
def retag_library_from_path(root: str, dry_run: bool, exclude=()) -> tuple[int, int]:
|
||
"""Re-tag artist/title offline from folder+filename for every source file."""
|
||
if not os.path.isdir(root):
|
||
err(f"Root folder not found: {root}")
|
||
return 0, 0
|
||
scanned = changed = 0
|
||
for path, _source, artist in _iter_source_files(root, exclude):
|
||
scanned += 1
|
||
try:
|
||
if retag_file_from_path(path, artist, dry_run):
|
||
changed += 1
|
||
except Exception as e: # noqa: BLE001
|
||
err(f"retag failed ({os.path.basename(path)}): {e}")
|
||
verb = "Would retag" if dry_run else "Retagged"
|
||
print(f"{verb} {changed}/{scanned} files")
|
||
return scanned, changed
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
def build_combined_hits(query, limit, yt_first, lidarr_only, yt_only) -> list[Hit]:
|
||
lidarr_hits: list[Hit] = []
|
||
yt_hits: list[Hit] = []
|
||
with ThreadPoolExecutor(max_workers=2) as ex:
|
||
f_lid = None if yt_only else ex.submit(lidarr_search, query, limit)
|
||
f_yt = None if lidarr_only else ex.submit(youtube_search, query, limit)
|
||
if f_lid:
|
||
lidarr_hits = f_lid.result()
|
||
if f_yt:
|
||
yt_hits = f_yt.result()
|
||
return (yt_hits + lidarr_hits) if yt_first else (lidarr_hits + yt_hits)
|
||
|
||
|
||
def parse_args():
|
||
p = argparse.ArgumentParser(
|
||
prog="musicfetch",
|
||
description="Fetch music via Lidarr (preferred) or YouTube Music.")
|
||
p.add_argument("query", nargs="*", help="Free-form query or a URL.")
|
||
p.add_argument("-n", "--noninteractive", action="store_true",
|
||
help="Auto-pick the top hit, no prompt.")
|
||
p.add_argument("-s", "--ytsearch", action="store_true",
|
||
help="YouTube first instead of Lidarr first.")
|
||
p.add_argument("-d", "--dry-run", action="store_true",
|
||
help="Show actions without executing them.")
|
||
p.add_argument("-q", "--quality", choices=QUALITY_CHOICES, default="best",
|
||
help="Audio quality/format (default: best).")
|
||
p.add_argument("--limit", type=int, default=10, help="Hits per source (default 10).")
|
||
p.add_argument("--lidarr-only", action="store_true", help="Skip YouTube.")
|
||
p.add_argument("--yt-only", action="store_true", help="Skip Lidarr.")
|
||
p.add_argument("-o", "--root", default=DEFAULT_ROOT, help=f"Output root (default {DEFAULT_ROOT}).")
|
||
p.add_argument("--search-all", action="store_true",
|
||
help="Search all albums when adding an artist to Lidarr.")
|
||
p.add_argument("--repair", action="store_true",
|
||
help="Re-tag existing downloads under --root from source metadata.")
|
||
p.add_argument("--workers", type=int, default=4,
|
||
help="Parallel yt-dlp metadata fetches during --repair (default 4; "
|
||
"raise with cookies, lower if YouTube rate-limits).")
|
||
p.add_argument("--cookies", metavar="FILE",
|
||
help="Path to a yt-dlp cookies.txt (authenticated requests avoid "
|
||
"YouTube's bot-check / rate limits). Overrides $YTDLP_COOKIES.")
|
||
p.add_argument("--cookies-from-browser", metavar="BROWSER",
|
||
help="Load YouTube cookies from a local browser, e.g. firefox or "
|
||
"chrome. Overrides $YTDLP_COOKIES_FROM_BROWSER.")
|
||
p.add_argument("--retag-from-path", action="store_true",
|
||
help="Offline: re-tag artist/title from folder + filename "
|
||
"(fixes tags damaged by a prior --repair).")
|
||
p.add_argument("-x", "--exclude", action="append", default=[], metavar="NAME",
|
||
help="Folder name under --root to skip during --repair/--retag-from-path "
|
||
"(repeatable, e.g. -x Unsorted -x playlists).")
|
||
p.add_argument("--debug", action="store_true", help="Verbose output.")
|
||
return p.parse_args()
|
||
|
||
|
||
def _dispatch_chosen(chosen: Hit, hits: list[Hit], root: str, quality: str,
|
||
dry_run: bool, lidarr_only: bool, search_all: bool) -> None:
|
||
"""Act on a picked Hit: Lidarr album (add+search, fall to top YouTube hit on
|
||
no release), Lidarr artist, or a YouTube track. Shared by main() and handle_link."""
|
||
if chosen.source == "lidarr":
|
||
if chosen.kind == "album":
|
||
handled = act_lidarr_album(chosen, root, search_all, dry_run)
|
||
if not handled and not lidarr_only:
|
||
yt_fallback = next((h for h in hits if h.source == "youtube"), None)
|
||
if yt_fallback:
|
||
print("Using top YouTube hit as fallback.")
|
||
act_youtube(yt_fallback, root, quality, dry_run)
|
||
else:
|
||
print("No YouTube fallback available.")
|
||
else:
|
||
act_lidarr_artist(chosen, root, search_all, dry_run)
|
||
else:
|
||
act_youtube(chosen, root, quality, dry_run)
|
||
|
||
|
||
def main():
|
||
global DEBUG, COOKIES_FILE, COOKIES_FROM_BROWSER
|
||
args = parse_args()
|
||
DEBUG = args.debug
|
||
if args.cookies:
|
||
COOKIES_FILE = args.cookies
|
||
if args.cookies_from_browser:
|
||
COOKIES_FROM_BROWSER = args.cookies_from_browser
|
||
query = " ".join(args.query).strip()
|
||
|
||
if args.retag_from_path:
|
||
retag_library_from_path(args.root, args.dry_run, args.exclude)
|
||
return
|
||
|
||
if args.repair:
|
||
repair_library(args.root, args.dry_run, args.exclude, args.workers)
|
||
return
|
||
|
||
if not query:
|
||
err("Provide a query/URL, or use --repair. See --help.")
|
||
sys.exit(1)
|
||
|
||
if args.lidarr_only and args.yt_only:
|
||
err("--lidarr-only and --yt-only are mutually exclusive.")
|
||
sys.exit(1)
|
||
|
||
if is_url(query):
|
||
if _is_direct_url(query):
|
||
handle_url(query, args.root, args.quality, args.dry_run)
|
||
else:
|
||
handle_link(query, args.root, args.quality, args.dry_run,
|
||
args.noninteractive, args.ytsearch, args.limit)
|
||
return
|
||
|
||
hits = build_combined_hits(query, args.limit, args.ytsearch,
|
||
args.lidarr_only, args.yt_only)
|
||
if not hits:
|
||
print("No hits found from any source.")
|
||
sys.exit(1)
|
||
|
||
chosen = pick(hits, query, args.noninteractive, args.ytsearch)
|
||
if not chosen:
|
||
print("Nothing selected.")
|
||
return
|
||
|
||
_dispatch_chosen(chosen, hits, args.root, args.quality, args.dry_run,
|
||
args.lidarr_only, args.search_all)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|