Merge feat/repair: re-tag existing downloads via --repair
This commit is contained in:
27
README.md
27
README.md
@@ -94,6 +94,7 @@ export LIDARR_API_KEY="your-lidarr-api-key"
|
|||||||
| `--yt-only` | Skip Lidarr. |
|
| `--yt-only` | Skip Lidarr. |
|
||||||
| `-o`, `--root PATH` | Output root folder (default `/media/music`). |
|
| `-o`, `--root PATH` | Output root folder (default `/media/music`). |
|
||||||
| `--search-all` | Search all albums when adding an artist to Lidarr. |
|
| `--search-all` | Search all albums when adding an artist to Lidarr. |
|
||||||
|
| `--repair` | Re-tag existing downloads under `--root` from source metadata (see below). |
|
||||||
| `--debug` | Verbose output. |
|
| `--debug` | Verbose output. |
|
||||||
|
|
||||||
### Examples
|
### Examples
|
||||||
@@ -115,8 +116,26 @@ export LIDARR_API_KEY="your-lidarr-api-key"
|
|||||||
# YouTube only, lossless preferred
|
# YouTube only, lossless preferred
|
||||||
./musicfetch --yt-only -q flac "Bonobo - Kerala"
|
./musicfetch --yt-only -q flac "Bonobo - Kerala"
|
||||||
|
|
||||||
# Download by URL (YouTube Music URL preferred for correct art)
|
# Download by URL (single track or playlist/set/album, any yt-dlp site)
|
||||||
./musicfetch "https://music.youtube.com/watch?v=xxxxxxxxxxx"
|
./musicfetch "https://music.youtube.com/watch?v=xxxxxxxxxxx"
|
||||||
|
./musicfetch "https://soundcloud.com/artist/sets/my-mix"
|
||||||
|
```
|
||||||
|
|
||||||
|
### 🔧 Repair existing tags
|
||||||
|
|
||||||
|
`--repair` walks `<root>/<artist>/<source>/` (the `youtube`/`soundcloud`/… download
|
||||||
|
folders — Lidarr album folders are skipped), re-fetches authoritative metadata for each
|
||||||
|
file using the `[id]` in its filename, and fixes tags (album, year, artist, title). Useful
|
||||||
|
when downloads landed with missing album or wrong year. It re-queries the source over the
|
||||||
|
network, so run it occasionally, not constantly. Requires `mutagen` (a yt-dlp dependency,
|
||||||
|
usually already present). CLI-only — not exposed via the REST API.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Preview what would change (writes nothing)
|
||||||
|
./musicfetch --repair -d
|
||||||
|
|
||||||
|
# Apply fixes under a specific root
|
||||||
|
./musicfetch --repair -o /media/music
|
||||||
```
|
```
|
||||||
|
|
||||||
### 📁 Output Structure
|
### 📁 Output Structure
|
||||||
@@ -124,8 +143,10 @@ export LIDARR_API_KEY="your-lidarr-api-key"
|
|||||||
```text
|
```text
|
||||||
<root>/
|
<root>/
|
||||||
├── Artist Name/
|
├── Artist Name/
|
||||||
│ ├── Album Name/ (managed by Lidarr)
|
│ ├── Album Name/ (managed by Lidarr)
|
||||||
│ └── youtube/ (yt-dlp downloads / fallbacks)
|
│ ├── youtube/ (YouTube / YouTube Music downloads)
|
||||||
|
│ ├── soundcloud/ (SoundCloud downloads)
|
||||||
|
│ └── <source>/ (one folder per yt-dlp source)
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|||||||
148
musicfetch
148
musicfetch
@@ -796,6 +796,142 @@ def handle_url(url: str, root: str, quality: str, dry_run: bool):
|
|||||||
download_single(url, root, quality, dry_run)
|
download_single(url, root, quality, dry_run)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Repair: re-tag existing downloads from source metadata (CLI only)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
_AUDIO_EXTS = ("opus", "m4a", "mp3", "flac")
|
||||||
|
_TRACK_FILE_RE = re.compile(
|
||||||
|
r"^(?P<title>.*) \[(?P<id>[^\]]+)\]\.(?P<ext>" + "|".join(_AUDIO_EXTS) + r")$")
|
||||||
|
# m4a uses atom keys; vorbis/easy formats use plain names.
|
||||||
|
_MP4_KEYS = {"artist": "\xa9ART", "title": "\xa9nam", "album": "\xa9alb", "date": "\xa9day"}
|
||||||
|
|
||||||
|
|
||||||
|
def _is_source_dir(name: str) -> bool:
|
||||||
|
"""True for a yt-dlp-style source folder (youtube/soundcloud/…), so we skip
|
||||||
|
Lidarr album folders (which have spaces/capitals)."""
|
||||||
|
return bool(name) and name == _sanitize_source(name)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_track_file(filename: str):
|
||||||
|
"""Return (title, id) parsed from '<title> [<id>].<ext>', else None."""
|
||||||
|
m = _TRACK_FILE_RE.match(filename)
|
||||||
|
return (m.group("title"), m.group("id")) if m else None
|
||||||
|
|
||||||
|
|
||||||
|
def _repair_probe_url(source: str, vid: str):
|
||||||
|
"""Reconstruct a fetchable URL from (source, id), or None if unsupported."""
|
||||||
|
if source == "youtube":
|
||||||
|
return f"https://music.youtube.com/watch?v={vid}"
|
||||||
|
if source == "soundcloud":
|
||||||
|
return f"https://api.soundcloud.com/tracks/{vid}"
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _desired_tags(meta: dict) -> dict:
|
||||||
|
"""Authoritative tags from yt-dlp metadata (omit empties)."""
|
||||||
|
year = (str(meta.get("release_year") or "")
|
||||||
|
or (meta.get("release_date") or "")[:4]
|
||||||
|
or (meta.get("upload_date") or "")[:4])
|
||||||
|
fields = {
|
||||||
|
"artist": get_artist_from_metadata(meta),
|
||||||
|
"title": meta.get("title", ""),
|
||||||
|
"album": meta.get("album", ""),
|
||||||
|
"date": year,
|
||||||
|
}
|
||||||
|
return {k: v for k, v in fields.items() if v and v != "Unknown Artist"}
|
||||||
|
|
||||||
|
|
||||||
|
def _open_audio(path: str):
|
||||||
|
"""Return (mutagen_file, key_map) for the path's format, or (None, None)."""
|
||||||
|
import mutagen.flac
|
||||||
|
import mutagen.mp4
|
||||||
|
import mutagen.oggopus
|
||||||
|
from mutagen.easyid3 import EasyID3
|
||||||
|
ext = path.rsplit(".", 1)[-1].lower()
|
||||||
|
if ext == "opus":
|
||||||
|
return mutagen.oggopus.OggOpus(path), None
|
||||||
|
if ext == "m4a":
|
||||||
|
return mutagen.mp4.MP4(path), _MP4_KEYS
|
||||||
|
if ext == "mp3":
|
||||||
|
return EasyID3(path), None
|
||||||
|
if ext == "flac":
|
||||||
|
return mutagen.flac.FLAC(path), None
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
|
def _read_tag(audio, key_map, field: str) -> str:
|
||||||
|
k = key_map[field] if key_map else field
|
||||||
|
val = audio.get(k)
|
||||||
|
if not val:
|
||||||
|
return ""
|
||||||
|
return str(val[0]) if isinstance(val, list) else str(val)
|
||||||
|
|
||||||
|
|
||||||
|
def repair_file(path: str, source: str, dry_run: bool) -> list[str]:
|
||||||
|
"""Re-tag one file from source metadata. Returns the list of changed fields."""
|
||||||
|
parsed = _parse_track_file(os.path.basename(path))
|
||||||
|
if not parsed:
|
||||||
|
dbg(f"skip (no id): {path}")
|
||||||
|
return []
|
||||||
|
_, vid = parsed
|
||||||
|
url = _repair_probe_url(source, vid)
|
||||||
|
if not url:
|
||||||
|
dbg(f"skip (source '{source}' not re-queryable): {path}")
|
||||||
|
return []
|
||||||
|
meta = run_yt_dlp_get_metadata(url)
|
||||||
|
if not meta:
|
||||||
|
dbg(f"skip (no metadata): {path}")
|
||||||
|
return []
|
||||||
|
desired = _desired_tags(meta)
|
||||||
|
try:
|
||||||
|
audio, key_map = _open_audio(path)
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
err(f"cannot open {path}: {e}")
|
||||||
|
return []
|
||||||
|
if audio is None:
|
||||||
|
return []
|
||||||
|
changed = []
|
||||||
|
for field, value in desired.items():
|
||||||
|
if _read_tag(audio, key_map, field) != value:
|
||||||
|
changed.append(f"{field}={value}")
|
||||||
|
if not dry_run:
|
||||||
|
audio[key_map[field] if key_map else field] = [value]
|
||||||
|
if changed and not dry_run:
|
||||||
|
audio.save()
|
||||||
|
if changed:
|
||||||
|
prefix = "[dry-run] would set" if dry_run else "set"
|
||||||
|
print(f"{prefix} [{', '.join(changed)}] on {path}")
|
||||||
|
return changed
|
||||||
|
|
||||||
|
|
||||||
|
def repair_library(root: str, dry_run: bool) -> tuple[int, int]:
|
||||||
|
"""Walk <root>/<artist>/<source>/ and re-tag audio files. Returns (scanned, changed)."""
|
||||||
|
if not os.path.isdir(root):
|
||||||
|
err(f"Root folder not found: {root}")
|
||||||
|
return 0, 0
|
||||||
|
scanned = changed = 0
|
||||||
|
for artist in sorted(os.listdir(root)):
|
||||||
|
adir = os.path.join(root, artist)
|
||||||
|
if not os.path.isdir(adir):
|
||||||
|
continue
|
||||||
|
for source in sorted(os.listdir(adir)):
|
||||||
|
sdir = os.path.join(adir, source)
|
||||||
|
if not os.path.isdir(sdir) or not _is_source_dir(source):
|
||||||
|
continue
|
||||||
|
for fname in sorted(os.listdir(sdir)):
|
||||||
|
if not fname.lower().endswith(_AUDIO_EXTS):
|
||||||
|
continue
|
||||||
|
scanned += 1
|
||||||
|
try:
|
||||||
|
if repair_file(os.path.join(sdir, fname), source, dry_run):
|
||||||
|
changed += 1
|
||||||
|
except Exception as e: # noqa: BLE001 — one bad file shouldn't abort
|
||||||
|
err(f"repair failed ({fname}): {e}")
|
||||||
|
verb = "Would repair" if dry_run else "Repaired"
|
||||||
|
print(f"{verb} {changed}/{scanned} files")
|
||||||
|
return scanned, changed
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Main
|
# Main
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -816,7 +952,7 @@ def parse_args():
|
|||||||
p = argparse.ArgumentParser(
|
p = argparse.ArgumentParser(
|
||||||
prog="musicfetch",
|
prog="musicfetch",
|
||||||
description="Fetch music via Lidarr (preferred) or YouTube Music.")
|
description="Fetch music via Lidarr (preferred) or YouTube Music.")
|
||||||
p.add_argument("query", nargs="+", help="Free-form query or a URL.")
|
p.add_argument("query", nargs="*", help="Free-form query or a URL.")
|
||||||
p.add_argument("-n", "--noninteractive", action="store_true",
|
p.add_argument("-n", "--noninteractive", action="store_true",
|
||||||
help="Auto-pick the top hit, no prompt.")
|
help="Auto-pick the top hit, no prompt.")
|
||||||
p.add_argument("-s", "--ytsearch", action="store_true",
|
p.add_argument("-s", "--ytsearch", action="store_true",
|
||||||
@@ -831,6 +967,8 @@ def parse_args():
|
|||||||
p.add_argument("-o", "--root", default=DEFAULT_ROOT, help=f"Output root (default {DEFAULT_ROOT}).")
|
p.add_argument("-o", "--root", default=DEFAULT_ROOT, help=f"Output root (default {DEFAULT_ROOT}).")
|
||||||
p.add_argument("--search-all", action="store_true",
|
p.add_argument("--search-all", action="store_true",
|
||||||
help="Search all albums when adding an artist to Lidarr.")
|
help="Search all albums when adding an artist to Lidarr.")
|
||||||
|
p.add_argument("--repair", action="store_true",
|
||||||
|
help="Re-tag existing downloads under --root from source metadata.")
|
||||||
p.add_argument("--debug", action="store_true", help="Verbose output.")
|
p.add_argument("--debug", action="store_true", help="Verbose output.")
|
||||||
return p.parse_args()
|
return p.parse_args()
|
||||||
|
|
||||||
@@ -841,6 +979,14 @@ def main():
|
|||||||
DEBUG = args.debug
|
DEBUG = args.debug
|
||||||
query = " ".join(args.query).strip()
|
query = " ".join(args.query).strip()
|
||||||
|
|
||||||
|
if args.repair:
|
||||||
|
repair_library(args.root, args.dry_run)
|
||||||
|
return
|
||||||
|
|
||||||
|
if not query:
|
||||||
|
err("Provide a query/URL, or use --repair. See --help.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
if args.lidarr_only and args.yt_only:
|
if args.lidarr_only and args.yt_only:
|
||||||
err("--lidarr-only and --yt-only are mutually exclusive.")
|
err("--lidarr-only and --yt-only are mutually exclusive.")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|||||||
118
tests/test_repair.py
Normal file
118
tests/test_repair.py
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
import server.mf # noqa: F401 — loads musicfetch, registers musicfetch_core
|
||||||
|
import musicfetch_core as mf
|
||||||
|
|
||||||
|
|
||||||
|
# ---- _is_source_dir ----
|
||||||
|
def test_is_source_dir():
|
||||||
|
assert mf._is_source_dir("youtube") is True
|
||||||
|
assert mf._is_source_dir("soundcloud") is True
|
||||||
|
assert mf._is_source_dir("downloads") is True
|
||||||
|
assert mf._is_source_dir("Discovery") is False # Lidarr album folder
|
||||||
|
assert mf._is_source_dir("Random Access Memories") is False
|
||||||
|
assert mf._is_source_dir("") is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---- _parse_track_file ----
|
||||||
|
def test_parse_track_file():
|
||||||
|
assert mf._parse_track_file("Under My Skin [nGSNF2l44Zc].opus") == ("Under My Skin", "nGSNF2l44Zc")
|
||||||
|
assert mf._parse_track_file("Ignomon [2202690443].m4a") == ("Ignomon", "2202690443")
|
||||||
|
assert mf._parse_track_file("no-id-here.opus") is None
|
||||||
|
assert mf._parse_track_file("cover.jpg") is None
|
||||||
|
|
||||||
|
|
||||||
|
# ---- _repair_probe_url ----
|
||||||
|
def test_repair_probe_url():
|
||||||
|
assert mf._repair_probe_url("youtube", "vid") == "https://music.youtube.com/watch?v=vid"
|
||||||
|
assert mf._repair_probe_url("soundcloud", "123") == "https://api.soundcloud.com/tracks/123"
|
||||||
|
assert mf._repair_probe_url("bandcamp", "x") is None
|
||||||
|
|
||||||
|
|
||||||
|
# ---- _desired_tags ----
|
||||||
|
def test_desired_tags_full():
|
||||||
|
meta = {"artist": "Daft Punk", "title": "Aerodynamic", "album": "Discovery", "release_year": 2001}
|
||||||
|
assert mf._desired_tags(meta) == {"artist": "Daft Punk", "title": "Aerodynamic",
|
||||||
|
"album": "Discovery", "date": "2001"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_desired_tags_year_fallbacks_and_omits_empty():
|
||||||
|
meta = {"title": "T", "uploader": "Chan", "upload_date": "20230102"} # no album, no release_year
|
||||||
|
out = mf._desired_tags(meta)
|
||||||
|
assert out["date"] == "2023"
|
||||||
|
assert out["title"] == "T"
|
||||||
|
assert out["artist"] == "Chan"
|
||||||
|
assert "album" not in out # omitted when empty
|
||||||
|
|
||||||
|
|
||||||
|
def test_desired_tags_drops_unknown_artist():
|
||||||
|
meta = {"title": "T"} # get_artist_from_metadata -> "Unknown Artist"
|
||||||
|
assert "artist" not in mf._desired_tags(meta)
|
||||||
|
|
||||||
|
|
||||||
|
# ---- repair_file (fake audio + mocked metadata) ----
|
||||||
|
class _FakeAudio(dict):
|
||||||
|
def __init__(self, initial):
|
||||||
|
super().__init__(initial)
|
||||||
|
self.saved = False
|
||||||
|
|
||||||
|
def save(self):
|
||||||
|
self.saved = True
|
||||||
|
|
||||||
|
|
||||||
|
def test_repair_file_writes_changed_fields(monkeypatch):
|
||||||
|
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||||
|
lambda url: {"artist": "Daft Punk", "title": "Aerodynamic",
|
||||||
|
"album": "Discovery", "release_year": 2001})
|
||||||
|
audio = _FakeAudio({"artist": ["Daft Punk"], "title": ["Aerodynamic"]}) # album/date missing
|
||||||
|
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||||
|
changed = mf.repair_file("X/youtube/Aerodynamic [vid].opus", "youtube", dry_run=False)
|
||||||
|
assert set(changed) == {"album=Discovery", "date=2001"}
|
||||||
|
assert audio["album"] == ["Discovery"]
|
||||||
|
assert audio["date"] == ["2001"]
|
||||||
|
assert audio.saved is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_repair_file_dry_run_writes_nothing(monkeypatch):
|
||||||
|
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||||
|
lambda url: {"artist": "A", "title": "T", "album": "Alb", "release_year": 2020})
|
||||||
|
audio = _FakeAudio({})
|
||||||
|
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||||
|
changed = mf.repair_file("X/youtube/T [vid].opus", "youtube", dry_run=True)
|
||||||
|
assert changed # reports would-change
|
||||||
|
assert audio == {} # nothing written
|
||||||
|
assert audio.saved is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_repair_file_skips_unparseable(monkeypatch):
|
||||||
|
called = {"meta": False}
|
||||||
|
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||||
|
lambda url: called.update(meta=True) or {})
|
||||||
|
assert mf.repair_file("X/youtube/no-id.opus", "youtube", dry_run=False) == []
|
||||||
|
assert called["meta"] is False # never hit the network
|
||||||
|
|
||||||
|
|
||||||
|
def test_repair_file_skips_unqueryable_source(monkeypatch):
|
||||||
|
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata", lambda url: {"title": "x"})
|
||||||
|
assert mf.repair_file("X/bandcamp/T [id].m4a", "bandcamp", dry_run=False) == []
|
||||||
|
|
||||||
|
|
||||||
|
# ---- repair_library (real temp tree, repair_file mocked) ----
|
||||||
|
def test_repair_library_scans_only_source_dirs(tmp_path, monkeypatch):
|
||||||
|
root = tmp_path
|
||||||
|
(root / "Daft Punk" / "youtube").mkdir(parents=True)
|
||||||
|
(root / "Daft Punk" / "youtube" / "Aerodynamic [vid].opus").write_text("x")
|
||||||
|
(root / "Daft Punk" / "Discovery").mkdir(parents=True) # Lidarr album -> skip
|
||||||
|
(root / "Daft Punk" / "Discovery" / "Aerodynamic.flac").write_text("x")
|
||||||
|
(root / "Ephixa" / "soundcloud").mkdir(parents=True)
|
||||||
|
(root / "Ephixa" / "soundcloud" / "Ignomon [123].m4a").write_text("x")
|
||||||
|
|
||||||
|
visited = []
|
||||||
|
monkeypatch.setattr(mf, "repair_file",
|
||||||
|
lambda path, source, dry_run: visited.append((source, path)) or ["album=X"])
|
||||||
|
scanned, changed = mf.repair_library(str(root), dry_run=False)
|
||||||
|
assert scanned == 2 and changed == 2
|
||||||
|
sources = sorted(s for s, _ in visited)
|
||||||
|
assert sources == ["soundcloud", "youtube"] # Discovery album folder skipped
|
||||||
|
|
||||||
|
|
||||||
|
def test_repair_library_missing_root(monkeypatch):
|
||||||
|
assert mf.repair_library("/no/such/dir", dry_run=False) == (0, 0)
|
||||||
Reference in New Issue
Block a user