| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- package worker
- import (
- "fmt"
- "sync"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
- )
- // Registry manages workers and their statistics
- type Registry struct {
- workers map[string]*types.WorkerData
- stats *types.RegistryStats
- mutex sync.RWMutex
- }
- // NewRegistry creates a new worker registry
- func NewRegistry() *Registry {
- return &Registry{
- workers: make(map[string]*types.WorkerData),
- stats: &types.RegistryStats{
- TotalWorkers: 0,
- ActiveWorkers: 0,
- BusyWorkers: 0,
- IdleWorkers: 0,
- TotalTasks: 0,
- CompletedTasks: 0,
- FailedTasks: 0,
- StartTime: time.Now(),
- },
- }
- }
- // RegisterWorker registers a new worker
- func (r *Registry) RegisterWorker(worker *types.WorkerData) error {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- if _, exists := r.workers[worker.ID]; exists {
- return fmt.Errorf("worker %s already registered", worker.ID)
- }
- r.workers[worker.ID] = worker
- r.updateStats()
- return nil
- }
- // UnregisterWorker removes a worker from the registry
- func (r *Registry) UnregisterWorker(workerID string) error {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- if _, exists := r.workers[workerID]; !exists {
- return fmt.Errorf("worker %s not found", workerID)
- }
- delete(r.workers, workerID)
- r.updateStats()
- return nil
- }
- // GetWorker returns a worker by ID
- func (r *Registry) GetWorker(workerID string) (*types.WorkerData, bool) {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- worker, exists := r.workers[workerID]
- return worker, exists
- }
- // ListWorkers returns all registered workers
- func (r *Registry) ListWorkers() []*types.WorkerData {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- workers := make([]*types.WorkerData, 0, len(r.workers))
- for _, worker := range r.workers {
- workers = append(workers, worker)
- }
- return workers
- }
- // GetWorkersByCapability returns workers that support a specific capability
- func (r *Registry) GetWorkersByCapability(capability types.TaskType) []*types.WorkerData {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- var workers []*types.WorkerData
- for _, worker := range r.workers {
- for _, cap := range worker.Capabilities {
- if cap == capability {
- workers = append(workers, worker)
- break
- }
- }
- }
- return workers
- }
- // GetAvailableWorkers returns workers that are available for new tasks
- func (r *Registry) GetAvailableWorkers() []*types.WorkerData {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- var workers []*types.WorkerData
- for _, worker := range r.workers {
- if worker.Status == "active" && worker.CurrentLoad < worker.MaxConcurrent {
- workers = append(workers, worker)
- }
- }
- return workers
- }
- // GetBestWorkerForTask returns the best worker for a specific task
- func (r *Registry) GetBestWorkerForTask(taskType types.TaskType) *types.WorkerData {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- var bestWorker *types.WorkerData
- var bestScore float64
- for _, worker := range r.workers {
- // Check if worker supports this task type
- supportsTask := false
- for _, cap := range worker.Capabilities {
- if cap == taskType {
- supportsTask = true
- break
- }
- }
- if !supportsTask {
- continue
- }
- // Check if worker is available
- if worker.Status != "active" || worker.CurrentLoad >= worker.MaxConcurrent {
- continue
- }
- // Calculate score based on current load and capacity
- score := float64(worker.MaxConcurrent-worker.CurrentLoad) / float64(worker.MaxConcurrent)
- if bestWorker == nil || score > bestScore {
- bestWorker = worker
- bestScore = score
- }
- }
- return bestWorker
- }
- // UpdateWorkerHeartbeat updates the last heartbeat time for a worker
- func (r *Registry) UpdateWorkerHeartbeat(workerID string) error {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- worker, exists := r.workers[workerID]
- if !exists {
- return fmt.Errorf("worker %s not found", workerID)
- }
- worker.LastHeartbeat = time.Now()
- return nil
- }
- // UpdateWorkerLoad updates the current load for a worker
- func (r *Registry) UpdateWorkerLoad(workerID string, load int) error {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- worker, exists := r.workers[workerID]
- if !exists {
- return fmt.Errorf("worker %s not found", workerID)
- }
- worker.CurrentLoad = load
- if load >= worker.MaxConcurrent {
- worker.Status = "busy"
- } else {
- worker.Status = "active"
- }
- r.updateStats()
- return nil
- }
- // UpdateWorkerStatus updates the status of a worker
- func (r *Registry) UpdateWorkerStatus(workerID string, status string) error {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- worker, exists := r.workers[workerID]
- if !exists {
- return fmt.Errorf("worker %s not found", workerID)
- }
- worker.Status = status
- r.updateStats()
- return nil
- }
- // CleanupStaleWorkers removes workers that haven't sent heartbeats recently
- func (r *Registry) CleanupStaleWorkers(timeout time.Duration) int {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- var removedCount int
- cutoff := time.Now().Add(-timeout)
- for workerID, worker := range r.workers {
- if worker.LastHeartbeat.Before(cutoff) {
- delete(r.workers, workerID)
- removedCount++
- }
- }
- if removedCount > 0 {
- r.updateStats()
- }
- return removedCount
- }
- // GetStats returns current registry statistics
- func (r *Registry) GetStats() *types.RegistryStats {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- // Create a copy of the stats to avoid race conditions
- stats := *r.stats
- return &stats
- }
- // updateStats updates the registry statistics (must be called with lock held)
- func (r *Registry) updateStats() {
- r.stats.TotalWorkers = len(r.workers)
- r.stats.ActiveWorkers = 0
- r.stats.BusyWorkers = 0
- r.stats.IdleWorkers = 0
- for _, worker := range r.workers {
- switch worker.Status {
- case "active":
- if worker.CurrentLoad > 0 {
- r.stats.ActiveWorkers++
- } else {
- r.stats.IdleWorkers++
- }
- case "busy":
- r.stats.BusyWorkers++
- }
- }
- r.stats.Uptime = time.Since(r.stats.StartTime)
- r.stats.LastUpdated = time.Now()
- }
- // GetTaskCapabilities returns all task capabilities available in the registry
- func (r *Registry) GetTaskCapabilities() []types.TaskType {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- capabilitySet := make(map[types.TaskType]bool)
- for _, worker := range r.workers {
- for _, cap := range worker.Capabilities {
- capabilitySet[cap] = true
- }
- }
- var capabilities []types.TaskType
- for cap := range capabilitySet {
- capabilities = append(capabilities, cap)
- }
- return capabilities
- }
- // GetWorkersByStatus returns workers filtered by status
- func (r *Registry) GetWorkersByStatus(status string) []*types.WorkerData {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- var workers []*types.WorkerData
- for _, worker := range r.workers {
- if worker.Status == status {
- workers = append(workers, worker)
- }
- }
- return workers
- }
- // GetWorkerCount returns the total number of registered workers
- func (r *Registry) GetWorkerCount() int {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- return len(r.workers)
- }
- // GetWorkerIDs returns all worker IDs
- func (r *Registry) GetWorkerIDs() []string {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- ids := make([]string, 0, len(r.workers))
- for id := range r.workers {
- ids = append(ids, id)
- }
- return ids
- }
- // GetWorkerSummary returns a summary of all workers
- func (r *Registry) GetWorkerSummary() *types.WorkerSummary {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- summary := &types.WorkerSummary{
- TotalWorkers: len(r.workers),
- ByStatus: make(map[string]int),
- ByCapability: make(map[types.TaskType]int),
- TotalLoad: 0,
- MaxCapacity: 0,
- }
- for _, worker := range r.workers {
- summary.ByStatus[worker.Status]++
- summary.TotalLoad += worker.CurrentLoad
- summary.MaxCapacity += worker.MaxConcurrent
- for _, cap := range worker.Capabilities {
- summary.ByCapability[cap]++
- }
- }
- return summary
- }
- // Default global registry instance
- var defaultRegistry *Registry
- var registryOnce sync.Once
- // GetDefaultRegistry returns the default global registry
- func GetDefaultRegistry() *Registry {
- registryOnce.Do(func() {
- defaultRegistry = NewRegistry()
- })
- return defaultRegistry
- }
|