worker.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package command
  2. import (
  3. "os"
  4. "os/signal"
  5. "path/filepath"
  6. "strings"
  7. "syscall"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/security"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "github.com/seaweedfs/seaweedfs/weed/worker"
  13. "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
  14. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  15. // Import task packages to trigger their auto-registration
  16. _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
  17. _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
  18. _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
  19. )
  20. var cmdWorker = &Command{
  21. UsageLine: "worker -admin=<admin_server> [-capabilities=<task_types>] [-maxConcurrent=<num>] [-workingDir=<path>]",
  22. Short: "start a maintenance worker to process cluster maintenance tasks",
  23. Long: `Start a maintenance worker that connects to an admin server to process
  24. maintenance tasks like vacuum, erasure coding, remote upload, and replication fixes.
  25. The worker ID and address are automatically generated.
  26. The worker connects to the admin server via gRPC (admin HTTP port + 10000).
  27. Examples:
  28. weed worker -admin=localhost:23646
  29. weed worker -admin=admin.example.com:23646
  30. weed worker -admin=localhost:23646 -capabilities=vacuum,replication
  31. weed worker -admin=localhost:23646 -maxConcurrent=4
  32. weed worker -admin=localhost:23646 -workingDir=/tmp/worker
  33. `,
  34. }
  35. var (
  36. workerAdminServer = cmdWorker.Flag.String("admin", "localhost:23646", "admin server address")
  37. workerCapabilities = cmdWorker.Flag.String("capabilities", "vacuum,ec,remote,replication,balance", "comma-separated list of task types this worker can handle")
  38. workerMaxConcurrent = cmdWorker.Flag.Int("maxConcurrent", 2, "maximum number of concurrent tasks")
  39. workerHeartbeatInterval = cmdWorker.Flag.Duration("heartbeat", 30*time.Second, "heartbeat interval")
  40. workerTaskRequestInterval = cmdWorker.Flag.Duration("taskInterval", 5*time.Second, "task request interval")
  41. workerWorkingDir = cmdWorker.Flag.String("workingDir", "", "working directory for the worker")
  42. )
  43. func init() {
  44. cmdWorker.Run = runWorker
  45. // Set default capabilities from registered task types
  46. // This happens after package imports have triggered auto-registration
  47. tasks.SetDefaultCapabilitiesFromRegistry()
  48. }
  49. func runWorker(cmd *Command, args []string) bool {
  50. util.LoadConfiguration("security", false)
  51. glog.Infof("Starting maintenance worker")
  52. glog.Infof("Admin server: %s", *workerAdminServer)
  53. glog.Infof("Capabilities: %s", *workerCapabilities)
  54. // Parse capabilities
  55. capabilities := parseCapabilities(*workerCapabilities)
  56. if len(capabilities) == 0 {
  57. glog.Fatalf("No valid capabilities specified")
  58. return false
  59. }
  60. // Set working directory and create task-specific subdirectories
  61. var baseWorkingDir string
  62. if *workerWorkingDir != "" {
  63. glog.Infof("Setting working directory to: %s", *workerWorkingDir)
  64. if err := os.Chdir(*workerWorkingDir); err != nil {
  65. glog.Fatalf("Failed to change working directory: %v", err)
  66. return false
  67. }
  68. wd, err := os.Getwd()
  69. if err != nil {
  70. glog.Fatalf("Failed to get working directory: %v", err)
  71. return false
  72. }
  73. baseWorkingDir = wd
  74. glog.Infof("Current working directory: %s", baseWorkingDir)
  75. } else {
  76. // Use default working directory when not specified
  77. wd, err := os.Getwd()
  78. if err != nil {
  79. glog.Fatalf("Failed to get current working directory: %v", err)
  80. return false
  81. }
  82. baseWorkingDir = wd
  83. glog.Infof("Using current working directory: %s", baseWorkingDir)
  84. }
  85. // Create task-specific subdirectories
  86. for _, capability := range capabilities {
  87. taskDir := filepath.Join(baseWorkingDir, string(capability))
  88. if err := os.MkdirAll(taskDir, 0755); err != nil {
  89. glog.Fatalf("Failed to create task directory %s: %v", taskDir, err)
  90. return false
  91. }
  92. glog.Infof("Created task directory: %s", taskDir)
  93. }
  94. // Create gRPC dial option using TLS configuration
  95. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker")
  96. // Create worker configuration
  97. config := &types.WorkerConfig{
  98. AdminServer: *workerAdminServer,
  99. Capabilities: capabilities,
  100. MaxConcurrent: *workerMaxConcurrent,
  101. HeartbeatInterval: *workerHeartbeatInterval,
  102. TaskRequestInterval: *workerTaskRequestInterval,
  103. BaseWorkingDir: baseWorkingDir,
  104. GrpcDialOption: grpcDialOption,
  105. }
  106. // Create worker instance
  107. workerInstance, err := worker.NewWorker(config)
  108. if err != nil {
  109. glog.Fatalf("Failed to create worker: %v", err)
  110. return false
  111. }
  112. adminClient, err := worker.CreateAdminClient(*workerAdminServer, workerInstance.ID(), grpcDialOption)
  113. if err != nil {
  114. glog.Fatalf("Failed to create admin client: %v", err)
  115. return false
  116. }
  117. // Set admin client
  118. workerInstance.SetAdminClient(adminClient)
  119. // Set working directory
  120. if *workerWorkingDir != "" {
  121. glog.Infof("Setting working directory to: %s", *workerWorkingDir)
  122. if err := os.Chdir(*workerWorkingDir); err != nil {
  123. glog.Fatalf("Failed to change working directory: %v", err)
  124. return false
  125. }
  126. wd, err := os.Getwd()
  127. if err != nil {
  128. glog.Fatalf("Failed to get working directory: %v", err)
  129. return false
  130. }
  131. glog.Infof("Current working directory: %s", wd)
  132. }
  133. // Start the worker
  134. err = workerInstance.Start()
  135. if err != nil {
  136. glog.Errorf("Failed to start worker: %v", err)
  137. return false
  138. }
  139. // Set up signal handling
  140. sigChan := make(chan os.Signal, 1)
  141. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  142. glog.Infof("Maintenance worker %s started successfully", workerInstance.ID())
  143. glog.Infof("Press Ctrl+C to stop the worker")
  144. // Wait for shutdown signal
  145. <-sigChan
  146. glog.Infof("Shutdown signal received, stopping worker...")
  147. // Gracefully stop the worker
  148. err = workerInstance.Stop()
  149. if err != nil {
  150. glog.Errorf("Error stopping worker: %v", err)
  151. }
  152. glog.Infof("Worker stopped")
  153. return true
  154. }
  155. // parseCapabilities converts comma-separated capability string to task types
  156. func parseCapabilities(capabilityStr string) []types.TaskType {
  157. if capabilityStr == "" {
  158. return nil
  159. }
  160. capabilityMap := map[string]types.TaskType{}
  161. // Populate capabilityMap with registered task types
  162. typesRegistry := tasks.GetGlobalTypesRegistry()
  163. for taskType := range typesRegistry.GetAllDetectors() {
  164. // Use the task type string directly as the key
  165. capabilityMap[strings.ToLower(string(taskType))] = taskType
  166. }
  167. // Add common aliases for convenience
  168. if taskType, exists := capabilityMap["erasure_coding"]; exists {
  169. capabilityMap["ec"] = taskType
  170. }
  171. if taskType, exists := capabilityMap["remote_upload"]; exists {
  172. capabilityMap["remote"] = taskType
  173. }
  174. if taskType, exists := capabilityMap["fix_replication"]; exists {
  175. capabilityMap["replication"] = taskType
  176. }
  177. var capabilities []types.TaskType
  178. parts := strings.Split(capabilityStr, ",")
  179. for _, part := range parts {
  180. part = strings.TrimSpace(part)
  181. if taskType, exists := capabilityMap[part]; exists {
  182. capabilities = append(capabilities, taskType)
  183. } else {
  184. glog.Warningf("Unknown capability: %s", part)
  185. }
  186. }
  187. return capabilities
  188. }
  189. // Legacy compatibility types for backward compatibility
  190. // These will be deprecated in future versions
  191. // WorkerStatus represents the current status of a worker (deprecated)
  192. type WorkerStatus struct {
  193. WorkerID string `json:"worker_id"`
  194. Address string `json:"address"`
  195. Status string `json:"status"`
  196. Capabilities []types.TaskType `json:"capabilities"`
  197. MaxConcurrent int `json:"max_concurrent"`
  198. CurrentLoad int `json:"current_load"`
  199. LastHeartbeat time.Time `json:"last_heartbeat"`
  200. CurrentTasks []types.Task `json:"current_tasks"`
  201. Uptime time.Duration `json:"uptime"`
  202. TasksCompleted int `json:"tasks_completed"`
  203. TasksFailed int `json:"tasks_failed"`
  204. }