download-manager.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. /**
  2. * @fileoverview Download Manager for parallel video downloads
  3. * Handles concurrent download queue with optimal CPU utilization
  4. * @author GrabZilla Development Team
  5. * @version 2.1.0
  6. */
  7. const os = require('os')
  8. const EventEmitter = require('events')
  9. // Priority levels for download queue
  10. const PRIORITY = {
  11. HIGH: 3,
  12. NORMAL: 2,
  13. LOW: 1
  14. }
  15. /**
  16. * Download Manager
  17. * Manages concurrent video downloads with worker pool
  18. */
  19. class DownloadManager extends EventEmitter {
  20. constructor(options = {}) {
  21. super()
  22. // Detect CPU cores and set optimal concurrency
  23. const cpuCount = os.cpus().length
  24. const platform = os.platform()
  25. const arch = os.arch()
  26. // Apple Silicon optimization
  27. const isAppleSilicon = platform === 'darwin' && arch === 'arm64'
  28. // Calculate optimal concurrency
  29. // For Apple Silicon: Use 50% of cores (M-series have performance+efficiency cores)
  30. // For other systems: Use 75% of cores
  31. const optimalConcurrency = isAppleSilicon
  32. ? Math.max(2, Math.floor(cpuCount * 0.5))
  33. : Math.max(2, Math.floor(cpuCount * 0.75))
  34. this.maxConcurrent = options.maxConcurrent || optimalConcurrency
  35. this.maxRetries = options.maxRetries || 3
  36. this.activeDownloads = new Map() // videoId -> download info
  37. this.activeProcesses = new Map() // videoId -> child process
  38. this.queuedDownloads = [] // Array of pending download requests
  39. this.downloadHistory = new Map() // Track completed downloads
  40. console.log(`📦 DownloadManager initialized:`)
  41. console.log(` Platform: ${platform} ${arch}`)
  42. console.log(` CPU Cores: ${cpuCount}`)
  43. console.log(` Max Concurrent: ${this.maxConcurrent}`)
  44. console.log(` Max Retries: ${this.maxRetries}`)
  45. console.log(` Apple Silicon: ${isAppleSilicon}`)
  46. }
  47. /**
  48. * Get current queue statistics
  49. */
  50. getStats() {
  51. return {
  52. active: this.activeDownloads.size,
  53. queued: this.queuedDownloads.length,
  54. maxConcurrent: this.maxConcurrent,
  55. completed: this.downloadHistory.size,
  56. canAcceptMore: this.activeDownloads.size < this.maxConcurrent
  57. }
  58. }
  59. /**
  60. * Add download to queue
  61. * @param {Object} downloadRequest - Download request object
  62. * @param {number} priority - Priority level (PRIORITY.HIGH/NORMAL/LOW)
  63. * @returns {Promise} Resolves when download completes
  64. */
  65. async addDownload(downloadRequest, priority = PRIORITY.NORMAL) {
  66. const { videoId, url, quality, format, savePath, cookieFile, downloadFn } = downloadRequest
  67. // Check if already downloading or queued
  68. if (this.activeDownloads.has(videoId)) {
  69. throw new Error(`Video ${videoId} is already being downloaded`)
  70. }
  71. if (this.queuedDownloads.find(req => req.videoId === videoId)) {
  72. throw new Error(`Video ${videoId} is already in queue`)
  73. }
  74. return new Promise((resolve, reject) => {
  75. const request = {
  76. videoId,
  77. url,
  78. quality,
  79. format,
  80. savePath,
  81. cookieFile,
  82. downloadFn,
  83. resolve,
  84. reject,
  85. priority,
  86. addedAt: Date.now(),
  87. retryCount: 0
  88. }
  89. this.queuedDownloads.push(request)
  90. this.sortQueue()
  91. this.emit('queueUpdated', this.getStats())
  92. // Try to process queue immediately
  93. this.processQueue()
  94. })
  95. }
  96. /**
  97. * Sort queue by priority and then by addedAt
  98. * @private
  99. */
  100. sortQueue() {
  101. this.queuedDownloads.sort((a, b) => {
  102. // Sort by priority first (higher priority first)
  103. if (b.priority !== a.priority) {
  104. return b.priority - a.priority
  105. }
  106. // Then by addedAt (older first)
  107. return a.addedAt - b.addedAt
  108. })
  109. }
  110. /**
  111. * Set priority for a queued download
  112. * @param {string} videoId - Video ID
  113. * @param {number} priority - New priority level
  114. * @returns {boolean} Success status
  115. */
  116. setPriority(videoId, priority) {
  117. const request = this.queuedDownloads.find(r => r.videoId === videoId)
  118. if (request) {
  119. request.priority = priority
  120. this.sortQueue()
  121. this.emit('queueUpdated', this.getStats())
  122. return true
  123. }
  124. return false
  125. }
  126. /**
  127. * Process download queue
  128. * Starts downloads up to maxConcurrent limit
  129. */
  130. async processQueue() {
  131. // Check if we can start more downloads
  132. while (this.activeDownloads.size < this.maxConcurrent && this.queuedDownloads.length > 0) {
  133. const request = this.queuedDownloads.shift()
  134. this.startDownload(request)
  135. }
  136. }
  137. /**
  138. * Start a single download
  139. */
  140. async startDownload(request) {
  141. const { videoId, url, quality, format, savePath, cookieFile, downloadFn, resolve, reject, retryCount } = request
  142. // Mark as active
  143. const downloadInfo = {
  144. videoId,
  145. url,
  146. startedAt: Date.now(),
  147. progress: 0,
  148. status: 'downloading',
  149. retryCount: retryCount || 0
  150. }
  151. this.activeDownloads.set(videoId, downloadInfo)
  152. this.emit('downloadStarted', { videoId, ...downloadInfo })
  153. this.emit('queueUpdated', this.getStats())
  154. try {
  155. console.log(`🚀 Starting download ${this.activeDownloads.size}/${this.maxConcurrent}: ${videoId}${retryCount ? ` (retry ${retryCount}/${this.maxRetries})` : ''}`)
  156. // Call the actual download function with callbacks
  157. const result = await downloadFn({
  158. url,
  159. quality,
  160. format,
  161. savePath,
  162. cookieFile,
  163. onProcess: (process) => {
  164. // Store process reference for cancellation
  165. this.activeProcesses.set(videoId, process)
  166. },
  167. onProgress: (progressData) => {
  168. // Update download info and emit progress
  169. if (downloadInfo) {
  170. downloadInfo.progress = progressData.progress || 0
  171. downloadInfo.speed = progressData.speed
  172. downloadInfo.eta = progressData.eta
  173. this.emit('downloadProgress', { videoId, ...progressData })
  174. }
  175. }
  176. })
  177. // Download completed successfully
  178. this.handleDownloadComplete(videoId, result, resolve)
  179. } catch (error) {
  180. // Check if error is retryable and we haven't exceeded max retries
  181. if (retryCount < this.maxRetries && this.isRetryableError(error)) {
  182. console.log(`🔄 Retrying download (${retryCount + 1}/${this.maxRetries}): ${videoId}`)
  183. // Remove from active
  184. this.activeDownloads.delete(videoId)
  185. this.activeProcesses.delete(videoId)
  186. // Update retry count and re-queue with exponential backoff
  187. request.retryCount = retryCount + 1
  188. request.lastError = error.message
  189. setTimeout(() => {
  190. // Add to front of queue with same priority
  191. this.queuedDownloads.unshift(request)
  192. this.emit('queueUpdated', this.getStats())
  193. this.processQueue()
  194. }, Math.pow(2, retryCount) * 1000) // 1s, 2s, 4s backoff
  195. } else {
  196. // Max retries exceeded or non-retryable error
  197. this.handleDownloadError(videoId, error, reject)
  198. }
  199. }
  200. }
  201. /**
  202. * Check if error is retryable
  203. * @param {Error} error - Error object
  204. * @returns {boolean} True if error is retryable
  205. * @private
  206. */
  207. isRetryableError(error) {
  208. const retryablePatterns = [
  209. /network/i,
  210. /timeout/i,
  211. /ECONNRESET/i,
  212. /ETIMEDOUT/i,
  213. /ENOTFOUND/i,
  214. /ECONNREFUSED/i,
  215. /socket hang up/i,
  216. /503/i,
  217. /502/i,
  218. /504/i
  219. ]
  220. return retryablePatterns.some(pattern => pattern.test(error.message))
  221. }
  222. /**
  223. * Handle download completion
  224. */
  225. handleDownloadComplete(videoId, result, resolve) {
  226. const downloadInfo = this.activeDownloads.get(videoId)
  227. if (downloadInfo) {
  228. downloadInfo.status = 'completed'
  229. downloadInfo.completedAt = Date.now()
  230. downloadInfo.duration = downloadInfo.completedAt - downloadInfo.startedAt
  231. downloadInfo.result = result
  232. // Move to history
  233. this.downloadHistory.set(videoId, downloadInfo)
  234. this.activeDownloads.delete(videoId)
  235. // Clean up process reference
  236. this.activeProcesses.delete(videoId)
  237. console.log(`✅ Download completed: ${videoId} (${(downloadInfo.duration / 1000).toFixed(1)}s)`)
  238. this.emit('downloadCompleted', { videoId, result, duration: downloadInfo.duration })
  239. this.emit('queueUpdated', this.getStats())
  240. resolve(result)
  241. }
  242. // Process next in queue
  243. this.processQueue()
  244. }
  245. /**
  246. * Handle download error
  247. */
  248. handleDownloadError(videoId, error, reject) {
  249. const downloadInfo = this.activeDownloads.get(videoId)
  250. if (downloadInfo) {
  251. downloadInfo.status = 'error'
  252. downloadInfo.error = error.message
  253. downloadInfo.completedAt = Date.now()
  254. downloadInfo.duration = downloadInfo.completedAt - downloadInfo.startedAt
  255. // Move to history
  256. this.downloadHistory.set(videoId, downloadInfo)
  257. this.activeDownloads.delete(videoId)
  258. // Clean up process reference
  259. this.activeProcesses.delete(videoId)
  260. console.error(`❌ Download failed: ${videoId} - ${error.message}`)
  261. this.emit('downloadFailed', { videoId, error: error.message })
  262. this.emit('queueUpdated', this.getStats())
  263. reject(error)
  264. }
  265. // Process next in queue
  266. this.processQueue()
  267. }
  268. /**
  269. * Cancel a specific download
  270. * @param {string} videoId - Video ID to cancel
  271. * @returns {boolean} Success status
  272. */
  273. cancelDownload(videoId) {
  274. // Try to cancel active download first
  275. if (this.activeDownloads.has(videoId)) {
  276. const process = this.activeProcesses.get(videoId)
  277. if (process && !process.killed) {
  278. try {
  279. // Try graceful termination first
  280. process.kill('SIGTERM')
  281. // Force kill after 5 seconds if still running
  282. setTimeout(() => {
  283. if (!process.killed) {
  284. process.kill('SIGKILL')
  285. }
  286. }, 5000)
  287. console.log(`🛑 Cancelled active download: ${videoId}`)
  288. // Clean up
  289. const downloadInfo = this.activeDownloads.get(videoId)
  290. if (downloadInfo) {
  291. downloadInfo.status = 'cancelled'
  292. downloadInfo.error = 'Cancelled by user'
  293. this.downloadHistory.set(videoId, downloadInfo)
  294. }
  295. this.activeDownloads.delete(videoId)
  296. this.activeProcesses.delete(videoId)
  297. this.emit('downloadCancelled', { videoId })
  298. this.emit('queueUpdated', this.getStats())
  299. // Process next in queue
  300. this.processQueue()
  301. return true
  302. } catch (error) {
  303. console.error(`Error cancelling download ${videoId}:`, error)
  304. return false
  305. }
  306. }
  307. }
  308. // Remove from queue if present
  309. const queueIndex = this.queuedDownloads.findIndex(req => req.videoId === videoId)
  310. if (queueIndex !== -1) {
  311. const request = this.queuedDownloads.splice(queueIndex, 1)[0]
  312. request.reject(new Error('Download cancelled by user'))
  313. console.log(`🛑 Removed from queue: ${videoId}`)
  314. this.emit('queueUpdated', this.getStats())
  315. return true
  316. }
  317. return false
  318. }
  319. /**
  320. * Cancel all downloads (both active and queued)
  321. * @returns {Object} Cancellation results
  322. */
  323. cancelAll() {
  324. let cancelledActive = 0
  325. let cancelledQueued = 0
  326. // Cancel all active downloads
  327. for (const [videoId, process] of this.activeProcesses.entries()) {
  328. if (process && !process.killed) {
  329. try {
  330. process.kill('SIGTERM')
  331. setTimeout(() => {
  332. if (!process.killed) process.kill('SIGKILL')
  333. }, 5000)
  334. const downloadInfo = this.activeDownloads.get(videoId)
  335. if (downloadInfo) {
  336. downloadInfo.status = 'cancelled'
  337. downloadInfo.error = 'Cancelled by user'
  338. this.downloadHistory.set(videoId, downloadInfo)
  339. }
  340. cancelledActive++
  341. } catch (error) {
  342. console.error(`Error cancelling ${videoId}:`, error)
  343. }
  344. }
  345. }
  346. // Clear active downloads and processes
  347. this.activeDownloads.clear()
  348. this.activeProcesses.clear()
  349. // Cancel all queued downloads
  350. cancelledQueued = this.queuedDownloads.length
  351. this.queuedDownloads.forEach(request => {
  352. request.reject(new Error('Download cancelled by user'))
  353. })
  354. this.queuedDownloads = []
  355. this.emit('queueUpdated', this.getStats())
  356. console.log(`🛑 Cancelled ${cancelledActive} active and ${cancelledQueued} queued downloads`)
  357. return {
  358. cancelledActive,
  359. cancelledQueued,
  360. total: cancelledActive + cancelledQueued
  361. }
  362. }
  363. /**
  364. * Clear download history
  365. */
  366. clearHistory() {
  367. const count = this.downloadHistory.size
  368. this.downloadHistory.clear()
  369. console.log(`🗑️ Cleared ${count} download history entries`)
  370. }
  371. /**
  372. * Get download info
  373. */
  374. getDownloadInfo(videoId) {
  375. return this.activeDownloads.get(videoId) ||
  376. this.downloadHistory.get(videoId) ||
  377. this.queuedDownloads.find(req => req.videoId === videoId)
  378. }
  379. /**
  380. * Check if video is downloading or queued
  381. */
  382. isDownloading(videoId) {
  383. return this.activeDownloads.has(videoId) ||
  384. this.queuedDownloads.some(req => req.videoId === videoId)
  385. }
  386. }
  387. module.exports = DownloadManager
  388. module.exports.PRIORITY = PRIORITY