| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- package main
- import (
- "context"
- "fmt"
- "io"
- "log"
- "net/http"
- "os"
- "os/exec"
- "path/filepath"
- "strings"
- "syscall"
- "time"
- "github.com/seaweedfs/seaweedfs/telemetry/proto"
- "github.com/seaweedfs/seaweedfs/weed/telemetry"
- protobuf "google.golang.org/protobuf/proto"
- )
- const (
- serverPort = "18080" // Use different port to avoid conflicts
- serverURL = "http://localhost:" + serverPort
- )
- func main() {
- fmt.Println("🧪 Starting SeaweedFS Telemetry Integration Test")
- // Start telemetry server
- fmt.Println("📡 Starting telemetry server...")
- serverCmd, err := startTelemetryServer()
- if err != nil {
- log.Fatalf("❌ Failed to start telemetry server: %v", err)
- }
- defer stopServer(serverCmd)
- // Wait for server to start
- if !waitForServer(serverURL+"/health", 15*time.Second) {
- log.Fatal("❌ Telemetry server failed to start")
- }
- fmt.Println("✅ Telemetry server started successfully")
- // Test protobuf marshaling first
- fmt.Println("🔧 Testing protobuf marshaling...")
- if err := testProtobufMarshaling(); err != nil {
- log.Fatalf("❌ Protobuf marshaling test failed: %v", err)
- }
- fmt.Println("✅ Protobuf marshaling test passed")
- // Test protobuf client
- fmt.Println("🔄 Testing protobuf telemetry client...")
- if err := testTelemetryClient(); err != nil {
- log.Fatalf("❌ Telemetry client test failed: %v", err)
- }
- fmt.Println("✅ Telemetry client test passed")
- // Test server metrics endpoint
- fmt.Println("📊 Testing Prometheus metrics endpoint...")
- if err := testMetricsEndpoint(); err != nil {
- log.Fatalf("❌ Metrics endpoint test failed: %v", err)
- }
- fmt.Println("✅ Metrics endpoint test passed")
- // Test stats API
- fmt.Println("📈 Testing stats API...")
- if err := testStatsAPI(); err != nil {
- log.Fatalf("❌ Stats API test failed: %v", err)
- }
- fmt.Println("✅ Stats API test passed")
- // Test instances API
- fmt.Println("📋 Testing instances API...")
- if err := testInstancesAPI(); err != nil {
- log.Fatalf("❌ Instances API test failed: %v", err)
- }
- fmt.Println("✅ Instances API test passed")
- fmt.Println("🎉 All telemetry integration tests passed!")
- }
- func startTelemetryServer() (*exec.Cmd, error) {
- // Get the directory where this test is running
- testDir, err := os.Getwd()
- if err != nil {
- return nil, fmt.Errorf("failed to get working directory: %v", err)
- }
- // Navigate to the server directory (from main seaweedfs directory)
- serverDir := filepath.Join(testDir, "telemetry", "server")
- cmd := exec.Command("go", "run", ".",
- "-port="+serverPort,
- "-dashboard=false",
- "-cleanup=1m",
- "-max-age=1h")
- cmd.Dir = serverDir
- // Create log files for server output
- logFile, err := os.Create("telemetry-server-test.log")
- if err != nil {
- return nil, fmt.Errorf("failed to create log file: %v", err)
- }
- cmd.Stdout = logFile
- cmd.Stderr = logFile
- if err := cmd.Start(); err != nil {
- return nil, fmt.Errorf("failed to start server: %v", err)
- }
- return cmd, nil
- }
- func stopServer(cmd *exec.Cmd) {
- if cmd != nil && cmd.Process != nil {
- cmd.Process.Signal(syscall.SIGTERM)
- cmd.Wait()
- // Clean up log file
- os.Remove("telemetry-server-test.log")
- }
- }
- func waitForServer(url string, timeout time.Duration) bool {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- fmt.Printf("⏳ Waiting for server at %s...\n", url)
- for {
- select {
- case <-ctx.Done():
- return false
- default:
- resp, err := http.Get(url)
- if err == nil {
- resp.Body.Close()
- if resp.StatusCode == http.StatusOK {
- return true
- }
- }
- time.Sleep(500 * time.Millisecond)
- }
- }
- }
- func testProtobufMarshaling() error {
- // Test protobuf marshaling/unmarshaling
- testData := &proto.TelemetryData{
- ClusterId: "test-cluster-12345",
- Version: "test-3.45",
- Os: "linux/amd64",
- VolumeServerCount: 2,
- TotalDiskBytes: 1000000,
- TotalVolumeCount: 10,
- FilerCount: 1,
- BrokerCount: 1,
- Timestamp: time.Now().Unix(),
- }
- // Marshal
- data, err := protobuf.Marshal(testData)
- if err != nil {
- return fmt.Errorf("failed to marshal protobuf: %v", err)
- }
- fmt.Printf(" Protobuf size: %d bytes\n", len(data))
- // Unmarshal
- testData2 := &proto.TelemetryData{}
- if err := protobuf.Unmarshal(data, testData2); err != nil {
- return fmt.Errorf("failed to unmarshal protobuf: %v", err)
- }
- // Verify data
- if testData2.ClusterId != testData.ClusterId {
- return fmt.Errorf("protobuf data mismatch: expected %s, got %s",
- testData.ClusterId, testData2.ClusterId)
- }
- if testData2.VolumeServerCount != testData.VolumeServerCount {
- return fmt.Errorf("volume server count mismatch: expected %d, got %d",
- testData.VolumeServerCount, testData2.VolumeServerCount)
- }
- return nil
- }
- func testTelemetryClient() error {
- // Create telemetry client
- client := telemetry.NewClient(serverURL+"/api/collect", true)
- // Create test data using protobuf format
- testData := &proto.TelemetryData{
- Version: "test-3.45",
- Os: "linux/amd64",
- VolumeServerCount: 3,
- TotalDiskBytes: 1073741824, // 1GB
- TotalVolumeCount: 50,
- FilerCount: 2,
- BrokerCount: 1,
- Timestamp: time.Now().Unix(),
- }
- // Send telemetry data
- if err := client.SendTelemetry(testData); err != nil {
- return fmt.Errorf("failed to send telemetry: %v", err)
- }
- fmt.Printf(" Sent telemetry for cluster: %s\n", client.GetInstanceID())
- // Wait a bit for processing
- time.Sleep(2 * time.Second)
- return nil
- }
- func testMetricsEndpoint() error {
- resp, err := http.Get(serverURL + "/metrics")
- if err != nil {
- return fmt.Errorf("failed to get metrics: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("metrics endpoint returned status %d", resp.StatusCode)
- }
- // Read response and check for expected metrics
- content, err := io.ReadAll(resp.Body)
- if err != nil {
- return fmt.Errorf("failed to read metrics response: %v", err)
- }
- contentStr := string(content)
- expectedMetrics := []string{
- "seaweedfs_telemetry_total_clusters",
- "seaweedfs_telemetry_active_clusters",
- "seaweedfs_telemetry_reports_received_total",
- "seaweedfs_telemetry_volume_servers",
- "seaweedfs_telemetry_disk_bytes",
- "seaweedfs_telemetry_volume_count",
- "seaweedfs_telemetry_filer_count",
- "seaweedfs_telemetry_broker_count",
- }
- for _, metric := range expectedMetrics {
- if !strings.Contains(contentStr, metric) {
- return fmt.Errorf("missing expected metric: %s", metric)
- }
- }
- // Check that we have at least one report received
- if !strings.Contains(contentStr, "seaweedfs_telemetry_reports_received_total 1") {
- fmt.Printf(" Warning: Expected at least 1 report received, metrics content:\n%s\n", contentStr)
- }
- fmt.Printf(" Found %d expected metrics\n", len(expectedMetrics))
- return nil
- }
- func testStatsAPI() error {
- resp, err := http.Get(serverURL + "/api/stats")
- if err != nil {
- return fmt.Errorf("failed to get stats: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("stats API returned status %d", resp.StatusCode)
- }
- // Read and verify JSON response
- content, err := io.ReadAll(resp.Body)
- if err != nil {
- return fmt.Errorf("failed to read stats response: %v", err)
- }
- contentStr := string(content)
- if !strings.Contains(contentStr, "total_instances") {
- return fmt.Errorf("stats response missing total_instances field")
- }
- fmt.Printf(" Stats response: %s\n", contentStr)
- return nil
- }
- func testInstancesAPI() error {
- resp, err := http.Get(serverURL + "/api/instances?limit=10")
- if err != nil {
- return fmt.Errorf("failed to get instances: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("instances API returned status %d", resp.StatusCode)
- }
- // Read response
- content, err := io.ReadAll(resp.Body)
- if err != nil {
- return fmt.Errorf("failed to read instances response: %v", err)
- }
- fmt.Printf(" Instances response length: %d bytes\n", len(content))
- return nil
- }
|