This document describes the design of a distributed task management system for SeaweedFS that handles Erasure Coding (EC) and vacuum operations through a scalable admin server and worker process architecture.
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Master │◄──►│ Admin Server │◄──►│ Workers │
│ │ │ │ │ │
│ - Volume Info │ │ - Task Discovery │ │ - Task Exec │
│ - Shard Status │ │ - Task Assign │ │ - Progress │
│ - Heartbeats │ │ - Progress Track │ │ - Error Report │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Volume Servers │ │ Volume Monitor │ │ Task Execution │
│ │ │ │ │ │
│ - Store Volumes │ │ - Health Check │ │ - EC Convert │
│ - EC Shards │ │ - Usage Stats │ │ - Vacuum Clean │
│ - Report Status │ │ - State Sync │ │ - Status Report │
└─────────────────┘ └──────────────────┘ └─────────────────┘
type TaskDiscoveryEngine struct {
masterClient MasterClient
volumeScanner VolumeScanner
taskDetectors map[TaskType]TaskDetector
scanInterval time.Duration
}
type VolumeCandidate struct {
VolumeID uint32
Server string
Collection string
TaskType TaskType
Priority TaskPriority
Reason string
DetectedAt time.Time
Parameters map[string]interface{}
}
EC Detection Logic:
Vacuum Detection Logic:
type WorkerRegistry struct {
workers map[string]*Worker
capabilities map[TaskType][]*Worker
lastHeartbeat map[string]time.Time
taskAssignment map[string]*Task
mutex sync.RWMutex
}
type Worker struct {
ID string
Address string
Capabilities []TaskType
MaxConcurrent int
CurrentLoad int
Status WorkerStatus
LastSeen time.Time
Performance WorkerMetrics
}
type TaskScheduler struct {
registry *WorkerRegistry
taskQueue *PriorityQueue
inProgressTasks map[string]*InProgressTask
volumeReservations map[uint32]*VolumeReservation
}
// Worker Selection Criteria:
// 1. Has required capability (EC or Vacuum)
// 2. Available capacity (CurrentLoad < MaxConcurrent)
// 3. Best performance history for task type
// 4. Lowest current load
// 5. Geographically close to volume server (optional)
type MaintenanceWorker struct {
id string
config *WorkerConfig
adminClient AdminClient
taskExecutors map[TaskType]TaskExecutor
currentTasks map[string]*RunningTask
registry *TaskRegistry
heartbeatTicker *time.Ticker
requestTicker *time.Ticker
}
type TaskExecutor interface {
Execute(ctx context.Context, task *Task) error
EstimateTime(task *Task) time.Duration
ValidateResources(task *Task) error
GetProgress() float64
Cancel() error
}
type ErasureCodingExecutor struct {
volumeClient VolumeServerClient
progress float64
cancelled bool
}
type VacuumExecutor struct {
volumeClient VolumeServerClient
progress float64
cancelled bool
}
type WorkerCapabilities struct {
SupportedTasks []TaskType
MaxConcurrent int
ResourceLimits ResourceLimits
PreferredServers []string // Affinity for specific volume servers
}
type ResourceLimits struct {
MaxMemoryMB int64
MaxDiskSpaceMB int64
MaxNetworkMbps int64
MaxCPUPercent float64
}
type TaskState string
const (
TaskStatePending TaskState = "pending"
TaskStateAssigned TaskState = "assigned"
TaskStateInProgress TaskState = "in_progress"
TaskStateCompleted TaskState = "completed"
TaskStateFailed TaskState = "failed"
TaskStateCancelled TaskState = "cancelled"
TaskStateStuck TaskState = "stuck" // Taking too long
TaskStateDuplicate TaskState = "duplicate" // Detected duplicate
)
type InProgressTask struct {
Task *Task
WorkerID string
StartedAt time.Time
LastUpdate time.Time
Progress float64
EstimatedEnd time.Time
VolumeReserved bool // Reserved for capacity planning
}
type TaskMonitor struct {
inProgressTasks map[string]*InProgressTask
timeoutChecker *time.Ticker
stuckDetector *time.Ticker
duplicateChecker *time.Ticker
}
type VolumeStateManager struct {
masterClient MasterClient
inProgressTasks map[uint32]*InProgressTask // VolumeID -> Task
committedChanges map[uint32]*VolumeChange // Changes not yet in master
reconcileInterval time.Duration
}
type VolumeChange struct {
VolumeID uint32
ChangeType ChangeType // "ec_encoding", "vacuum_completed"
OldCapacity int64
NewCapacity int64
TaskID string
CompletedAt time.Time
ReportedToMaster bool
}
When the master needs to assign shards, it must consider:
Committed but unreported changes from admin server
type CapacityOracle struct {
adminServer AdminServerClient
masterState *MasterVolumeState
updateFreq time.Duration
}
func (o *CapacityOracle) GetAdjustedCapacity(volumeID uint32) int64 {
baseCapacity := o.masterState.GetCapacity(volumeID)
// Adjust for in-progress tasks
if task := o.adminServer.GetInProgressTask(volumeID); task != nil {
switch task.Type {
case TaskTypeErasureCoding:
// EC reduces effective capacity
return baseCapacity / 2 // Simplified
case TaskTypeVacuum:
// Vacuum may increase available space
return baseCapacity + int64(float64(baseCapacity) * 0.3)
}
}
// Adjust for completed but unreported changes
if change := o.adminServer.GetPendingChange(volumeID); change != nil {
return change.NewCapacity
}
return baseCapacity
}
type FailureHandler struct {
taskRescheduler *TaskRescheduler
workerMonitor *WorkerMonitor
alertManager *AlertManager
}
// Failure Scenarios:
// 1. Worker becomes unresponsive (heartbeat timeout)
// 2. Task execution fails (reported by worker)
// 3. Task gets stuck (progress timeout)
// 4. Duplicate task detection
// 5. Resource exhaustion
Worker Timeout Recovery:
Task Stuck Recovery:
Duplicate Task Prevention:
type DuplicateDetector struct {
activeFingerprints map[string]bool // VolumeID+TaskType
recentCompleted *LRUCache // Recently completed tasks
}
func (d *DuplicateDetector) IsTaskDuplicate(task *Task) bool {
fingerprint := fmt.Sprintf("%d-%s", task.VolumeID, task.Type)
return d.activeFingerprints[fingerprint] ||
d.recentCompleted.Contains(fingerprint)
}
type TaskSimulator struct {
scenarios map[string]SimulationScenario
}
type SimulationScenario struct {
Name string
WorkerCount int
VolumeCount int
FailurePatterns []FailurePattern
Duration time.Duration
}
type FailurePattern struct {
Type FailureType // "worker_timeout", "task_stuck", "duplicate"
Probability float64 // 0.0 to 1.0
Timing TimingSpec // When during task execution
Duration time.Duration
}
Scenario 1: Worker Timeout During EC
Scenario 2: Stuck Vacuum Task
Scenario 3: Duplicate Task Prevention
Scenario 4: Master-Admin State Divergence
type SystemMetrics struct {
TasksPerSecond float64
WorkerUtilization float64
AverageTaskTime time.Duration
FailureRate float64
QueueDepth int
VolumeStatesSync bool
}
This design provides a robust, scalable foundation for distributed task management in SeaweedFS while maintaining consistency with the existing architecture patterns.