feat: --repair flag to re-tag existing downloads from source metadata
Walks <root>/<artist>/<source>/ (known yt-dlp source folders only; skips Lidarr album dirs), re-queries each file's source by the [id] in its filename, and fixes tags (album/year/artist/title) via mutagen. Honors --dry-run for preview. CLI-only (not the REST API). Fixes downloads that landed with missing album / wrong year. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
27
README.md
27
README.md
@@ -94,6 +94,7 @@ export LIDARR_API_KEY="your-lidarr-api-key"
|
||||
| `--yt-only` | Skip Lidarr. |
|
||||
| `-o`, `--root PATH` | Output root folder (default `/media/music`). |
|
||||
| `--search-all` | Search all albums when adding an artist to Lidarr. |
|
||||
| `--repair` | Re-tag existing downloads under `--root` from source metadata (see below). |
|
||||
| `--debug` | Verbose output. |
|
||||
|
||||
### Examples
|
||||
@@ -115,8 +116,26 @@ export LIDARR_API_KEY="your-lidarr-api-key"
|
||||
# YouTube only, lossless preferred
|
||||
./musicfetch --yt-only -q flac "Bonobo - Kerala"
|
||||
|
||||
# Download by URL (YouTube Music URL preferred for correct art)
|
||||
# Download by URL (single track or playlist/set/album, any yt-dlp site)
|
||||
./musicfetch "https://music.youtube.com/watch?v=xxxxxxxxxxx"
|
||||
./musicfetch "https://soundcloud.com/artist/sets/my-mix"
|
||||
```
|
||||
|
||||
### 🔧 Repair existing tags
|
||||
|
||||
`--repair` walks `<root>/<artist>/<source>/` (the `youtube`/`soundcloud`/… download
|
||||
folders — Lidarr album folders are skipped), re-fetches authoritative metadata for each
|
||||
file using the `[id]` in its filename, and fixes tags (album, year, artist, title). Useful
|
||||
when downloads landed with missing album or wrong year. It re-queries the source over the
|
||||
network, so run it occasionally, not constantly. Requires `mutagen` (a yt-dlp dependency,
|
||||
usually already present). CLI-only — not exposed via the REST API.
|
||||
|
||||
```bash
|
||||
# Preview what would change (writes nothing)
|
||||
./musicfetch --repair -d
|
||||
|
||||
# Apply fixes under a specific root
|
||||
./musicfetch --repair -o /media/music
|
||||
```
|
||||
|
||||
### 📁 Output Structure
|
||||
@@ -124,8 +143,10 @@ export LIDARR_API_KEY="your-lidarr-api-key"
|
||||
```text
|
||||
<root>/
|
||||
├── Artist Name/
|
||||
│ ├── Album Name/ (managed by Lidarr)
|
||||
│ └── youtube/ (yt-dlp downloads / fallbacks)
|
||||
│ ├── Album Name/ (managed by Lidarr)
|
||||
│ ├── youtube/ (YouTube / YouTube Music downloads)
|
||||
│ ├── soundcloud/ (SoundCloud downloads)
|
||||
│ └── <source>/ (one folder per yt-dlp source)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
148
musicfetch
148
musicfetch
@@ -796,6 +796,142 @@ def handle_url(url: str, root: str, quality: str, dry_run: bool):
|
||||
download_single(url, root, quality, dry_run)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Repair: re-tag existing downloads from source metadata (CLI only)
|
||||
# ---------------------------------------------------------------------------
|
||||
_AUDIO_EXTS = ("opus", "m4a", "mp3", "flac")
|
||||
_TRACK_FILE_RE = re.compile(
|
||||
r"^(?P<title>.*) \[(?P<id>[^\]]+)\]\.(?P<ext>" + "|".join(_AUDIO_EXTS) + r")$")
|
||||
# m4a uses atom keys; vorbis/easy formats use plain names.
|
||||
_MP4_KEYS = {"artist": "\xa9ART", "title": "\xa9nam", "album": "\xa9alb", "date": "\xa9day"}
|
||||
|
||||
|
||||
def _is_source_dir(name: str) -> bool:
|
||||
"""True for a yt-dlp-style source folder (youtube/soundcloud/…), so we skip
|
||||
Lidarr album folders (which have spaces/capitals)."""
|
||||
return bool(name) and name == _sanitize_source(name)
|
||||
|
||||
|
||||
def _parse_track_file(filename: str):
|
||||
"""Return (title, id) parsed from '<title> [<id>].<ext>', else None."""
|
||||
m = _TRACK_FILE_RE.match(filename)
|
||||
return (m.group("title"), m.group("id")) if m else None
|
||||
|
||||
|
||||
def _repair_probe_url(source: str, vid: str):
|
||||
"""Reconstruct a fetchable URL from (source, id), or None if unsupported."""
|
||||
if source == "youtube":
|
||||
return f"https://music.youtube.com/watch?v={vid}"
|
||||
if source == "soundcloud":
|
||||
return f"https://api.soundcloud.com/tracks/{vid}"
|
||||
return None
|
||||
|
||||
|
||||
def _desired_tags(meta: dict) -> dict:
|
||||
"""Authoritative tags from yt-dlp metadata (omit empties)."""
|
||||
year = (str(meta.get("release_year") or "")
|
||||
or (meta.get("release_date") or "")[:4]
|
||||
or (meta.get("upload_date") or "")[:4])
|
||||
fields = {
|
||||
"artist": get_artist_from_metadata(meta),
|
||||
"title": meta.get("title", ""),
|
||||
"album": meta.get("album", ""),
|
||||
"date": year,
|
||||
}
|
||||
return {k: v for k, v in fields.items() if v and v != "Unknown Artist"}
|
||||
|
||||
|
||||
def _open_audio(path: str):
|
||||
"""Return (mutagen_file, key_map) for the path's format, or (None, None)."""
|
||||
import mutagen.flac
|
||||
import mutagen.mp4
|
||||
import mutagen.oggopus
|
||||
from mutagen.easyid3 import EasyID3
|
||||
ext = path.rsplit(".", 1)[-1].lower()
|
||||
if ext == "opus":
|
||||
return mutagen.oggopus.OggOpus(path), None
|
||||
if ext == "m4a":
|
||||
return mutagen.mp4.MP4(path), _MP4_KEYS
|
||||
if ext == "mp3":
|
||||
return EasyID3(path), None
|
||||
if ext == "flac":
|
||||
return mutagen.flac.FLAC(path), None
|
||||
return None, None
|
||||
|
||||
|
||||
def _read_tag(audio, key_map, field: str) -> str:
|
||||
k = key_map[field] if key_map else field
|
||||
val = audio.get(k)
|
||||
if not val:
|
||||
return ""
|
||||
return str(val[0]) if isinstance(val, list) else str(val)
|
||||
|
||||
|
||||
def repair_file(path: str, source: str, dry_run: bool) -> list[str]:
|
||||
"""Re-tag one file from source metadata. Returns the list of changed fields."""
|
||||
parsed = _parse_track_file(os.path.basename(path))
|
||||
if not parsed:
|
||||
dbg(f"skip (no id): {path}")
|
||||
return []
|
||||
_, vid = parsed
|
||||
url = _repair_probe_url(source, vid)
|
||||
if not url:
|
||||
dbg(f"skip (source '{source}' not re-queryable): {path}")
|
||||
return []
|
||||
meta = run_yt_dlp_get_metadata(url)
|
||||
if not meta:
|
||||
dbg(f"skip (no metadata): {path}")
|
||||
return []
|
||||
desired = _desired_tags(meta)
|
||||
try:
|
||||
audio, key_map = _open_audio(path)
|
||||
except Exception as e: # noqa: BLE001
|
||||
err(f"cannot open {path}: {e}")
|
||||
return []
|
||||
if audio is None:
|
||||
return []
|
||||
changed = []
|
||||
for field, value in desired.items():
|
||||
if _read_tag(audio, key_map, field) != value:
|
||||
changed.append(f"{field}={value}")
|
||||
if not dry_run:
|
||||
audio[key_map[field] if key_map else field] = [value]
|
||||
if changed and not dry_run:
|
||||
audio.save()
|
||||
if changed:
|
||||
prefix = "[dry-run] would set" if dry_run else "set"
|
||||
print(f"{prefix} [{', '.join(changed)}] on {path}")
|
||||
return changed
|
||||
|
||||
|
||||
def repair_library(root: str, dry_run: bool) -> tuple[int, int]:
|
||||
"""Walk <root>/<artist>/<source>/ and re-tag audio files. Returns (scanned, changed)."""
|
||||
if not os.path.isdir(root):
|
||||
err(f"Root folder not found: {root}")
|
||||
return 0, 0
|
||||
scanned = changed = 0
|
||||
for artist in sorted(os.listdir(root)):
|
||||
adir = os.path.join(root, artist)
|
||||
if not os.path.isdir(adir):
|
||||
continue
|
||||
for source in sorted(os.listdir(adir)):
|
||||
sdir = os.path.join(adir, source)
|
||||
if not os.path.isdir(sdir) or not _is_source_dir(source):
|
||||
continue
|
||||
for fname in sorted(os.listdir(sdir)):
|
||||
if not fname.lower().endswith(_AUDIO_EXTS):
|
||||
continue
|
||||
scanned += 1
|
||||
try:
|
||||
if repair_file(os.path.join(sdir, fname), source, dry_run):
|
||||
changed += 1
|
||||
except Exception as e: # noqa: BLE001 — one bad file shouldn't abort
|
||||
err(f"repair failed ({fname}): {e}")
|
||||
verb = "Would repair" if dry_run else "Repaired"
|
||||
print(f"{verb} {changed}/{scanned} files")
|
||||
return scanned, changed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -816,7 +952,7 @@ def parse_args():
|
||||
p = argparse.ArgumentParser(
|
||||
prog="musicfetch",
|
||||
description="Fetch music via Lidarr (preferred) or YouTube Music.")
|
||||
p.add_argument("query", nargs="+", help="Free-form query or a URL.")
|
||||
p.add_argument("query", nargs="*", help="Free-form query or a URL.")
|
||||
p.add_argument("-n", "--noninteractive", action="store_true",
|
||||
help="Auto-pick the top hit, no prompt.")
|
||||
p.add_argument("-s", "--ytsearch", action="store_true",
|
||||
@@ -831,6 +967,8 @@ def parse_args():
|
||||
p.add_argument("-o", "--root", default=DEFAULT_ROOT, help=f"Output root (default {DEFAULT_ROOT}).")
|
||||
p.add_argument("--search-all", action="store_true",
|
||||
help="Search all albums when adding an artist to Lidarr.")
|
||||
p.add_argument("--repair", action="store_true",
|
||||
help="Re-tag existing downloads under --root from source metadata.")
|
||||
p.add_argument("--debug", action="store_true", help="Verbose output.")
|
||||
return p.parse_args()
|
||||
|
||||
@@ -841,6 +979,14 @@ def main():
|
||||
DEBUG = args.debug
|
||||
query = " ".join(args.query).strip()
|
||||
|
||||
if args.repair:
|
||||
repair_library(args.root, args.dry_run)
|
||||
return
|
||||
|
||||
if not query:
|
||||
err("Provide a query/URL, or use --repair. See --help.")
|
||||
sys.exit(1)
|
||||
|
||||
if args.lidarr_only and args.yt_only:
|
||||
err("--lidarr-only and --yt-only are mutually exclusive.")
|
||||
sys.exit(1)
|
||||
|
||||
118
tests/test_repair.py
Normal file
118
tests/test_repair.py
Normal file
@@ -0,0 +1,118 @@
|
||||
import server.mf # noqa: F401 — loads musicfetch, registers musicfetch_core
|
||||
import musicfetch_core as mf
|
||||
|
||||
|
||||
# ---- _is_source_dir ----
|
||||
def test_is_source_dir():
|
||||
assert mf._is_source_dir("youtube") is True
|
||||
assert mf._is_source_dir("soundcloud") is True
|
||||
assert mf._is_source_dir("downloads") is True
|
||||
assert mf._is_source_dir("Discovery") is False # Lidarr album folder
|
||||
assert mf._is_source_dir("Random Access Memories") is False
|
||||
assert mf._is_source_dir("") is False
|
||||
|
||||
|
||||
# ---- _parse_track_file ----
|
||||
def test_parse_track_file():
|
||||
assert mf._parse_track_file("Under My Skin [nGSNF2l44Zc].opus") == ("Under My Skin", "nGSNF2l44Zc")
|
||||
assert mf._parse_track_file("Ignomon [2202690443].m4a") == ("Ignomon", "2202690443")
|
||||
assert mf._parse_track_file("no-id-here.opus") is None
|
||||
assert mf._parse_track_file("cover.jpg") is None
|
||||
|
||||
|
||||
# ---- _repair_probe_url ----
|
||||
def test_repair_probe_url():
|
||||
assert mf._repair_probe_url("youtube", "vid") == "https://music.youtube.com/watch?v=vid"
|
||||
assert mf._repair_probe_url("soundcloud", "123") == "https://api.soundcloud.com/tracks/123"
|
||||
assert mf._repair_probe_url("bandcamp", "x") is None
|
||||
|
||||
|
||||
# ---- _desired_tags ----
|
||||
def test_desired_tags_full():
|
||||
meta = {"artist": "Daft Punk", "title": "Aerodynamic", "album": "Discovery", "release_year": 2001}
|
||||
assert mf._desired_tags(meta) == {"artist": "Daft Punk", "title": "Aerodynamic",
|
||||
"album": "Discovery", "date": "2001"}
|
||||
|
||||
|
||||
def test_desired_tags_year_fallbacks_and_omits_empty():
|
||||
meta = {"title": "T", "uploader": "Chan", "upload_date": "20230102"} # no album, no release_year
|
||||
out = mf._desired_tags(meta)
|
||||
assert out["date"] == "2023"
|
||||
assert out["title"] == "T"
|
||||
assert out["artist"] == "Chan"
|
||||
assert "album" not in out # omitted when empty
|
||||
|
||||
|
||||
def test_desired_tags_drops_unknown_artist():
|
||||
meta = {"title": "T"} # get_artist_from_metadata -> "Unknown Artist"
|
||||
assert "artist" not in mf._desired_tags(meta)
|
||||
|
||||
|
||||
# ---- repair_file (fake audio + mocked metadata) ----
|
||||
class _FakeAudio(dict):
|
||||
def __init__(self, initial):
|
||||
super().__init__(initial)
|
||||
self.saved = False
|
||||
|
||||
def save(self):
|
||||
self.saved = True
|
||||
|
||||
|
||||
def test_repair_file_writes_changed_fields(monkeypatch):
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url: {"artist": "Daft Punk", "title": "Aerodynamic",
|
||||
"album": "Discovery", "release_year": 2001})
|
||||
audio = _FakeAudio({"artist": ["Daft Punk"], "title": ["Aerodynamic"]}) # album/date missing
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.repair_file("X/youtube/Aerodynamic [vid].opus", "youtube", dry_run=False)
|
||||
assert set(changed) == {"album=Discovery", "date=2001"}
|
||||
assert audio["album"] == ["Discovery"]
|
||||
assert audio["date"] == ["2001"]
|
||||
assert audio.saved is True
|
||||
|
||||
|
||||
def test_repair_file_dry_run_writes_nothing(monkeypatch):
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url: {"artist": "A", "title": "T", "album": "Alb", "release_year": 2020})
|
||||
audio = _FakeAudio({})
|
||||
monkeypatch.setattr(mf, "_open_audio", lambda path: (audio, None))
|
||||
changed = mf.repair_file("X/youtube/T [vid].opus", "youtube", dry_run=True)
|
||||
assert changed # reports would-change
|
||||
assert audio == {} # nothing written
|
||||
assert audio.saved is False
|
||||
|
||||
|
||||
def test_repair_file_skips_unparseable(monkeypatch):
|
||||
called = {"meta": False}
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata",
|
||||
lambda url: called.update(meta=True) or {})
|
||||
assert mf.repair_file("X/youtube/no-id.opus", "youtube", dry_run=False) == []
|
||||
assert called["meta"] is False # never hit the network
|
||||
|
||||
|
||||
def test_repair_file_skips_unqueryable_source(monkeypatch):
|
||||
monkeypatch.setattr(mf, "run_yt_dlp_get_metadata", lambda url: {"title": "x"})
|
||||
assert mf.repair_file("X/bandcamp/T [id].m4a", "bandcamp", dry_run=False) == []
|
||||
|
||||
|
||||
# ---- repair_library (real temp tree, repair_file mocked) ----
|
||||
def test_repair_library_scans_only_source_dirs(tmp_path, monkeypatch):
|
||||
root = tmp_path
|
||||
(root / "Daft Punk" / "youtube").mkdir(parents=True)
|
||||
(root / "Daft Punk" / "youtube" / "Aerodynamic [vid].opus").write_text("x")
|
||||
(root / "Daft Punk" / "Discovery").mkdir(parents=True) # Lidarr album -> skip
|
||||
(root / "Daft Punk" / "Discovery" / "Aerodynamic.flac").write_text("x")
|
||||
(root / "Ephixa" / "soundcloud").mkdir(parents=True)
|
||||
(root / "Ephixa" / "soundcloud" / "Ignomon [123].m4a").write_text("x")
|
||||
|
||||
visited = []
|
||||
monkeypatch.setattr(mf, "repair_file",
|
||||
lambda path, source, dry_run: visited.append((source, path)) or ["album=X"])
|
||||
scanned, changed = mf.repair_library(str(root), dry_run=False)
|
||||
assert scanned == 2 and changed == 2
|
||||
sources = sorted(s for s, _ in visited)
|
||||
assert sources == ["soundcloud", "youtube"] # Discovery album folder skipped
|
||||
|
||||
|
||||
def test_repair_library_missing_root(monkeypatch):
|
||||
assert mf.repair_library("/no/such/dir", dry_run=False) == (0, 0)
|
||||
Reference in New Issue
Block a user