bandcampdownloader.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. import logging
  2. import os
  3. import re
  4. import shutil
  5. from mutagen import mp3
  6. from mutagen import id3
  7. import requests
  8. import slugify
  9. from bandcamp_dl import __version__
  10. from bandcamp_dl.config import CASE_LOWER, CASE_UPPER, CASE_CAMEL, CASE_NONE
  11. def print_clean(msg):
  12. terminal_size = shutil.get_terminal_size()
  13. print(f'{msg}{" " * (int(terminal_size[0]) - len(msg))}', end='')
  14. class BandcampDownloader:
  15. def __init__(self, config, urls=None):
  16. """Initialize variables we will need throughout the Class
  17. :param config: user config/args
  18. :param urls: list of urls
  19. """
  20. self.headers = {'User-Agent': f'bandcamp-dl/{__version__} '
  21. f'(https://github.com/evolution0/bandcamp-dl)'}
  22. self.session = requests.Session()
  23. self.logger = logging.getLogger("bandcamp-dl").getChild("Downloader")
  24. if type(urls) is str:
  25. self.urls = [urls]
  26. self.config = config
  27. self.urls = urls
  28. def start(self, album: dict):
  29. """Start album download process
  30. :param album: album dict
  31. """
  32. if not album['full'] and not self.config.no_confirm:
  33. choice = input("Track list incomplete, some tracks may be private, download anyway? "
  34. "(yes/no): ").lower()
  35. if choice == "yes" or choice == "y":
  36. print("Starting download process.")
  37. self.download_album(album)
  38. else:
  39. print("Cancelling download process.")
  40. return None
  41. else:
  42. self.download_album(album)
  43. def template_to_path(self, track: dict, ascii_only, ok_chars, space_char, keep_space,
  44. case_mode) -> str:
  45. """Create valid filepath based on template
  46. :param track: track metadata
  47. :param ok_chars: optional chars to allow
  48. :param ascii_only: allow only ascii chars in filename
  49. :param keep_space: retain whitespace in filename
  50. :param case_mode: char case conversion logic (or none / retain)
  51. :param space_char: char to use in place of spaces
  52. :return: filepath
  53. """
  54. self.logger.debug(" Generating filepath/trackname..")
  55. path = self.config.template
  56. self.logger.debug(f"\n\tTemplate: {path}")
  57. def slugify_preset(content):
  58. retain_case = case_mode != CASE_LOWER
  59. if case_mode == CASE_UPPER:
  60. content = content.upper()
  61. if case_mode == CASE_CAMEL:
  62. content = re.sub(r'(((?<=\s)|^|-)[a-z])', lambda x: x.group().upper(), content.lower())
  63. slugged = slugify.slugify(content, ok=ok_chars, only_ascii=ascii_only,
  64. spaces=keep_space, lower=not retain_case,
  65. space_replacement=space_char)
  66. return slugged
  67. template_tokens = ['trackartist', 'artist', 'album', 'title', 'date', 'label', 'track', 'album_id', 'track_id']
  68. for token in template_tokens:
  69. key = token
  70. if token == 'trackartist':
  71. key = 'artist'
  72. elif token == 'artist':
  73. key = 'albumartist'
  74. if key == 'artist' and track.get('artist') is None:
  75. self.logger.debug('Track artist is None, replacing with album artist')
  76. track['artist'] = track.get('albumartist')
  77. if self.config.untitled_path_from_slug and token == 'album' and track['album'].lower() == 'untitled':
  78. track['album'] = track['url'].split("/")[-1].replace("-"," ")
  79. if token == 'track' and track['track'] == 'None':
  80. track['track'] = "Single"
  81. else:
  82. track['track'] = str(track['track']).zfill(2)
  83. if self.config.no_slugify:
  84. replacement = str(track.get(key, ""))
  85. else:
  86. replacement = slugify_preset(track.get(key, ""))
  87. path = path.replace(f'%{{{token}}}', replacement)
  88. if self.config.base_dir is not None:
  89. path = f"{self.config.base_dir}/{path}.mp3"
  90. else:
  91. path = f"{path}.mp3"
  92. self.logger.debug(" filepath/trackname generated..")
  93. self.logger.debug(f"\n\tPath: {path}")
  94. return path
  95. def create_directory(self, filename: str) -> str:
  96. """Create directory based on filename if it doesn't exist
  97. :param filename: full filename
  98. :return: directory path
  99. """
  100. directory = os.path.dirname(filename)
  101. self.logger.debug(f" Directory:\n\t{directory}")
  102. self.logger.debug(" Directory doesn't exist, creating..")
  103. if not os.path.exists(directory):
  104. os.makedirs(directory)
  105. return directory
  106. def download_album(self, album: dict) -> bool:
  107. """Download all MP3 files in the album
  108. :param album: album dict
  109. :return: True if successful
  110. """
  111. for track_index, track in enumerate(album['tracks']):
  112. track_meta = {"artist": track['artist'],
  113. "albumartist": album['artist'],
  114. "label": album['label'],
  115. "album": album['title'],
  116. "title": track['title'].replace(f"{track['artist']} - ", "", 1),
  117. "track": track['track'],
  118. "track_id": track['track_id'],
  119. "album_id": album['album_id'],
  120. # TODO: Find out why the 'lyrics' key seems to vanish.
  121. "lyrics": track.get('lyrics', ""),
  122. "date": album['date'],
  123. "url": album['url'],
  124. "genres": album['genres']}
  125. path_meta = track_meta.copy()
  126. if self.config.truncate_album > 0 and len(path_meta['album']) > self.config.truncate_album:
  127. path_meta['album'] = path_meta['album'][:self.config.truncate_album]
  128. if self.config.truncate_track > 0 and len(path_meta['title']) > self.config.truncate_track:
  129. path_meta['title'] = path_meta['title'][:self.config.truncate_track]
  130. self.num_tracks = len(album['tracks'])
  131. self.track_num = track_index + 1
  132. filepath = self.template_to_path(path_meta, self.config.ascii_only,
  133. self.config.ok_chars, self.config.space_char,
  134. self.config.keep_spaces, self.config.case_mode)
  135. filepath = filepath + ".tmp"
  136. filename = filepath.rsplit('/', 1)[1]
  137. dirname = self.create_directory(filepath)
  138. self.logger.debug(" Current file:\n\t%s", filepath)
  139. if album['art'] and not os.path.exists(dirname + "/cover.jpg"):
  140. try:
  141. with open(dirname + "/cover.jpg", "wb") as f:
  142. r = self.session.get(album['art'], headers=self.headers)
  143. f.write(r.content)
  144. self.album_art = dirname + "/cover.jpg"
  145. except Exception as e:
  146. print(e)
  147. print("Couldn't download album art.")
  148. attempts = 0
  149. skip = False
  150. while True:
  151. try:
  152. r = self.session.get(track['url'], headers=self.headers, stream=True)
  153. file_length = int(r.headers.get('content-length', 0))
  154. total = int(file_length / 100)
  155. # If file exists and is still a tmp file skip downloading and encode
  156. if os.path.exists(filepath):
  157. self.write_id3_tags(filepath, track_meta)
  158. # Set skip to True so that we don't try encoding again
  159. skip = True
  160. # break out of the try/except and move on to the next file
  161. break
  162. elif os.path.exists(filepath[:-4]) and self.config.overwrite is not True:
  163. print(f"File: {filename[:-4]} already exists and is complete, skipping..")
  164. skip = True
  165. break
  166. with open(filepath, "wb") as f:
  167. if file_length is None:
  168. f.write(r.content)
  169. else:
  170. dl = 0
  171. for data in r.iter_content(chunk_size=total):
  172. dl += len(data)
  173. f.write(data)
  174. if not self.config.debug:
  175. done = int(50 * dl / file_length)
  176. print_clean(f'\r({self.track_num}/{self.num_tracks}) '
  177. f'[{"=" * done}{" " * (50 - done)}] :: '
  178. f'Downloading: {filename[:-8]}')
  179. local_size = os.path.getsize(filepath)
  180. # if the local filesize before encoding doesn't match the remote filesize
  181. # redownload
  182. if local_size != file_length and attempts != 3:
  183. print(f"{filename} is incomplete, retrying..")
  184. continue
  185. # if the maximum number of retry attempts is reached give up and move on
  186. elif attempts == 3:
  187. print("Maximum retries reached.. skipping.")
  188. # Clean up incomplete file
  189. os.remove(filepath)
  190. break
  191. # if all is well continue the download process for the rest of the tracks
  192. else:
  193. break
  194. except Exception as e:
  195. print(e)
  196. print("Downloading failed..")
  197. return False
  198. if skip is False:
  199. self.write_id3_tags(filepath, track_meta)
  200. if os.path.isfile(f"{self.config.base_dir}/{__version__}.not.finished"):
  201. os.remove(f"{self.config.base_dir}/{__version__}.not.finished")
  202. # Remove album art image as it is embedded
  203. if self.config.embed_art and hasattr(self, "album_art"):
  204. os.remove(self.album_art)
  205. return True
  206. def write_id3_tags(self, filepath: str, meta: dict):
  207. """Write metadata to the MP3 file
  208. :param filepath: name of mp3 file
  209. :param meta: dict of track metadata
  210. """
  211. self.logger.debug(" Encoding process starting..")
  212. filename = filepath.rsplit('/', 1)[1][:-8]
  213. if not self.config.debug:
  214. print_clean(f'\r({self.track_num}/{self.num_tracks}) [{"=" * 50}] '
  215. f':: Encoding: {filename}')
  216. audio = mp3.MP3(filepath)
  217. audio.delete()
  218. audio["TIT2"] = id3._frames.TIT2(encoding=3, text=["title"])
  219. audio["WOAF"] = id3._frames.WOAF(url=meta["url"])
  220. audio.save(filename=None, v1=2)
  221. audio = mp3.MP3(filepath)
  222. if self.config.group and 'label' in meta:
  223. audio["TIT1"] = id3._frames.TIT1(encoding=3, text=meta["label"])
  224. if self.config.embed_lyrics:
  225. audio["USLT"] = id3._frames.USLT(encoding=3, lang='eng', desc='', text=meta['lyrics'])
  226. if self.config.embed_art:
  227. with open(self.album_art, 'rb') as cover_img:
  228. cover_bytes = cover_img.read()
  229. audio["APIC"] = id3._frames.APIC(encoding=3, mime='image/jpeg', type=3,
  230. desc='Cover', data=cover_bytes)
  231. if self.config.embed_genres:
  232. audio["TCON"] = id3._frames.TCON(encoding=3, text=meta['genres'])
  233. audio.save()
  234. audio = mp3.EasyMP3(filepath)
  235. if meta['track'].isdigit():
  236. audio["tracknumber"] = meta['track']
  237. else:
  238. audio["tracknumber"] = '1'
  239. if meta['artist'] is not None:
  240. audio["artist"] = meta['artist']
  241. else:
  242. audio["artist"] = meta['albumartist']
  243. audio["title"] = meta["title"]
  244. audio["albumartist"] = meta['albumartist']
  245. audio["album"] = meta['album']
  246. audio["date"] = meta["date"]
  247. audio.save()
  248. self.logger.debug(" Encoding process finished..")
  249. self.logger.debug(" Renaming:\n\t%s -to-> %s", filepath, filepath[:-4])
  250. try:
  251. os.rename(filepath, filepath[:-4])
  252. except WindowsError:
  253. os.remove(filepath[:-4])
  254. os.rename(filepath, filepath[:-4])
  255. if self.config.debug:
  256. return
  257. print_clean(f'\r({self.track_num}/{self.num_tracks}) [{"=" * 50}] :: Finished: {filename}')