| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- import logging
- import os
- import re
- import shutil
- from mutagen import mp3
- from mutagen import id3
- import requests
- import slugify
- from bandcamp_dl import __version__
- from bandcamp_dl.config import CASE_LOWER, CASE_UPPER, CASE_CAMEL, CASE_NONE
- def print_clean(msg):
- terminal_size = shutil.get_terminal_size()
- print(f'{msg}{" " * (int(terminal_size[0]) - len(msg))}', end='')
- class BandcampDownloader:
- def __init__(self, config, urls=None):
- """Initialize variables we will need throughout the Class
- :param config: user config/args
- :param urls: list of urls
- """
- self.headers = {'User-Agent': f'bandcamp-dl/{__version__} '
- f'(https://github.com/evolution0/bandcamp-dl)'}
- self.session = requests.Session()
- self.logger = logging.getLogger("bandcamp-dl").getChild("Downloader")
- if type(urls) is str:
- self.urls = [urls]
- self.config = config
- self.urls = urls
- def start(self, album: dict):
- """Start album download process
- :param album: album dict
- """
- if not album['full'] and not self.config.no_confirm:
- choice = input("Track list incomplete, some tracks may be private, download anyway? "
- "(yes/no): ").lower()
- if choice == "yes" or choice == "y":
- print("Starting download process.")
- self.download_album(album)
- else:
- print("Cancelling download process.")
- return None
- else:
- self.download_album(album)
- def template_to_path(self, track: dict, ascii_only, ok_chars, space_char, keep_space,
- case_mode) -> str:
- """Create valid filepath based on template
- :param track: track metadata
- :param ok_chars: optional chars to allow
- :param ascii_only: allow only ascii chars in filename
- :param keep_space: retain whitespace in filename
- :param case_mode: char case conversion logic (or none / retain)
- :param space_char: char to use in place of spaces
- :return: filepath
- """
- self.logger.debug(" Generating filepath/trackname..")
- path = self.config.template
- self.logger.debug(f"\n\tTemplate: {path}")
- def slugify_preset(content):
- retain_case = case_mode != CASE_LOWER
- if case_mode == CASE_UPPER:
- content = content.upper()
- if case_mode == CASE_CAMEL:
- content = re.sub(r'(((?<=\s)|^|-)[a-z])', lambda x: x.group().upper(), content.lower())
- slugged = slugify.slugify(content, ok=ok_chars, only_ascii=ascii_only,
- spaces=keep_space, lower=not retain_case,
- space_replacement=space_char)
- return slugged
- template_tokens = ['trackartist', 'artist', 'album', 'title', 'date', 'label', 'track', 'album_id', 'track_id']
- for token in template_tokens:
- key = token
- if token == 'trackartist':
- key = 'artist'
- elif token == 'artist':
- key = 'albumartist'
- if key == 'artist' and track.get('artist') is None:
- self.logger.debug('Track artist is None, replacing with album artist')
- track['artist'] = track.get('albumartist')
- if self.config.untitled_path_from_slug and token == 'album' and track['album'].lower() == 'untitled':
- track['album'] = track['url'].split("/")[-1].replace("-"," ")
- if token == 'track' and track['track'] == 'None':
- track['track'] = "Single"
- else:
- track['track'] = str(track['track']).zfill(2)
- if self.config.no_slugify:
- replacement = str(track.get(key, ""))
- else:
- replacement = slugify_preset(track.get(key, ""))
- path = path.replace(f'%{{{token}}}', replacement)
- if self.config.base_dir is not None:
- path = f"{self.config.base_dir}/{path}.mp3"
- else:
- path = f"{path}.mp3"
- self.logger.debug(" filepath/trackname generated..")
- self.logger.debug(f"\n\tPath: {path}")
- return path
- def create_directory(self, filename: str) -> str:
- """Create directory based on filename if it doesn't exist
- :param filename: full filename
- :return: directory path
- """
- directory = os.path.dirname(filename)
- self.logger.debug(f" Directory:\n\t{directory}")
- self.logger.debug(" Directory doesn't exist, creating..")
- if not os.path.exists(directory):
- os.makedirs(directory)
- return directory
- def download_album(self, album: dict) -> bool:
- """Download all MP3 files in the album
- :param album: album dict
- :return: True if successful
- """
- for track_index, track in enumerate(album['tracks']):
- track_meta = {"artist": track['artist'],
- "albumartist": album['artist'],
- "label": album['label'],
- "album": album['title'],
- "title": track['title'].replace(f"{track['artist']} - ", "", 1),
- "track": track['track'],
- "track_id": track['track_id'],
- "album_id": album['album_id'],
- # TODO: Find out why the 'lyrics' key seems to vanish.
- "lyrics": track.get('lyrics', ""),
- "date": album['date'],
- "url": album['url'],
- "genres": album['genres']}
- path_meta = track_meta.copy()
- if self.config.truncate_album > 0 and len(path_meta['album']) > self.config.truncate_album:
- path_meta['album'] = path_meta['album'][:self.config.truncate_album]
- if self.config.truncate_track > 0 and len(path_meta['title']) > self.config.truncate_track:
- path_meta['title'] = path_meta['title'][:self.config.truncate_track]
- self.num_tracks = len(album['tracks'])
- self.track_num = track_index + 1
- filepath = self.template_to_path(path_meta, self.config.ascii_only,
- self.config.ok_chars, self.config.space_char,
- self.config.keep_spaces, self.config.case_mode)
- filepath = filepath + ".tmp"
- filename = filepath.rsplit('/', 1)[1]
- dirname = self.create_directory(filepath)
- self.logger.debug(" Current file:\n\t%s", filepath)
- if album['art'] and not os.path.exists(dirname + "/cover.jpg"):
- try:
- with open(dirname + "/cover.jpg", "wb") as f:
- r = self.session.get(album['art'], headers=self.headers)
- f.write(r.content)
- self.album_art = dirname + "/cover.jpg"
- except Exception as e:
- print(e)
- print("Couldn't download album art.")
- attempts = 0
- skip = False
- while True:
- try:
- r = self.session.get(track['url'], headers=self.headers, stream=True)
- file_length = int(r.headers.get('content-length', 0))
- total = int(file_length / 100)
- # If file exists and is still a tmp file skip downloading and encode
- if os.path.exists(filepath):
- self.write_id3_tags(filepath, track_meta)
- # Set skip to True so that we don't try encoding again
- skip = True
- # break out of the try/except and move on to the next file
- break
- elif os.path.exists(filepath[:-4]) and self.config.overwrite is not True:
- print(f"File: {filename[:-4]} already exists and is complete, skipping..")
- skip = True
- break
- with open(filepath, "wb") as f:
- if file_length is None:
- f.write(r.content)
- else:
- dl = 0
- for data in r.iter_content(chunk_size=total):
- dl += len(data)
- f.write(data)
- if not self.config.debug:
- done = int(50 * dl / file_length)
- print_clean(f'\r({self.track_num}/{self.num_tracks}) '
- f'[{"=" * done}{" " * (50 - done)}] :: '
- f'Downloading: {filename[:-8]}')
- local_size = os.path.getsize(filepath)
- # if the local filesize before encoding doesn't match the remote filesize
- # redownload
- if local_size != file_length and attempts != 3:
- print(f"{filename} is incomplete, retrying..")
- continue
- # if the maximum number of retry attempts is reached give up and move on
- elif attempts == 3:
- print("Maximum retries reached.. skipping.")
- # Clean up incomplete file
- os.remove(filepath)
- break
- # if all is well continue the download process for the rest of the tracks
- else:
- break
- except Exception as e:
- print(e)
- print("Downloading failed..")
- return False
- if skip is False:
- self.write_id3_tags(filepath, track_meta)
- if os.path.isfile(f"{self.config.base_dir}/{__version__}.not.finished"):
- os.remove(f"{self.config.base_dir}/{__version__}.not.finished")
- # Remove album art image as it is embedded
- if self.config.embed_art and hasattr(self, "album_art"):
- os.remove(self.album_art)
- return True
- def write_id3_tags(self, filepath: str, meta: dict):
- """Write metadata to the MP3 file
- :param filepath: name of mp3 file
- :param meta: dict of track metadata
- """
- self.logger.debug(" Encoding process starting..")
- filename = filepath.rsplit('/', 1)[1][:-8]
- if not self.config.debug:
- print_clean(f'\r({self.track_num}/{self.num_tracks}) [{"=" * 50}] '
- f':: Encoding: {filename}')
- audio = mp3.MP3(filepath)
- audio.delete()
- audio["TIT2"] = id3._frames.TIT2(encoding=3, text=["title"])
- audio["WOAF"] = id3._frames.WOAF(url=meta["url"])
- audio.save(filename=None, v1=2)
- audio = mp3.MP3(filepath)
- if self.config.group and 'label' in meta:
- audio["TIT1"] = id3._frames.TIT1(encoding=3, text=meta["label"])
- if self.config.embed_lyrics:
- audio["USLT"] = id3._frames.USLT(encoding=3, lang='eng', desc='', text=meta['lyrics'])
- if self.config.embed_art:
- with open(self.album_art, 'rb') as cover_img:
- cover_bytes = cover_img.read()
- audio["APIC"] = id3._frames.APIC(encoding=3, mime='image/jpeg', type=3,
- desc='Cover', data=cover_bytes)
- if self.config.embed_genres:
- audio["TCON"] = id3._frames.TCON(encoding=3, text=meta['genres'])
- audio.save()
- audio = mp3.EasyMP3(filepath)
- if meta['track'].isdigit():
- audio["tracknumber"] = meta['track']
- else:
- audio["tracknumber"] = '1'
- if meta['artist'] is not None:
- audio["artist"] = meta['artist']
- else:
- audio["artist"] = meta['albumartist']
- audio["title"] = meta["title"]
- audio["albumartist"] = meta['albumartist']
- audio["album"] = meta['album']
- audio["date"] = meta["date"]
- audio.save()
- self.logger.debug(" Encoding process finished..")
- self.logger.debug(" Renaming:\n\t%s -to-> %s", filepath, filepath[:-4])
- try:
- os.rename(filepath, filepath[:-4])
- except WindowsError:
- os.remove(filepath[:-4])
- os.rename(filepath, filepath[:-4])
- if self.config.debug:
- return
- print_clean(f'\r({self.track_num}/{self.num_tracks}) [{"=" * 50}] :: Finished: {filename}')
|