config_update_registry.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package tasks
  2. import (
  3. "sync"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  6. )
  7. // ConfigUpdateFunc is a function type for updating task configurations
  8. type ConfigUpdateFunc func(configPersistence interface{}) error
  9. // ConfigUpdateRegistry manages config update functions for all task types
  10. type ConfigUpdateRegistry struct {
  11. updaters map[types.TaskType]ConfigUpdateFunc
  12. mutex sync.RWMutex
  13. }
  14. var (
  15. globalConfigUpdateRegistry *ConfigUpdateRegistry
  16. configUpdateRegistryOnce sync.Once
  17. )
  18. // GetGlobalConfigUpdateRegistry returns the global config update registry (singleton)
  19. func GetGlobalConfigUpdateRegistry() *ConfigUpdateRegistry {
  20. configUpdateRegistryOnce.Do(func() {
  21. globalConfigUpdateRegistry = &ConfigUpdateRegistry{
  22. updaters: make(map[types.TaskType]ConfigUpdateFunc),
  23. }
  24. glog.V(1).Infof("Created global config update registry")
  25. })
  26. return globalConfigUpdateRegistry
  27. }
  28. // RegisterConfigUpdater registers a config update function for a task type
  29. func (r *ConfigUpdateRegistry) RegisterConfigUpdater(taskType types.TaskType, updateFunc ConfigUpdateFunc) {
  30. r.mutex.Lock()
  31. defer r.mutex.Unlock()
  32. r.updaters[taskType] = updateFunc
  33. glog.V(1).Infof("Registered config updater for task type: %s", taskType)
  34. }
  35. // UpdateAllConfigs updates configurations for all registered task types
  36. func (r *ConfigUpdateRegistry) UpdateAllConfigs(configPersistence interface{}) {
  37. r.mutex.RLock()
  38. updaters := make(map[types.TaskType]ConfigUpdateFunc)
  39. for k, v := range r.updaters {
  40. updaters[k] = v
  41. }
  42. r.mutex.RUnlock()
  43. for taskType, updateFunc := range updaters {
  44. if err := updateFunc(configPersistence); err != nil {
  45. glog.Warningf("Failed to load %s configuration from persistence: %v", taskType, err)
  46. } else {
  47. glog.V(1).Infof("Loaded %s configuration from persistence", taskType)
  48. }
  49. }
  50. glog.V(1).Infof("All task configurations loaded from persistence")
  51. }
  52. // AutoRegisterConfigUpdater is a convenience function for registering config updaters
  53. func AutoRegisterConfigUpdater(taskType types.TaskType, updateFunc ConfigUpdateFunc) {
  54. registry := GetGlobalConfigUpdateRegistry()
  55. registry.RegisterConfigUpdater(taskType, updateFunc)
  56. }