collector.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package telemetry
  2. import (
  3. "time"
  4. "github.com/seaweedfs/seaweedfs/telemetry/proto"
  5. "github.com/seaweedfs/seaweedfs/weed/cluster"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/topology"
  8. )
  9. // Collector gathers telemetry data from a SeaweedFS cluster
  10. // Only the leader master will send telemetry to avoid duplicates
  11. type Collector struct {
  12. client *Client
  13. topo *topology.Topology
  14. cluster *cluster.Cluster
  15. masterServer interface{} // Will be set to *weed_server.MasterServer to access client tracking
  16. version string
  17. os string
  18. }
  19. // NewCollector creates a new telemetry collector
  20. func NewCollector(client *Client, topo *topology.Topology, cluster *cluster.Cluster) *Collector {
  21. return &Collector{
  22. client: client,
  23. topo: topo,
  24. cluster: cluster,
  25. masterServer: nil,
  26. version: "unknown",
  27. os: "unknown",
  28. }
  29. }
  30. // SetVersion sets the SeaweedFS version
  31. func (c *Collector) SetVersion(version string) {
  32. c.version = version
  33. }
  34. // SetOS sets the operating system information
  35. func (c *Collector) SetOS(os string) {
  36. c.os = os
  37. }
  38. // SetMasterServer sets a reference to the master server for client tracking
  39. func (c *Collector) SetMasterServer(masterServer interface{}) {
  40. c.masterServer = masterServer
  41. }
  42. // isLeader checks if this master is the leader
  43. func (c *Collector) isLeader() bool {
  44. if c.topo == nil {
  45. return false
  46. }
  47. return c.topo.IsLeader()
  48. }
  49. // CollectAndSendAsync collects telemetry data and sends it asynchronously
  50. // Only sends telemetry if this master is the leader
  51. func (c *Collector) CollectAndSendAsync() {
  52. if !c.client.IsEnabled() {
  53. return
  54. }
  55. go func() {
  56. data := c.collectData()
  57. c.client.SendTelemetryAsync(data)
  58. }()
  59. }
  60. // StartPeriodicCollection starts sending telemetry data periodically
  61. func (c *Collector) StartPeriodicCollection(interval time.Duration) {
  62. if !c.client.IsEnabled() {
  63. glog.V(1).Infof("Telemetry is disabled, skipping periodic collection")
  64. return
  65. }
  66. glog.V(0).Infof("Starting telemetry collection every %v", interval)
  67. // Send initial telemetry after a short delay
  68. go func() {
  69. time.Sleep(61 * time.Second) // Wait for cluster to stabilize
  70. if c.isLeader() {
  71. c.CollectAndSendAsync()
  72. } else {
  73. glog.V(2).Infof("Skipping initial telemetry collection - not the leader master")
  74. }
  75. }()
  76. // Start periodic collection
  77. ticker := time.NewTicker(interval)
  78. go func() {
  79. defer ticker.Stop()
  80. for range ticker.C {
  81. // Check leadership before each collection
  82. if c.isLeader() {
  83. c.CollectAndSendAsync()
  84. } else {
  85. glog.V(2).Infof("Skipping periodic telemetry collection - not the leader master")
  86. }
  87. }
  88. }()
  89. }
  90. // collectData gathers telemetry data from the topology
  91. func (c *Collector) collectData() *proto.TelemetryData {
  92. data := &proto.TelemetryData{
  93. Version: c.version,
  94. Os: c.os,
  95. Timestamp: time.Now().Unix(),
  96. }
  97. if c.topo != nil {
  98. // Collect volume server count
  99. data.VolumeServerCount = int32(c.countVolumeServers())
  100. // Collect total disk usage and volume count
  101. diskBytes, volumeCount := c.collectVolumeStats()
  102. data.TotalDiskBytes = diskBytes
  103. data.TotalVolumeCount = int32(volumeCount)
  104. }
  105. if c.cluster != nil {
  106. // Collect filer and broker counts
  107. data.FilerCount = int32(c.countFilers())
  108. data.BrokerCount = int32(c.countBrokers())
  109. }
  110. return data
  111. }
  112. // countVolumeServers counts the number of active volume servers
  113. func (c *Collector) countVolumeServers() int {
  114. count := 0
  115. for _, dcNode := range c.topo.Children() {
  116. dc := dcNode.(*topology.DataCenter)
  117. for _, rackNode := range dc.Children() {
  118. rack := rackNode.(*topology.Rack)
  119. for range rack.Children() {
  120. count++
  121. }
  122. }
  123. }
  124. return count
  125. }
  126. // collectVolumeStats collects total disk usage and volume count
  127. func (c *Collector) collectVolumeStats() (uint64, int) {
  128. var totalDiskBytes uint64
  129. var totalVolumeCount int
  130. for _, dcNode := range c.topo.Children() {
  131. dc := dcNode.(*topology.DataCenter)
  132. for _, rackNode := range dc.Children() {
  133. rack := rackNode.(*topology.Rack)
  134. for _, dnNode := range rack.Children() {
  135. dn := dnNode.(*topology.DataNode)
  136. volumes := dn.GetVolumes()
  137. for _, volumeInfo := range volumes {
  138. totalVolumeCount++
  139. totalDiskBytes += volumeInfo.Size
  140. }
  141. }
  142. }
  143. }
  144. return totalDiskBytes, totalVolumeCount
  145. }
  146. // countFilers counts the number of active filer servers across all groups
  147. func (c *Collector) countFilers() int {
  148. // Count all filer-type nodes in the cluster
  149. // This includes both pure filer servers and S3 servers (which register as filers)
  150. count := 0
  151. for _, groupName := range c.getAllFilerGroups() {
  152. nodes := c.cluster.ListClusterNode(cluster.FilerGroupName(groupName), cluster.FilerType)
  153. count += len(nodes)
  154. }
  155. return count
  156. }
  157. // countBrokers counts the number of active broker servers
  158. func (c *Collector) countBrokers() int {
  159. // Count brokers across all broker groups
  160. count := 0
  161. for _, groupName := range c.getAllBrokerGroups() {
  162. nodes := c.cluster.ListClusterNode(cluster.FilerGroupName(groupName), cluster.BrokerType)
  163. count += len(nodes)
  164. }
  165. return count
  166. }
  167. // getAllFilerGroups returns all filer group names
  168. func (c *Collector) getAllFilerGroups() []string {
  169. // For simplicity, we check the default group
  170. // In a more sophisticated implementation, we could enumerate all groups
  171. return []string{""}
  172. }
  173. // getAllBrokerGroups returns all broker group names
  174. func (c *Collector) getAllBrokerGroups() []string {
  175. // For simplicity, we check the default group
  176. // In a more sophisticated implementation, we could enumerate all groups
  177. return []string{""}
  178. }