registry.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package worker
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  7. )
  8. // Registry manages workers and their statistics
  9. type Registry struct {
  10. workers map[string]*types.WorkerData
  11. stats *types.RegistryStats
  12. mutex sync.RWMutex
  13. }
  14. // NewRegistry creates a new worker registry
  15. func NewRegistry() *Registry {
  16. return &Registry{
  17. workers: make(map[string]*types.WorkerData),
  18. stats: &types.RegistryStats{
  19. TotalWorkers: 0,
  20. ActiveWorkers: 0,
  21. BusyWorkers: 0,
  22. IdleWorkers: 0,
  23. TotalTasks: 0,
  24. CompletedTasks: 0,
  25. FailedTasks: 0,
  26. StartTime: time.Now(),
  27. },
  28. }
  29. }
  30. // RegisterWorker registers a new worker
  31. func (r *Registry) RegisterWorker(worker *types.WorkerData) error {
  32. r.mutex.Lock()
  33. defer r.mutex.Unlock()
  34. if _, exists := r.workers[worker.ID]; exists {
  35. return fmt.Errorf("worker %s already registered", worker.ID)
  36. }
  37. r.workers[worker.ID] = worker
  38. r.updateStats()
  39. return nil
  40. }
  41. // UnregisterWorker removes a worker from the registry
  42. func (r *Registry) UnregisterWorker(workerID string) error {
  43. r.mutex.Lock()
  44. defer r.mutex.Unlock()
  45. if _, exists := r.workers[workerID]; !exists {
  46. return fmt.Errorf("worker %s not found", workerID)
  47. }
  48. delete(r.workers, workerID)
  49. r.updateStats()
  50. return nil
  51. }
  52. // GetWorker returns a worker by ID
  53. func (r *Registry) GetWorker(workerID string) (*types.WorkerData, bool) {
  54. r.mutex.RLock()
  55. defer r.mutex.RUnlock()
  56. worker, exists := r.workers[workerID]
  57. return worker, exists
  58. }
  59. // ListWorkers returns all registered workers
  60. func (r *Registry) ListWorkers() []*types.WorkerData {
  61. r.mutex.RLock()
  62. defer r.mutex.RUnlock()
  63. workers := make([]*types.WorkerData, 0, len(r.workers))
  64. for _, worker := range r.workers {
  65. workers = append(workers, worker)
  66. }
  67. return workers
  68. }
  69. // GetWorkersByCapability returns workers that support a specific capability
  70. func (r *Registry) GetWorkersByCapability(capability types.TaskType) []*types.WorkerData {
  71. r.mutex.RLock()
  72. defer r.mutex.RUnlock()
  73. var workers []*types.WorkerData
  74. for _, worker := range r.workers {
  75. for _, cap := range worker.Capabilities {
  76. if cap == capability {
  77. workers = append(workers, worker)
  78. break
  79. }
  80. }
  81. }
  82. return workers
  83. }
  84. // GetAvailableWorkers returns workers that are available for new tasks
  85. func (r *Registry) GetAvailableWorkers() []*types.WorkerData {
  86. r.mutex.RLock()
  87. defer r.mutex.RUnlock()
  88. var workers []*types.WorkerData
  89. for _, worker := range r.workers {
  90. if worker.Status == "active" && worker.CurrentLoad < worker.MaxConcurrent {
  91. workers = append(workers, worker)
  92. }
  93. }
  94. return workers
  95. }
  96. // GetBestWorkerForTask returns the best worker for a specific task
  97. func (r *Registry) GetBestWorkerForTask(taskType types.TaskType) *types.WorkerData {
  98. r.mutex.RLock()
  99. defer r.mutex.RUnlock()
  100. var bestWorker *types.WorkerData
  101. var bestScore float64
  102. for _, worker := range r.workers {
  103. // Check if worker supports this task type
  104. supportsTask := false
  105. for _, cap := range worker.Capabilities {
  106. if cap == taskType {
  107. supportsTask = true
  108. break
  109. }
  110. }
  111. if !supportsTask {
  112. continue
  113. }
  114. // Check if worker is available
  115. if worker.Status != "active" || worker.CurrentLoad >= worker.MaxConcurrent {
  116. continue
  117. }
  118. // Calculate score based on current load and capacity
  119. score := float64(worker.MaxConcurrent-worker.CurrentLoad) / float64(worker.MaxConcurrent)
  120. if bestWorker == nil || score > bestScore {
  121. bestWorker = worker
  122. bestScore = score
  123. }
  124. }
  125. return bestWorker
  126. }
  127. // UpdateWorkerHeartbeat updates the last heartbeat time for a worker
  128. func (r *Registry) UpdateWorkerHeartbeat(workerID string) error {
  129. r.mutex.Lock()
  130. defer r.mutex.Unlock()
  131. worker, exists := r.workers[workerID]
  132. if !exists {
  133. return fmt.Errorf("worker %s not found", workerID)
  134. }
  135. worker.LastHeartbeat = time.Now()
  136. return nil
  137. }
  138. // UpdateWorkerLoad updates the current load for a worker
  139. func (r *Registry) UpdateWorkerLoad(workerID string, load int) error {
  140. r.mutex.Lock()
  141. defer r.mutex.Unlock()
  142. worker, exists := r.workers[workerID]
  143. if !exists {
  144. return fmt.Errorf("worker %s not found", workerID)
  145. }
  146. worker.CurrentLoad = load
  147. if load >= worker.MaxConcurrent {
  148. worker.Status = "busy"
  149. } else {
  150. worker.Status = "active"
  151. }
  152. r.updateStats()
  153. return nil
  154. }
  155. // UpdateWorkerStatus updates the status of a worker
  156. func (r *Registry) UpdateWorkerStatus(workerID string, status string) error {
  157. r.mutex.Lock()
  158. defer r.mutex.Unlock()
  159. worker, exists := r.workers[workerID]
  160. if !exists {
  161. return fmt.Errorf("worker %s not found", workerID)
  162. }
  163. worker.Status = status
  164. r.updateStats()
  165. return nil
  166. }
  167. // CleanupStaleWorkers removes workers that haven't sent heartbeats recently
  168. func (r *Registry) CleanupStaleWorkers(timeout time.Duration) int {
  169. r.mutex.Lock()
  170. defer r.mutex.Unlock()
  171. var removedCount int
  172. cutoff := time.Now().Add(-timeout)
  173. for workerID, worker := range r.workers {
  174. if worker.LastHeartbeat.Before(cutoff) {
  175. delete(r.workers, workerID)
  176. removedCount++
  177. }
  178. }
  179. if removedCount > 0 {
  180. r.updateStats()
  181. }
  182. return removedCount
  183. }
  184. // GetStats returns current registry statistics
  185. func (r *Registry) GetStats() *types.RegistryStats {
  186. r.mutex.RLock()
  187. defer r.mutex.RUnlock()
  188. // Create a copy of the stats to avoid race conditions
  189. stats := *r.stats
  190. return &stats
  191. }
  192. // updateStats updates the registry statistics (must be called with lock held)
  193. func (r *Registry) updateStats() {
  194. r.stats.TotalWorkers = len(r.workers)
  195. r.stats.ActiveWorkers = 0
  196. r.stats.BusyWorkers = 0
  197. r.stats.IdleWorkers = 0
  198. for _, worker := range r.workers {
  199. switch worker.Status {
  200. case "active":
  201. if worker.CurrentLoad > 0 {
  202. r.stats.ActiveWorkers++
  203. } else {
  204. r.stats.IdleWorkers++
  205. }
  206. case "busy":
  207. r.stats.BusyWorkers++
  208. }
  209. }
  210. r.stats.Uptime = time.Since(r.stats.StartTime)
  211. r.stats.LastUpdated = time.Now()
  212. }
  213. // GetTaskCapabilities returns all task capabilities available in the registry
  214. func (r *Registry) GetTaskCapabilities() []types.TaskType {
  215. r.mutex.RLock()
  216. defer r.mutex.RUnlock()
  217. capabilitySet := make(map[types.TaskType]bool)
  218. for _, worker := range r.workers {
  219. for _, cap := range worker.Capabilities {
  220. capabilitySet[cap] = true
  221. }
  222. }
  223. var capabilities []types.TaskType
  224. for cap := range capabilitySet {
  225. capabilities = append(capabilities, cap)
  226. }
  227. return capabilities
  228. }
  229. // GetWorkersByStatus returns workers filtered by status
  230. func (r *Registry) GetWorkersByStatus(status string) []*types.WorkerData {
  231. r.mutex.RLock()
  232. defer r.mutex.RUnlock()
  233. var workers []*types.WorkerData
  234. for _, worker := range r.workers {
  235. if worker.Status == status {
  236. workers = append(workers, worker)
  237. }
  238. }
  239. return workers
  240. }
  241. // GetWorkerCount returns the total number of registered workers
  242. func (r *Registry) GetWorkerCount() int {
  243. r.mutex.RLock()
  244. defer r.mutex.RUnlock()
  245. return len(r.workers)
  246. }
  247. // GetWorkerIDs returns all worker IDs
  248. func (r *Registry) GetWorkerIDs() []string {
  249. r.mutex.RLock()
  250. defer r.mutex.RUnlock()
  251. ids := make([]string, 0, len(r.workers))
  252. for id := range r.workers {
  253. ids = append(ids, id)
  254. }
  255. return ids
  256. }
  257. // GetWorkerSummary returns a summary of all workers
  258. func (r *Registry) GetWorkerSummary() *types.WorkerSummary {
  259. r.mutex.RLock()
  260. defer r.mutex.RUnlock()
  261. summary := &types.WorkerSummary{
  262. TotalWorkers: len(r.workers),
  263. ByStatus: make(map[string]int),
  264. ByCapability: make(map[types.TaskType]int),
  265. TotalLoad: 0,
  266. MaxCapacity: 0,
  267. }
  268. for _, worker := range r.workers {
  269. summary.ByStatus[worker.Status]++
  270. summary.TotalLoad += worker.CurrentLoad
  271. summary.MaxCapacity += worker.MaxConcurrent
  272. for _, cap := range worker.Capabilities {
  273. summary.ByCapability[cap]++
  274. }
  275. }
  276. return summary
  277. }
  278. // Default global registry instance
  279. var defaultRegistry *Registry
  280. var registryOnce sync.Once
  281. // GetDefaultRegistry returns the default global registry
  282. func GetDefaultRegistry() *Registry {
  283. registryOnce.Do(func() {
  284. defaultRegistry = NewRegistry()
  285. })
  286. return defaultRegistry
  287. }