| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- package erasure_coding
- import (
- "sync"
- "time"
- )
- // ErasureCodingMetrics contains erasure coding-specific monitoring data
- type ErasureCodingMetrics struct {
- // Execution metrics
- VolumesEncoded int64 `json:"volumes_encoded"`
- TotalShardsCreated int64 `json:"total_shards_created"`
- TotalDataProcessed int64 `json:"total_data_processed"`
- TotalSourcesRemoved int64 `json:"total_sources_removed"`
- LastEncodingTime time.Time `json:"last_encoding_time"`
- // Performance metrics
- AverageEncodingTime int64 `json:"average_encoding_time_seconds"`
- AverageShardSize int64 `json:"average_shard_size"`
- AverageDataShards int `json:"average_data_shards"`
- AverageParityShards int `json:"average_parity_shards"`
- SuccessfulOperations int64 `json:"successful_operations"`
- FailedOperations int64 `json:"failed_operations"`
- // Distribution metrics
- ShardsPerDataCenter map[string]int64 `json:"shards_per_datacenter"`
- ShardsPerRack map[string]int64 `json:"shards_per_rack"`
- PlacementSuccessRate float64 `json:"placement_success_rate"`
- // Current task metrics
- CurrentVolumeSize int64 `json:"current_volume_size"`
- CurrentShardCount int `json:"current_shard_count"`
- VolumesPendingEncoding int `json:"volumes_pending_encoding"`
- mutex sync.RWMutex
- }
- // NewErasureCodingMetrics creates a new erasure coding metrics instance
- func NewErasureCodingMetrics() *ErasureCodingMetrics {
- return &ErasureCodingMetrics{
- LastEncodingTime: time.Now(),
- ShardsPerDataCenter: make(map[string]int64),
- ShardsPerRack: make(map[string]int64),
- }
- }
- // RecordVolumeEncoded records a successful volume encoding operation
- func (m *ErasureCodingMetrics) RecordVolumeEncoded(volumeSize int64, shardsCreated int, dataShards int, parityShards int, encodingTime time.Duration, sourceRemoved bool) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.VolumesEncoded++
- m.TotalShardsCreated += int64(shardsCreated)
- m.TotalDataProcessed += volumeSize
- m.SuccessfulOperations++
- m.LastEncodingTime = time.Now()
- if sourceRemoved {
- m.TotalSourcesRemoved++
- }
- // Update average encoding time
- if m.AverageEncodingTime == 0 {
- m.AverageEncodingTime = int64(encodingTime.Seconds())
- } else {
- // Exponential moving average
- newTime := int64(encodingTime.Seconds())
- m.AverageEncodingTime = (m.AverageEncodingTime*4 + newTime) / 5
- }
- // Update average shard size
- if shardsCreated > 0 {
- avgShardSize := volumeSize / int64(shardsCreated)
- if m.AverageShardSize == 0 {
- m.AverageShardSize = avgShardSize
- } else {
- m.AverageShardSize = (m.AverageShardSize*4 + avgShardSize) / 5
- }
- }
- // Update average data/parity shards
- if m.AverageDataShards == 0 {
- m.AverageDataShards = dataShards
- m.AverageParityShards = parityShards
- } else {
- m.AverageDataShards = (m.AverageDataShards*4 + dataShards) / 5
- m.AverageParityShards = (m.AverageParityShards*4 + parityShards) / 5
- }
- }
- // RecordFailure records a failed erasure coding operation
- func (m *ErasureCodingMetrics) RecordFailure() {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.FailedOperations++
- }
- // RecordShardPlacement records shard placement for distribution tracking
- func (m *ErasureCodingMetrics) RecordShardPlacement(dataCenter string, rack string) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.ShardsPerDataCenter[dataCenter]++
- rackKey := dataCenter + ":" + rack
- m.ShardsPerRack[rackKey]++
- }
- // UpdateCurrentVolumeInfo updates current volume processing information
- func (m *ErasureCodingMetrics) UpdateCurrentVolumeInfo(volumeSize int64, shardCount int) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.CurrentVolumeSize = volumeSize
- m.CurrentShardCount = shardCount
- }
- // SetVolumesPendingEncoding sets the number of volumes pending encoding
- func (m *ErasureCodingMetrics) SetVolumesPendingEncoding(count int) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.VolumesPendingEncoding = count
- }
- // UpdatePlacementSuccessRate updates the placement success rate
- func (m *ErasureCodingMetrics) UpdatePlacementSuccessRate(rate float64) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- if m.PlacementSuccessRate == 0 {
- m.PlacementSuccessRate = rate
- } else {
- // Exponential moving average
- m.PlacementSuccessRate = 0.8*m.PlacementSuccessRate + 0.2*rate
- }
- }
- // GetMetrics returns a copy of the current metrics (without the mutex)
- func (m *ErasureCodingMetrics) GetMetrics() ErasureCodingMetrics {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- // Create deep copy of maps
- shardsPerDC := make(map[string]int64)
- for k, v := range m.ShardsPerDataCenter {
- shardsPerDC[k] = v
- }
- shardsPerRack := make(map[string]int64)
- for k, v := range m.ShardsPerRack {
- shardsPerRack[k] = v
- }
- // Create a copy without the mutex to avoid copying lock value
- return ErasureCodingMetrics{
- VolumesEncoded: m.VolumesEncoded,
- TotalShardsCreated: m.TotalShardsCreated,
- TotalDataProcessed: m.TotalDataProcessed,
- TotalSourcesRemoved: m.TotalSourcesRemoved,
- LastEncodingTime: m.LastEncodingTime,
- AverageEncodingTime: m.AverageEncodingTime,
- AverageShardSize: m.AverageShardSize,
- AverageDataShards: m.AverageDataShards,
- AverageParityShards: m.AverageParityShards,
- SuccessfulOperations: m.SuccessfulOperations,
- FailedOperations: m.FailedOperations,
- ShardsPerDataCenter: shardsPerDC,
- ShardsPerRack: shardsPerRack,
- PlacementSuccessRate: m.PlacementSuccessRate,
- CurrentVolumeSize: m.CurrentVolumeSize,
- CurrentShardCount: m.CurrentShardCount,
- VolumesPendingEncoding: m.VolumesPendingEncoding,
- }
- }
- // GetSuccessRate returns the success rate as a percentage
- func (m *ErasureCodingMetrics) GetSuccessRate() float64 {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- total := m.SuccessfulOperations + m.FailedOperations
- if total == 0 {
- return 100.0
- }
- return float64(m.SuccessfulOperations) / float64(total) * 100.0
- }
- // GetAverageDataProcessed returns the average data processed per volume
- func (m *ErasureCodingMetrics) GetAverageDataProcessed() float64 {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- if m.VolumesEncoded == 0 {
- return 0
- }
- return float64(m.TotalDataProcessed) / float64(m.VolumesEncoded)
- }
- // GetSourceRemovalRate returns the percentage of sources removed after encoding
- func (m *ErasureCodingMetrics) GetSourceRemovalRate() float64 {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- if m.VolumesEncoded == 0 {
- return 0
- }
- return float64(m.TotalSourcesRemoved) / float64(m.VolumesEncoded) * 100.0
- }
- // Reset resets all metrics to zero
- func (m *ErasureCodingMetrics) Reset() {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- *m = ErasureCodingMetrics{
- LastEncodingTime: time.Now(),
- ShardsPerDataCenter: make(map[string]int64),
- ShardsPerRack: make(map[string]int64),
- }
- }
- // Global metrics instance for erasure coding tasks
- var globalErasureCodingMetrics = NewErasureCodingMetrics()
- // GetGlobalErasureCodingMetrics returns the global erasure coding metrics instance
- func GetGlobalErasureCodingMetrics() *ErasureCodingMetrics {
- return globalErasureCodingMetrics
- }
|