summaryrefslogtreecommitdiff
path: root/splitter.py
diff options
context:
space:
mode:
Diffstat (limited to 'splitter.py')
-rw-r--r--splitter.py403
1 files changed, 0 insertions, 403 deletions
diff --git a/splitter.py b/splitter.py
deleted file mode 100644
index fbbafff..0000000
--- a/splitter.py
+++ /dev/null
@@ -1,403 +0,0 @@
-from coding import to_unicode, to_bytes
-from tools import *
-
-import formats
-
-from tempfile import mkdtemp
-from itertools import chain
-
-import collections
-import subprocess
-import shutil
-import sys
-import os
-
-ILLEGAL_CHARACTERS_MAP = {
- u"\\": u"-",
- u":": u"-",
- u"*": u"+",
- u"?": u"_",
- u'"': u"'",
- u"<": u"(",
- u">": u")",
- u"|": u"-"
-}
-
-def filterdir(dir, prefix):
- return sorted(filter(lambda f: f.startswith(prefix), os.listdir(dir)))
-
-def mkdir(path):
- if not os.path.exists(path):
- try:
- os.makedirs(path)
- except OSError as err:
- printerr("make dir %s failed: %s", quote(path), err)
- sys.exit(1)
-
-def convert_characters(path):
- return "".join([ILLEGAL_CHARACTERS_MAP.get(ch, ch) for ch in path])
-
-class TempLink:
- def __init__(self, path, name):
- self.tmpdir = mkdtemp(prefix = "temp-")
- self.linkpath = "%s/%s" % (self.tmpdir, name)
-
- try:
- os.symlink(path, self.tmpdir + "/" + name)
- except Exception as err:
- os.rmdir(self.tmpdir)
- raise err
-
- def remove(self):
- os.unlink(self.linkpath)
- os.rmdir(self.tmpdir)
-
- def __repr__(self):
- return "TempLink('%s')" % self.linkpath
-
- def __str__(self):
- return self.linkpath
-
- def __enter__(self):
- return self
-
- def __exit__(self, *args):
- self.remove()
-
-class StreamInfo:
- __mapping = {
- b"Channels:": "channels",
- b"Bits/sample:": "bits_per_sample",
- b"Samples/sec:": "sample_rate"
- }
-
- @staticmethod
- def get(name):
- info = StreamInfo()
- proc = subprocess.Popen(["shninfo", name], stdout = subprocess.PIPE)
- for line in proc.stdout.readlines():
- data = line.split()
- attr = StreamInfo.__mapping.get(data[0])
- if attr:
- setattr(info, attr, int(data[1]))
- elif line.startswith(b"Handled by:"):
- info.type = to_unicode(data[2])
-
- if proc.wait():
- return None
-
- return info
-
-class Splitter:
- EXT = ["ape", "flac", "wv"]
-
- class File:
- def __init__(self, fileobj, name, info):
- self.fileobj = fileobj
- self.name = name
- self.info = info
-
- def __getattr__(self, attr):
- return getattr(self.fileobj, attr)
-
- class TrackInfo:
- def __init__(self, name, tags):
- self.name = name
- self.tags = tags
-
- @staticmethod
- def format_by_tags(fmt, tags, replace=False):
- if replace:
- def conv(var):
- if isinstance(var, str):
- return var.replace("/", "-")
- return var
-
- tags = {k: conv(v) for k, v in tags.items()}
-
- try:
- return fmt.format(year=tags["date"], **tags)
- except KeyError as err:
- printerr("invalid format key: %s", err)
- sys.exit(1)
- except ValueError as err:
- printerr("invalid format option: %s", err)
- sys.exit(1)
-
- def __init__(self, cue, opt):
- self.cue = cue
- self.opt = opt
- self.tracktotal = len(list(self.all_tracks()))
-
- self.enctype = formats.handler(opt.type, logger=printf)
- self.tag_supported = self.enctype.is_tag_supported()
-
- self.tags = {
- "album": self.opt.album or self.cue.get("title"),
- "date": self.opt.date or self.cue.get("date"),
- "genre": self.opt.genre or self.cue.get("genre"),
- "comment": self.opt.comment or self.cue.get("comment"),
- "composer": self.opt.composer
- or self.cue.get("songwriter"),
- "artist": self.opt.albumartist or self.opt.artist
- or self.cue.get("performer"),
- "albumartist": self.opt.albumartist
- }
-
- tmp = self.format_by_tags(os.path.dirname(opt.fmt), self.tags, True)
-
- if opt.convert_chars:
- tmp = convert_characters(tmp)
-
- self.dest = os.path.join(opt.dir, tmp)
- track_fmt = os.path.basename(opt.fmt)
-
- tracknumber = 0
- self.track_info = {}
- for track in self.all_tracks():
- tracknumber += 1
- self.track_info[track] = self.get_track_info(
- track, tracknumber, track_fmt
- )
-
- def get_track_info(self, track, tracknumber, fmt):
- tags = dict(self.tags)
- tags.update({
- "tracknumber": tracknumber,
- "tracktotal": self.tracktotal,
-
- "title": track.get("title") or "track",
- "artist": self.opt.artist or track.get("performer")
- or self.cue.get("performer"),
- "composer": self.opt.composer or track.get("songwriter")
- or self.cue.get("songwriter")
- })
-
- name = self.format_by_tags(fmt, tags).replace("/", "-")
-
- if self.opt.convert_chars:
- name = convert_characters(name)
-
- return self.TrackInfo(name + "." + self.enctype.ext, tags)
-
- def find_realfile(self, name):
- if not name.endswith(".wav"):
- return None
-
- orig = name.rpartition(".")[0]
- for file in filterdir(self.cue.dir or ".", orig):
- head, _, ext = file.rpartition(".")
- if head == orig and ext in self.EXT:
- return file
-
- return None
-
- def open_files(self):
- lst = []
-
- for file in self.cue.files():
- if not file.has_audio_tracks():
- debug("skip file %s: no tracks", quote(file.name))
- continue
-
- name = self.cue.dir + file.name
- if not os.path.exists(name):
- real = self.find_realfile(file.name)
- if not real:
- printerr("no such file %s", quote(file.name))
- sys.exit(1)
- name = self.cue.dir + real
-
- info = StreamInfo.get(name)
- if info is None:
- printerr("%s: unknown type", quote(file.name))
- sys.exit(1)
-
- lst.append(self.File(file, name, info))
-
- return lst
-
- def shntool_args(self, tool, info):
- encode = self.enctype.encode(self.opt, info)
- return [tool, "-w", "-d", self.dest, "-o", encode]
-
- def track_name(self, track):
- return self.track_info[track].name
-
- def track_tags(self, track):
- return self.track_info[track].tags
-
- def tag(self, track, path):
- if not self.tag_supported:
- return
-
- printf("Tag [%s] : ", path)
- if not self.enctype.tag(path, self.track_tags(track)):
- printf("FAILED\n")
- sys.exit(1)
-
- printf("OK\n")
-
- def copy_file(self, file):
- noteq = lambda a, b: a and a != b
-
- if file.info.type != self.encode.name:
- return False
- if noteq(self.opt.sample_rate, file.info.sample_rate):
- return False
- if noteq(self.opt.bits_per_sample, file.info.bits_per_sample):
- return False
- if noteq(self.opt.channels, file.info.channels):
- return False
-
- track = list(file.tracks())[0]
- trackname = self.track_name(track)
- path = os.path.join(self.dest, trackname)
-
- if self.opt.dry_run:
- printf("Copy [%s] --> [%s]\n", file.name, path)
- return True
-
- printf("Copy [%s] --> [%s] : ", file.name, path)
-
- try:
- shutil.copyfile(file.name, path)
- except Exception as err:
- printf("FAILED: %s\n", err)
- sys.exit(1)
- else:
- printf("OK\n")
-
- self.tag(track, path)
-
- return True
-
- def convert_file(self, file):
- track = list(file.tracks())[0]
- trackname = self.track_name(track)
-
- args = self.shntool_args("shnconv", file.info)
-
- if self.opt.dry_run:
- name = "link to " + quote(file.name, "'")
- debug("run %s", " ".join(map(quote, args + [name])))
- return
-
- try:
- link = TempLink(os.path.abspath(file.name), trackname)
- except OSError as err:
- printerr("create symlink %s failed: %s", quote(trackname), err)
- sys.exit(1)
-
- ret = subprocess.call(args + [str(link)])
- link.remove()
-
- if ret:
- printerr("shnconv failed: exit code %d", ret);
- sys.exit(1)
-
- self.tag(track, os.path.join(self.dest, trackname))
-
- def split_file(self, file, points):
- args = self.shntool_args("shnsplit", file.info) + [file.name]
-
- if self.opt.dry_run:
- debug("run %s", " ".join(map(quote, args)))
- return
-
- proc = subprocess.Popen(args, stdin = subprocess.PIPE)
- proc.stdin.write(to_bytes("\n".join(map(str, points))))
- proc.stdin.close()
-
- if proc.wait():
- printerr("shnsplit failed: exit code %d", proc.returncode)
- sys.exit(1)
-
- splitted = filterdir(self.dest, "split-track")
- for track, filename in zip(file.tracks(), splitted):
- trackname = self.track_name(track)
- path = os.path.join(self.dest, trackname)
-
- printf("Rename [%s] --> [%s] : ", filename, trackname)
- try:
- os.rename(os.path.join(self.dest, filename), path)
- except OSError as err:
- printf("FAILED: %s\n", err)
- sys.exit(1)
- else:
- printf("OK\n")
-
- self.tag(track, path)
-
- def check_duplicates(self):
- names = [x.name for x in self.track_info.values()]
- dup = [k for k, v in collections.Counter(names).items() if v > 1]
- if dup:
- printerr("track names are duplicated: %s", " ".join(dup))
- sys.exit(1)
-
- def transfer_files(self, source, dest):
- for file in sorted(os.listdir(source)):
- path = os.path.join(source, file)
- if not os.path.isfile(path):
- debug("skip non file %s", quote(file))
- continue
-
- printf("Copy [%s] into [%s] : ", file, dest)
-
- try:
- shutil.copy(path, dest)
- except Exception as err:
- printf("FAILED: %s\n", err)
- sys.exit(1)
- else:
- printf("OK\n")
-
- def split(self):
- self.check_duplicates()
-
- files = self.open_files()
-
- self.realpath = None
- if not self.opt.dry_run:
- mkdir(self.dest)
- if self.opt.use_tempdir:
- self.realpath = self.dest
- tempdir = mkdtemp(prefix="cutter-")
- self.dest = to_unicode(tempdir)
-
- for file in files:
- points = list(file.split_points(file.info))
- if not points:
- if not self.copy_file(file):
- self.convert_file(file)
- else:
- self.split_file(file, points)
-
- if self.realpath:
- self.transfer_files(self.dest, self.realpath)
- try:
- shutil.rmtree(self.dest)
- except Exception as err:
- printerr("rm %s failed: %s\n", self.dest, err)
- sys.exit(1)
-
- def all_tracks(self):
- return chain(*[f.tracks() for f in self.cue.files()])
-
- def dump_tags(self):
- add_line = False
- for track in self.all_tracks():
- if add_line:
- printf("\n")
- add_line = True
-
- tags = self.track_tags(track)
- for k, v in sorted(tags.items()):
- if v is not "":
- printf("%s=%s\n", k.upper(), v)
-
- def dump_tracks(self):
- for track in self.all_tracks():
- trackname = self.track_name(track)
- printf("%s\n", os.path.join(self.dest, trackname))