#!/usr/bin/env python3
"""
Crawl a music library directory, match expressions against tags, and print filenames for M3U playlists.
"""
import json
import re
import sys
import xml.etree.ElementTree as ET
from abc import abstractmethod
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from pathlib import Path
from typing import Iterator, Optional, Dict, List
from mutagen import File, FileType, MutagenError
class Match:
@abstractmethod
def __call__(self, tags: Dict[str, List[str]]) -> bool:
raise NotImplementedError
class NotMatch(Match):
def __init__(self, m: Match) -> None:
self._match: Match = m
def __call__(self, tags: Dict[str, List[str]]) -> bool:
return not self._match(tags)
class AndMatch(Match):
def __init__(self, matches: List[Match]) -> None:
self._matches: List[Match] = matches
def __call__(self, tags: Dict[str, List[str]]) -> bool:
return len(self._matches) > 0 and all(_(tags) for _ in self._matches)
class OrMatch(Match):
def __init__(self, matches: List[Match]) -> None:
self._matches: List[Match] = matches
def __call__(self, tags: Dict[str, List[str]]) -> bool:
return len(self._matches) == 0 or any(_(tags) for _ in self._matches)
class TagMatch(Match):
def __init__(self, tag: str, value: str) -> None:
try:
self._tag: str = tag.lower()
self._pattern: re.Pattern[str] = re.compile(value, re.IGNORECASE)
except re.error as e:
raise ValueError(f"Cannot parse regular expression '{value}': '{str(e)}'") from None
def _get_values(self, tags: Dict[str, List[str]]) -> Iterator[str]:
yield from tags[self._tag] if self._tag in tags and len(tags[self._tag]) else [""]
def __call__(self, tags: Dict[str, List[str]]) -> bool:
return any(self._pattern.search(_) is not None for _ in self._get_values(tags))
def _crawl_library(library: Path, relative_to: Optional[Path]) -> Iterator[Dict[str, List[str]]]:
for file_name in library.glob("**/*"):
if file_name.suffix in [".jpg", ".png", ".m3u", ".xml"] or file_name.name.startswith("."):
continue
if not file_name.is_file():
continue
try:
file_info: Optional[FileType] = File(file_name, easy=True)
except (MutagenError, OSError) as e:
print(f"{file_name}: {str(e)}", file=sys.stderr)
continue
if file_info is None:
print(f"{file_name}: can't parse audio file", file=sys.stderr)
continue
file_name = file_name.absolute()
tags: Dict[str, List[str]] = {
"filename": [str(file_name)],
"relaname": [str(file_name.relative_to(relative_to) if relative_to is not None else file_name)],
"basename": [file_name.name],
}
tags.update(
(str(k).lower(), list(filter(None, [str(_) for _ in v] if isinstance(v, list) else [str(v)])))
for k, v in file_info.items()
)
yield tags
def _tag_sort(sort_by: Optional[str], tags: Iterator[Dict[str, List[str]]]) -> Iterator[Dict[str, List[str]]]:
sort_keys: List[str] = [_.strip().lower() for _ in sort_by.split(",")] if sort_by is not None else []
if sort_keys:
yield from sorted(tags, key=lambda _: tuple(_[k][0].lower() if k in _ and len(_[k]) else "" for k in sort_keys))
else:
yield from tags
def _parse_match(element: ET.Element) -> Match:
if element.tag == "and":
if len(element.attrib) > 0:
raise ValueError(f"{element.tag} does not support attributes")
return AndMatch([_parse_match(_) for _ in element])
elif element.tag == "or":
if len(element.attrib) > 0:
raise ValueError(f"{element.tag} does not support attributes")
return OrMatch([_parse_match(_) for _ in element])
elif element.tag == "not":
if len(element.attrib) > 0:
raise ValueError(f"{element.tag} does not support attributes")
return NotMatch(OrMatch([_parse_match(_) for _ in element]))
elif element.tag == "match":
if len(element) > 0:
raise ValueError(f"{element.tag} does not support children")
if "tag" not in element.attrib:
raise ValueError(f"{element.tag} requires a 'tag' attribute")
return TagMatch(tag=element.attrib["tag"], value=element.text or "")
else:
raise ValueError(f"Unknown tag '{element.tag}', must be one of 'and', 'or', 'not', 'match'")
def _parse_ruleset(ruleset: Optional[Path]) -> Optional[Match]:
if ruleset is None:
return OrMatch([])
try:
return _parse_match(ET.parse(ruleset).getroot())
except (ET.ParseError, OSError, ValueError) as e:
print(f"{ruleset}: can't parse XML: {str(e)}", file=sys.stderr)
return None
def _run(ruleset_filename: Optional[Path], library_path: Path, relative_to: Optional[Path],
sort_by: Optional[str], dump_json: bool) -> int:
library_path = library_path.absolute()
if relative_to is not None:
relative_to = relative_to.absolute()
if relative_to != library_path and relative_to not in library_path.parents:
print(f"'{library_path}' is not in the subpath of '{relative_to}'", file=sys.stderr)
return 1
ruleset: Optional[Match] = _parse_ruleset(ruleset_filename)
if ruleset is None:
return 1
total: int = 0
matches: int = 0
for tags in _tag_sort(sort_by, _crawl_library(library_path, relative_to)):
total += 1
if ruleset(tags):
matches += 1
if dump_json:
print(json.dumps(tags, ensure_ascii=False, sort_keys=True), file=sys.stdout)
else:
print(tags["relaname"][0], end="\r\n", file=sys.stdout)
if ruleset_filename is not None:
print(f"{ruleset_filename} matched {matches} of {total} valid files in {library_path}", file=sys.stderr)
else:
print(f"Found {total} valid files in {library_path}", file=sys.stderr)
return 0
def main() -> int:
parser: ArgumentParser = ArgumentParser(description=__doc__, formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("--library-path", metavar="PATH", type=Path,
default=Path("."), help="music library to crawl")
parser.add_argument("--relative-to", metavar="PATH", type=Path,
default=None, help="produce relative filenames")
parser.add_argument("--sort-by", metavar="KEY", type=str,
default=None, help="comma-separated list of sort keys (ascending, insensitive)")
parser.add_argument("--json", action="store_const", const=True,
default=False, help="dump JSON instead of M3U filenames")
parser.add_argument("XML", nargs="?", type=Path,
default=None, help="filter matches in XML format to evaluate")
args = parser.parse_args()
return _run(ruleset_filename=args.XML,
library_path=args.library_path, relative_to=args.relative_to,
sort_by=args.sort_by, dump_json=args.json)
if __name__ == "__main__":
sys.exit(main())