| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- package balance
- import (
- "sync"
- "time"
- )
- // BalanceMetrics contains balance-specific monitoring data
- type BalanceMetrics struct {
- // Execution metrics
- VolumesBalanced int64 `json:"volumes_balanced"`
- TotalDataTransferred int64 `json:"total_data_transferred"`
- AverageImbalance float64 `json:"average_imbalance"`
- LastBalanceTime time.Time `json:"last_balance_time"`
- // Performance metrics
- AverageTransferSpeed float64 `json:"average_transfer_speed_mbps"`
- TotalExecutionTime int64 `json:"total_execution_time_seconds"`
- SuccessfulOperations int64 `json:"successful_operations"`
- FailedOperations int64 `json:"failed_operations"`
- // Current task metrics
- CurrentImbalanceScore float64 `json:"current_imbalance_score"`
- PlannedDestinations int `json:"planned_destinations"`
- mutex sync.RWMutex
- }
- // NewBalanceMetrics creates a new balance metrics instance
- func NewBalanceMetrics() *BalanceMetrics {
- return &BalanceMetrics{
- LastBalanceTime: time.Now(),
- }
- }
- // RecordVolumeBalanced records a successful volume balance operation
- func (m *BalanceMetrics) RecordVolumeBalanced(volumeSize int64, transferTime time.Duration) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.VolumesBalanced++
- m.TotalDataTransferred += volumeSize
- m.SuccessfulOperations++
- m.LastBalanceTime = time.Now()
- m.TotalExecutionTime += int64(transferTime.Seconds())
- // Calculate average transfer speed (MB/s)
- if transferTime > 0 {
- speedMBps := float64(volumeSize) / (1024 * 1024) / transferTime.Seconds()
- if m.AverageTransferSpeed == 0 {
- m.AverageTransferSpeed = speedMBps
- } else {
- // Exponential moving average
- m.AverageTransferSpeed = 0.8*m.AverageTransferSpeed + 0.2*speedMBps
- }
- }
- }
- // RecordFailure records a failed balance operation
- func (m *BalanceMetrics) RecordFailure() {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.FailedOperations++
- }
- // UpdateImbalanceScore updates the current cluster imbalance score
- func (m *BalanceMetrics) UpdateImbalanceScore(score float64) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.CurrentImbalanceScore = score
- // Update average imbalance with exponential moving average
- if m.AverageImbalance == 0 {
- m.AverageImbalance = score
- } else {
- m.AverageImbalance = 0.9*m.AverageImbalance + 0.1*score
- }
- }
- // SetPlannedDestinations sets the number of planned destinations
- func (m *BalanceMetrics) SetPlannedDestinations(count int) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.PlannedDestinations = count
- }
- // GetMetrics returns a copy of the current metrics (without the mutex)
- func (m *BalanceMetrics) GetMetrics() BalanceMetrics {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- // Create a copy without the mutex to avoid copying lock value
- return BalanceMetrics{
- VolumesBalanced: m.VolumesBalanced,
- TotalDataTransferred: m.TotalDataTransferred,
- AverageImbalance: m.AverageImbalance,
- LastBalanceTime: m.LastBalanceTime,
- AverageTransferSpeed: m.AverageTransferSpeed,
- TotalExecutionTime: m.TotalExecutionTime,
- SuccessfulOperations: m.SuccessfulOperations,
- FailedOperations: m.FailedOperations,
- CurrentImbalanceScore: m.CurrentImbalanceScore,
- PlannedDestinations: m.PlannedDestinations,
- }
- }
- // GetSuccessRate returns the success rate as a percentage
- func (m *BalanceMetrics) GetSuccessRate() float64 {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- total := m.SuccessfulOperations + m.FailedOperations
- if total == 0 {
- return 100.0
- }
- return float64(m.SuccessfulOperations) / float64(total) * 100.0
- }
- // Reset resets all metrics to zero
- func (m *BalanceMetrics) Reset() {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- *m = BalanceMetrics{
- LastBalanceTime: time.Now(),
- }
- }
- // Global metrics instance for balance tasks
- var globalBalanceMetrics = NewBalanceMetrics()
- // GetGlobalBalanceMetrics returns the global balance metrics instance
- func GetGlobalBalanceMetrics() *BalanceMetrics {
- return globalBalanceMetrics
- }
|