task_definition.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package base
  2. import (
  3. "fmt"
  4. "reflect"
  5. "strings"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/admin/config"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  10. )
  11. // TaskDefinition encapsulates everything needed to define a complete task type
  12. type TaskDefinition struct {
  13. // Basic task information
  14. Type types.TaskType
  15. Name string
  16. DisplayName string
  17. Description string
  18. Icon string
  19. Capabilities []string
  20. // Task configuration
  21. Config TaskConfig
  22. ConfigSpec ConfigSpec
  23. // Task creation
  24. CreateTask func(params *worker_pb.TaskParams) (types.Task, error)
  25. // Detection logic
  26. DetectionFunc func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config TaskConfig) ([]*types.TaskDetectionResult, error)
  27. ScanInterval time.Duration
  28. // Scheduling logic
  29. SchedulingFunc func(task *types.TaskInput, running []*types.TaskInput, workers []*types.WorkerData, config TaskConfig) bool
  30. MaxConcurrent int
  31. RepeatInterval time.Duration
  32. }
  33. // TaskConfig provides a configuration interface that supports type-safe defaults
  34. type TaskConfig interface {
  35. config.ConfigWithDefaults // Extends ConfigWithDefaults for type-safe schema operations
  36. IsEnabled() bool
  37. SetEnabled(bool)
  38. ToTaskPolicy() *worker_pb.TaskPolicy
  39. FromTaskPolicy(policy *worker_pb.TaskPolicy) error
  40. }
  41. // ConfigSpec defines the configuration schema
  42. type ConfigSpec struct {
  43. Fields []*config.Field
  44. }
  45. // BaseConfig provides common configuration fields with reflection-based serialization
  46. type BaseConfig struct {
  47. Enabled bool `json:"enabled"`
  48. ScanIntervalSeconds int `json:"scan_interval_seconds"`
  49. MaxConcurrent int `json:"max_concurrent"`
  50. }
  51. // IsEnabled returns whether the task is enabled
  52. func (c *BaseConfig) IsEnabled() bool {
  53. return c.Enabled
  54. }
  55. // SetEnabled sets whether the task is enabled
  56. func (c *BaseConfig) SetEnabled(enabled bool) {
  57. c.Enabled = enabled
  58. }
  59. // Validate validates the base configuration
  60. func (c *BaseConfig) Validate() error {
  61. // Common validation logic
  62. return nil
  63. }
  64. // StructToMap converts any struct to a map using reflection
  65. func StructToMap(obj interface{}) map[string]interface{} {
  66. result := make(map[string]interface{})
  67. val := reflect.ValueOf(obj)
  68. // Handle pointer to struct
  69. if val.Kind() == reflect.Ptr {
  70. val = val.Elem()
  71. }
  72. if val.Kind() != reflect.Struct {
  73. return result
  74. }
  75. typ := val.Type()
  76. for i := 0; i < val.NumField(); i++ {
  77. field := val.Field(i)
  78. fieldType := typ.Field(i)
  79. // Skip unexported fields
  80. if !field.CanInterface() {
  81. continue
  82. }
  83. // Handle embedded structs recursively (before JSON tag check)
  84. if field.Kind() == reflect.Struct && fieldType.Anonymous {
  85. embeddedMap := StructToMap(field.Interface())
  86. for k, v := range embeddedMap {
  87. result[k] = v
  88. }
  89. continue
  90. }
  91. // Get JSON tag name
  92. jsonTag := fieldType.Tag.Get("json")
  93. if jsonTag == "" || jsonTag == "-" {
  94. continue
  95. }
  96. // Remove options like ",omitempty"
  97. if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 {
  98. jsonTag = jsonTag[:commaIdx]
  99. }
  100. result[jsonTag] = field.Interface()
  101. }
  102. return result
  103. }
  104. // MapToStruct loads data from map into struct using reflection
  105. func MapToStruct(data map[string]interface{}, obj interface{}) error {
  106. val := reflect.ValueOf(obj)
  107. // Must be pointer to struct
  108. if val.Kind() != reflect.Ptr || val.Elem().Kind() != reflect.Struct {
  109. return fmt.Errorf("obj must be pointer to struct")
  110. }
  111. val = val.Elem()
  112. typ := val.Type()
  113. for i := 0; i < val.NumField(); i++ {
  114. field := val.Field(i)
  115. fieldType := typ.Field(i)
  116. // Skip unexported fields
  117. if !field.CanSet() {
  118. continue
  119. }
  120. // Handle embedded structs recursively (before JSON tag check)
  121. if field.Kind() == reflect.Struct && fieldType.Anonymous {
  122. err := MapToStruct(data, field.Addr().Interface())
  123. if err != nil {
  124. return err
  125. }
  126. continue
  127. }
  128. // Get JSON tag name
  129. jsonTag := fieldType.Tag.Get("json")
  130. if jsonTag == "" || jsonTag == "-" {
  131. continue
  132. }
  133. // Remove options like ",omitempty"
  134. if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 {
  135. jsonTag = jsonTag[:commaIdx]
  136. }
  137. if value, exists := data[jsonTag]; exists {
  138. err := setFieldValue(field, value)
  139. if err != nil {
  140. return fmt.Errorf("failed to set field %s: %v", jsonTag, err)
  141. }
  142. }
  143. }
  144. return nil
  145. }
  146. // ToMap converts config to map using reflection
  147. // ToTaskPolicy converts BaseConfig to protobuf (partial implementation)
  148. // Note: Concrete implementations should override this to include task-specific config
  149. func (c *BaseConfig) ToTaskPolicy() *worker_pb.TaskPolicy {
  150. return &worker_pb.TaskPolicy{
  151. Enabled: c.Enabled,
  152. MaxConcurrent: int32(c.MaxConcurrent),
  153. RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
  154. CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
  155. // TaskConfig field should be set by concrete implementations
  156. }
  157. }
  158. // FromTaskPolicy loads BaseConfig from protobuf (partial implementation)
  159. // Note: Concrete implementations should override this to handle task-specific config
  160. func (c *BaseConfig) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
  161. if policy == nil {
  162. return fmt.Errorf("policy is nil")
  163. }
  164. c.Enabled = policy.Enabled
  165. c.MaxConcurrent = int(policy.MaxConcurrent)
  166. c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds)
  167. return nil
  168. }
  169. // ApplySchemaDefaults applies default values from schema using reflection
  170. func (c *BaseConfig) ApplySchemaDefaults(schema *config.Schema) error {
  171. // Use reflection-based approach for BaseConfig since it needs to handle embedded structs
  172. return schema.ApplyDefaultsToProtobuf(c)
  173. }
  174. // setFieldValue sets a field value with type conversion
  175. func setFieldValue(field reflect.Value, value interface{}) error {
  176. if value == nil {
  177. return nil
  178. }
  179. valueVal := reflect.ValueOf(value)
  180. fieldType := field.Type()
  181. valueType := valueVal.Type()
  182. // Direct assignment if types match
  183. if valueType.AssignableTo(fieldType) {
  184. field.Set(valueVal)
  185. return nil
  186. }
  187. // Type conversion for common cases
  188. switch fieldType.Kind() {
  189. case reflect.Bool:
  190. if b, ok := value.(bool); ok {
  191. field.SetBool(b)
  192. } else {
  193. return fmt.Errorf("cannot convert %T to bool", value)
  194. }
  195. case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
  196. switch v := value.(type) {
  197. case int:
  198. field.SetInt(int64(v))
  199. case int32:
  200. field.SetInt(int64(v))
  201. case int64:
  202. field.SetInt(v)
  203. case float64:
  204. field.SetInt(int64(v))
  205. default:
  206. return fmt.Errorf("cannot convert %T to int", value)
  207. }
  208. case reflect.Float32, reflect.Float64:
  209. switch v := value.(type) {
  210. case float32:
  211. field.SetFloat(float64(v))
  212. case float64:
  213. field.SetFloat(v)
  214. case int:
  215. field.SetFloat(float64(v))
  216. case int64:
  217. field.SetFloat(float64(v))
  218. default:
  219. return fmt.Errorf("cannot convert %T to float", value)
  220. }
  221. case reflect.String:
  222. if s, ok := value.(string); ok {
  223. field.SetString(s)
  224. } else {
  225. return fmt.Errorf("cannot convert %T to string", value)
  226. }
  227. default:
  228. return fmt.Errorf("unsupported field type %s", fieldType.Kind())
  229. }
  230. return nil
  231. }