maintenance_scanner.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package maintenance
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  9. )
  10. // NewMaintenanceScanner creates a new maintenance scanner
  11. func NewMaintenanceScanner(adminClient AdminClient, policy *MaintenancePolicy, queue *MaintenanceQueue) *MaintenanceScanner {
  12. scanner := &MaintenanceScanner{
  13. adminClient: adminClient,
  14. policy: policy,
  15. queue: queue,
  16. lastScan: make(map[MaintenanceTaskType]time.Time),
  17. }
  18. // Initialize integration
  19. scanner.integration = NewMaintenanceIntegration(queue, policy)
  20. // Set up bidirectional relationship
  21. queue.SetIntegration(scanner.integration)
  22. glog.V(1).Infof("Initialized maintenance scanner with task system")
  23. return scanner
  24. }
  25. // ScanForMaintenanceTasks analyzes the cluster and generates maintenance tasks
  26. func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult, error) {
  27. // Get volume health metrics
  28. volumeMetrics, err := ms.getVolumeHealthMetrics()
  29. if err != nil {
  30. return nil, fmt.Errorf("failed to get volume health metrics: %w", err)
  31. }
  32. // Use task system for all task types
  33. if ms.integration != nil {
  34. // Convert metrics to task system format
  35. taskMetrics := ms.convertToTaskMetrics(volumeMetrics)
  36. // Update topology information for complete cluster view (including empty servers)
  37. // This must happen before task detection to ensure EC placement can consider all servers
  38. if ms.lastTopologyInfo != nil {
  39. if err := ms.integration.UpdateTopologyInfo(ms.lastTopologyInfo); err != nil {
  40. glog.Errorf("Failed to update topology info for empty servers: %v", err)
  41. // Don't fail the scan - continue with just volume-bearing servers
  42. } else {
  43. glog.V(1).Infof("Updated topology info for complete cluster view including empty servers")
  44. }
  45. }
  46. // Use task detection system with complete cluster information
  47. results, err := ms.integration.ScanWithTaskDetectors(taskMetrics)
  48. if err != nil {
  49. glog.Errorf("Task scanning failed: %v", err)
  50. return nil, err
  51. }
  52. glog.V(1).Infof("Maintenance scan completed: found %d tasks", len(results))
  53. return results, nil
  54. }
  55. // No integration available
  56. glog.Warningf("No integration available, no tasks will be scheduled")
  57. return []*TaskDetectionResult{}, nil
  58. }
  59. // getVolumeHealthMetrics collects health information for all volumes
  60. func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, error) {
  61. var metrics []*VolumeHealthMetrics
  62. glog.V(1).Infof("Collecting volume health metrics from master")
  63. err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error {
  64. resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  65. if err != nil {
  66. return err
  67. }
  68. if resp.TopologyInfo == nil {
  69. glog.Warningf("No topology info received from master")
  70. return nil
  71. }
  72. volumeSizeLimitBytes := uint64(resp.VolumeSizeLimitMb) * 1024 * 1024 // Convert MB to bytes
  73. // Track all nodes discovered in topology
  74. var allNodesInTopology []string
  75. var nodesWithVolumes []string
  76. var nodesWithoutVolumes []string
  77. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  78. glog.V(2).Infof("Processing datacenter: %s", dc.Id)
  79. for _, rack := range dc.RackInfos {
  80. glog.V(2).Infof("Processing rack: %s in datacenter: %s", rack.Id, dc.Id)
  81. for _, node := range rack.DataNodeInfos {
  82. allNodesInTopology = append(allNodesInTopology, node.Id)
  83. glog.V(2).Infof("Found volume server in topology: %s (disks: %d)", node.Id, len(node.DiskInfos))
  84. hasVolumes := false
  85. // Process each disk on this node
  86. for diskType, diskInfo := range node.DiskInfos {
  87. if len(diskInfo.VolumeInfos) > 0 {
  88. hasVolumes = true
  89. glog.V(2).Infof("Volume server %s disk %s has %d volumes", node.Id, diskType, len(diskInfo.VolumeInfos))
  90. }
  91. // Process volumes on this specific disk
  92. for _, volInfo := range diskInfo.VolumeInfos {
  93. metric := &VolumeHealthMetrics{
  94. VolumeID: volInfo.Id,
  95. Server: node.Id,
  96. DiskType: diskType, // Track which disk this volume is on
  97. DiskId: volInfo.DiskId, // Use disk ID from volume info
  98. DataCenter: dc.Id, // Data center from current loop
  99. Rack: rack.Id, // Rack from current loop
  100. Collection: volInfo.Collection,
  101. Size: volInfo.Size,
  102. DeletedBytes: volInfo.DeletedByteCount,
  103. LastModified: time.Unix(int64(volInfo.ModifiedAtSecond), 0),
  104. IsReadOnly: volInfo.ReadOnly,
  105. IsECVolume: false, // Will be determined from volume structure
  106. ReplicaCount: 1, // Will be counted
  107. ExpectedReplicas: int(volInfo.ReplicaPlacement),
  108. }
  109. // Calculate derived metrics
  110. if metric.Size > 0 {
  111. metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
  112. // Calculate fullness ratio using actual volume size limit from master
  113. metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimitBytes)
  114. }
  115. metric.Age = time.Since(metric.LastModified)
  116. glog.V(3).Infof("Volume %d on %s:%s (ID %d): size=%d, limit=%d, fullness=%.2f",
  117. metric.VolumeID, metric.Server, metric.DiskType, metric.DiskId, metric.Size, volumeSizeLimitBytes, metric.FullnessRatio)
  118. metrics = append(metrics, metric)
  119. }
  120. }
  121. if hasVolumes {
  122. nodesWithVolumes = append(nodesWithVolumes, node.Id)
  123. } else {
  124. nodesWithoutVolumes = append(nodesWithoutVolumes, node.Id)
  125. glog.V(1).Infof("Volume server %s found in topology but has no volumes", node.Id)
  126. }
  127. }
  128. }
  129. }
  130. glog.Infof("Topology discovery complete:")
  131. glog.Infof(" - Total volume servers in topology: %d (%v)", len(allNodesInTopology), allNodesInTopology)
  132. glog.Infof(" - Volume servers with volumes: %d (%v)", len(nodesWithVolumes), nodesWithVolumes)
  133. glog.Infof(" - Volume servers without volumes: %d (%v)", len(nodesWithoutVolumes), nodesWithoutVolumes)
  134. // Store topology info for volume shard tracker
  135. ms.lastTopologyInfo = resp.TopologyInfo
  136. return nil
  137. })
  138. if err != nil {
  139. glog.Errorf("Failed to get volume health metrics: %v", err)
  140. return nil, err
  141. }
  142. glog.V(1).Infof("Successfully collected metrics for %d actual volumes with disk ID information", len(metrics))
  143. // Count actual replicas and identify EC volumes
  144. ms.enrichVolumeMetrics(metrics)
  145. return metrics, nil
  146. }
  147. // enrichVolumeMetrics adds additional information like replica counts
  148. func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics) {
  149. // Group volumes by ID to count replicas
  150. volumeGroups := make(map[uint32][]*VolumeHealthMetrics)
  151. for _, metric := range metrics {
  152. volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
  153. }
  154. // Update replica counts for actual volumes
  155. for volumeID, replicas := range volumeGroups {
  156. replicaCount := len(replicas)
  157. for _, replica := range replicas {
  158. replica.ReplicaCount = replicaCount
  159. }
  160. glog.V(3).Infof("Volume %d has %d replicas", volumeID, replicaCount)
  161. }
  162. // TODO: Identify EC volumes by checking volume structure
  163. // This would require querying volume servers for EC shard information
  164. }
  165. // convertToTaskMetrics converts existing volume metrics to task system format
  166. func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetrics) []*types.VolumeHealthMetrics {
  167. var simplified []*types.VolumeHealthMetrics
  168. for _, metric := range metrics {
  169. simplified = append(simplified, &types.VolumeHealthMetrics{
  170. VolumeID: metric.VolumeID,
  171. Server: metric.Server,
  172. DiskType: metric.DiskType,
  173. DiskId: metric.DiskId,
  174. DataCenter: metric.DataCenter,
  175. Rack: metric.Rack,
  176. Collection: metric.Collection,
  177. Size: metric.Size,
  178. DeletedBytes: metric.DeletedBytes,
  179. GarbageRatio: metric.GarbageRatio,
  180. LastModified: metric.LastModified,
  181. Age: metric.Age,
  182. ReplicaCount: metric.ReplicaCount,
  183. ExpectedReplicas: metric.ExpectedReplicas,
  184. IsReadOnly: metric.IsReadOnly,
  185. HasRemoteCopy: metric.HasRemoteCopy,
  186. IsECVolume: metric.IsECVolume,
  187. FullnessRatio: metric.FullnessRatio,
  188. })
  189. }
  190. glog.V(2).Infof("Converted %d volume metrics with disk ID information for task detection", len(simplified))
  191. return simplified
  192. }