task_log_handler.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package tasks
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "strings"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  10. )
  11. // TaskLogHandler handles task log requests from admin server
  12. type TaskLogHandler struct {
  13. baseLogDir string
  14. }
  15. // NewTaskLogHandler creates a new task log handler
  16. func NewTaskLogHandler(baseLogDir string) *TaskLogHandler {
  17. if baseLogDir == "" {
  18. baseLogDir = "/tmp/seaweedfs/task_logs"
  19. }
  20. // Best-effort ensure the base directory exists so reads don't fail due to missing dir
  21. if err := os.MkdirAll(baseLogDir, 0755); err != nil {
  22. glog.Warningf("Failed to create base task log directory %s: %v", baseLogDir, err)
  23. }
  24. return &TaskLogHandler{
  25. baseLogDir: baseLogDir,
  26. }
  27. }
  28. // HandleLogRequest processes a task log request and returns the response
  29. func (h *TaskLogHandler) HandleLogRequest(request *worker_pb.TaskLogRequest) *worker_pb.TaskLogResponse {
  30. response := &worker_pb.TaskLogResponse{
  31. TaskId: request.TaskId,
  32. WorkerId: request.WorkerId,
  33. Success: false,
  34. }
  35. // Find the task log directory
  36. logDir, err := h.findTaskLogDirectory(request.TaskId)
  37. if err != nil {
  38. response.ErrorMessage = fmt.Sprintf("Task log directory not found: %v", err)
  39. glog.Warningf("Task log request failed for %s: %v", request.TaskId, err)
  40. // Add diagnostic information to help debug the issue
  41. response.LogEntries = []*worker_pb.TaskLogEntry{
  42. {
  43. Timestamp: time.Now().Unix(),
  44. Level: "WARNING",
  45. Message: fmt.Sprintf("Task logs not available: %v", err),
  46. Fields: map[string]string{"source": "task_log_handler"},
  47. },
  48. {
  49. Timestamp: time.Now().Unix(),
  50. Level: "INFO",
  51. Message: fmt.Sprintf("This usually means the task was never executed on this worker or logs were cleaned up. Base log directory: %s", h.baseLogDir),
  52. Fields: map[string]string{"source": "task_log_handler"},
  53. },
  54. }
  55. // response.Success remains false to indicate logs were not found
  56. return response
  57. }
  58. // Read metadata if requested
  59. if request.IncludeMetadata {
  60. metadata, err := h.readTaskMetadata(logDir)
  61. if err != nil {
  62. response.ErrorMessage = fmt.Sprintf("Failed to read task metadata: %v", err)
  63. glog.Warningf("Failed to read metadata for task %s: %v", request.TaskId, err)
  64. return response
  65. }
  66. response.Metadata = metadata
  67. }
  68. // Read log entries
  69. logEntries, err := h.readTaskLogEntries(logDir, request)
  70. if err != nil {
  71. response.ErrorMessage = fmt.Sprintf("Failed to read task logs: %v", err)
  72. glog.Warningf("Failed to read logs for task %s: %v", request.TaskId, err)
  73. return response
  74. }
  75. response.LogEntries = logEntries
  76. response.Success = true
  77. glog.V(1).Infof("Successfully retrieved %d log entries for task %s", len(logEntries), request.TaskId)
  78. return response
  79. }
  80. // findTaskLogDirectory searches for the task log directory by task ID
  81. func (h *TaskLogHandler) findTaskLogDirectory(taskID string) (string, error) {
  82. entries, err := os.ReadDir(h.baseLogDir)
  83. if err != nil {
  84. return "", fmt.Errorf("failed to read base log directory %s: %w", h.baseLogDir, err)
  85. }
  86. // Look for directories that start with the task ID
  87. var candidateDirs []string
  88. for _, entry := range entries {
  89. if entry.IsDir() {
  90. candidateDirs = append(candidateDirs, entry.Name())
  91. if strings.HasPrefix(entry.Name(), taskID+"_") {
  92. return filepath.Join(h.baseLogDir, entry.Name()), nil
  93. }
  94. }
  95. }
  96. // Enhanced error message with diagnostic information
  97. return "", fmt.Errorf("task log directory not found for task ID: %s (searched %d directories in %s, directories found: %v)",
  98. taskID, len(candidateDirs), h.baseLogDir, candidateDirs)
  99. }
  100. // readTaskMetadata reads task metadata from the log directory
  101. func (h *TaskLogHandler) readTaskMetadata(logDir string) (*worker_pb.TaskLogMetadata, error) {
  102. metadata, err := GetTaskLogMetadata(logDir)
  103. if err != nil {
  104. return nil, err
  105. }
  106. // Convert to protobuf metadata
  107. pbMetadata := &worker_pb.TaskLogMetadata{
  108. TaskId: metadata.TaskID,
  109. TaskType: metadata.TaskType,
  110. WorkerId: metadata.WorkerID,
  111. StartTime: metadata.StartTime.Unix(),
  112. Status: metadata.Status,
  113. Progress: float32(metadata.Progress),
  114. VolumeId: metadata.VolumeID,
  115. Server: metadata.Server,
  116. Collection: metadata.Collection,
  117. LogFilePath: metadata.LogFilePath,
  118. CreatedAt: metadata.CreatedAt.Unix(),
  119. CustomData: make(map[string]string),
  120. }
  121. // Set end time and duration if available
  122. if metadata.EndTime != nil {
  123. pbMetadata.EndTime = metadata.EndTime.Unix()
  124. }
  125. if metadata.Duration != nil {
  126. pbMetadata.DurationMs = metadata.Duration.Milliseconds()
  127. }
  128. // Convert custom data
  129. for key, value := range metadata.CustomData {
  130. if strValue, ok := value.(string); ok {
  131. pbMetadata.CustomData[key] = strValue
  132. } else {
  133. pbMetadata.CustomData[key] = fmt.Sprintf("%v", value)
  134. }
  135. }
  136. return pbMetadata, nil
  137. }
  138. // readTaskLogEntries reads and filters log entries based on the request
  139. func (h *TaskLogHandler) readTaskLogEntries(logDir string, request *worker_pb.TaskLogRequest) ([]*worker_pb.TaskLogEntry, error) {
  140. entries, err := ReadTaskLogs(logDir)
  141. if err != nil {
  142. return nil, err
  143. }
  144. // Apply filters
  145. var filteredEntries []TaskLogEntry
  146. for _, entry := range entries {
  147. // Filter by log level
  148. if request.LogLevel != "" && !strings.EqualFold(entry.Level, request.LogLevel) {
  149. continue
  150. }
  151. // Filter by time range
  152. if request.StartTime > 0 && entry.Timestamp.Unix() < request.StartTime {
  153. continue
  154. }
  155. if request.EndTime > 0 && entry.Timestamp.Unix() > request.EndTime {
  156. continue
  157. }
  158. filteredEntries = append(filteredEntries, entry)
  159. }
  160. // Limit entries if requested
  161. if request.MaxEntries > 0 && len(filteredEntries) > int(request.MaxEntries) {
  162. // Take the most recent entries
  163. start := len(filteredEntries) - int(request.MaxEntries)
  164. filteredEntries = filteredEntries[start:]
  165. }
  166. // Convert to protobuf entries
  167. var pbEntries []*worker_pb.TaskLogEntry
  168. for _, entry := range filteredEntries {
  169. pbEntry := &worker_pb.TaskLogEntry{
  170. Timestamp: entry.Timestamp.Unix(),
  171. Level: entry.Level,
  172. Message: entry.Message,
  173. Fields: make(map[string]string),
  174. }
  175. // Set progress if available
  176. if entry.Progress != nil {
  177. pbEntry.Progress = float32(*entry.Progress)
  178. }
  179. // Set status if available
  180. if entry.Status != nil {
  181. pbEntry.Status = *entry.Status
  182. }
  183. // Convert fields
  184. for key, value := range entry.Fields {
  185. if strValue, ok := value.(string); ok {
  186. pbEntry.Fields[key] = strValue
  187. } else {
  188. pbEntry.Fields[key] = fmt.Sprintf("%v", value)
  189. }
  190. }
  191. pbEntries = append(pbEntries, pbEntry)
  192. }
  193. return pbEntries, nil
  194. }
  195. // ListAvailableTaskLogs returns a list of available task log directories
  196. func (h *TaskLogHandler) ListAvailableTaskLogs() ([]string, error) {
  197. entries, err := os.ReadDir(h.baseLogDir)
  198. if err != nil {
  199. return nil, fmt.Errorf("failed to read base log directory: %w", err)
  200. }
  201. var taskDirs []string
  202. for _, entry := range entries {
  203. if entry.IsDir() {
  204. taskDirs = append(taskDirs, entry.Name())
  205. }
  206. }
  207. return taskDirs, nil
  208. }
  209. // CleanupOldLogs removes old task logs beyond the specified limit
  210. func (h *TaskLogHandler) CleanupOldLogs(maxTasks int) error {
  211. config := TaskLoggerConfig{
  212. BaseLogDir: h.baseLogDir,
  213. MaxTasks: maxTasks,
  214. }
  215. // Create a temporary logger to trigger cleanup
  216. tempLogger := &FileTaskLogger{
  217. config: config,
  218. }
  219. tempLogger.cleanupOldLogs()
  220. return nil
  221. }