integration.go 7.9 KB


  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "log"
  7. "net/http"
  8. "os"
  9. "os/exec"
  10. "path/filepath"
  11. "strings"
  12. "syscall"
  13. "time"
  14. "github.com/seaweedfs/seaweedfs/telemetry/proto"
  15. "github.com/seaweedfs/seaweedfs/weed/telemetry"
  16. protobuf "google.golang.org/protobuf/proto"
  17. )
  18. const (
  19. serverPort = "18080" // Use different port to avoid conflicts
  20. serverURL = "http://localhost:" + serverPort
  21. )
  22. func main() {
  23. fmt.Println("🧪 Starting SeaweedFS Telemetry Integration Test")
  24. // Start telemetry server
  25. fmt.Println("📡 Starting telemetry server...")
  26. serverCmd, err := startTelemetryServer()
  27. if err != nil {
  28. log.Fatalf("❌ Failed to start telemetry server: %v", err)
  29. }
  30. defer stopServer(serverCmd)
  31. // Wait for server to start
  32. if !waitForServer(serverURL+"/health", 15*time.Second) {
  33. log.Fatal("❌ Telemetry server failed to start")
  34. }
  35. fmt.Println("✅ Telemetry server started successfully")
  36. // Test protobuf marshaling first
  37. fmt.Println("🔧 Testing protobuf marshaling...")
  38. if err := testProtobufMarshaling(); err != nil {
  39. log.Fatalf("❌ Protobuf marshaling test failed: %v", err)
  40. }
  41. fmt.Println("✅ Protobuf marshaling test passed")
  42. // Test protobuf client
  43. fmt.Println("🔄 Testing protobuf telemetry client...")
  44. if err := testTelemetryClient(); err != nil {
  45. log.Fatalf("❌ Telemetry client test failed: %v", err)
  46. }
  47. fmt.Println("✅ Telemetry client test passed")
  48. // Test server metrics endpoint
  49. fmt.Println("📊 Testing Prometheus metrics endpoint...")
  50. if err := testMetricsEndpoint(); err != nil {
  51. log.Fatalf("❌ Metrics endpoint test failed: %v", err)
  52. }
  53. fmt.Println("✅ Metrics endpoint test passed")
  54. // Test stats API
  55. fmt.Println("📈 Testing stats API...")
  56. if err := testStatsAPI(); err != nil {
  57. log.Fatalf("❌ Stats API test failed: %v", err)
  58. }
  59. fmt.Println("✅ Stats API test passed")
  60. // Test instances API
  61. fmt.Println("📋 Testing instances API...")
  62. if err := testInstancesAPI(); err != nil {
  63. log.Fatalf("❌ Instances API test failed: %v", err)
  64. }
  65. fmt.Println("✅ Instances API test passed")
  66. fmt.Println("🎉 All telemetry integration tests passed!")
  67. }
  68. func startTelemetryServer() (*exec.Cmd, error) {
  69. // Get the directory where this test is running
  70. testDir, err := os.Getwd()
  71. if err != nil {
  72. return nil, fmt.Errorf("failed to get working directory: %v", err)
  73. }
  74. // Navigate to the server directory (from main seaweedfs directory)
  75. serverDir := filepath.Join(testDir, "telemetry", "server")
  76. cmd := exec.Command("go", "run", ".",
  77. "-port="+serverPort,
  78. "-dashboard=false",
  79. "-cleanup=1m",
  80. "-max-age=1h")
  81. cmd.Dir = serverDir
  82. // Create log files for server output
  83. logFile, err := os.Create("telemetry-server-test.log")
  84. if err != nil {
  85. return nil, fmt.Errorf("failed to create log file: %v", err)
  86. }
  87. cmd.Stdout = logFile
  88. cmd.Stderr = logFile
  89. if err := cmd.Start(); err != nil {
  90. return nil, fmt.Errorf("failed to start server: %v", err)
  91. }
  92. return cmd, nil
  93. }
  94. func stopServer(cmd *exec.Cmd) {
  95. if cmd != nil && cmd.Process != nil {
  96. cmd.Process.Signal(syscall.SIGTERM)
  97. cmd.Wait()
  98. // Clean up log file
  99. os.Remove("telemetry-server-test.log")
  100. }
  101. }
  102. func waitForServer(url string, timeout time.Duration) bool {
  103. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  104. defer cancel()
  105. fmt.Printf("⏳ Waiting for server at %s...\n", url)
  106. for {
  107. select {
  108. case <-ctx.Done():
  109. return false
  110. default:
  111. resp, err := http.Get(url)
  112. if err == nil {
  113. resp.Body.Close()
  114. if resp.StatusCode == http.StatusOK {
  115. return true
  116. }
  117. }
  118. time.Sleep(500 * time.Millisecond)
  119. }
  120. }
  121. }
  122. func testProtobufMarshaling() error {
  123. // Test protobuf marshaling/unmarshaling
  124. testData := &proto.TelemetryData{
  125. ClusterId: "test-cluster-12345",
  126. Version: "test-3.45",
  127. Os: "linux/amd64",
  128. VolumeServerCount: 2,
  129. TotalDiskBytes: 1000000,
  130. TotalVolumeCount: 10,
  131. FilerCount: 1,
  132. BrokerCount: 1,
  133. Timestamp: time.Now().Unix(),
  134. }
  135. // Marshal
  136. data, err := protobuf.Marshal(testData)
  137. if err != nil {
  138. return fmt.Errorf("failed to marshal protobuf: %v", err)
  139. }
  140. fmt.Printf(" Protobuf size: %d bytes\n", len(data))
  141. // Unmarshal
  142. testData2 := &proto.TelemetryData{}
  143. if err := protobuf.Unmarshal(data, testData2); err != nil {
  144. return fmt.Errorf("failed to unmarshal protobuf: %v", err)
  145. }
  146. // Verify data
  147. if testData2.ClusterId != testData.ClusterId {
  148. return fmt.Errorf("protobuf data mismatch: expected %s, got %s",
  149. testData.ClusterId, testData2.ClusterId)
  150. }
  151. if testData2.VolumeServerCount != testData.VolumeServerCount {
  152. return fmt.Errorf("volume server count mismatch: expected %d, got %d",
  153. testData.VolumeServerCount, testData2.VolumeServerCount)
  154. }
  155. return nil
  156. }
  157. func testTelemetryClient() error {
  158. // Create telemetry client
  159. client := telemetry.NewClient(serverURL+"/api/collect", true)
  160. // Create test data using protobuf format
  161. testData := &proto.TelemetryData{
  162. Version: "test-3.45",
  163. Os: "linux/amd64",
  164. VolumeServerCount: 3,
  165. TotalDiskBytes: 1073741824, // 1GB
  166. TotalVolumeCount: 50,
  167. FilerCount: 2,
  168. BrokerCount: 1,
  169. Timestamp: time.Now().Unix(),
  170. }
  171. // Send telemetry data
  172. if err := client.SendTelemetry(testData); err != nil {
  173. return fmt.Errorf("failed to send telemetry: %v", err)
  174. }
  175. fmt.Printf(" Sent telemetry for cluster: %s\n", client.GetInstanceID())
  176. // Wait a bit for processing
  177. time.Sleep(2 * time.Second)
  178. return nil
  179. }
  180. func testMetricsEndpoint() error {
  181. resp, err := http.Get(serverURL + "/metrics")
  182. if err != nil {
  183. return fmt.Errorf("failed to get metrics: %v", err)
  184. }
  185. defer resp.Body.Close()
  186. if resp.StatusCode != http.StatusOK {
  187. return fmt.Errorf("metrics endpoint returned status %d", resp.StatusCode)
  188. }
  189. // Read response and check for expected metrics
  190. content, err := io.ReadAll(resp.Body)
  191. if err != nil {
  192. return fmt.Errorf("failed to read metrics response: %v", err)
  193. }
  194. contentStr := string(content)
  195. expectedMetrics := []string{
  196. "seaweedfs_telemetry_total_clusters",
  197. "seaweedfs_telemetry_active_clusters",
  198. "seaweedfs_telemetry_reports_received_total",
  199. "seaweedfs_telemetry_volume_servers",
  200. "seaweedfs_telemetry_disk_bytes",
  201. "seaweedfs_telemetry_volume_count",
  202. "seaweedfs_telemetry_filer_count",
  203. "seaweedfs_telemetry_broker_count",
  204. }
  205. for _, metric := range expectedMetrics {
  206. if !strings.Contains(contentStr, metric) {
  207. return fmt.Errorf("missing expected metric: %s", metric)
  208. }
  209. }
  210. // Check that we have at least one report received
  211. if !strings.Contains(contentStr, "seaweedfs_telemetry_reports_received_total 1") {
  212. fmt.Printf(" Warning: Expected at least 1 report received, metrics content:\n%s\n", contentStr)
  213. }
  214. fmt.Printf(" Found %d expected metrics\n", len(expectedMetrics))
  215. return nil
  216. }
  217. func testStatsAPI() error {
  218. resp, err := http.Get(serverURL + "/api/stats")
  219. if err != nil {
  220. return fmt.Errorf("failed to get stats: %v", err)
  221. }
  222. defer resp.Body.Close()
  223. if resp.StatusCode != http.StatusOK {
  224. return fmt.Errorf("stats API returned status %d", resp.StatusCode)
  225. }
  226. // Read and verify JSON response
  227. content, err := io.ReadAll(resp.Body)
  228. if err != nil {
  229. return fmt.Errorf("failed to read stats response: %v", err)
  230. }
  231. contentStr := string(content)
  232. if !strings.Contains(contentStr, "total_instances") {
  233. return fmt.Errorf("stats response missing total_instances field")
  234. }
  235. fmt.Printf(" Stats response: %s\n", contentStr)
  236. return nil
  237. }
  238. func testInstancesAPI() error {
  239. resp, err := http.Get(serverURL + "/api/instances?limit=10")
  240. if err != nil {
  241. return fmt.Errorf("failed to get instances: %v", err)
  242. }
  243. defer resp.Body.Close()
  244. if resp.StatusCode != http.StatusOK {
  245. return fmt.Errorf("instances API returned status %d", resp.StatusCode)
  246. }
  247. // Read response
  248. content, err := io.ReadAll(resp.Body)
  249. if err != nil {
  250. return fmt.Errorf("failed to read instances response: %v", err)
  251. }
  252. fmt.Printf(" Instances response length: %d bytes\n", len(content))
  253. return nil
  254. }