||
- package worker
- import (
- "fmt"
- "sync"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
- )
- // Registry manages workers and their statistics
- type Registry struct {
- workers map[string]*types.WorkerData
- stats *types.RegistryStats
- mutex sync.RWMutex
- }
- // NewRegistry creates a new worker registry
- func NewRegistry() *Registry {
- return &Registry{
- workers: make(map[string]*types.WorkerData),
- stats: &types.RegistryStats{
- TotalWorkers: 0,
- ActiveWorkers: 0,
- BusyWorkers: 0,
- IdleWorkers: 0,
- TotalTasks: 0,
- CompletedTasks: 0,
- FailedTasks: 0,
- StartTime: time.Now(),
- },
- }
- }
- // RegisterWorker registers a new worker
- func (r *Registry) RegisterWorker(worker *types.WorkerData) error {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- if _, exists := r.workers[worker.ID]; exists {
- return fmt.Errorf("worker %s already registered", worker.ID)
- }
- r.workers[worker.ID] = worker
- r.updateStats()
- return nil
- }
- // UnregisterWorker removes a worker from the registry
- func (r *Registry) UnregisterWorker(workerID string) error {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- if _, exists := r.workers[workerID]; !exists {
- return fmt.Errorf("worker %s not found", workerID)
- }
- delete(r.workers, workerID)
- r.updateStats()
- return nil
- }
- // GetWorker returns a worker by ID
- func (r *Registry) GetWorker(workerID string) (*types.WorkerData, bool) {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- worker, exists := r.workers[workerID]
- return worker, exists
- }
- // ListWorkers returns all registered workers
- func (r *Registry) ListWorkers() []*types.WorkerData {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- workers := make([]*types.WorkerData, 0, len(r.workers))
- for _, worker := range r.workers {
- workers = append(workers, worker)
- }
- return workers
- }
- // GetWorkersByCapability returns workers that support a specific capability
- func (r *Registry) GetWorkersByCapability(capability types.TaskType) []*types.WorkerData {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- var workers []*types.WorkerData
- for _, worker := range r.workers {
- for _, cap := range worker.Capabilities {
- if cap == capability {
- workers = append(workers, worker)
- break
- }
- }
- }
- return workers
- }
- // GetAvailableWorkers returns workers that are available for new tasks
- func (r *Registry) GetAvailableWorkers() []*types.WorkerData {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- var workers []*types.WorkerData
- for _, worker := range r.workers {
- if worker.Status == "active" && worker.CurrentLoad < worker.MaxConcurrent {
- workers = append(workers, worker)
- }
- }
- return workers
- }
- // GetBestWorkerForTask returns the best worker for a specific task
- func (r *Registry) GetBestWorkerForTask(taskType types.TaskType) *types.WorkerData {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- var bestWorker *types.WorkerData
- var bestScore float64
- for _, worker := range r.workers {
- // Check if worker supports this task type
- supportsTask := false
- for _, cap := range worker.Capabilities {
- if cap == taskType {
- supportsTask = true
- break
- }
- }
- if !supportsTask {
- continue
- }
- // Check if worker is available
- if worker.Status != "active" || worker.CurrentLoad >= worker.MaxConcurrent {
- continue
- }
- // Calculate score based on current load and capacity
- score := float64(worker.MaxConcurrent-worker.CurrentLoad) / float64(worker.MaxConcurrent)
- if bestWorker == nil || score > bestScore {
- bestWorker = worker
- bestScore = score
- }
- }
- return bestWorker
- }
- // UpdateWorkerHeartbeat updates the last heartbeat time for a worker
- func (r *Registry) UpdateWorkerHeartbeat(workerID string) error {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- worker, exists := r.workers[workerID]
- if !exists {
- return fmt.Errorf("worker %s not found", workerID)
- }
- worker.LastHeartbeat = time.Now()
- return nil
- }
- // UpdateWorkerLoad updates the current load for a worker
- func (r *Registry) UpdateWorkerLoad(workerID string, load int) error {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- worker, exists := r.workers[workerID]
- if !exists {
- return fmt.Errorf("worker %s not found", workerID)
- }
- worker.CurrentLoad = load
- if load >= worker.MaxConcurrent {
- worker.Status = "busy"
- } else {
- worker.Status = "active"
- }
- r.updateStats()
- return nil
- }
- // UpdateWorkerStatus updates the status of a worker
- func (r *Registry) UpdateWorkerStatus(workerID string, status string) error {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- worker, exists := r.workers[workerID]
- if !exists {
- return fmt.Errorf("worker %s not found", workerID)
- }
- worker.Status = status
- r.updateStats()
- return nil
- }
- // CleanupStaleWorkers removes workers that haven't sent heartbeats recently
- func (r *Registry) CleanupStaleWorkers(timeout time.Duration) int {
- r.mutex.Lock()
- defer r.mutex.Unlock()
- var removedCount int
- cutoff := time.Now().Add(-timeout)
- for workerID, worker := range r.workers {
- if worker.LastHeartbeat.Before(cutoff) {
- delete(r.workers, workerID)
- removedCount++
- }
- }
- if removedCount > 0 {
- r.updateStats()
- }
- return removedCount
- }
- // GetStats returns current registry statistics
- func (r *Registry) GetStats() *types.RegistryStats {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- // Create a copy of the stats to avoid race conditions
- stats := *r.stats
- return &stats
- }
- // updateStats updates the registry statistics (must be called with lock held)
- func (r *Registry) updateStats() {
- r.stats.TotalWorkers = len(r.workers)
- r.stats.ActiveWorkers = 0
- r.stats.BusyWorkers = 0
- r.stats.IdleWorkers = 0
- for _, worker := range r.workers {
- switch worker.Status {
- case "active":
- if worker.CurrentLoad > 0 {
- r.stats.ActiveWorkers++
- } else {
- r.stats.IdleWorkers++
- }
- case "busy":
- r.stats.BusyWorkers++
- }
- }
- r.stats.Uptime = time.Since(r.stats.StartTime)
- r.stats.LastUpdated = time.Now()
- }
- // GetTaskCapabilities returns all task capabilities available in the registry
- func (r *Registry) GetTaskCapabilities() []types.TaskType {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- capabilitySet := make(map[types.TaskType]bool)
- for _, worker := range r.workers {
- for _, cap := range worker.Capabilities {
- capabilitySet[cap] = true
- }
- }
- var capabilities []types.TaskType
- for cap := range capabilitySet {
- capabilities = append(capabilities, cap)
- }
- return capabilities
- }
- // GetWorkersByStatus returns workers filtered by status
- func (r *Registry) GetWorkersByStatus(status string) []*types.WorkerData {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- var workers []*types.WorkerData
- for _, worker := range r.workers {
- if worker.Status == status {
- workers = append(workers, worker)
- }
- }
- return workers
- }
- // GetWorkerCount returns the total number of registered workers
- func (r *Registry) GetWorkerCount() int {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- return len(r.workers)
- }
- // GetWorkerIDs returns all worker IDs
- func (r *Registry) GetWorkerIDs() []string {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- ids := make([]string, 0, len(r.workers))
- for id := range r.workers {
- ids = append(ids, id)
- }
- return ids
- }
- // GetWorkerSummary returns a summary of all workers
- func (r *Registry) GetWorkerSummary() *types.WorkerSummary {
- r.mutex.RLock()
- defer r.mutex.RUnlock()
- summary := &types.WorkerSummary{
- TotalWorkers: len(r.workers),
- ByStatus: make(map[string]int),
- ByCapability: make(map[types.TaskType]int),
- TotalLoad: 0,
- MaxCapacity: 0,
- }
- for _, worker := range r.workers {
- summary.ByStatus[worker.Status]++
- summary.TotalLoad += worker.CurrentLoad
- summary.MaxCapacity += worker.MaxConcurrent
- for _, cap := range worker.Capabilities {
- summary.ByCapability[cap]++
- }
- }
- return summary
- }
- // Default global registry instance
- var defaultRegistry *Registry
- var registryOnce sync.Once
- // GetDefaultRegistry returns the default global registry
- func GetDefaultRegistry() *Registry {
- registryOnce.Do(func() {
- defaultRegistry = NewRegistry()
- })
- return defaultRegistry
- }
|