| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- package storage
- import (
- "sync"
- "time"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
- "github.com/seaweedfs/seaweedfs/telemetry/proto"
- )
- type PrometheusStorage struct {
- // Prometheus metrics
- totalClusters prometheus.Gauge
- activeClusters prometheus.Gauge
- volumeServerCount *prometheus.GaugeVec
- totalDiskBytes *prometheus.GaugeVec
- totalVolumeCount *prometheus.GaugeVec
- filerCount *prometheus.GaugeVec
- brokerCount *prometheus.GaugeVec
- clusterInfo *prometheus.GaugeVec
- telemetryReceived prometheus.Counter
- // In-memory storage for API endpoints (if needed)
- mu sync.RWMutex
- instances map[string]*telemetryData
- stats map[string]interface{}
- }
- // telemetryData is an internal struct that includes the received timestamp
- type telemetryData struct {
- *proto.TelemetryData
- ReceivedAt time.Time `json:"received_at"`
- }
- func NewPrometheusStorage() *PrometheusStorage {
- return &PrometheusStorage{
- totalClusters: promauto.NewGauge(prometheus.GaugeOpts{
- Name: "seaweedfs_telemetry_total_clusters",
- Help: "Total number of unique SeaweedFS clusters (last 30 days)",
- }),
- activeClusters: promauto.NewGauge(prometheus.GaugeOpts{
- Name: "seaweedfs_telemetry_active_clusters",
- Help: "Number of active SeaweedFS clusters (last 7 days)",
- }),
- volumeServerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
- Name: "seaweedfs_telemetry_volume_servers",
- Help: "Number of volume servers per cluster",
- }, []string{"cluster_id", "version", "os"}),
- totalDiskBytes: promauto.NewGaugeVec(prometheus.GaugeOpts{
- Name: "seaweedfs_telemetry_disk_bytes",
- Help: "Total disk usage in bytes per cluster",
- }, []string{"cluster_id", "version", "os"}),
- totalVolumeCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
- Name: "seaweedfs_telemetry_volume_count",
- Help: "Total number of volumes per cluster",
- }, []string{"cluster_id", "version", "os"}),
- filerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
- Name: "seaweedfs_telemetry_filer_count",
- Help: "Number of filer servers per cluster",
- }, []string{"cluster_id", "version", "os"}),
- brokerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
- Name: "seaweedfs_telemetry_broker_count",
- Help: "Number of broker servers per cluster",
- }, []string{"cluster_id", "version", "os"}),
- clusterInfo: promauto.NewGaugeVec(prometheus.GaugeOpts{
- Name: "seaweedfs_telemetry_cluster_info",
- Help: "Cluster information (always 1, labels contain metadata)",
- }, []string{"cluster_id", "version", "os"}),
- telemetryReceived: promauto.NewCounter(prometheus.CounterOpts{
- Name: "seaweedfs_telemetry_reports_received_total",
- Help: "Total number of telemetry reports received",
- }),
- instances: make(map[string]*telemetryData),
- stats: make(map[string]interface{}),
- }
- }
- func (s *PrometheusStorage) StoreTelemetry(data *proto.TelemetryData) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- // Update Prometheus metrics
- labels := prometheus.Labels{
- "cluster_id": data.ClusterId,
- "version": data.Version,
- "os": data.Os,
- }
- s.volumeServerCount.With(labels).Set(float64(data.VolumeServerCount))
- s.totalDiskBytes.With(labels).Set(float64(data.TotalDiskBytes))
- s.totalVolumeCount.With(labels).Set(float64(data.TotalVolumeCount))
- s.filerCount.With(labels).Set(float64(data.FilerCount))
- s.brokerCount.With(labels).Set(float64(data.BrokerCount))
- infoLabels := prometheus.Labels{
- "cluster_id": data.ClusterId,
- "version": data.Version,
- "os": data.Os,
- }
- s.clusterInfo.With(infoLabels).Set(1)
- s.telemetryReceived.Inc()
- // Store in memory for API endpoints
- s.instances[data.ClusterId] = &telemetryData{
- TelemetryData: data,
- ReceivedAt: time.Now().UTC(),
- }
- // Update aggregated stats
- s.updateStats()
- return nil
- }
- func (s *PrometheusStorage) GetStats() (map[string]interface{}, error) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- // Return cached stats
- result := make(map[string]interface{})
- for k, v := range s.stats {
- result[k] = v
- }
- return result, nil
- }
- func (s *PrometheusStorage) GetInstances(limit int) ([]*telemetryData, error) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- var instances []*telemetryData
- count := 0
- for _, instance := range s.instances {
- if count >= limit {
- break
- }
- instances = append(instances, instance)
- count++
- }
- return instances, nil
- }
- func (s *PrometheusStorage) GetMetrics(days int) (map[string]interface{}, error) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- // Return current metrics from in-memory storage
- // Historical data should be queried from Prometheus directly
- cutoff := time.Now().AddDate(0, 0, -days)
- var volumeServers []map[string]interface{}
- var diskUsage []map[string]interface{}
- for _, instance := range s.instances {
- if instance.ReceivedAt.After(cutoff) {
- volumeServers = append(volumeServers, map[string]interface{}{
- "date": instance.ReceivedAt.Format("2006-01-02"),
- "value": instance.TelemetryData.VolumeServerCount,
- })
- diskUsage = append(diskUsage, map[string]interface{}{
- "date": instance.ReceivedAt.Format("2006-01-02"),
- "value": instance.TelemetryData.TotalDiskBytes,
- })
- }
- }
- return map[string]interface{}{
- "volume_servers": volumeServers,
- "disk_usage": diskUsage,
- }, nil
- }
- func (s *PrometheusStorage) updateStats() {
- now := time.Now()
- last7Days := now.AddDate(0, 0, -7)
- last30Days := now.AddDate(0, 0, -30)
- totalInstances := 0
- activeInstances := 0
- versions := make(map[string]int)
- osDistribution := make(map[string]int)
- for _, instance := range s.instances {
- if instance.ReceivedAt.After(last30Days) {
- totalInstances++
- }
- if instance.ReceivedAt.After(last7Days) {
- activeInstances++
- versions[instance.TelemetryData.Version]++
- osDistribution[instance.TelemetryData.Os]++
- }
- }
- // Update Prometheus gauges
- s.totalClusters.Set(float64(totalInstances))
- s.activeClusters.Set(float64(activeInstances))
- // Update cached stats for API
- s.stats = map[string]interface{}{
- "total_instances": totalInstances,
- "active_instances": activeInstances,
- "versions": versions,
- "os_distribution": osDistribution,
- }
- }
- // CleanupOldInstances removes instances older than the specified duration
- func (s *PrometheusStorage) CleanupOldInstances(maxAge time.Duration) {
- s.mu.Lock()
- defer s.mu.Unlock()
- cutoff := time.Now().Add(-maxAge)
- for instanceID, instance := range s.instances {
- if instance.ReceivedAt.Before(cutoff) {
- delete(s.instances, instanceID)
- // Remove from Prometheus metrics
- labels := prometheus.Labels{
- "cluster_id": instance.TelemetryData.ClusterId,
- "version": instance.TelemetryData.Version,
- "os": instance.TelemetryData.Os,
- }
- s.volumeServerCount.Delete(labels)
- s.totalDiskBytes.Delete(labels)
- s.totalVolumeCount.Delete(labels)
- s.filerCount.Delete(labels)
- s.brokerCount.Delete(labels)
- }
- }
- s.updateStats()
- }
|