worker.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837
  1. package worker
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "fmt"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
  14. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  15. // Import task packages to trigger their auto-registration
  16. _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
  17. _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
  18. _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
  19. )
  20. // Worker represents a maintenance worker instance
  21. type Worker struct {
  22. id string
  23. config *types.WorkerConfig
  24. registry *tasks.TaskRegistry
  25. currentTasks map[string]*types.TaskInput
  26. adminClient AdminClient
  27. running bool
  28. stopChan chan struct{}
  29. mutex sync.RWMutex
  30. startTime time.Time
  31. tasksCompleted int
  32. tasksFailed int
  33. heartbeatTicker *time.Ticker
  34. requestTicker *time.Ticker
  35. taskLogHandler *tasks.TaskLogHandler
  36. }
  37. // AdminClient defines the interface for communicating with the admin server
  38. type AdminClient interface {
  39. Connect() error
  40. Disconnect() error
  41. RegisterWorker(worker *types.WorkerData) error
  42. SendHeartbeat(workerID string, status *types.WorkerStatus) error
  43. RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error)
  44. CompleteTask(taskID string, success bool, errorMsg string) error
  45. CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error
  46. UpdateTaskProgress(taskID string, progress float64) error
  47. IsConnected() bool
  48. }
  49. // GenerateOrLoadWorkerID generates a unique worker ID or loads existing one from working directory
  50. func GenerateOrLoadWorkerID(workingDir string) (string, error) {
  51. const workerIDFile = "worker.id"
  52. var idFilePath string
  53. if workingDir != "" {
  54. idFilePath = filepath.Join(workingDir, workerIDFile)
  55. } else {
  56. // Use current working directory if none specified
  57. wd, err := os.Getwd()
  58. if err != nil {
  59. return "", fmt.Errorf("failed to get working directory: %w", err)
  60. }
  61. idFilePath = filepath.Join(wd, workerIDFile)
  62. }
  63. // Try to read existing worker ID
  64. if data, err := os.ReadFile(idFilePath); err == nil {
  65. workerID := strings.TrimSpace(string(data))
  66. if workerID != "" {
  67. glog.Infof("Loaded existing worker ID from %s: %s", idFilePath, workerID)
  68. return workerID, nil
  69. }
  70. }
  71. // Generate simplified worker ID
  72. hostname, _ := os.Hostname()
  73. if hostname == "" {
  74. hostname = "unknown"
  75. }
  76. // Use short hostname - take first 6 chars or last part after dots
  77. shortHostname := hostname
  78. if len(hostname) > 6 {
  79. if parts := strings.Split(hostname, "."); len(parts) > 1 {
  80. // Use last part before domain (e.g., "worker1" from "worker1.example.com")
  81. shortHostname = parts[0]
  82. if len(shortHostname) > 6 {
  83. shortHostname = shortHostname[:6]
  84. }
  85. } else {
  86. // Use first 6 characters
  87. shortHostname = hostname[:6]
  88. }
  89. }
  90. // Generate random component for uniqueness (2 bytes = 4 hex chars)
  91. randomBytes := make([]byte, 2)
  92. var workerID string
  93. if _, err := rand.Read(randomBytes); err != nil {
  94. // Fallback to short timestamp if crypto/rand fails
  95. timestamp := time.Now().Unix() % 10000 // last 4 digits
  96. workerID = fmt.Sprintf("w-%s-%04d", shortHostname, timestamp)
  97. glog.Infof("Generated fallback worker ID: %s", workerID)
  98. } else {
  99. // Use random hex for uniqueness
  100. randomHex := fmt.Sprintf("%x", randomBytes)
  101. workerID = fmt.Sprintf("w-%s-%s", shortHostname, randomHex)
  102. glog.Infof("Generated new worker ID: %s", workerID)
  103. }
  104. // Save worker ID to file
  105. if err := os.WriteFile(idFilePath, []byte(workerID), 0644); err != nil {
  106. glog.Warningf("Failed to save worker ID to %s: %v", idFilePath, err)
  107. } else {
  108. glog.Infof("Saved worker ID to %s", idFilePath)
  109. }
  110. return workerID, nil
  111. }
  112. // NewWorker creates a new worker instance
  113. func NewWorker(config *types.WorkerConfig) (*Worker, error) {
  114. if config == nil {
  115. config = types.DefaultWorkerConfig()
  116. }
  117. // Generate or load persistent worker ID
  118. workerID, err := GenerateOrLoadWorkerID(config.BaseWorkingDir)
  119. if err != nil {
  120. return nil, fmt.Errorf("failed to generate or load worker ID: %w", err)
  121. }
  122. // Use the global unified registry that already has all tasks registered
  123. registry := tasks.GetGlobalTaskRegistry()
  124. // Initialize task log handler
  125. logDir := filepath.Join(config.BaseWorkingDir, "task_logs")
  126. // Ensure the base task log directory exists to avoid errors when admin requests logs
  127. if err := os.MkdirAll(logDir, 0755); err != nil {
  128. glog.Warningf("Failed to create task log base directory %s: %v", logDir, err)
  129. }
  130. taskLogHandler := tasks.NewTaskLogHandler(logDir)
  131. worker := &Worker{
  132. id: workerID,
  133. config: config,
  134. registry: registry,
  135. currentTasks: make(map[string]*types.TaskInput),
  136. stopChan: make(chan struct{}),
  137. startTime: time.Now(),
  138. taskLogHandler: taskLogHandler,
  139. }
  140. glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetAll()))
  141. return worker, nil
  142. }
  143. // getTaskLoggerConfig returns the task logger configuration with worker's log directory
  144. func (w *Worker) getTaskLoggerConfig() tasks.TaskLoggerConfig {
  145. config := tasks.DefaultTaskLoggerConfig()
  146. // Use worker's configured log directory (BaseWorkingDir is guaranteed to be non-empty)
  147. logDir := filepath.Join(w.config.BaseWorkingDir, "task_logs")
  148. config.BaseLogDir = logDir
  149. return config
  150. }
  151. // ID returns the worker ID
  152. func (w *Worker) ID() string {
  153. return w.id
  154. }
  155. // Start starts the worker
  156. func (w *Worker) Start() error {
  157. w.mutex.Lock()
  158. defer w.mutex.Unlock()
  159. if w.running {
  160. return fmt.Errorf("worker is already running")
  161. }
  162. if w.adminClient == nil {
  163. return fmt.Errorf("admin client is not set")
  164. }
  165. w.running = true
  166. w.startTime = time.Now()
  167. // Prepare worker info for registration
  168. workerInfo := &types.WorkerData{
  169. ID: w.id,
  170. Capabilities: w.config.Capabilities,
  171. MaxConcurrent: w.config.MaxConcurrent,
  172. Status: "active",
  173. CurrentLoad: 0,
  174. LastHeartbeat: time.Now(),
  175. }
  176. // Register worker info with client first (this stores it for use during connection)
  177. if err := w.adminClient.RegisterWorker(workerInfo); err != nil {
  178. glog.V(1).Infof("Worker info stored for registration: %v", err)
  179. // This is expected if not connected yet
  180. }
  181. // Start connection attempt (will register immediately if successful)
  182. glog.Infof("WORKER STARTING: Worker %s starting with capabilities %v, max concurrent: %d",
  183. w.id, w.config.Capabilities, w.config.MaxConcurrent)
  184. // Try initial connection, but don't fail if it doesn't work immediately
  185. if err := w.adminClient.Connect(); err != nil {
  186. glog.Warningf("INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err)
  187. // Don't return error - let the reconnection loop handle it
  188. } else {
  189. glog.Infof("INITIAL CONNECTION SUCCESS: Worker %s successfully connected to admin server", w.id)
  190. }
  191. // Start worker loops regardless of initial connection status
  192. // They will handle connection failures gracefully
  193. glog.V(1).Infof("STARTING LOOPS: Worker %s starting background loops", w.id)
  194. go w.heartbeatLoop()
  195. go w.taskRequestLoop()
  196. go w.connectionMonitorLoop()
  197. go w.messageProcessingLoop()
  198. glog.Infof("WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id)
  199. return nil
  200. }
  201. // Stop stops the worker
  202. func (w *Worker) Stop() error {
  203. w.mutex.Lock()
  204. defer w.mutex.Unlock()
  205. if !w.running {
  206. return nil
  207. }
  208. w.running = false
  209. close(w.stopChan)
  210. // Stop tickers
  211. if w.heartbeatTicker != nil {
  212. w.heartbeatTicker.Stop()
  213. }
  214. if w.requestTicker != nil {
  215. w.requestTicker.Stop()
  216. }
  217. // Wait for current tasks to complete or timeout
  218. timeout := time.NewTimer(30 * time.Second)
  219. defer timeout.Stop()
  220. for len(w.currentTasks) > 0 {
  221. select {
  222. case <-timeout.C:
  223. glog.Warningf("Worker %s stopping with %d tasks still running", w.id, len(w.currentTasks))
  224. break
  225. case <-time.After(time.Second):
  226. // Check again
  227. }
  228. }
  229. // Disconnect from admin server
  230. if w.adminClient != nil {
  231. if err := w.adminClient.Disconnect(); err != nil {
  232. glog.Errorf("Error disconnecting from admin server: %v", err)
  233. }
  234. }
  235. glog.Infof("Worker %s stopped", w.id)
  236. return nil
  237. }
  238. // RegisterTask registers a task factory
  239. func (w *Worker) RegisterTask(taskType types.TaskType, factory types.TaskFactory) {
  240. w.registry.Register(taskType, factory)
  241. }
  242. // GetCapabilities returns the worker capabilities
  243. func (w *Worker) GetCapabilities() []types.TaskType {
  244. return w.config.Capabilities
  245. }
  246. // GetStatus returns the current worker status
  247. func (w *Worker) GetStatus() types.WorkerStatus {
  248. w.mutex.RLock()
  249. defer w.mutex.RUnlock()
  250. var currentTasks []types.TaskInput
  251. for _, task := range w.currentTasks {
  252. currentTasks = append(currentTasks, *task)
  253. }
  254. status := "active"
  255. if len(w.currentTasks) >= w.config.MaxConcurrent {
  256. status = "busy"
  257. }
  258. return types.WorkerStatus{
  259. WorkerID: w.id,
  260. Status: status,
  261. Capabilities: w.config.Capabilities,
  262. MaxConcurrent: w.config.MaxConcurrent,
  263. CurrentLoad: len(w.currentTasks),
  264. LastHeartbeat: time.Now(),
  265. CurrentTasks: currentTasks,
  266. Uptime: time.Since(w.startTime),
  267. TasksCompleted: w.tasksCompleted,
  268. TasksFailed: w.tasksFailed,
  269. }
  270. }
  271. // HandleTask handles a task execution
  272. func (w *Worker) HandleTask(task *types.TaskInput) error {
  273. glog.V(1).Infof("Worker %s received task %s (type: %s, volume: %d)",
  274. w.id, task.ID, task.Type, task.VolumeID)
  275. w.mutex.Lock()
  276. currentLoad := len(w.currentTasks)
  277. if currentLoad >= w.config.MaxConcurrent {
  278. w.mutex.Unlock()
  279. glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s",
  280. w.id, currentLoad, w.config.MaxConcurrent, task.ID)
  281. return fmt.Errorf("worker is at capacity")
  282. }
  283. w.currentTasks[task.ID] = task
  284. newLoad := len(w.currentTasks)
  285. w.mutex.Unlock()
  286. glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d",
  287. w.id, task.ID, newLoad, w.config.MaxConcurrent)
  288. // Execute task in goroutine
  289. go w.executeTask(task)
  290. return nil
  291. }
  292. // SetCapabilities sets the worker capabilities
  293. func (w *Worker) SetCapabilities(capabilities []types.TaskType) {
  294. w.config.Capabilities = capabilities
  295. }
  296. // SetMaxConcurrent sets the maximum concurrent tasks
  297. func (w *Worker) SetMaxConcurrent(max int) {
  298. w.config.MaxConcurrent = max
  299. }
  300. // SetHeartbeatInterval sets the heartbeat interval
  301. func (w *Worker) SetHeartbeatInterval(interval time.Duration) {
  302. w.config.HeartbeatInterval = interval
  303. }
  304. // SetTaskRequestInterval sets the task request interval
  305. func (w *Worker) SetTaskRequestInterval(interval time.Duration) {
  306. w.config.TaskRequestInterval = interval
  307. }
  308. // SetAdminClient sets the admin client
  309. func (w *Worker) SetAdminClient(client AdminClient) {
  310. w.adminClient = client
  311. }
  312. // executeTask executes a task
  313. func (w *Worker) executeTask(task *types.TaskInput) {
  314. startTime := time.Now()
  315. defer func() {
  316. w.mutex.Lock()
  317. delete(w.currentTasks, task.ID)
  318. currentLoad := len(w.currentTasks)
  319. w.mutex.Unlock()
  320. duration := time.Since(startTime)
  321. glog.Infof("TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d",
  322. w.id, task.ID, duration, currentLoad, w.config.MaxConcurrent)
  323. }()
  324. glog.Infof("TASK EXECUTION STARTED: Worker %s starting execution of task %s (type: %s, volume: %d, server: %s, collection: %s) at %v",
  325. w.id, task.ID, task.Type, task.VolumeID, task.Server, task.Collection, startTime.Format(time.RFC3339))
  326. // Report task start to admin server
  327. if err := w.adminClient.UpdateTaskProgress(task.ID, 0.0); err != nil {
  328. glog.V(1).Infof("Failed to report task start to admin: %v", err)
  329. }
  330. // Determine task-specific working directory (BaseWorkingDir is guaranteed to be non-empty)
  331. taskWorkingDir := filepath.Join(w.config.BaseWorkingDir, string(task.Type))
  332. glog.V(2).Infof("📁 WORKING DIRECTORY: Task %s using working directory: %s", task.ID, taskWorkingDir)
  333. // Check if we have typed protobuf parameters
  334. if task.TypedParams == nil {
  335. w.completeTask(task.ID, false, "task has no typed parameters - task was not properly planned")
  336. glog.Errorf("Worker %s rejecting task %s: no typed parameters", w.id, task.ID)
  337. return
  338. }
  339. // Use new task execution system with unified Task interface
  340. glog.V(1).Infof("Executing task %s with typed protobuf parameters", task.ID)
  341. // Initialize a file-based task logger so admin can retrieve logs
  342. // Build minimal params for logger metadata
  343. loggerParams := types.TaskParams{
  344. VolumeID: task.VolumeID,
  345. Collection: task.Collection,
  346. TypedParams: task.TypedParams,
  347. }
  348. loggerConfig := w.getTaskLoggerConfig()
  349. fileLogger, logErr := tasks.NewTaskLogger(task.ID, task.Type, w.id, loggerParams, loggerConfig)
  350. if logErr != nil {
  351. glog.Warningf("Failed to initialize file logger for task %s: %v", task.ID, logErr)
  352. } else {
  353. defer func() {
  354. if err := fileLogger.Close(); err != nil {
  355. glog.V(1).Infof("Failed to close task logger for %s: %v", task.ID, err)
  356. }
  357. }()
  358. fileLogger.Info("Task %s started (type=%s, server=%s, collection=%s)", task.ID, task.Type, task.Server, task.Collection)
  359. }
  360. taskFactory := w.registry.Get(task.Type)
  361. if taskFactory == nil {
  362. w.completeTask(task.ID, false, fmt.Sprintf("task factory not available for %s: task type not found", task.Type))
  363. glog.Errorf("Worker %s failed to get task factory for %s type %v", w.id, task.ID, task.Type)
  364. // Log supported task types for debugging
  365. allFactories := w.registry.GetAll()
  366. glog.Errorf("Available task types: %d", len(allFactories))
  367. for taskType := range allFactories {
  368. glog.Errorf("Supported task type: %v", taskType)
  369. }
  370. return
  371. }
  372. taskInstance, err := taskFactory.Create(task.TypedParams)
  373. if err != nil {
  374. w.completeTask(task.ID, false, fmt.Sprintf("failed to create task for %s: %v", task.Type, err))
  375. glog.Errorf("Worker %s failed to create task %s type %v: %v", w.id, task.ID, task.Type, err)
  376. return
  377. }
  378. // Task execution uses the new unified Task interface
  379. glog.V(2).Infof("Executing task %s in working directory: %s", task.ID, taskWorkingDir)
  380. // If we have a file logger, adapt it so task WithFields logs are captured into file
  381. if fileLogger != nil {
  382. if withLogger, ok := taskInstance.(interface{ SetLogger(types.Logger) }); ok {
  383. withLogger.SetLogger(newTaskLoggerAdapter(fileLogger))
  384. }
  385. }
  386. // Set progress callback that reports to admin server
  387. taskInstance.SetProgressCallback(func(progress float64, stage string) {
  388. // Report progress updates to admin server
  389. glog.V(2).Infof("Task %s progress: %.1f%% - %s", task.ID, progress, stage)
  390. if err := w.adminClient.UpdateTaskProgress(task.ID, progress); err != nil {
  391. glog.V(1).Infof("Failed to report task progress to admin: %v", err)
  392. }
  393. if fileLogger != nil {
  394. // Use meaningful stage description or fallback to generic message
  395. message := stage
  396. if message == "" {
  397. message = fmt.Sprintf("Progress: %.1f%%", progress)
  398. }
  399. fileLogger.LogProgress(progress, message)
  400. }
  401. })
  402. // Execute task with context
  403. ctx := context.Background()
  404. err = taskInstance.Execute(ctx, task.TypedParams)
  405. // Report completion
  406. if err != nil {
  407. w.completeTask(task.ID, false, err.Error())
  408. w.tasksFailed++
  409. glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err)
  410. if fileLogger != nil {
  411. fileLogger.LogStatus("failed", err.Error())
  412. fileLogger.Error("Task %s failed: %v", task.ID, err)
  413. }
  414. } else {
  415. w.completeTask(task.ID, true, "")
  416. w.tasksCompleted++
  417. glog.Infof("Worker %s completed task %s successfully", w.id, task.ID)
  418. if fileLogger != nil {
  419. fileLogger.Info("Task %s completed successfully", task.ID)
  420. }
  421. }
  422. }
  423. // completeTask reports task completion to admin server
  424. func (w *Worker) completeTask(taskID string, success bool, errorMsg string) {
  425. if w.adminClient != nil {
  426. if err := w.adminClient.CompleteTask(taskID, success, errorMsg); err != nil {
  427. glog.Errorf("Failed to report task completion: %v", err)
  428. }
  429. }
  430. }
  431. // heartbeatLoop sends periodic heartbeats to the admin server
  432. func (w *Worker) heartbeatLoop() {
  433. w.heartbeatTicker = time.NewTicker(w.config.HeartbeatInterval)
  434. defer w.heartbeatTicker.Stop()
  435. for {
  436. select {
  437. case <-w.stopChan:
  438. return
  439. case <-w.heartbeatTicker.C:
  440. w.sendHeartbeat()
  441. }
  442. }
  443. }
  444. // taskRequestLoop periodically requests new tasks from the admin server
  445. func (w *Worker) taskRequestLoop() {
  446. w.requestTicker = time.NewTicker(w.config.TaskRequestInterval)
  447. defer w.requestTicker.Stop()
  448. for {
  449. select {
  450. case <-w.stopChan:
  451. return
  452. case <-w.requestTicker.C:
  453. w.requestTasks()
  454. }
  455. }
  456. }
  457. // sendHeartbeat sends heartbeat to admin server
  458. func (w *Worker) sendHeartbeat() {
  459. if w.adminClient != nil {
  460. if err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{
  461. WorkerID: w.id,
  462. Status: "active",
  463. Capabilities: w.config.Capabilities,
  464. MaxConcurrent: w.config.MaxConcurrent,
  465. CurrentLoad: len(w.currentTasks),
  466. LastHeartbeat: time.Now(),
  467. }); err != nil {
  468. glog.Warningf("Failed to send heartbeat: %v", err)
  469. }
  470. }
  471. }
  472. // requestTasks requests new tasks from the admin server
  473. func (w *Worker) requestTasks() {
  474. w.mutex.RLock()
  475. currentLoad := len(w.currentTasks)
  476. w.mutex.RUnlock()
  477. if currentLoad >= w.config.MaxConcurrent {
  478. glog.V(3).Infof("TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)",
  479. w.id, currentLoad, w.config.MaxConcurrent)
  480. return // Already at capacity
  481. }
  482. if w.adminClient != nil {
  483. glog.V(3).Infof("REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)",
  484. w.id, currentLoad, w.config.MaxConcurrent, w.config.Capabilities)
  485. task, err := w.adminClient.RequestTask(w.id, w.config.Capabilities)
  486. if err != nil {
  487. glog.V(2).Infof("TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err)
  488. return
  489. }
  490. if task != nil {
  491. glog.Infof("TASK RESPONSE RECEIVED: Worker %s received task from admin server - ID: %s, Type: %s",
  492. w.id, task.ID, task.Type)
  493. if err := w.HandleTask(task); err != nil {
  494. glog.Errorf("TASK HANDLING FAILED: Worker %s failed to handle task %s: %v", w.id, task.ID, err)
  495. }
  496. } else {
  497. glog.V(3).Infof("NO TASK AVAILABLE: Worker %s - admin server has no tasks available", w.id)
  498. }
  499. }
  500. }
  501. // GetTaskRegistry returns the task registry
  502. func (w *Worker) GetTaskRegistry() *tasks.TaskRegistry {
  503. return w.registry
  504. }
  505. // GetCurrentTasks returns the current tasks
  506. func (w *Worker) GetCurrentTasks() map[string]*types.TaskInput {
  507. w.mutex.RLock()
  508. defer w.mutex.RUnlock()
  509. tasks := make(map[string]*types.TaskInput)
  510. for id, task := range w.currentTasks {
  511. tasks[id] = task
  512. }
  513. return tasks
  514. }
  515. // registerWorker registers the worker with the admin server
  516. func (w *Worker) registerWorker() {
  517. workerInfo := &types.WorkerData{
  518. ID: w.id,
  519. Capabilities: w.config.Capabilities,
  520. MaxConcurrent: w.config.MaxConcurrent,
  521. Status: "active",
  522. CurrentLoad: 0,
  523. LastHeartbeat: time.Now(),
  524. }
  525. if err := w.adminClient.RegisterWorker(workerInfo); err != nil {
  526. glog.Warningf("Failed to register worker (will retry on next heartbeat): %v", err)
  527. } else {
  528. glog.Infof("Worker %s registered successfully with admin server", w.id)
  529. }
  530. }
  531. // connectionMonitorLoop monitors connection status
  532. func (w *Worker) connectionMonitorLoop() {
  533. ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
  534. defer ticker.Stop()
  535. lastConnectionStatus := false
  536. for {
  537. select {
  538. case <-w.stopChan:
  539. glog.V(1).Infof("CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id)
  540. return
  541. case <-ticker.C:
  542. // Monitor connection status and log changes
  543. currentConnectionStatus := w.adminClient != nil && w.adminClient.IsConnected()
  544. if currentConnectionStatus != lastConnectionStatus {
  545. if currentConnectionStatus {
  546. glog.Infof("CONNECTION RESTORED: Worker %s connection status changed: connected", w.id)
  547. } else {
  548. glog.Warningf("CONNECTION LOST: Worker %s connection status changed: disconnected", w.id)
  549. }
  550. lastConnectionStatus = currentConnectionStatus
  551. } else {
  552. if currentConnectionStatus {
  553. glog.V(3).Infof("CONNECTION OK: Worker %s connection status: connected", w.id)
  554. } else {
  555. glog.V(1).Infof("CONNECTION DOWN: Worker %s connection status: disconnected, reconnection in progress", w.id)
  556. }
  557. }
  558. }
  559. }
  560. }
  561. // GetConfig returns the worker configuration
  562. func (w *Worker) GetConfig() *types.WorkerConfig {
  563. return w.config
  564. }
  565. // GetPerformanceMetrics returns performance metrics
  566. func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance {
  567. w.mutex.RLock()
  568. defer w.mutex.RUnlock()
  569. uptime := time.Since(w.startTime)
  570. var successRate float64
  571. totalTasks := w.tasksCompleted + w.tasksFailed
  572. if totalTasks > 0 {
  573. successRate = float64(w.tasksCompleted) / float64(totalTasks) * 100
  574. }
  575. return &types.WorkerPerformance{
  576. TasksCompleted: w.tasksCompleted,
  577. TasksFailed: w.tasksFailed,
  578. AverageTaskTime: 0, // Would need to track this
  579. Uptime: uptime,
  580. SuccessRate: successRate,
  581. }
  582. }
  583. // messageProcessingLoop processes incoming admin messages
  584. func (w *Worker) messageProcessingLoop() {
  585. glog.Infof("MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id)
  586. // Get access to the incoming message channel from gRPC client
  587. grpcClient, ok := w.adminClient.(*GrpcAdminClient)
  588. if !ok {
  589. glog.Warningf("MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id)
  590. return
  591. }
  592. incomingChan := grpcClient.GetIncomingChannel()
  593. glog.V(1).Infof("MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id)
  594. for {
  595. select {
  596. case <-w.stopChan:
  597. glog.Infof("MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id)
  598. return
  599. case message := <-incomingChan:
  600. if message != nil {
  601. glog.V(3).Infof("MESSAGE PROCESSING: Worker %s processing incoming message", w.id)
  602. w.processAdminMessage(message)
  603. } else {
  604. glog.V(3).Infof("NULL MESSAGE: Worker %s received nil message", w.id)
  605. }
  606. }
  607. }
  608. }
  609. // processAdminMessage processes different types of admin messages
  610. func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) {
  611. glog.V(4).Infof("ADMIN MESSAGE RECEIVED: Worker %s received admin message: %T", w.id, message.Message)
  612. switch msg := message.Message.(type) {
  613. case *worker_pb.AdminMessage_RegistrationResponse:
  614. glog.V(2).Infof("REGISTRATION RESPONSE: Worker %s received registration response", w.id)
  615. w.handleRegistrationResponse(msg.RegistrationResponse)
  616. case *worker_pb.AdminMessage_HeartbeatResponse:
  617. glog.V(3).Infof("HEARTBEAT RESPONSE: Worker %s received heartbeat response", w.id)
  618. w.handleHeartbeatResponse(msg.HeartbeatResponse)
  619. case *worker_pb.AdminMessage_TaskLogRequest:
  620. glog.V(1).Infof("TASK LOG REQUEST: Worker %s received task log request for task %s", w.id, msg.TaskLogRequest.TaskId)
  621. w.handleTaskLogRequest(msg.TaskLogRequest)
  622. case *worker_pb.AdminMessage_TaskAssignment:
  623. taskAssign := msg.TaskAssignment
  624. glog.V(1).Infof("Worker %s received direct task assignment %s (type: %s, volume: %d)",
  625. w.id, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
  626. // Convert to task and handle it
  627. task := &types.TaskInput{
  628. ID: taskAssign.TaskId,
  629. Type: types.TaskType(taskAssign.TaskType),
  630. Status: types.TaskStatusAssigned,
  631. VolumeID: taskAssign.Params.VolumeId,
  632. Server: getServerFromParams(taskAssign.Params),
  633. Collection: taskAssign.Params.Collection,
  634. Priority: types.TaskPriority(taskAssign.Priority),
  635. CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
  636. TypedParams: taskAssign.Params,
  637. }
  638. if err := w.HandleTask(task); err != nil {
  639. glog.Errorf("DIRECT TASK ASSIGNMENT FAILED: Worker %s failed to handle direct task assignment %s: %v", w.id, task.ID, err)
  640. }
  641. case *worker_pb.AdminMessage_TaskCancellation:
  642. glog.Infof("TASK CANCELLATION: Worker %s received task cancellation for task %s", w.id, msg.TaskCancellation.TaskId)
  643. w.handleTaskCancellation(msg.TaskCancellation)
  644. case *worker_pb.AdminMessage_AdminShutdown:
  645. glog.Infof("ADMIN SHUTDOWN: Worker %s received admin shutdown message", w.id)
  646. w.handleAdminShutdown(msg.AdminShutdown)
  647. default:
  648. glog.V(1).Infof("UNKNOWN MESSAGE: Worker %s received unknown admin message type: %T", w.id, message.Message)
  649. }
  650. }
  651. // handleTaskLogRequest processes task log requests from admin server
  652. func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) {
  653. glog.V(1).Infof("Worker %s handling task log request for task %s", w.id, request.TaskId)
  654. // Use the task log handler to process the request
  655. response := w.taskLogHandler.HandleLogRequest(request)
  656. // Send response back to admin server
  657. responseMsg := &worker_pb.WorkerMessage{
  658. WorkerId: w.id,
  659. Timestamp: time.Now().Unix(),
  660. Message: &worker_pb.WorkerMessage_TaskLogResponse{
  661. TaskLogResponse: response,
  662. },
  663. }
  664. grpcClient, ok := w.adminClient.(*GrpcAdminClient)
  665. if !ok {
  666. glog.Errorf("Cannot send task log response: admin client is not gRPC client")
  667. return
  668. }
  669. select {
  670. case grpcClient.outgoing <- responseMsg:
  671. glog.V(1).Infof("Task log response sent for task %s", request.TaskId)
  672. case <-time.After(5 * time.Second):
  673. glog.Errorf("Failed to send task log response for task %s: timeout", request.TaskId)
  674. }
  675. }
  676. // handleTaskCancellation processes task cancellation requests
  677. func (w *Worker) handleTaskCancellation(cancellation *worker_pb.TaskCancellation) {
  678. glog.Infof("Worker %s received task cancellation for task %s", w.id, cancellation.TaskId)
  679. w.mutex.Lock()
  680. defer w.mutex.Unlock()
  681. if task, exists := w.currentTasks[cancellation.TaskId]; exists {
  682. // TODO: Implement task cancellation logic
  683. glog.Infof("Cancelling task %s", task.ID)
  684. } else {
  685. glog.Warningf("Cannot cancel task %s: task not found", cancellation.TaskId)
  686. }
  687. }
  688. // handleAdminShutdown processes admin shutdown notifications
  689. func (w *Worker) handleAdminShutdown(shutdown *worker_pb.AdminShutdown) {
  690. glog.Infof("Worker %s received admin shutdown notification: %s", w.id, shutdown.Reason)
  691. gracefulSeconds := shutdown.GracefulShutdownSeconds
  692. if gracefulSeconds > 0 {
  693. glog.Infof("Graceful shutdown in %d seconds", gracefulSeconds)
  694. time.AfterFunc(time.Duration(gracefulSeconds)*time.Second, func() {
  695. w.Stop()
  696. })
  697. } else {
  698. // Immediate shutdown
  699. go w.Stop()
  700. }
  701. }
  702. // handleRegistrationResponse processes registration response from admin server
  703. func (w *Worker) handleRegistrationResponse(response *worker_pb.RegistrationResponse) {
  704. glog.V(2).Infof("Worker %s processed registration response: success=%v", w.id, response.Success)
  705. if !response.Success {
  706. glog.Warningf("Worker %s registration failed: %s", w.id, response.Message)
  707. }
  708. // Registration responses are typically handled by the gRPC client during connection setup
  709. // No additional action needed here
  710. }
  711. // handleHeartbeatResponse processes heartbeat response from admin server
  712. func (w *Worker) handleHeartbeatResponse(response *worker_pb.HeartbeatResponse) {
  713. glog.V(4).Infof("Worker %s processed heartbeat response", w.id)
  714. // Heartbeat responses are mainly for keeping the connection alive
  715. // The admin may include configuration updates or status information in the future
  716. // For now, just acknowledge receipt
  717. }