monitoring.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package balance
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // BalanceMetrics contains balance-specific monitoring data
  7. type BalanceMetrics struct {
  8. // Execution metrics
  9. VolumesBalanced int64 `json:"volumes_balanced"`
  10. TotalDataTransferred int64 `json:"total_data_transferred"`
  11. AverageImbalance float64 `json:"average_imbalance"`
  12. LastBalanceTime time.Time `json:"last_balance_time"`
  13. // Performance metrics
  14. AverageTransferSpeed float64 `json:"average_transfer_speed_mbps"`
  15. TotalExecutionTime int64 `json:"total_execution_time_seconds"`
  16. SuccessfulOperations int64 `json:"successful_operations"`
  17. FailedOperations int64 `json:"failed_operations"`
  18. // Current task metrics
  19. CurrentImbalanceScore float64 `json:"current_imbalance_score"`
  20. PlannedDestinations int `json:"planned_destinations"`
  21. mutex sync.RWMutex
  22. }
  23. // NewBalanceMetrics creates a new balance metrics instance
  24. func NewBalanceMetrics() *BalanceMetrics {
  25. return &BalanceMetrics{
  26. LastBalanceTime: time.Now(),
  27. }
  28. }
  29. // RecordVolumeBalanced records a successful volume balance operation
  30. func (m *BalanceMetrics) RecordVolumeBalanced(volumeSize int64, transferTime time.Duration) {
  31. m.mutex.Lock()
  32. defer m.mutex.Unlock()
  33. m.VolumesBalanced++
  34. m.TotalDataTransferred += volumeSize
  35. m.SuccessfulOperations++
  36. m.LastBalanceTime = time.Now()
  37. m.TotalExecutionTime += int64(transferTime.Seconds())
  38. // Calculate average transfer speed (MB/s)
  39. if transferTime > 0 {
  40. speedMBps := float64(volumeSize) / (1024 * 1024) / transferTime.Seconds()
  41. if m.AverageTransferSpeed == 0 {
  42. m.AverageTransferSpeed = speedMBps
  43. } else {
  44. // Exponential moving average
  45. m.AverageTransferSpeed = 0.8*m.AverageTransferSpeed + 0.2*speedMBps
  46. }
  47. }
  48. }
  49. // RecordFailure records a failed balance operation
  50. func (m *BalanceMetrics) RecordFailure() {
  51. m.mutex.Lock()
  52. defer m.mutex.Unlock()
  53. m.FailedOperations++
  54. }
  55. // UpdateImbalanceScore updates the current cluster imbalance score
  56. func (m *BalanceMetrics) UpdateImbalanceScore(score float64) {
  57. m.mutex.Lock()
  58. defer m.mutex.Unlock()
  59. m.CurrentImbalanceScore = score
  60. // Update average imbalance with exponential moving average
  61. if m.AverageImbalance == 0 {
  62. m.AverageImbalance = score
  63. } else {
  64. m.AverageImbalance = 0.9*m.AverageImbalance + 0.1*score
  65. }
  66. }
  67. // SetPlannedDestinations sets the number of planned destinations
  68. func (m *BalanceMetrics) SetPlannedDestinations(count int) {
  69. m.mutex.Lock()
  70. defer m.mutex.Unlock()
  71. m.PlannedDestinations = count
  72. }
  73. // GetMetrics returns a copy of the current metrics (without the mutex)
  74. func (m *BalanceMetrics) GetMetrics() BalanceMetrics {
  75. m.mutex.RLock()
  76. defer m.mutex.RUnlock()
  77. // Create a copy without the mutex to avoid copying lock value
  78. return BalanceMetrics{
  79. VolumesBalanced: m.VolumesBalanced,
  80. TotalDataTransferred: m.TotalDataTransferred,
  81. AverageImbalance: m.AverageImbalance,
  82. LastBalanceTime: m.LastBalanceTime,
  83. AverageTransferSpeed: m.AverageTransferSpeed,
  84. TotalExecutionTime: m.TotalExecutionTime,
  85. SuccessfulOperations: m.SuccessfulOperations,
  86. FailedOperations: m.FailedOperations,
  87. CurrentImbalanceScore: m.CurrentImbalanceScore,
  88. PlannedDestinations: m.PlannedDestinations,
  89. }
  90. }
  91. // GetSuccessRate returns the success rate as a percentage
  92. func (m *BalanceMetrics) GetSuccessRate() float64 {
  93. m.mutex.RLock()
  94. defer m.mutex.RUnlock()
  95. total := m.SuccessfulOperations + m.FailedOperations
  96. if total == 0 {
  97. return 100.0
  98. }
  99. return float64(m.SuccessfulOperations) / float64(total) * 100.0
  100. }
  101. // Reset resets all metrics to zero
  102. func (m *BalanceMetrics) Reset() {
  103. m.mutex.Lock()
  104. defer m.mutex.Unlock()
  105. *m = BalanceMetrics{
  106. LastBalanceTime: time.Now(),
  107. }
  108. }
  109. // Global metrics instance for balance tasks
  110. var globalBalanceMetrics = NewBalanceMetrics()
  111. // GetGlobalBalanceMetrics returns the global balance metrics instance
  112. func GetGlobalBalanceMetrics() *BalanceMetrics {
  113. return globalBalanceMetrics
  114. }