|
|
@@ -0,0 +1,313 @@
|
|
|
+import logging
|
|
|
+import os
|
|
|
+import re
|
|
|
+import shutil
|
|
|
+
|
|
|
+from mutagen import mp3
|
|
|
+from mutagen import id3
|
|
|
+import requests
|
|
|
+import slugify
|
|
|
+
|
|
|
+from bandcamp_dl import __version__
|
|
|
+from bandcamp_dl.config import CASE_LOWER, CASE_UPPER, CASE_CAMEL, CASE_NONE
|
|
|
+
|
|
|
+
|
|
|
+def print_clean(msg):
|
|
|
+ terminal_size = shutil.get_terminal_size()
|
|
|
+ print(f'{msg}{" " * (int(terminal_size[0]) - len(msg))}', end='')
|
|
|
+
|
|
|
+
|
|
|
+class BandcampDownloader:
|
|
|
+ def __init__(self, config, urls=None):
|
|
|
+ """Initialize variables we will need throughout the Class
|
|
|
+
|
|
|
+ :param config: user config/args
|
|
|
+ :param urls: list of urls
|
|
|
+ """
|
|
|
+ self.headers = {'User-Agent': f'bandcamp-dl/{__version__} '
|
|
|
+ f'(https://github.com/evolution0/bandcamp-dl)'}
|
|
|
+ self.session = requests.Session()
|
|
|
+ self.logger = logging.getLogger("bandcamp-dl").getChild("Downloader")
|
|
|
+
|
|
|
+ if type(urls) is str:
|
|
|
+ self.urls = [urls]
|
|
|
+
|
|
|
+ self.config = config
|
|
|
+ self.urls = urls
|
|
|
+
|
|
|
+ def start(self, album: dict):
|
|
|
+ """Start album download process
|
|
|
+
|
|
|
+ :param album: album dict
|
|
|
+ """
|
|
|
+
|
|
|
+ if not album['full'] and not self.config.no_confirm:
|
|
|
+ choice = input("Track list incomplete, some tracks may be private, download anyway? "
|
|
|
+ "(yes/no): ").lower()
|
|
|
+ if choice == "yes" or choice == "y":
|
|
|
+ print("Starting download process.")
|
|
|
+ self.download_album(album)
|
|
|
+ else:
|
|
|
+ print("Cancelling download process.")
|
|
|
+ return None
|
|
|
+ else:
|
|
|
+ self.download_album(album)
|
|
|
+
|
|
|
+ def template_to_path(self, track: dict, ascii_only, ok_chars, space_char, keep_space,
|
|
|
+ case_mode) -> str:
|
|
|
+ """Create valid filepath based on template
|
|
|
+
|
|
|
+ :param track: track metadata
|
|
|
+ :param ok_chars: optional chars to allow
|
|
|
+ :param ascii_only: allow only ascii chars in filename
|
|
|
+ :param keep_space: retain whitespace in filename
|
|
|
+ :param case_mode: char case conversion logic (or none / retain)
|
|
|
+ :param space_char: char to use in place of spaces
|
|
|
+ :return: filepath
|
|
|
+ """
|
|
|
+ self.logger.debug(" Generating filepath/trackname..")
|
|
|
+ path = self.config.template
|
|
|
+ self.logger.debug(f"\n\tTemplate: {path}")
|
|
|
+
|
|
|
+ def slugify_preset(content):
|
|
|
+ retain_case = case_mode != CASE_LOWER
|
|
|
+ if case_mode == CASE_UPPER:
|
|
|
+ content = content.upper()
|
|
|
+ if case_mode == CASE_CAMEL:
|
|
|
+ content = re.sub(r'(((?<=\s)|^|-)[a-z])', lambda x: x.group().upper(), content.lower())
|
|
|
+ slugged = slugify.slugify(content, ok=ok_chars, only_ascii=ascii_only,
|
|
|
+ spaces=keep_space, lower=not retain_case,
|
|
|
+ space_replacement=space_char)
|
|
|
+ return slugged
|
|
|
+
|
|
|
+ template_tokens = ['trackartist', 'artist', 'album', 'title', 'date', 'label', 'track', 'album_id', 'track_id']
|
|
|
+ for token in template_tokens:
|
|
|
+ key = token
|
|
|
+ if token == 'trackartist':
|
|
|
+ key = 'artist'
|
|
|
+ elif token == 'artist':
|
|
|
+ key = 'albumartist'
|
|
|
+
|
|
|
+ if key == 'artist' and track.get('artist') is None:
|
|
|
+ self.logger.debug('Track artist is None, replacing with album artist')
|
|
|
+ track['artist'] = track.get('albumartist')
|
|
|
+
|
|
|
+ if self.config.untitled_path_from_slug and token == 'album' and track['album'].lower() == 'untitled':
|
|
|
+ track['album'] = track['url'].split("/")[-1].replace("-"," ")
|
|
|
+
|
|
|
+ if token == 'track' and track['track'] == 'None':
|
|
|
+ track['track'] = "Single"
|
|
|
+ else:
|
|
|
+ track['track'] = str(track['track']).zfill(2)
|
|
|
+
|
|
|
+ if self.config.no_slugify:
|
|
|
+ replacement = str(track.get(key, ""))
|
|
|
+ else:
|
|
|
+ replacement = slugify_preset(track.get(key, ""))
|
|
|
+
|
|
|
+ path = path.replace(f'%{{{token}}}', replacement)
|
|
|
+
|
|
|
+ if self.config.base_dir is not None:
|
|
|
+ path = f"{self.config.base_dir}/{path}.mp3"
|
|
|
+ else:
|
|
|
+ path = f"{path}.mp3"
|
|
|
+
|
|
|
+ self.logger.debug(" filepath/trackname generated..")
|
|
|
+ self.logger.debug(f"\n\tPath: {path}")
|
|
|
+ return path
|
|
|
+
|
|
|
+
|
|
|
+ def create_directory(self, filename: str) -> str:
|
|
|
+ """Create directory based on filename if it doesn't exist
|
|
|
+
|
|
|
+ :param filename: full filename
|
|
|
+ :return: directory path
|
|
|
+ """
|
|
|
+ directory = os.path.dirname(filename)
|
|
|
+ self.logger.debug(f" Directory:\n\t{directory}")
|
|
|
+ self.logger.debug(" Directory doesn't exist, creating..")
|
|
|
+ if not os.path.exists(directory):
|
|
|
+ os.makedirs(directory)
|
|
|
+
|
|
|
+ return directory
|
|
|
+
|
|
|
+ def download_album(self, album: dict) -> bool:
|
|
|
+ """Download all MP3 files in the album
|
|
|
+
|
|
|
+ :param album: album dict
|
|
|
+ :return: True if successful
|
|
|
+ """
|
|
|
+ for track_index, track in enumerate(album['tracks']):
|
|
|
+ track_meta = {"artist": track['artist'],
|
|
|
+ "albumartist": album['artist'],
|
|
|
+ "label": album['label'],
|
|
|
+ "album": album['title'],
|
|
|
+ "title": track['title'].replace(f"{track['artist']} - ", "", 1),
|
|
|
+ "track": track['track'],
|
|
|
+ "track_id": track['track_id'],
|
|
|
+ "album_id": album['album_id'],
|
|
|
+ # TODO: Find out why the 'lyrics' key seems to vanish.
|
|
|
+ "lyrics": track.get('lyrics', ""),
|
|
|
+ "date": album['date'],
|
|
|
+ "url": album['url'],
|
|
|
+ "genres": album['genres']}
|
|
|
+
|
|
|
+ path_meta = track_meta.copy()
|
|
|
+
|
|
|
+ if self.config.truncate_album > 0 and len(path_meta['album']) > self.config.truncate_album:
|
|
|
+ path_meta['album'] = path_meta['album'][:self.config.truncate_album]
|
|
|
+
|
|
|
+ if self.config.truncate_track > 0 and len(path_meta['title']) > self.config.truncate_track:
|
|
|
+ path_meta['title'] = path_meta['title'][:self.config.truncate_track]
|
|
|
+
|
|
|
+ self.num_tracks = len(album['tracks'])
|
|
|
+ self.track_num = track_index + 1
|
|
|
+
|
|
|
+ filepath = self.template_to_path(path_meta, self.config.ascii_only,
|
|
|
+ self.config.ok_chars, self.config.space_char,
|
|
|
+ self.config.keep_spaces, self.config.case_mode)
|
|
|
+ filepath = filepath + ".tmp"
|
|
|
+ filename = filepath.rsplit('/', 1)[1]
|
|
|
+ dirname = self.create_directory(filepath)
|
|
|
+
|
|
|
+ self.logger.debug(" Current file:\n\t%s", filepath)
|
|
|
+
|
|
|
+ if album['art'] and not os.path.exists(dirname + "/cover.jpg"):
|
|
|
+ try:
|
|
|
+ with open(dirname + "/cover.jpg", "wb") as f:
|
|
|
+ r = self.session.get(album['art'], headers=self.headers)
|
|
|
+ f.write(r.content)
|
|
|
+ self.album_art = dirname + "/cover.jpg"
|
|
|
+ except Exception as e:
|
|
|
+ print(e)
|
|
|
+ print("Couldn't download album art.")
|
|
|
+
|
|
|
+ attempts = 0
|
|
|
+ skip = False
|
|
|
+
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ r = self.session.get(track['url'], headers=self.headers, stream=True)
|
|
|
+ file_length = int(r.headers.get('content-length', 0))
|
|
|
+ total = int(file_length / 100)
|
|
|
+ # If file exists and is still a tmp file skip downloading and encode
|
|
|
+ if os.path.exists(filepath):
|
|
|
+ self.write_id3_tags(filepath, track_meta)
|
|
|
+ # Set skip to True so that we don't try encoding again
|
|
|
+ skip = True
|
|
|
+ # break out of the try/except and move on to the next file
|
|
|
+ break
|
|
|
+ elif os.path.exists(filepath[:-4]) and self.config.overwrite is not True:
|
|
|
+ print(f"File: {filename[:-4]} already exists and is complete, skipping..")
|
|
|
+ skip = True
|
|
|
+ break
|
|
|
+ with open(filepath, "wb") as f:
|
|
|
+ if file_length is None:
|
|
|
+ f.write(r.content)
|
|
|
+ else:
|
|
|
+ dl = 0
|
|
|
+ for data in r.iter_content(chunk_size=total):
|
|
|
+ dl += len(data)
|
|
|
+ f.write(data)
|
|
|
+ if not self.config.debug:
|
|
|
+ done = int(50 * dl / file_length)
|
|
|
+ print_clean(f'\r({self.track_num}/{self.num_tracks}) '
|
|
|
+ f'[{"=" * done}{" " * (50 - done)}] :: '
|
|
|
+ f'Downloading: {filename[:-8]}')
|
|
|
+ local_size = os.path.getsize(filepath)
|
|
|
+ # if the local filesize before encoding doesn't match the remote filesize
|
|
|
+ # redownload
|
|
|
+ if local_size != file_length and attempts != 3:
|
|
|
+ print(f"{filename} is incomplete, retrying..")
|
|
|
+ continue
|
|
|
+ # if the maximum number of retry attempts is reached give up and move on
|
|
|
+ elif attempts == 3:
|
|
|
+ print("Maximum retries reached.. skipping.")
|
|
|
+ # Clean up incomplete file
|
|
|
+ os.remove(filepath)
|
|
|
+ break
|
|
|
+ # if all is well continue the download process for the rest of the tracks
|
|
|
+ else:
|
|
|
+ break
|
|
|
+ except Exception as e:
|
|
|
+ print(e)
|
|
|
+ print("Downloading failed..")
|
|
|
+ return False
|
|
|
+ if skip is False:
|
|
|
+ self.write_id3_tags(filepath, track_meta)
|
|
|
+
|
|
|
+ if os.path.isfile(f"{self.config.base_dir}/{__version__}.not.finished"):
|
|
|
+ os.remove(f"{self.config.base_dir}/{__version__}.not.finished")
|
|
|
+
|
|
|
+ # Remove album art image as it is embedded
|
|
|
+ if self.config.embed_art and hasattr(self, "album_art"):
|
|
|
+ os.remove(self.album_art)
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ def write_id3_tags(self, filepath: str, meta: dict):
|
|
|
+ """Write metadata to the MP3 file
|
|
|
+
|
|
|
+ :param filepath: name of mp3 file
|
|
|
+ :param meta: dict of track metadata
|
|
|
+ """
|
|
|
+ self.logger.debug(" Encoding process starting..")
|
|
|
+
|
|
|
+ filename = filepath.rsplit('/', 1)[1][:-8]
|
|
|
+
|
|
|
+ if not self.config.debug:
|
|
|
+ print_clean(f'\r({self.track_num}/{self.num_tracks}) [{"=" * 50}] '
|
|
|
+ f':: Encoding: {filename}')
|
|
|
+
|
|
|
+ audio = mp3.MP3(filepath)
|
|
|
+ audio.delete()
|
|
|
+ audio["TIT2"] = id3._frames.TIT2(encoding=3, text=["title"])
|
|
|
+ audio["WOAF"] = id3._frames.WOAF(url=meta["url"])
|
|
|
+ audio.save(filename=None, v1=2)
|
|
|
+
|
|
|
+ audio = mp3.MP3(filepath)
|
|
|
+ if self.config.group and 'label' in meta:
|
|
|
+ audio["TIT1"] = id3._frames.TIT1(encoding=3, text=meta["label"])
|
|
|
+
|
|
|
+ if self.config.embed_lyrics:
|
|
|
+ audio["USLT"] = id3._frames.USLT(encoding=3, lang='eng', desc='', text=meta['lyrics'])
|
|
|
+
|
|
|
+ if self.config.embed_art:
|
|
|
+ with open(self.album_art, 'rb') as cover_img:
|
|
|
+ cover_bytes = cover_img.read()
|
|
|
+ audio["APIC"] = id3._frames.APIC(encoding=3, mime='image/jpeg', type=3,
|
|
|
+ desc='Cover', data=cover_bytes)
|
|
|
+ if self.config.embed_genres:
|
|
|
+ audio["TCON"] = id3._frames.TCON(encoding=3, text=meta['genres'])
|
|
|
+ audio.save()
|
|
|
+
|
|
|
+ audio = mp3.EasyMP3(filepath)
|
|
|
+
|
|
|
+ if meta['track'].isdigit():
|
|
|
+ audio["tracknumber"] = meta['track']
|
|
|
+ else:
|
|
|
+ audio["tracknumber"] = '1'
|
|
|
+
|
|
|
+ if meta['artist'] is not None:
|
|
|
+ audio["artist"] = meta['artist']
|
|
|
+ else:
|
|
|
+ audio["artist"] = meta['albumartist']
|
|
|
+ audio["title"] = meta["title"]
|
|
|
+ audio["albumartist"] = meta['albumartist']
|
|
|
+ audio["album"] = meta['album']
|
|
|
+ audio["date"] = meta["date"]
|
|
|
+ audio.save()
|
|
|
+
|
|
|
+ self.logger.debug(" Encoding process finished..")
|
|
|
+ self.logger.debug(" Renaming:\n\t%s -to-> %s", filepath, filepath[:-4])
|
|
|
+
|
|
|
+ try:
|
|
|
+ os.rename(filepath, filepath[:-4])
|
|
|
+ except WindowsError:
|
|
|
+ os.remove(filepath[:-4])
|
|
|
+ os.rename(filepath, filepath[:-4])
|
|
|
+
|
|
|
+ if self.config.debug:
|
|
|
+ return
|
|
|
+
|
|
|
+ print_clean(f'\r({self.track_num}/{self.num_tracks}) [{"=" * 50}] :: Finished: {filename}')
|