| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- /**
- * @fileoverview Download Manager for parallel video downloads
- * Handles concurrent download queue with optimal CPU utilization
- * @author GrabZilla Development Team
- * @version 2.1.0
- */
- const os = require('os')
- const EventEmitter = require('events')
- // Priority levels for download queue
- const PRIORITY = {
- HIGH: 3,
- NORMAL: 2,
- LOW: 1
- }
- /**
- * Download Manager
- * Manages concurrent video downloads with worker pool
- */
- class DownloadManager extends EventEmitter {
- constructor(options = {}) {
- super()
- // Detect CPU cores and set optimal concurrency
- const cpuCount = os.cpus().length
- const platform = os.platform()
- const arch = os.arch()
- // Apple Silicon optimization
- const isAppleSilicon = platform === 'darwin' && arch === 'arm64'
- // Calculate optimal concurrency
- // For Apple Silicon: Use 50% of cores (M-series have performance+efficiency cores)
- // For other systems: Use 75% of cores
- const optimalConcurrency = isAppleSilicon
- ? Math.max(2, Math.floor(cpuCount * 0.5))
- : Math.max(2, Math.floor(cpuCount * 0.75))
- this.maxConcurrent = options.maxConcurrent || optimalConcurrency
- this.maxRetries = options.maxRetries || 3
- this.activeDownloads = new Map() // videoId -> download info
- this.activeProcesses = new Map() // videoId -> child process
- this.queuedDownloads = [] // Array of pending download requests
- this.downloadHistory = new Map() // Track completed downloads
- console.log(`📦 DownloadManager initialized:`)
- console.log(` Platform: ${platform} ${arch}`)
- console.log(` CPU Cores: ${cpuCount}`)
- console.log(` Max Concurrent: ${this.maxConcurrent}`)
- console.log(` Max Retries: ${this.maxRetries}`)
- console.log(` Apple Silicon: ${isAppleSilicon}`)
- }
- /**
- * Get current queue statistics
- */
- getStats() {
- return {
- active: this.activeDownloads.size,
- queued: this.queuedDownloads.length,
- maxConcurrent: this.maxConcurrent,
- completed: this.downloadHistory.size,
- canAcceptMore: this.activeDownloads.size < this.maxConcurrent
- }
- }
- /**
- * Add download to queue
- * @param {Object} downloadRequest - Download request object
- * @param {number} priority - Priority level (PRIORITY.HIGH/NORMAL/LOW)
- * @returns {Promise} Resolves when download completes
- */
- async addDownload(downloadRequest, priority = PRIORITY.NORMAL) {
- const { videoId, url, quality, format, savePath, cookieFile, downloadFn } = downloadRequest
- // Check if already downloading or queued
- if (this.activeDownloads.has(videoId)) {
- throw new Error(`Video ${videoId} is already being downloaded`)
- }
- if (this.queuedDownloads.find(req => req.videoId === videoId)) {
- throw new Error(`Video ${videoId} is already in queue`)
- }
- return new Promise((resolve, reject) => {
- const request = {
- videoId,
- url,
- quality,
- format,
- savePath,
- cookieFile,
- downloadFn,
- resolve,
- reject,
- priority,
- addedAt: Date.now(),
- retryCount: 0
- }
- this.queuedDownloads.push(request)
- this.sortQueue()
- this.emit('queueUpdated', this.getStats())
- // Try to process queue immediately
- this.processQueue()
- })
- }
- /**
- * Sort queue by priority and then by addedAt
- * @private
- */
- sortQueue() {
- this.queuedDownloads.sort((a, b) => {
- // Sort by priority first (higher priority first)
- if (b.priority !== a.priority) {
- return b.priority - a.priority
- }
- // Then by addedAt (older first)
- return a.addedAt - b.addedAt
- })
- }
- /**
- * Set priority for a queued download
- * @param {string} videoId - Video ID
- * @param {number} priority - New priority level
- * @returns {boolean} Success status
- */
- setPriority(videoId, priority) {
- const request = this.queuedDownloads.find(r => r.videoId === videoId)
- if (request) {
- request.priority = priority
- this.sortQueue()
- this.emit('queueUpdated', this.getStats())
- return true
- }
- return false
- }
- /**
- * Process download queue
- * Starts downloads up to maxConcurrent limit
- */
- async processQueue() {
- // Check if we can start more downloads
- while (this.activeDownloads.size < this.maxConcurrent && this.queuedDownloads.length > 0) {
- const request = this.queuedDownloads.shift()
- this.startDownload(request)
- }
- }
- /**
- * Start a single download
- */
- async startDownload(request) {
- const { videoId, url, quality, format, savePath, cookieFile, downloadFn, resolve, reject, retryCount } = request
- // Mark as active
- const downloadInfo = {
- videoId,
- url,
- startedAt: Date.now(),
- progress: 0,
- status: 'downloading',
- retryCount: retryCount || 0
- }
- this.activeDownloads.set(videoId, downloadInfo)
- this.emit('downloadStarted', { videoId, ...downloadInfo })
- this.emit('queueUpdated', this.getStats())
- try {
- console.log(`🚀 Starting download ${this.activeDownloads.size}/${this.maxConcurrent}: ${videoId}${retryCount ? ` (retry ${retryCount}/${this.maxRetries})` : ''}`)
- // Call the actual download function with callbacks
- const result = await downloadFn({
- url,
- quality,
- format,
- savePath,
- cookieFile,
- onProcess: (process) => {
- // Store process reference for cancellation
- this.activeProcesses.set(videoId, process)
- },
- onProgress: (progressData) => {
- // Update download info and emit progress
- if (downloadInfo) {
- downloadInfo.progress = progressData.progress || 0
- downloadInfo.speed = progressData.speed
- downloadInfo.eta = progressData.eta
- this.emit('downloadProgress', { videoId, ...progressData })
- }
- }
- })
- // Download completed successfully
- this.handleDownloadComplete(videoId, result, resolve)
- } catch (error) {
- // Check if error is retryable and we haven't exceeded max retries
- if (retryCount < this.maxRetries && this.isRetryableError(error)) {
- console.log(`🔄 Retrying download (${retryCount + 1}/${this.maxRetries}): ${videoId}`)
- // Remove from active
- this.activeDownloads.delete(videoId)
- this.activeProcesses.delete(videoId)
- // Update retry count and re-queue with exponential backoff
- request.retryCount = retryCount + 1
- request.lastError = error.message
- setTimeout(() => {
- // Add to front of queue with same priority
- this.queuedDownloads.unshift(request)
- this.emit('queueUpdated', this.getStats())
- this.processQueue()
- }, Math.pow(2, retryCount) * 1000) // 1s, 2s, 4s backoff
- } else {
- // Max retries exceeded or non-retryable error
- this.handleDownloadError(videoId, error, reject)
- }
- }
- }
- /**
- * Check if error is retryable
- * @param {Error} error - Error object
- * @returns {boolean} True if error is retryable
- * @private
- */
- isRetryableError(error) {
- const retryablePatterns = [
- /network/i,
- /timeout/i,
- /ECONNRESET/i,
- /ETIMEDOUT/i,
- /ENOTFOUND/i,
- /ECONNREFUSED/i,
- /socket hang up/i,
- /503/i,
- /502/i,
- /504/i
- ]
- return retryablePatterns.some(pattern => pattern.test(error.message))
- }
- /**
- * Handle download completion
- */
- handleDownloadComplete(videoId, result, resolve) {
- const downloadInfo = this.activeDownloads.get(videoId)
- if (downloadInfo) {
- downloadInfo.status = 'completed'
- downloadInfo.completedAt = Date.now()
- downloadInfo.duration = downloadInfo.completedAt - downloadInfo.startedAt
- downloadInfo.result = result
- // Move to history
- this.downloadHistory.set(videoId, downloadInfo)
- this.activeDownloads.delete(videoId)
- // Clean up process reference
- this.activeProcesses.delete(videoId)
- console.log(`✅ Download completed: ${videoId} (${(downloadInfo.duration / 1000).toFixed(1)}s)`)
- this.emit('downloadCompleted', { videoId, result, duration: downloadInfo.duration })
- this.emit('queueUpdated', this.getStats())
- resolve(result)
- }
- // Process next in queue
- this.processQueue()
- }
- /**
- * Handle download error
- */
- handleDownloadError(videoId, error, reject) {
- const downloadInfo = this.activeDownloads.get(videoId)
- if (downloadInfo) {
- downloadInfo.status = 'error'
- downloadInfo.error = error.message
- downloadInfo.completedAt = Date.now()
- downloadInfo.duration = downloadInfo.completedAt - downloadInfo.startedAt
- // Move to history
- this.downloadHistory.set(videoId, downloadInfo)
- this.activeDownloads.delete(videoId)
- // Clean up process reference
- this.activeProcesses.delete(videoId)
- console.error(`❌ Download failed: ${videoId} - ${error.message}`)
- this.emit('downloadFailed', { videoId, error: error.message })
- this.emit('queueUpdated', this.getStats())
- reject(error)
- }
- // Process next in queue
- this.processQueue()
- }
- /**
- * Cancel a specific download
- * @param {string} videoId - Video ID to cancel
- * @returns {boolean} Success status
- */
- cancelDownload(videoId) {
- // Try to cancel active download first
- if (this.activeDownloads.has(videoId)) {
- const process = this.activeProcesses.get(videoId)
- if (process && !process.killed) {
- try {
- // Try graceful termination first
- process.kill('SIGTERM')
- // Force kill after 5 seconds if still running
- setTimeout(() => {
- if (!process.killed) {
- process.kill('SIGKILL')
- }
- }, 5000)
- console.log(`🛑 Cancelled active download: ${videoId}`)
- // Clean up
- const downloadInfo = this.activeDownloads.get(videoId)
- if (downloadInfo) {
- downloadInfo.status = 'cancelled'
- downloadInfo.error = 'Cancelled by user'
- this.downloadHistory.set(videoId, downloadInfo)
- }
- this.activeDownloads.delete(videoId)
- this.activeProcesses.delete(videoId)
- this.emit('downloadCancelled', { videoId })
- this.emit('queueUpdated', this.getStats())
- // Process next in queue
- this.processQueue()
- return true
- } catch (error) {
- console.error(`Error cancelling download ${videoId}:`, error)
- return false
- }
- }
- }
- // Remove from queue if present
- const queueIndex = this.queuedDownloads.findIndex(req => req.videoId === videoId)
- if (queueIndex !== -1) {
- const request = this.queuedDownloads.splice(queueIndex, 1)[0]
- request.reject(new Error('Download cancelled by user'))
- console.log(`🛑 Removed from queue: ${videoId}`)
- this.emit('queueUpdated', this.getStats())
- return true
- }
- return false
- }
- /**
- * Cancel all downloads (both active and queued)
- * @returns {Object} Cancellation results
- */
- cancelAll() {
- let cancelledActive = 0
- let cancelledQueued = 0
- // Cancel all active downloads
- for (const [videoId, process] of this.activeProcesses.entries()) {
- if (process && !process.killed) {
- try {
- process.kill('SIGTERM')
- setTimeout(() => {
- if (!process.killed) process.kill('SIGKILL')
- }, 5000)
- const downloadInfo = this.activeDownloads.get(videoId)
- if (downloadInfo) {
- downloadInfo.status = 'cancelled'
- downloadInfo.error = 'Cancelled by user'
- this.downloadHistory.set(videoId, downloadInfo)
- }
- cancelledActive++
- } catch (error) {
- console.error(`Error cancelling ${videoId}:`, error)
- }
- }
- }
- // Clear active downloads and processes
- this.activeDownloads.clear()
- this.activeProcesses.clear()
- // Cancel all queued downloads
- cancelledQueued = this.queuedDownloads.length
- this.queuedDownloads.forEach(request => {
- request.reject(new Error('Download cancelled by user'))
- })
- this.queuedDownloads = []
- this.emit('queueUpdated', this.getStats())
- console.log(`🛑 Cancelled ${cancelledActive} active and ${cancelledQueued} queued downloads`)
- return {
- cancelledActive,
- cancelledQueued,
- total: cancelledActive + cancelledQueued
- }
- }
- /**
- * Clear download history
- */
- clearHistory() {
- const count = this.downloadHistory.size
- this.downloadHistory.clear()
- console.log(`🗑️ Cleared ${count} download history entries`)
- }
- /**
- * Get download info
- */
- getDownloadInfo(videoId) {
- return this.activeDownloads.get(videoId) ||
- this.downloadHistory.get(videoId) ||
- this.queuedDownloads.find(req => req.videoId === videoId)
- }
- /**
- * Check if video is downloading or queued
- */
- isDownloading(videoId) {
- return this.activeDownloads.has(videoId) ||
- this.queuedDownloads.some(req => req.videoId === videoId)
- }
- }
- module.exports = DownloadManager
- module.exports.PRIORITY = PRIORITY
|