| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- package tasks
- import (
- "encoding/json"
- "fmt"
- "io"
- "os"
- "path/filepath"
- "sort"
- "sync"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
- )
- // TaskLogger provides file-based logging for individual tasks
- type TaskLogger interface {
- // Log methods
- Info(message string, args ...interface{})
- Warning(message string, args ...interface{})
- Error(message string, args ...interface{})
- Debug(message string, args ...interface{})
- // Progress and status logging
- LogProgress(progress float64, message string)
- LogStatus(status string, message string)
- // Structured logging
- LogWithFields(level string, message string, fields map[string]interface{})
- // Lifecycle
- Close() error
- GetLogDir() string
- }
- // LoggerProvider interface for tasks that support logging
- type LoggerProvider interface {
- InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error
- GetTaskLogger() TaskLogger
- }
- // TaskLoggerConfig holds configuration for task logging
- type TaskLoggerConfig struct {
- BaseLogDir string
- MaxTasks int // Maximum number of task logs to keep
- MaxLogSizeMB int // Maximum log file size in MB
- EnableConsole bool // Also log to console
- }
- // FileTaskLogger implements TaskLogger using file-based logging
- type FileTaskLogger struct {
- taskID string
- taskType types.TaskType
- workerID string
- logDir string
- logFile *os.File
- mutex sync.Mutex
- config TaskLoggerConfig
- metadata *TaskLogMetadata
- closed bool
- }
- // TaskLogMetadata contains metadata about the task execution
- type TaskLogMetadata struct {
- TaskID string `json:"task_id"`
- TaskType string `json:"task_type"`
- WorkerID string `json:"worker_id"`
- StartTime time.Time `json:"start_time"`
- EndTime *time.Time `json:"end_time,omitempty"`
- Duration *time.Duration `json:"duration,omitempty"`
- Status string `json:"status"`
- Progress float64 `json:"progress"`
- VolumeID uint32 `json:"volume_id,omitempty"`
- Server string `json:"server,omitempty"`
- Collection string `json:"collection,omitempty"`
- CustomData map[string]interface{} `json:"custom_data,omitempty"`
- LogFilePath string `json:"log_file_path"`
- CreatedAt time.Time `json:"created_at"`
- }
- // TaskLogEntry represents a single log entry
- type TaskLogEntry struct {
- Timestamp time.Time `json:"timestamp"`
- Level string `json:"level"`
- Message string `json:"message"`
- Fields map[string]interface{} `json:"fields,omitempty"`
- Progress *float64 `json:"progress,omitempty"`
- Status *string `json:"status,omitempty"`
- }
- // DefaultTaskLoggerConfig returns default configuration
- func DefaultTaskLoggerConfig() TaskLoggerConfig {
- return TaskLoggerConfig{
- BaseLogDir: "/data/task_logs", // Use persistent data directory
- MaxTasks: 100, // Keep last 100 task logs
- MaxLogSizeMB: 10,
- EnableConsole: true,
- }
- }
- // NewTaskLogger creates a new file-based task logger
- func NewTaskLogger(taskID string, taskType types.TaskType, workerID string, params types.TaskParams, config TaskLoggerConfig) (TaskLogger, error) {
- // Create unique directory name with timestamp
- timestamp := time.Now().Format("20060102_150405")
- dirName := fmt.Sprintf("%s_%s_%s_%s", taskID, taskType, workerID, timestamp)
- logDir := filepath.Join(config.BaseLogDir, dirName)
- // Create log directory
- if err := os.MkdirAll(logDir, 0755); err != nil {
- return nil, fmt.Errorf("failed to create log directory %s: %w", logDir, err)
- }
- // Create log file
- logFilePath := filepath.Join(logDir, "task.log")
- logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
- if err != nil {
- return nil, fmt.Errorf("failed to create log file %s: %w", logFilePath, err)
- }
- // Create metadata
- metadata := &TaskLogMetadata{
- TaskID: taskID,
- TaskType: string(taskType),
- WorkerID: workerID,
- StartTime: time.Now(),
- Status: "started",
- Progress: 0.0,
- VolumeID: params.VolumeID,
- Server: getServerFromSources(params.TypedParams.Sources),
- Collection: params.Collection,
- CustomData: make(map[string]interface{}),
- LogFilePath: logFilePath,
- CreatedAt: time.Now(),
- }
- logger := &FileTaskLogger{
- taskID: taskID,
- taskType: taskType,
- workerID: workerID,
- logDir: logDir,
- logFile: logFile,
- config: config,
- metadata: metadata,
- closed: false,
- }
- // Write initial log entry
- logger.Info("Task logger initialized for %s (type: %s, worker: %s)", taskID, taskType, workerID)
- logger.LogWithFields("INFO", "Task parameters", map[string]interface{}{
- "volume_id": params.VolumeID,
- "server": getServerFromSources(params.TypedParams.Sources),
- "collection": params.Collection,
- })
- // Save initial metadata
- if err := logger.saveMetadata(); err != nil {
- glog.Warningf("Failed to save initial task metadata: %v", err)
- }
- // Clean up old task logs
- go logger.cleanupOldLogs()
- return logger, nil
- }
- // Info logs an info message
- func (l *FileTaskLogger) Info(message string, args ...interface{}) {
- l.log("INFO", message, args...)
- }
- // Warning logs a warning message
- func (l *FileTaskLogger) Warning(message string, args ...interface{}) {
- l.log("WARNING", message, args...)
- }
- // Error logs an error message
- func (l *FileTaskLogger) Error(message string, args ...interface{}) {
- l.log("ERROR", message, args...)
- }
- // Debug logs a debug message
- func (l *FileTaskLogger) Debug(message string, args ...interface{}) {
- l.log("DEBUG", message, args...)
- }
- // LogProgress logs task progress
- func (l *FileTaskLogger) LogProgress(progress float64, message string) {
- l.mutex.Lock()
- l.metadata.Progress = progress
- l.mutex.Unlock()
- entry := TaskLogEntry{
- Timestamp: time.Now(),
- Level: "INFO",
- Message: message,
- Progress: &progress,
- }
- l.writeLogEntry(entry)
- l.saveMetadata() // Update metadata with new progress
- }
- // LogStatus logs task status change
- func (l *FileTaskLogger) LogStatus(status string, message string) {
- l.mutex.Lock()
- l.metadata.Status = status
- l.mutex.Unlock()
- entry := TaskLogEntry{
- Timestamp: time.Now(),
- Level: "INFO",
- Message: message,
- Status: &status,
- }
- l.writeLogEntry(entry)
- l.saveMetadata() // Update metadata with new status
- }
- // LogWithFields logs a message with structured fields
- func (l *FileTaskLogger) LogWithFields(level string, message string, fields map[string]interface{}) {
- entry := TaskLogEntry{
- Timestamp: time.Now(),
- Level: level,
- Message: message,
- Fields: fields,
- }
- l.writeLogEntry(entry)
- }
- // Close closes the logger and finalizes metadata
- func (l *FileTaskLogger) Close() error {
- l.mutex.Lock()
- defer l.mutex.Unlock()
- if l.closed {
- return nil
- }
- // Finalize metadata
- endTime := time.Now()
- duration := endTime.Sub(l.metadata.StartTime)
- l.metadata.EndTime = &endTime
- l.metadata.Duration = &duration
- if l.metadata.Status == "started" {
- l.metadata.Status = "completed"
- }
- // Save final metadata
- l.saveMetadata()
- // Close log file
- if l.logFile != nil {
- if err := l.logFile.Close(); err != nil {
- return fmt.Errorf("failed to close log file: %w", err)
- }
- }
- l.closed = true
- l.Info("Task logger closed for %s", l.taskID)
- return nil
- }
- // GetLogDir returns the log directory path
- func (l *FileTaskLogger) GetLogDir() string {
- return l.logDir
- }
- // log is the internal logging method
- func (l *FileTaskLogger) log(level string, message string, args ...interface{}) {
- formattedMessage := fmt.Sprintf(message, args...)
- entry := TaskLogEntry{
- Timestamp: time.Now(),
- Level: level,
- Message: formattedMessage,
- }
- l.writeLogEntry(entry)
- }
- // writeLogEntry writes a log entry to the file
- func (l *FileTaskLogger) writeLogEntry(entry TaskLogEntry) {
- l.mutex.Lock()
- defer l.mutex.Unlock()
- if l.closed || l.logFile == nil {
- return
- }
- // Format as JSON line
- jsonData, err := json.Marshal(entry)
- if err != nil {
- glog.Errorf("Failed to marshal log entry: %v", err)
- return
- }
- // Write to file
- if _, err := l.logFile.WriteString(string(jsonData) + "\n"); err != nil {
- glog.Errorf("Failed to write log entry: %v", err)
- return
- }
- // Flush to disk
- if err := l.logFile.Sync(); err != nil {
- glog.Errorf("Failed to sync log file: %v", err)
- }
- // Also log to console and stderr if enabled
- if l.config.EnableConsole {
- // Log to glog with proper call depth to show actual source location
- // We need depth 3 to skip: writeLogEntry -> log -> Info/Warning/Error calls to reach the original caller
- formattedMsg := fmt.Sprintf("[TASK-%s] %s: %s", l.taskID, entry.Level, entry.Message)
- switch entry.Level {
- case "ERROR":
- glog.ErrorDepth(3, formattedMsg)
- case "WARNING":
- glog.WarningDepth(3, formattedMsg)
- default: // INFO, DEBUG, etc.
- glog.InfoDepth(3, formattedMsg)
- }
- // Also log to stderr for immediate visibility
- fmt.Fprintf(os.Stderr, "[TASK-%s] %s: %s\n", l.taskID, entry.Level, entry.Message)
- }
- }
- // saveMetadata saves task metadata to file
- func (l *FileTaskLogger) saveMetadata() error {
- metadataPath := filepath.Join(l.logDir, "metadata.json")
- data, err := json.MarshalIndent(l.metadata, "", " ")
- if err != nil {
- return fmt.Errorf("failed to marshal metadata: %w", err)
- }
- return os.WriteFile(metadataPath, data, 0644)
- }
- // cleanupOldLogs removes old task log directories to maintain the limit
- func (l *FileTaskLogger) cleanupOldLogs() {
- baseDir := l.config.BaseLogDir
- entries, err := os.ReadDir(baseDir)
- if err != nil {
- glog.Warningf("Failed to read log directory %s: %v", baseDir, err)
- return
- }
- // Filter for directories only
- var dirs []os.DirEntry
- for _, entry := range entries {
- if entry.IsDir() {
- dirs = append(dirs, entry)
- }
- }
- // If we're under the limit, nothing to clean
- if len(dirs) <= l.config.MaxTasks {
- return
- }
- // Sort by modification time (oldest first)
- sort.Slice(dirs, func(i, j int) bool {
- infoI, errI := dirs[i].Info()
- infoJ, errJ := dirs[j].Info()
- if errI != nil || errJ != nil {
- return false
- }
- return infoI.ModTime().Before(infoJ.ModTime())
- })
- // Remove oldest directories
- numToRemove := len(dirs) - l.config.MaxTasks
- for i := 0; i < numToRemove; i++ {
- dirPath := filepath.Join(baseDir, dirs[i].Name())
- if err := os.RemoveAll(dirPath); err != nil {
- glog.Warningf("Failed to remove old log directory %s: %v", dirPath, err)
- } else {
- glog.V(1).Infof("Cleaned up old task log directory: %s", dirPath)
- }
- }
- glog.V(1).Infof("Task log cleanup completed: removed %d old directories", numToRemove)
- }
- // GetTaskLogMetadata reads metadata from a task log directory
- func GetTaskLogMetadata(logDir string) (*TaskLogMetadata, error) {
- metadataPath := filepath.Join(logDir, "metadata.json")
- data, err := os.ReadFile(metadataPath)
- if err != nil {
- return nil, fmt.Errorf("failed to read metadata file: %w", err)
- }
- var metadata TaskLogMetadata
- if err := json.Unmarshal(data, &metadata); err != nil {
- return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
- }
- return &metadata, nil
- }
- // ReadTaskLogs reads all log entries from a task log file
- func ReadTaskLogs(logDir string) ([]TaskLogEntry, error) {
- logPath := filepath.Join(logDir, "task.log")
- file, err := os.Open(logPath)
- if err != nil {
- return nil, fmt.Errorf("failed to open log file: %w", err)
- }
- defer file.Close()
- var entries []TaskLogEntry
- decoder := json.NewDecoder(file)
- for {
- var entry TaskLogEntry
- if err := decoder.Decode(&entry); err != nil {
- if err == io.EOF {
- break
- }
- return nil, fmt.Errorf("failed to decode log entry: %w", err)
- }
- entries = append(entries, entry)
- }
- return entries, nil
- }
|