maintenance_config_proto.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package maintenance
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  6. )
  7. // MaintenanceConfigManager handles protobuf-based configuration
  8. type MaintenanceConfigManager struct {
  9. config *worker_pb.MaintenanceConfig
  10. }
  11. // NewMaintenanceConfigManager creates a new config manager with defaults
  12. func NewMaintenanceConfigManager() *MaintenanceConfigManager {
  13. return &MaintenanceConfigManager{
  14. config: DefaultMaintenanceConfigProto(),
  15. }
  16. }
  17. // DefaultMaintenanceConfigProto returns default configuration as protobuf
  18. func DefaultMaintenanceConfigProto() *worker_pb.MaintenanceConfig {
  19. return &worker_pb.MaintenanceConfig{
  20. Enabled: true,
  21. ScanIntervalSeconds: 30 * 60, // 30 minutes
  22. WorkerTimeoutSeconds: 5 * 60, // 5 minutes
  23. TaskTimeoutSeconds: 2 * 60 * 60, // 2 hours
  24. RetryDelaySeconds: 15 * 60, // 15 minutes
  25. MaxRetries: 3,
  26. CleanupIntervalSeconds: 24 * 60 * 60, // 24 hours
  27. TaskRetentionSeconds: 7 * 24 * 60 * 60, // 7 days
  28. // Policy field will be populated dynamically from separate task configuration files
  29. Policy: nil,
  30. }
  31. }
  32. // GetConfig returns the current configuration
  33. func (mcm *MaintenanceConfigManager) GetConfig() *worker_pb.MaintenanceConfig {
  34. return mcm.config
  35. }
  36. // Type-safe configuration accessors
  37. // GetVacuumConfig returns vacuum-specific configuration for a task type
  38. func (mcm *MaintenanceConfigManager) GetVacuumConfig(taskType string) *worker_pb.VacuumTaskConfig {
  39. if policy := mcm.getTaskPolicy(taskType); policy != nil {
  40. if vacuumConfig := policy.GetVacuumConfig(); vacuumConfig != nil {
  41. return vacuumConfig
  42. }
  43. }
  44. // Return defaults if not configured
  45. return &worker_pb.VacuumTaskConfig{
  46. GarbageThreshold: 0.3,
  47. MinVolumeAgeHours: 24,
  48. MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
  49. }
  50. }
  51. // GetErasureCodingConfig returns EC-specific configuration for a task type
  52. func (mcm *MaintenanceConfigManager) GetErasureCodingConfig(taskType string) *worker_pb.ErasureCodingTaskConfig {
  53. if policy := mcm.getTaskPolicy(taskType); policy != nil {
  54. if ecConfig := policy.GetErasureCodingConfig(); ecConfig != nil {
  55. return ecConfig
  56. }
  57. }
  58. // Return defaults if not configured
  59. return &worker_pb.ErasureCodingTaskConfig{
  60. FullnessRatio: 0.95,
  61. QuietForSeconds: 3600,
  62. MinVolumeSizeMb: 100,
  63. CollectionFilter: "",
  64. }
  65. }
  66. // GetBalanceConfig returns balance-specific configuration for a task type
  67. func (mcm *MaintenanceConfigManager) GetBalanceConfig(taskType string) *worker_pb.BalanceTaskConfig {
  68. if policy := mcm.getTaskPolicy(taskType); policy != nil {
  69. if balanceConfig := policy.GetBalanceConfig(); balanceConfig != nil {
  70. return balanceConfig
  71. }
  72. }
  73. // Return defaults if not configured
  74. return &worker_pb.BalanceTaskConfig{
  75. ImbalanceThreshold: 0.2,
  76. MinServerCount: 2,
  77. }
  78. }
  79. // GetReplicationConfig returns replication-specific configuration for a task type
  80. func (mcm *MaintenanceConfigManager) GetReplicationConfig(taskType string) *worker_pb.ReplicationTaskConfig {
  81. if policy := mcm.getTaskPolicy(taskType); policy != nil {
  82. if replicationConfig := policy.GetReplicationConfig(); replicationConfig != nil {
  83. return replicationConfig
  84. }
  85. }
  86. // Return defaults if not configured
  87. return &worker_pb.ReplicationTaskConfig{
  88. TargetReplicaCount: 2,
  89. }
  90. }
  91. // Typed convenience methods for getting task configurations
  92. // GetVacuumTaskConfigForType returns vacuum configuration for a specific task type
  93. func (mcm *MaintenanceConfigManager) GetVacuumTaskConfigForType(taskType string) *worker_pb.VacuumTaskConfig {
  94. return GetVacuumTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
  95. }
  96. // GetErasureCodingTaskConfigForType returns erasure coding configuration for a specific task type
  97. func (mcm *MaintenanceConfigManager) GetErasureCodingTaskConfigForType(taskType string) *worker_pb.ErasureCodingTaskConfig {
  98. return GetErasureCodingTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
  99. }
  100. // GetBalanceTaskConfigForType returns balance configuration for a specific task type
  101. func (mcm *MaintenanceConfigManager) GetBalanceTaskConfigForType(taskType string) *worker_pb.BalanceTaskConfig {
  102. return GetBalanceTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
  103. }
  104. // GetReplicationTaskConfigForType returns replication configuration for a specific task type
  105. func (mcm *MaintenanceConfigManager) GetReplicationTaskConfigForType(taskType string) *worker_pb.ReplicationTaskConfig {
  106. return GetReplicationTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
  107. }
  108. // Helper methods
  109. func (mcm *MaintenanceConfigManager) getTaskPolicy(taskType string) *worker_pb.TaskPolicy {
  110. if mcm.config.Policy != nil && mcm.config.Policy.TaskPolicies != nil {
  111. return mcm.config.Policy.TaskPolicies[taskType]
  112. }
  113. return nil
  114. }
  115. // IsTaskEnabled returns whether a task type is enabled
  116. func (mcm *MaintenanceConfigManager) IsTaskEnabled(taskType string) bool {
  117. if policy := mcm.getTaskPolicy(taskType); policy != nil {
  118. return policy.Enabled
  119. }
  120. return false
  121. }
  122. // GetMaxConcurrent returns the max concurrent limit for a task type
  123. func (mcm *MaintenanceConfigManager) GetMaxConcurrent(taskType string) int32 {
  124. if policy := mcm.getTaskPolicy(taskType); policy != nil {
  125. return policy.MaxConcurrent
  126. }
  127. return 1 // Default
  128. }
  129. // GetRepeatInterval returns the repeat interval for a task type in seconds
  130. func (mcm *MaintenanceConfigManager) GetRepeatInterval(taskType string) int32 {
  131. if policy := mcm.getTaskPolicy(taskType); policy != nil {
  132. return policy.RepeatIntervalSeconds
  133. }
  134. return mcm.config.Policy.DefaultRepeatIntervalSeconds
  135. }
  136. // GetCheckInterval returns the check interval for a task type in seconds
  137. func (mcm *MaintenanceConfigManager) GetCheckInterval(taskType string) int32 {
  138. if policy := mcm.getTaskPolicy(taskType); policy != nil {
  139. return policy.CheckIntervalSeconds
  140. }
  141. return mcm.config.Policy.DefaultCheckIntervalSeconds
  142. }
  143. // Duration accessor methods
  144. // GetScanInterval returns the scan interval as a time.Duration
  145. func (mcm *MaintenanceConfigManager) GetScanInterval() time.Duration {
  146. return time.Duration(mcm.config.ScanIntervalSeconds) * time.Second
  147. }
  148. // GetWorkerTimeout returns the worker timeout as a time.Duration
  149. func (mcm *MaintenanceConfigManager) GetWorkerTimeout() time.Duration {
  150. return time.Duration(mcm.config.WorkerTimeoutSeconds) * time.Second
  151. }
  152. // GetTaskTimeout returns the task timeout as a time.Duration
  153. func (mcm *MaintenanceConfigManager) GetTaskTimeout() time.Duration {
  154. return time.Duration(mcm.config.TaskTimeoutSeconds) * time.Second
  155. }
  156. // GetRetryDelay returns the retry delay as a time.Duration
  157. func (mcm *MaintenanceConfigManager) GetRetryDelay() time.Duration {
  158. return time.Duration(mcm.config.RetryDelaySeconds) * time.Second
  159. }
  160. // GetCleanupInterval returns the cleanup interval as a time.Duration
  161. func (mcm *MaintenanceConfigManager) GetCleanupInterval() time.Duration {
  162. return time.Duration(mcm.config.CleanupIntervalSeconds) * time.Second
  163. }
  164. // GetTaskRetention returns the task retention period as a time.Duration
  165. func (mcm *MaintenanceConfigManager) GetTaskRetention() time.Duration {
  166. return time.Duration(mcm.config.TaskRetentionSeconds) * time.Second
  167. }
  168. // ValidateMaintenanceConfigWithSchema validates protobuf maintenance configuration using ConfigField rules
  169. func ValidateMaintenanceConfigWithSchema(config *worker_pb.MaintenanceConfig) error {
  170. if config == nil {
  171. return fmt.Errorf("configuration cannot be nil")
  172. }
  173. // Get the schema to access field validation rules
  174. schema := GetMaintenanceConfigSchema()
  175. // Validate each field individually using the ConfigField rules
  176. if err := validateFieldWithSchema(schema, "enabled", config.Enabled); err != nil {
  177. return err
  178. }
  179. if err := validateFieldWithSchema(schema, "scan_interval_seconds", int(config.ScanIntervalSeconds)); err != nil {
  180. return err
  181. }
  182. if err := validateFieldWithSchema(schema, "worker_timeout_seconds", int(config.WorkerTimeoutSeconds)); err != nil {
  183. return err
  184. }
  185. if err := validateFieldWithSchema(schema, "task_timeout_seconds", int(config.TaskTimeoutSeconds)); err != nil {
  186. return err
  187. }
  188. if err := validateFieldWithSchema(schema, "retry_delay_seconds", int(config.RetryDelaySeconds)); err != nil {
  189. return err
  190. }
  191. if err := validateFieldWithSchema(schema, "max_retries", int(config.MaxRetries)); err != nil {
  192. return err
  193. }
  194. if err := validateFieldWithSchema(schema, "cleanup_interval_seconds", int(config.CleanupIntervalSeconds)); err != nil {
  195. return err
  196. }
  197. if err := validateFieldWithSchema(schema, "task_retention_seconds", int(config.TaskRetentionSeconds)); err != nil {
  198. return err
  199. }
  200. // Validate policy fields if present
  201. if config.Policy != nil {
  202. // Note: These field names might need to be adjusted based on the actual schema
  203. if err := validatePolicyField("global_max_concurrent", int(config.Policy.GlobalMaxConcurrent)); err != nil {
  204. return err
  205. }
  206. if err := validatePolicyField("default_repeat_interval_seconds", int(config.Policy.DefaultRepeatIntervalSeconds)); err != nil {
  207. return err
  208. }
  209. if err := validatePolicyField("default_check_interval_seconds", int(config.Policy.DefaultCheckIntervalSeconds)); err != nil {
  210. return err
  211. }
  212. }
  213. return nil
  214. }
  215. // validateFieldWithSchema validates a single field using its ConfigField definition
  216. func validateFieldWithSchema(schema *MaintenanceConfigSchema, fieldName string, value interface{}) error {
  217. field := schema.GetFieldByName(fieldName)
  218. if field == nil {
  219. // Field not in schema, skip validation
  220. return nil
  221. }
  222. return field.ValidateValue(value)
  223. }
  224. // validatePolicyField validates policy fields (simplified validation for now)
  225. func validatePolicyField(fieldName string, value int) error {
  226. switch fieldName {
  227. case "global_max_concurrent":
  228. if value < 1 || value > 20 {
  229. return fmt.Errorf("Global Max Concurrent must be between 1 and 20, got %d", value)
  230. }
  231. case "default_repeat_interval":
  232. if value < 1 || value > 168 {
  233. return fmt.Errorf("Default Repeat Interval must be between 1 and 168 hours, got %d", value)
  234. }
  235. case "default_check_interval":
  236. if value < 1 || value > 168 {
  237. return fmt.Errorf("Default Check Interval must be between 1 and 168 hours, got %d", value)
  238. }
  239. }
  240. return nil
  241. }