maintenance_worker.go 13 KB


  1. package maintenance
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "sync"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/worker"
  10. "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
  11. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  12. // Import task packages to trigger their auto-registration
  13. _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
  14. _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
  15. _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
  16. )
  17. // MaintenanceWorkerService manages maintenance task execution
  18. // TaskExecutor defines the function signature for task execution
  19. type TaskExecutor func(*MaintenanceWorkerService, *MaintenanceTask) error
  20. // TaskExecutorFactory creates a task executor for a given worker service
  21. type TaskExecutorFactory func() TaskExecutor
  22. // Global registry for task executor factories
  23. var taskExecutorFactories = make(map[MaintenanceTaskType]TaskExecutorFactory)
  24. var executorRegistryMutex sync.RWMutex
  25. var executorRegistryInitOnce sync.Once
  26. // initializeExecutorFactories dynamically registers executor factories for all auto-registered task types
  27. func initializeExecutorFactories() {
  28. executorRegistryInitOnce.Do(func() {
  29. // Get all registered task types from the global registry
  30. typesRegistry := tasks.GetGlobalTypesRegistry()
  31. var taskTypes []MaintenanceTaskType
  32. for workerTaskType := range typesRegistry.GetAllDetectors() {
  33. // Convert types.TaskType to MaintenanceTaskType by string conversion
  34. maintenanceTaskType := MaintenanceTaskType(string(workerTaskType))
  35. taskTypes = append(taskTypes, maintenanceTaskType)
  36. }
  37. // Register generic executor for all task types
  38. for _, taskType := range taskTypes {
  39. RegisterTaskExecutorFactory(taskType, createGenericTaskExecutor)
  40. }
  41. glog.V(1).Infof("Dynamically registered generic task executor for %d task types: %v", len(taskTypes), taskTypes)
  42. })
  43. }
  44. // RegisterTaskExecutorFactory registers a factory function for creating task executors
  45. func RegisterTaskExecutorFactory(taskType MaintenanceTaskType, factory TaskExecutorFactory) {
  46. executorRegistryMutex.Lock()
  47. defer executorRegistryMutex.Unlock()
  48. taskExecutorFactories[taskType] = factory
  49. glog.V(2).Infof("Registered executor factory for task type: %s", taskType)
  50. }
  51. // GetTaskExecutorFactory returns the factory for a task type
  52. func GetTaskExecutorFactory(taskType MaintenanceTaskType) (TaskExecutorFactory, bool) {
  53. // Ensure executor factories are initialized
  54. initializeExecutorFactories()
  55. executorRegistryMutex.RLock()
  56. defer executorRegistryMutex.RUnlock()
  57. factory, exists := taskExecutorFactories[taskType]
  58. return factory, exists
  59. }
  60. // GetSupportedExecutorTaskTypes returns all task types with registered executor factories
  61. func GetSupportedExecutorTaskTypes() []MaintenanceTaskType {
  62. // Ensure executor factories are initialized
  63. initializeExecutorFactories()
  64. executorRegistryMutex.RLock()
  65. defer executorRegistryMutex.RUnlock()
  66. taskTypes := make([]MaintenanceTaskType, 0, len(taskExecutorFactories))
  67. for taskType := range taskExecutorFactories {
  68. taskTypes = append(taskTypes, taskType)
  69. }
  70. return taskTypes
  71. }
  72. // createGenericTaskExecutor creates a generic task executor that uses the task registry
  73. func createGenericTaskExecutor() TaskExecutor {
  74. return func(mws *MaintenanceWorkerService, task *MaintenanceTask) error {
  75. return mws.executeGenericTask(task)
  76. }
  77. }
  78. // init does minimal initialization - actual registration happens lazily
  79. func init() {
  80. // Executor factory registration will happen lazily when first accessed
  81. glog.V(1).Infof("Maintenance worker initialized - executor factories will be registered on first access")
  82. }
  83. type MaintenanceWorkerService struct {
  84. workerID string
  85. address string
  86. adminServer string
  87. capabilities []MaintenanceTaskType
  88. maxConcurrent int
  89. currentTasks map[string]*MaintenanceTask
  90. queue *MaintenanceQueue
  91. adminClient AdminClient
  92. running bool
  93. stopChan chan struct{}
  94. // Task execution registry
  95. taskExecutors map[MaintenanceTaskType]TaskExecutor
  96. // Task registry for creating task instances
  97. taskRegistry *tasks.TaskRegistry
  98. }
  99. // NewMaintenanceWorkerService creates a new maintenance worker service
  100. func NewMaintenanceWorkerService(workerID, address, adminServer string) *MaintenanceWorkerService {
  101. // Get all registered maintenance task types dynamically
  102. capabilities := GetRegisteredMaintenanceTaskTypes()
  103. worker := &MaintenanceWorkerService{
  104. workerID: workerID,
  105. address: address,
  106. adminServer: adminServer,
  107. capabilities: capabilities,
  108. maxConcurrent: 2, // Default concurrent task limit
  109. currentTasks: make(map[string]*MaintenanceTask),
  110. stopChan: make(chan struct{}),
  111. taskExecutors: make(map[MaintenanceTaskType]TaskExecutor),
  112. taskRegistry: tasks.GetGlobalTaskRegistry(), // Use global registry with auto-registered tasks
  113. }
  114. // Initialize task executor registry
  115. worker.initializeTaskExecutors()
  116. glog.V(1).Infof("Created maintenance worker with %d registered task types", len(worker.taskRegistry.GetAll()))
  117. return worker
  118. }
  119. // executeGenericTask executes a task using the task registry instead of hardcoded methods
  120. func (mws *MaintenanceWorkerService) executeGenericTask(task *MaintenanceTask) error {
  121. glog.V(2).Infof("Executing generic task %s: %s for volume %d", task.ID, task.Type, task.VolumeID)
  122. // Validate that task has proper typed parameters
  123. if task.TypedParams == nil {
  124. return fmt.Errorf("task %s has no typed parameters - task was not properly planned (insufficient destinations)", task.ID)
  125. }
  126. // Convert MaintenanceTask to types.TaskType
  127. taskType := types.TaskType(string(task.Type))
  128. // Create task instance using the registry
  129. taskInstance, err := mws.taskRegistry.Get(taskType).Create(task.TypedParams)
  130. if err != nil {
  131. return fmt.Errorf("failed to create task instance: %w", err)
  132. }
  133. // Update progress to show task has started
  134. mws.updateTaskProgress(task.ID, 5)
  135. // Execute the task
  136. err = taskInstance.Execute(context.Background(), task.TypedParams)
  137. if err != nil {
  138. return fmt.Errorf("task execution failed: %w", err)
  139. }
  140. // Update progress to show completion
  141. mws.updateTaskProgress(task.ID, 100)
  142. glog.V(2).Infof("Generic task %s completed successfully", task.ID)
  143. return nil
  144. }
  145. // initializeTaskExecutors sets up the task execution registry dynamically
  146. func (mws *MaintenanceWorkerService) initializeTaskExecutors() {
  147. mws.taskExecutors = make(map[MaintenanceTaskType]TaskExecutor)
  148. // Get all registered executor factories and create executors
  149. executorRegistryMutex.RLock()
  150. defer executorRegistryMutex.RUnlock()
  151. for taskType, factory := range taskExecutorFactories {
  152. executor := factory()
  153. mws.taskExecutors[taskType] = executor
  154. glog.V(3).Infof("Initialized executor for task type: %s", taskType)
  155. }
  156. glog.V(2).Infof("Initialized %d task executors", len(mws.taskExecutors))
  157. }
  158. // RegisterTaskExecutor allows dynamic registration of new task executors
  159. func (mws *MaintenanceWorkerService) RegisterTaskExecutor(taskType MaintenanceTaskType, executor TaskExecutor) {
  160. if mws.taskExecutors == nil {
  161. mws.taskExecutors = make(map[MaintenanceTaskType]TaskExecutor)
  162. }
  163. mws.taskExecutors[taskType] = executor
  164. glog.V(1).Infof("Registered executor for task type: %s", taskType)
  165. }
  166. // GetSupportedTaskTypes returns all task types that this worker can execute
  167. func (mws *MaintenanceWorkerService) GetSupportedTaskTypes() []MaintenanceTaskType {
  168. return GetSupportedExecutorTaskTypes()
  169. }
  170. // Start begins the worker service
  171. func (mws *MaintenanceWorkerService) Start() error {
  172. mws.running = true
  173. // Register with admin server
  174. worker := &MaintenanceWorker{
  175. ID: mws.workerID,
  176. Address: mws.address,
  177. Capabilities: mws.capabilities,
  178. MaxConcurrent: mws.maxConcurrent,
  179. }
  180. if mws.queue != nil {
  181. mws.queue.RegisterWorker(worker)
  182. }
  183. // Start worker loop
  184. go mws.workerLoop()
  185. glog.Infof("Maintenance worker %s started at %s", mws.workerID, mws.address)
  186. return nil
  187. }
  188. // Stop terminates the worker service
  189. func (mws *MaintenanceWorkerService) Stop() {
  190. mws.running = false
  191. close(mws.stopChan)
  192. // Wait for current tasks to complete or timeout
  193. timeout := time.NewTimer(30 * time.Second)
  194. defer timeout.Stop()
  195. for len(mws.currentTasks) > 0 {
  196. select {
  197. case <-timeout.C:
  198. glog.Warningf("Worker %s stopping with %d tasks still running", mws.workerID, len(mws.currentTasks))
  199. return
  200. case <-time.After(time.Second):
  201. // Check again
  202. }
  203. }
  204. glog.Infof("Maintenance worker %s stopped", mws.workerID)
  205. }
  206. // workerLoop is the main worker event loop
  207. func (mws *MaintenanceWorkerService) workerLoop() {
  208. heartbeatTicker := time.NewTicker(30 * time.Second)
  209. defer heartbeatTicker.Stop()
  210. taskRequestTicker := time.NewTicker(5 * time.Second)
  211. defer taskRequestTicker.Stop()
  212. for mws.running {
  213. select {
  214. case <-mws.stopChan:
  215. return
  216. case <-heartbeatTicker.C:
  217. mws.sendHeartbeat()
  218. case <-taskRequestTicker.C:
  219. mws.requestTasks()
  220. }
  221. }
  222. }
  223. // sendHeartbeat sends heartbeat to admin server
  224. func (mws *MaintenanceWorkerService) sendHeartbeat() {
  225. if mws.queue != nil {
  226. mws.queue.UpdateWorkerHeartbeat(mws.workerID)
  227. }
  228. }
  229. // requestTasks requests new tasks from the admin server
  230. func (mws *MaintenanceWorkerService) requestTasks() {
  231. if len(mws.currentTasks) >= mws.maxConcurrent {
  232. return // Already at capacity
  233. }
  234. if mws.queue != nil {
  235. task := mws.queue.GetNextTask(mws.workerID, mws.capabilities)
  236. if task != nil {
  237. mws.executeTask(task)
  238. }
  239. }
  240. }
  241. // executeTask executes a maintenance task
  242. func (mws *MaintenanceWorkerService) executeTask(task *MaintenanceTask) {
  243. mws.currentTasks[task.ID] = task
  244. go func() {
  245. defer func() {
  246. delete(mws.currentTasks, task.ID)
  247. }()
  248. glog.Infof("Worker %s executing task %s: %s", mws.workerID, task.ID, task.Type)
  249. // Execute task using dynamic executor registry
  250. var err error
  251. if executor, exists := mws.taskExecutors[task.Type]; exists {
  252. err = executor(mws, task)
  253. } else {
  254. err = fmt.Errorf("unsupported task type: %s", task.Type)
  255. glog.Errorf("No executor registered for task type: %s", task.Type)
  256. }
  257. // Report task completion
  258. if mws.queue != nil {
  259. errorMsg := ""
  260. if err != nil {
  261. errorMsg = err.Error()
  262. }
  263. mws.queue.CompleteTask(task.ID, errorMsg)
  264. }
  265. if err != nil {
  266. glog.Errorf("Worker %s failed to execute task %s: %v", mws.workerID, task.ID, err)
  267. } else {
  268. glog.Infof("Worker %s completed task %s successfully", mws.workerID, task.ID)
  269. }
  270. }()
  271. }
  272. // updateTaskProgress updates the progress of a task
  273. func (mws *MaintenanceWorkerService) updateTaskProgress(taskID string, progress float64) {
  274. if mws.queue != nil {
  275. mws.queue.UpdateTaskProgress(taskID, progress)
  276. }
  277. }
  278. // GetStatus returns the current status of the worker
  279. func (mws *MaintenanceWorkerService) GetStatus() map[string]interface{} {
  280. return map[string]interface{}{
  281. "worker_id": mws.workerID,
  282. "address": mws.address,
  283. "running": mws.running,
  284. "capabilities": mws.capabilities,
  285. "max_concurrent": mws.maxConcurrent,
  286. "current_tasks": len(mws.currentTasks),
  287. "task_details": mws.currentTasks,
  288. }
  289. }
  290. // SetQueue sets the maintenance queue for the worker
  291. func (mws *MaintenanceWorkerService) SetQueue(queue *MaintenanceQueue) {
  292. mws.queue = queue
  293. }
  294. // SetAdminClient sets the admin client for the worker
  295. func (mws *MaintenanceWorkerService) SetAdminClient(client AdminClient) {
  296. mws.adminClient = client
  297. }
  298. // SetCapabilities sets the worker capabilities
  299. func (mws *MaintenanceWorkerService) SetCapabilities(capabilities []MaintenanceTaskType) {
  300. mws.capabilities = capabilities
  301. }
  302. // SetMaxConcurrent sets the maximum concurrent tasks
  303. func (mws *MaintenanceWorkerService) SetMaxConcurrent(max int) {
  304. mws.maxConcurrent = max
  305. }
  306. // SetHeartbeatInterval sets the heartbeat interval (placeholder for future use)
  307. func (mws *MaintenanceWorkerService) SetHeartbeatInterval(interval time.Duration) {
  308. // Future implementation for configurable heartbeat
  309. }
  310. // SetTaskRequestInterval sets the task request interval (placeholder for future use)
  311. func (mws *MaintenanceWorkerService) SetTaskRequestInterval(interval time.Duration) {
  312. // Future implementation for configurable task requests
  313. }
  314. // MaintenanceWorkerCommand represents a standalone maintenance worker command
  315. type MaintenanceWorkerCommand struct {
  316. workerService *MaintenanceWorkerService
  317. }
  318. // NewMaintenanceWorkerCommand creates a new worker command
  319. func NewMaintenanceWorkerCommand(workerID, address, adminServer string) *MaintenanceWorkerCommand {
  320. return &MaintenanceWorkerCommand{
  321. workerService: NewMaintenanceWorkerService(workerID, address, adminServer),
  322. }
  323. }
  324. // Run starts the maintenance worker as a standalone service
  325. func (mwc *MaintenanceWorkerCommand) Run() error {
  326. // Generate or load persistent worker ID if not provided
  327. if mwc.workerService.workerID == "" {
  328. // Get current working directory for worker ID persistence
  329. wd, err := os.Getwd()
  330. if err != nil {
  331. return fmt.Errorf("failed to get working directory: %w", err)
  332. }
  333. workerID, err := worker.GenerateOrLoadWorkerID(wd)
  334. if err != nil {
  335. return fmt.Errorf("failed to generate or load worker ID: %w", err)
  336. }
  337. mwc.workerService.workerID = workerID
  338. }
  339. // Start the worker service
  340. err := mwc.workerService.Start()
  341. if err != nil {
  342. return fmt.Errorf("failed to start maintenance worker: %w", err)
  343. }
  344. // Wait for interrupt signal
  345. select {}
  346. }