prometheus.go 6.8 KB


  1. package storage
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/prometheus/client_golang/prometheus"
  6. "github.com/prometheus/client_golang/prometheus/promauto"
  7. "github.com/seaweedfs/seaweedfs/telemetry/proto"
  8. )
  9. type PrometheusStorage struct {
  10. // Prometheus metrics
  11. totalClusters prometheus.Gauge
  12. activeClusters prometheus.Gauge
  13. volumeServerCount *prometheus.GaugeVec
  14. totalDiskBytes *prometheus.GaugeVec
  15. totalVolumeCount *prometheus.GaugeVec
  16. filerCount *prometheus.GaugeVec
  17. brokerCount *prometheus.GaugeVec
  18. clusterInfo *prometheus.GaugeVec
  19. telemetryReceived prometheus.Counter
  20. // In-memory storage for API endpoints (if needed)
  21. mu sync.RWMutex
  22. instances map[string]*telemetryData
  23. stats map[string]interface{}
  24. }
  25. // telemetryData is an internal struct that includes the received timestamp
  26. type telemetryData struct {
  27. *proto.TelemetryData
  28. ReceivedAt time.Time `json:"received_at"`
  29. }
  30. func NewPrometheusStorage() *PrometheusStorage {
  31. return &PrometheusStorage{
  32. totalClusters: promauto.NewGauge(prometheus.GaugeOpts{
  33. Name: "seaweedfs_telemetry_total_clusters",
  34. Help: "Total number of unique SeaweedFS clusters (last 30 days)",
  35. }),
  36. activeClusters: promauto.NewGauge(prometheus.GaugeOpts{
  37. Name: "seaweedfs_telemetry_active_clusters",
  38. Help: "Number of active SeaweedFS clusters (last 7 days)",
  39. }),
  40. volumeServerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
  41. Name: "seaweedfs_telemetry_volume_servers",
  42. Help: "Number of volume servers per cluster",
  43. }, []string{"cluster_id", "version", "os"}),
  44. totalDiskBytes: promauto.NewGaugeVec(prometheus.GaugeOpts{
  45. Name: "seaweedfs_telemetry_disk_bytes",
  46. Help: "Total disk usage in bytes per cluster",
  47. }, []string{"cluster_id", "version", "os"}),
  48. totalVolumeCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
  49. Name: "seaweedfs_telemetry_volume_count",
  50. Help: "Total number of volumes per cluster",
  51. }, []string{"cluster_id", "version", "os"}),
  52. filerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
  53. Name: "seaweedfs_telemetry_filer_count",
  54. Help: "Number of filer servers per cluster",
  55. }, []string{"cluster_id", "version", "os"}),
  56. brokerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
  57. Name: "seaweedfs_telemetry_broker_count",
  58. Help: "Number of broker servers per cluster",
  59. }, []string{"cluster_id", "version", "os"}),
  60. clusterInfo: promauto.NewGaugeVec(prometheus.GaugeOpts{
  61. Name: "seaweedfs_telemetry_cluster_info",
  62. Help: "Cluster information (always 1, labels contain metadata)",
  63. }, []string{"cluster_id", "version", "os"}),
  64. telemetryReceived: promauto.NewCounter(prometheus.CounterOpts{
  65. Name: "seaweedfs_telemetry_reports_received_total",
  66. Help: "Total number of telemetry reports received",
  67. }),
  68. instances: make(map[string]*telemetryData),
  69. stats: make(map[string]interface{}),
  70. }
  71. }
  72. func (s *PrometheusStorage) StoreTelemetry(data *proto.TelemetryData) error {
  73. s.mu.Lock()
  74. defer s.mu.Unlock()
  75. // Update Prometheus metrics
  76. labels := prometheus.Labels{
  77. "cluster_id": data.ClusterId,
  78. "version": data.Version,
  79. "os": data.Os,
  80. }
  81. s.volumeServerCount.With(labels).Set(float64(data.VolumeServerCount))
  82. s.totalDiskBytes.With(labels).Set(float64(data.TotalDiskBytes))
  83. s.totalVolumeCount.With(labels).Set(float64(data.TotalVolumeCount))
  84. s.filerCount.With(labels).Set(float64(data.FilerCount))
  85. s.brokerCount.With(labels).Set(float64(data.BrokerCount))
  86. infoLabels := prometheus.Labels{
  87. "cluster_id": data.ClusterId,
  88. "version": data.Version,
  89. "os": data.Os,
  90. }
  91. s.clusterInfo.With(infoLabels).Set(1)
  92. s.telemetryReceived.Inc()
  93. // Store in memory for API endpoints
  94. s.instances[data.ClusterId] = &telemetryData{
  95. TelemetryData: data,
  96. ReceivedAt: time.Now().UTC(),
  97. }
  98. // Update aggregated stats
  99. s.updateStats()
  100. return nil
  101. }
  102. func (s *PrometheusStorage) GetStats() (map[string]interface{}, error) {
  103. s.mu.RLock()
  104. defer s.mu.RUnlock()
  105. // Return cached stats
  106. result := make(map[string]interface{})
  107. for k, v := range s.stats {
  108. result[k] = v
  109. }
  110. return result, nil
  111. }
  112. func (s *PrometheusStorage) GetInstances(limit int) ([]*telemetryData, error) {
  113. s.mu.RLock()
  114. defer s.mu.RUnlock()
  115. var instances []*telemetryData
  116. count := 0
  117. for _, instance := range s.instances {
  118. if count >= limit {
  119. break
  120. }
  121. instances = append(instances, instance)
  122. count++
  123. }
  124. return instances, nil
  125. }
  126. func (s *PrometheusStorage) GetMetrics(days int) (map[string]interface{}, error) {
  127. s.mu.RLock()
  128. defer s.mu.RUnlock()
  129. // Return current metrics from in-memory storage
  130. // Historical data should be queried from Prometheus directly
  131. cutoff := time.Now().AddDate(0, 0, -days)
  132. var volumeServers []map[string]interface{}
  133. var diskUsage []map[string]interface{}
  134. for _, instance := range s.instances {
  135. if instance.ReceivedAt.After(cutoff) {
  136. volumeServers = append(volumeServers, map[string]interface{}{
  137. "date": instance.ReceivedAt.Format("2006-01-02"),
  138. "value": instance.TelemetryData.VolumeServerCount,
  139. })
  140. diskUsage = append(diskUsage, map[string]interface{}{
  141. "date": instance.ReceivedAt.Format("2006-01-02"),
  142. "value": instance.TelemetryData.TotalDiskBytes,
  143. })
  144. }
  145. }
  146. return map[string]interface{}{
  147. "volume_servers": volumeServers,
  148. "disk_usage": diskUsage,
  149. }, nil
  150. }
  151. func (s *PrometheusStorage) updateStats() {
  152. now := time.Now()
  153. last7Days := now.AddDate(0, 0, -7)
  154. last30Days := now.AddDate(0, 0, -30)
  155. totalInstances := 0
  156. activeInstances := 0
  157. versions := make(map[string]int)
  158. osDistribution := make(map[string]int)
  159. for _, instance := range s.instances {
  160. if instance.ReceivedAt.After(last30Days) {
  161. totalInstances++
  162. }
  163. if instance.ReceivedAt.After(last7Days) {
  164. activeInstances++
  165. versions[instance.TelemetryData.Version]++
  166. osDistribution[instance.TelemetryData.Os]++
  167. }
  168. }
  169. // Update Prometheus gauges
  170. s.totalClusters.Set(float64(totalInstances))
  171. s.activeClusters.Set(float64(activeInstances))
  172. // Update cached stats for API
  173. s.stats = map[string]interface{}{
  174. "total_instances": totalInstances,
  175. "active_instances": activeInstances,
  176. "versions": versions,
  177. "os_distribution": osDistribution,
  178. }
  179. }
  180. // CleanupOldInstances removes instances older than the specified duration
  181. func (s *PrometheusStorage) CleanupOldInstances(maxAge time.Duration) {
  182. s.mu.Lock()
  183. defer s.mu.Unlock()
  184. cutoff := time.Now().Add(-maxAge)
  185. for instanceID, instance := range s.instances {
  186. if instance.ReceivedAt.Before(cutoff) {
  187. delete(s.instances, instanceID)
  188. // Remove from Prometheus metrics
  189. labels := prometheus.Labels{
  190. "cluster_id": instance.TelemetryData.ClusterId,
  191. "version": instance.TelemetryData.Version,
  192. "os": instance.TelemetryData.Os,
  193. }
  194. s.volumeServerCount.Delete(labels)
  195. s.totalDiskBytes.Delete(labels)
  196. s.totalVolumeCount.Delete(labels)
  197. s.filerCount.Delete(labels)
  198. s.brokerCount.Delete(labels)
  199. }
  200. }
  201. s.updateStats()
  202. }