monitoring.go 6.9 KB


  1. package erasure_coding
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // ErasureCodingMetrics contains erasure coding-specific monitoring data
  7. type ErasureCodingMetrics struct {
  8. // Execution metrics
  9. VolumesEncoded int64 `json:"volumes_encoded"`
  10. TotalShardsCreated int64 `json:"total_shards_created"`
  11. TotalDataProcessed int64 `json:"total_data_processed"`
  12. TotalSourcesRemoved int64 `json:"total_sources_removed"`
  13. LastEncodingTime time.Time `json:"last_encoding_time"`
  14. // Performance metrics
  15. AverageEncodingTime int64 `json:"average_encoding_time_seconds"`
  16. AverageShardSize int64 `json:"average_shard_size"`
  17. AverageDataShards int `json:"average_data_shards"`
  18. AverageParityShards int `json:"average_parity_shards"`
  19. SuccessfulOperations int64 `json:"successful_operations"`
  20. FailedOperations int64 `json:"failed_operations"`
  21. // Distribution metrics
  22. ShardsPerDataCenter map[string]int64 `json:"shards_per_datacenter"`
  23. ShardsPerRack map[string]int64 `json:"shards_per_rack"`
  24. PlacementSuccessRate float64 `json:"placement_success_rate"`
  25. // Current task metrics
  26. CurrentVolumeSize int64 `json:"current_volume_size"`
  27. CurrentShardCount int `json:"current_shard_count"`
  28. VolumesPendingEncoding int `json:"volumes_pending_encoding"`
  29. mutex sync.RWMutex
  30. }
  31. // NewErasureCodingMetrics creates a new erasure coding metrics instance
  32. func NewErasureCodingMetrics() *ErasureCodingMetrics {
  33. return &ErasureCodingMetrics{
  34. LastEncodingTime: time.Now(),
  35. ShardsPerDataCenter: make(map[string]int64),
  36. ShardsPerRack: make(map[string]int64),
  37. }
  38. }
  39. // RecordVolumeEncoded records a successful volume encoding operation
  40. func (m *ErasureCodingMetrics) RecordVolumeEncoded(volumeSize int64, shardsCreated int, dataShards int, parityShards int, encodingTime time.Duration, sourceRemoved bool) {
  41. m.mutex.Lock()
  42. defer m.mutex.Unlock()
  43. m.VolumesEncoded++
  44. m.TotalShardsCreated += int64(shardsCreated)
  45. m.TotalDataProcessed += volumeSize
  46. m.SuccessfulOperations++
  47. m.LastEncodingTime = time.Now()
  48. if sourceRemoved {
  49. m.TotalSourcesRemoved++
  50. }
  51. // Update average encoding time
  52. if m.AverageEncodingTime == 0 {
  53. m.AverageEncodingTime = int64(encodingTime.Seconds())
  54. } else {
  55. // Exponential moving average
  56. newTime := int64(encodingTime.Seconds())
  57. m.AverageEncodingTime = (m.AverageEncodingTime*4 + newTime) / 5
  58. }
  59. // Update average shard size
  60. if shardsCreated > 0 {
  61. avgShardSize := volumeSize / int64(shardsCreated)
  62. if m.AverageShardSize == 0 {
  63. m.AverageShardSize = avgShardSize
  64. } else {
  65. m.AverageShardSize = (m.AverageShardSize*4 + avgShardSize) / 5
  66. }
  67. }
  68. // Update average data/parity shards
  69. if m.AverageDataShards == 0 {
  70. m.AverageDataShards = dataShards
  71. m.AverageParityShards = parityShards
  72. } else {
  73. m.AverageDataShards = (m.AverageDataShards*4 + dataShards) / 5
  74. m.AverageParityShards = (m.AverageParityShards*4 + parityShards) / 5
  75. }
  76. }
  77. // RecordFailure records a failed erasure coding operation
  78. func (m *ErasureCodingMetrics) RecordFailure() {
  79. m.mutex.Lock()
  80. defer m.mutex.Unlock()
  81. m.FailedOperations++
  82. }
  83. // RecordShardPlacement records shard placement for distribution tracking
  84. func (m *ErasureCodingMetrics) RecordShardPlacement(dataCenter string, rack string) {
  85. m.mutex.Lock()
  86. defer m.mutex.Unlock()
  87. m.ShardsPerDataCenter[dataCenter]++
  88. rackKey := dataCenter + ":" + rack
  89. m.ShardsPerRack[rackKey]++
  90. }
  91. // UpdateCurrentVolumeInfo updates current volume processing information
  92. func (m *ErasureCodingMetrics) UpdateCurrentVolumeInfo(volumeSize int64, shardCount int) {
  93. m.mutex.Lock()
  94. defer m.mutex.Unlock()
  95. m.CurrentVolumeSize = volumeSize
  96. m.CurrentShardCount = shardCount
  97. }
  98. // SetVolumesPendingEncoding sets the number of volumes pending encoding
  99. func (m *ErasureCodingMetrics) SetVolumesPendingEncoding(count int) {
  100. m.mutex.Lock()
  101. defer m.mutex.Unlock()
  102. m.VolumesPendingEncoding = count
  103. }
  104. // UpdatePlacementSuccessRate updates the placement success rate
  105. func (m *ErasureCodingMetrics) UpdatePlacementSuccessRate(rate float64) {
  106. m.mutex.Lock()
  107. defer m.mutex.Unlock()
  108. if m.PlacementSuccessRate == 0 {
  109. m.PlacementSuccessRate = rate
  110. } else {
  111. // Exponential moving average
  112. m.PlacementSuccessRate = 0.8*m.PlacementSuccessRate + 0.2*rate
  113. }
  114. }
  115. // GetMetrics returns a copy of the current metrics (without the mutex)
  116. func (m *ErasureCodingMetrics) GetMetrics() ErasureCodingMetrics {
  117. m.mutex.RLock()
  118. defer m.mutex.RUnlock()
  119. // Create deep copy of maps
  120. shardsPerDC := make(map[string]int64)
  121. for k, v := range m.ShardsPerDataCenter {
  122. shardsPerDC[k] = v
  123. }
  124. shardsPerRack := make(map[string]int64)
  125. for k, v := range m.ShardsPerRack {
  126. shardsPerRack[k] = v
  127. }
  128. // Create a copy without the mutex to avoid copying lock value
  129. return ErasureCodingMetrics{
  130. VolumesEncoded: m.VolumesEncoded,
  131. TotalShardsCreated: m.TotalShardsCreated,
  132. TotalDataProcessed: m.TotalDataProcessed,
  133. TotalSourcesRemoved: m.TotalSourcesRemoved,
  134. LastEncodingTime: m.LastEncodingTime,
  135. AverageEncodingTime: m.AverageEncodingTime,
  136. AverageShardSize: m.AverageShardSize,
  137. AverageDataShards: m.AverageDataShards,
  138. AverageParityShards: m.AverageParityShards,
  139. SuccessfulOperations: m.SuccessfulOperations,
  140. FailedOperations: m.FailedOperations,
  141. ShardsPerDataCenter: shardsPerDC,
  142. ShardsPerRack: shardsPerRack,
  143. PlacementSuccessRate: m.PlacementSuccessRate,
  144. CurrentVolumeSize: m.CurrentVolumeSize,
  145. CurrentShardCount: m.CurrentShardCount,
  146. VolumesPendingEncoding: m.VolumesPendingEncoding,
  147. }
  148. }
  149. // GetSuccessRate returns the success rate as a percentage
  150. func (m *ErasureCodingMetrics) GetSuccessRate() float64 {
  151. m.mutex.RLock()
  152. defer m.mutex.RUnlock()
  153. total := m.SuccessfulOperations + m.FailedOperations
  154. if total == 0 {
  155. return 100.0
  156. }
  157. return float64(m.SuccessfulOperations) / float64(total) * 100.0
  158. }
  159. // GetAverageDataProcessed returns the average data processed per volume
  160. func (m *ErasureCodingMetrics) GetAverageDataProcessed() float64 {
  161. m.mutex.RLock()
  162. defer m.mutex.RUnlock()
  163. if m.VolumesEncoded == 0 {
  164. return 0
  165. }
  166. return float64(m.TotalDataProcessed) / float64(m.VolumesEncoded)
  167. }
  168. // GetSourceRemovalRate returns the percentage of sources removed after encoding
  169. func (m *ErasureCodingMetrics) GetSourceRemovalRate() float64 {
  170. m.mutex.RLock()
  171. defer m.mutex.RUnlock()
  172. if m.VolumesEncoded == 0 {
  173. return 0
  174. }
  175. return float64(m.TotalSourcesRemoved) / float64(m.VolumesEncoded) * 100.0
  176. }
  177. // Reset resets all metrics to zero
  178. func (m *ErasureCodingMetrics) Reset() {
  179. m.mutex.Lock()
  180. defer m.mutex.Unlock()
  181. *m = ErasureCodingMetrics{
  182. LastEncodingTime: time.Now(),
  183. ShardsPerDataCenter: make(map[string]int64),
  184. ShardsPerRack: make(map[string]int64),
  185. }
  186. }
  187. // Global metrics instance for erasure coding tasks
  188. var globalErasureCodingMetrics = NewErasureCodingMetrics()
  189. // GetGlobalErasureCodingMetrics returns the global erasure coding metrics instance
  190. func GetGlobalErasureCodingMetrics() *ErasureCodingMetrics {
  191. return globalErasureCodingMetrics
  192. }