task_logger.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. package tasks
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "sort"
  9. "sync"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  13. )
  14. // TaskLogger provides file-based logging for individual tasks
  15. type TaskLogger interface {
  16. // Log methods
  17. Info(message string, args ...interface{})
  18. Warning(message string, args ...interface{})
  19. Error(message string, args ...interface{})
  20. Debug(message string, args ...interface{})
  21. // Progress and status logging
  22. LogProgress(progress float64, message string)
  23. LogStatus(status string, message string)
  24. // Structured logging
  25. LogWithFields(level string, message string, fields map[string]interface{})
  26. // Lifecycle
  27. Close() error
  28. GetLogDir() string
  29. }
  30. // LoggerProvider interface for tasks that support logging
  31. type LoggerProvider interface {
  32. InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error
  33. GetTaskLogger() TaskLogger
  34. }
  35. // TaskLoggerConfig holds configuration for task logging
  36. type TaskLoggerConfig struct {
  37. BaseLogDir string
  38. MaxTasks int // Maximum number of task logs to keep
  39. MaxLogSizeMB int // Maximum log file size in MB
  40. EnableConsole bool // Also log to console
  41. }
  42. // FileTaskLogger implements TaskLogger using file-based logging
  43. type FileTaskLogger struct {
  44. taskID string
  45. taskType types.TaskType
  46. workerID string
  47. logDir string
  48. logFile *os.File
  49. mutex sync.Mutex
  50. config TaskLoggerConfig
  51. metadata *TaskLogMetadata
  52. closed bool
  53. }
  54. // TaskLogMetadata contains metadata about the task execution
  55. type TaskLogMetadata struct {
  56. TaskID string `json:"task_id"`
  57. TaskType string `json:"task_type"`
  58. WorkerID string `json:"worker_id"`
  59. StartTime time.Time `json:"start_time"`
  60. EndTime *time.Time `json:"end_time,omitempty"`
  61. Duration *time.Duration `json:"duration,omitempty"`
  62. Status string `json:"status"`
  63. Progress float64 `json:"progress"`
  64. VolumeID uint32 `json:"volume_id,omitempty"`
  65. Server string `json:"server,omitempty"`
  66. Collection string `json:"collection,omitempty"`
  67. CustomData map[string]interface{} `json:"custom_data,omitempty"`
  68. LogFilePath string `json:"log_file_path"`
  69. CreatedAt time.Time `json:"created_at"`
  70. }
  71. // TaskLogEntry represents a single log entry
  72. type TaskLogEntry struct {
  73. Timestamp time.Time `json:"timestamp"`
  74. Level string `json:"level"`
  75. Message string `json:"message"`
  76. Fields map[string]interface{} `json:"fields,omitempty"`
  77. Progress *float64 `json:"progress,omitempty"`
  78. Status *string `json:"status,omitempty"`
  79. }
  80. // DefaultTaskLoggerConfig returns default configuration
  81. func DefaultTaskLoggerConfig() TaskLoggerConfig {
  82. return TaskLoggerConfig{
  83. BaseLogDir: "/data/task_logs", // Use persistent data directory
  84. MaxTasks: 100, // Keep last 100 task logs
  85. MaxLogSizeMB: 10,
  86. EnableConsole: true,
  87. }
  88. }
  89. // NewTaskLogger creates a new file-based task logger
  90. func NewTaskLogger(taskID string, taskType types.TaskType, workerID string, params types.TaskParams, config TaskLoggerConfig) (TaskLogger, error) {
  91. // Create unique directory name with timestamp
  92. timestamp := time.Now().Format("20060102_150405")
  93. dirName := fmt.Sprintf("%s_%s_%s_%s", taskID, taskType, workerID, timestamp)
  94. logDir := filepath.Join(config.BaseLogDir, dirName)
  95. // Create log directory
  96. if err := os.MkdirAll(logDir, 0755); err != nil {
  97. return nil, fmt.Errorf("failed to create log directory %s: %w", logDir, err)
  98. }
  99. // Create log file
  100. logFilePath := filepath.Join(logDir, "task.log")
  101. logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
  102. if err != nil {
  103. return nil, fmt.Errorf("failed to create log file %s: %w", logFilePath, err)
  104. }
  105. // Create metadata
  106. metadata := &TaskLogMetadata{
  107. TaskID: taskID,
  108. TaskType: string(taskType),
  109. WorkerID: workerID,
  110. StartTime: time.Now(),
  111. Status: "started",
  112. Progress: 0.0,
  113. VolumeID: params.VolumeID,
  114. Server: getServerFromSources(params.TypedParams.Sources),
  115. Collection: params.Collection,
  116. CustomData: make(map[string]interface{}),
  117. LogFilePath: logFilePath,
  118. CreatedAt: time.Now(),
  119. }
  120. logger := &FileTaskLogger{
  121. taskID: taskID,
  122. taskType: taskType,
  123. workerID: workerID,
  124. logDir: logDir,
  125. logFile: logFile,
  126. config: config,
  127. metadata: metadata,
  128. closed: false,
  129. }
  130. // Write initial log entry
  131. logger.Info("Task logger initialized for %s (type: %s, worker: %s)", taskID, taskType, workerID)
  132. logger.LogWithFields("INFO", "Task parameters", map[string]interface{}{
  133. "volume_id": params.VolumeID,
  134. "server": getServerFromSources(params.TypedParams.Sources),
  135. "collection": params.Collection,
  136. })
  137. // Save initial metadata
  138. if err := logger.saveMetadata(); err != nil {
  139. glog.Warningf("Failed to save initial task metadata: %v", err)
  140. }
  141. // Clean up old task logs
  142. go logger.cleanupOldLogs()
  143. return logger, nil
  144. }
  145. // Info logs an info message
  146. func (l *FileTaskLogger) Info(message string, args ...interface{}) {
  147. l.log("INFO", message, args...)
  148. }
  149. // Warning logs a warning message
  150. func (l *FileTaskLogger) Warning(message string, args ...interface{}) {
  151. l.log("WARNING", message, args...)
  152. }
  153. // Error logs an error message
  154. func (l *FileTaskLogger) Error(message string, args ...interface{}) {
  155. l.log("ERROR", message, args...)
  156. }
  157. // Debug logs a debug message
  158. func (l *FileTaskLogger) Debug(message string, args ...interface{}) {
  159. l.log("DEBUG", message, args...)
  160. }
  161. // LogProgress logs task progress
  162. func (l *FileTaskLogger) LogProgress(progress float64, message string) {
  163. l.mutex.Lock()
  164. l.metadata.Progress = progress
  165. l.mutex.Unlock()
  166. entry := TaskLogEntry{
  167. Timestamp: time.Now(),
  168. Level: "INFO",
  169. Message: message,
  170. Progress: &progress,
  171. }
  172. l.writeLogEntry(entry)
  173. l.saveMetadata() // Update metadata with new progress
  174. }
  175. // LogStatus logs task status change
  176. func (l *FileTaskLogger) LogStatus(status string, message string) {
  177. l.mutex.Lock()
  178. l.metadata.Status = status
  179. l.mutex.Unlock()
  180. entry := TaskLogEntry{
  181. Timestamp: time.Now(),
  182. Level: "INFO",
  183. Message: message,
  184. Status: &status,
  185. }
  186. l.writeLogEntry(entry)
  187. l.saveMetadata() // Update metadata with new status
  188. }
  189. // LogWithFields logs a message with structured fields
  190. func (l *FileTaskLogger) LogWithFields(level string, message string, fields map[string]interface{}) {
  191. entry := TaskLogEntry{
  192. Timestamp: time.Now(),
  193. Level: level,
  194. Message: message,
  195. Fields: fields,
  196. }
  197. l.writeLogEntry(entry)
  198. }
  199. // Close closes the logger and finalizes metadata
  200. func (l *FileTaskLogger) Close() error {
  201. l.mutex.Lock()
  202. defer l.mutex.Unlock()
  203. if l.closed {
  204. return nil
  205. }
  206. // Finalize metadata
  207. endTime := time.Now()
  208. duration := endTime.Sub(l.metadata.StartTime)
  209. l.metadata.EndTime = &endTime
  210. l.metadata.Duration = &duration
  211. if l.metadata.Status == "started" {
  212. l.metadata.Status = "completed"
  213. }
  214. // Save final metadata
  215. l.saveMetadata()
  216. // Close log file
  217. if l.logFile != nil {
  218. if err := l.logFile.Close(); err != nil {
  219. return fmt.Errorf("failed to close log file: %w", err)
  220. }
  221. }
  222. l.closed = true
  223. l.Info("Task logger closed for %s", l.taskID)
  224. return nil
  225. }
  226. // GetLogDir returns the log directory path
  227. func (l *FileTaskLogger) GetLogDir() string {
  228. return l.logDir
  229. }
  230. // log is the internal logging method
  231. func (l *FileTaskLogger) log(level string, message string, args ...interface{}) {
  232. formattedMessage := fmt.Sprintf(message, args...)
  233. entry := TaskLogEntry{
  234. Timestamp: time.Now(),
  235. Level: level,
  236. Message: formattedMessage,
  237. }
  238. l.writeLogEntry(entry)
  239. }
  240. // writeLogEntry writes a log entry to the file
  241. func (l *FileTaskLogger) writeLogEntry(entry TaskLogEntry) {
  242. l.mutex.Lock()
  243. defer l.mutex.Unlock()
  244. if l.closed || l.logFile == nil {
  245. return
  246. }
  247. // Format as JSON line
  248. jsonData, err := json.Marshal(entry)
  249. if err != nil {
  250. glog.Errorf("Failed to marshal log entry: %v", err)
  251. return
  252. }
  253. // Write to file
  254. if _, err := l.logFile.WriteString(string(jsonData) + "\n"); err != nil {
  255. glog.Errorf("Failed to write log entry: %v", err)
  256. return
  257. }
  258. // Flush to disk
  259. if err := l.logFile.Sync(); err != nil {
  260. glog.Errorf("Failed to sync log file: %v", err)
  261. }
  262. // Also log to console and stderr if enabled
  263. if l.config.EnableConsole {
  264. // Log to glog with proper call depth to show actual source location
  265. // We need depth 3 to skip: writeLogEntry -> log -> Info/Warning/Error calls to reach the original caller
  266. formattedMsg := fmt.Sprintf("[TASK-%s] %s: %s", l.taskID, entry.Level, entry.Message)
  267. switch entry.Level {
  268. case "ERROR":
  269. glog.ErrorDepth(3, formattedMsg)
  270. case "WARNING":
  271. glog.WarningDepth(3, formattedMsg)
  272. default: // INFO, DEBUG, etc.
  273. glog.InfoDepth(3, formattedMsg)
  274. }
  275. // Also log to stderr for immediate visibility
  276. fmt.Fprintf(os.Stderr, "[TASK-%s] %s: %s\n", l.taskID, entry.Level, entry.Message)
  277. }
  278. }
  279. // saveMetadata saves task metadata to file
  280. func (l *FileTaskLogger) saveMetadata() error {
  281. metadataPath := filepath.Join(l.logDir, "metadata.json")
  282. data, err := json.MarshalIndent(l.metadata, "", " ")
  283. if err != nil {
  284. return fmt.Errorf("failed to marshal metadata: %w", err)
  285. }
  286. return os.WriteFile(metadataPath, data, 0644)
  287. }
  288. // cleanupOldLogs removes old task log directories to maintain the limit
  289. func (l *FileTaskLogger) cleanupOldLogs() {
  290. baseDir := l.config.BaseLogDir
  291. entries, err := os.ReadDir(baseDir)
  292. if err != nil {
  293. glog.Warningf("Failed to read log directory %s: %v", baseDir, err)
  294. return
  295. }
  296. // Filter for directories only
  297. var dirs []os.DirEntry
  298. for _, entry := range entries {
  299. if entry.IsDir() {
  300. dirs = append(dirs, entry)
  301. }
  302. }
  303. // If we're under the limit, nothing to clean
  304. if len(dirs) <= l.config.MaxTasks {
  305. return
  306. }
  307. // Sort by modification time (oldest first)
  308. sort.Slice(dirs, func(i, j int) bool {
  309. infoI, errI := dirs[i].Info()
  310. infoJ, errJ := dirs[j].Info()
  311. if errI != nil || errJ != nil {
  312. return false
  313. }
  314. return infoI.ModTime().Before(infoJ.ModTime())
  315. })
  316. // Remove oldest directories
  317. numToRemove := len(dirs) - l.config.MaxTasks
  318. for i := 0; i < numToRemove; i++ {
  319. dirPath := filepath.Join(baseDir, dirs[i].Name())
  320. if err := os.RemoveAll(dirPath); err != nil {
  321. glog.Warningf("Failed to remove old log directory %s: %v", dirPath, err)
  322. } else {
  323. glog.V(1).Infof("Cleaned up old task log directory: %s", dirPath)
  324. }
  325. }
  326. glog.V(1).Infof("Task log cleanup completed: removed %d old directories", numToRemove)
  327. }
  328. // GetTaskLogMetadata reads metadata from a task log directory
  329. func GetTaskLogMetadata(logDir string) (*TaskLogMetadata, error) {
  330. metadataPath := filepath.Join(logDir, "metadata.json")
  331. data, err := os.ReadFile(metadataPath)
  332. if err != nil {
  333. return nil, fmt.Errorf("failed to read metadata file: %w", err)
  334. }
  335. var metadata TaskLogMetadata
  336. if err := json.Unmarshal(data, &metadata); err != nil {
  337. return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
  338. }
  339. return &metadata, nil
  340. }
  341. // ReadTaskLogs reads all log entries from a task log file
  342. func ReadTaskLogs(logDir string) ([]TaskLogEntry, error) {
  343. logPath := filepath.Join(logDir, "task.log")
  344. file, err := os.Open(logPath)
  345. if err != nil {
  346. return nil, fmt.Errorf("failed to open log file: %w", err)
  347. }
  348. defer file.Close()
  349. var entries []TaskLogEntry
  350. decoder := json.NewDecoder(file)
  351. for {
  352. var entry TaskLogEntry
  353. if err := decoder.Decode(&entry); err != nil {
  354. if err == io.EOF {
  355. break
  356. }
  357. return nil, fmt.Errorf("failed to decode log entry: %w", err)
  358. }
  359. entries = append(entries, entry)
  360. }
  361. return entries, nil
  362. }